NATS and JetStream
Overview of Pub/Sub and Queuing
NATS serves as our pub/sub + queuing + event streaming system. We don't use RabbitMQ/SQS/Kafka/Kinesis.
This article does not explain how NATS works. Read the NATS documentation (opens in a new tab) to get oriented. This article is about specific standards and patterns that we use at Pivot.
NATS uses the concept of 'messages' addressed to subjects which conflicts with our application concept of 'messages' in rooms. Therefore we say 'NATS message' to refer to an event sent by a publisher via NATS and 'Pivot chat message' to refer to a message sent in a Pivot room by a Pivot user. These concepts interact, becuase the Messenger service sends NATS messages whenever Pivot chat messages are sent!
Local Development
NATS is included in our dependencies-compose.yaml file in the pivot repo and
is configured locally with a user for each service that uses it.
Cloud Deployment
We use Synadia Cloud, a hosted service from the makers of NATS. It is API compatible with open source NATS.
Patterns and Practices
Subject-Based Messaging
Multiple services should not publish to the same subject. A service should
only publish to subjects using the service name in the first (or second in the
case of ephemeral subjects) portion of the subject name. In fact, we create
a user for each service that restricts the subjects each can post to to just
those two.
The interesting thing about NATS that makes it so useful is that consumers can
decide whether to consume a subject as if it were a traditional pub/sub service
(each instance of our Dealer service reads every message for the subjects it
consumes) or more of a message queue (our Buzzbuzz service instances consume
each messages in a round-robin fashion and ack's each when done processing).
This is a fancy version of Kafka's consumer groups concept, which leverages NATS
Jetstream consumers.
A publisher should only publish a message to one subject. It is up to consumers to subscribe to that subject if they wish. They can configure that subscription in a way that supports their use case, as described below.
Schemas
NATS message bodies are just bytes. Producers are responsible for defining Protobuf schemas for their subjects and publishing with those schemas. Producers document on this site the name(s) of the subjects they publish to. Consumers can then subscribe to the subjects and decode messages using the schemas provided by the producer. Subjects need to be specific enough that a single Protobuf schema can correspond to all messages in that exact subject.
Protobuf schemas and NATS subjects should be versioned, but the version (e.g.
v1) should be part of the last segment of the subject rather than its own
segment (e.g. messenger.change_feed.room.createdv1 not .created.v1) because
the version represents the specific Protobuf schema for the message, not a
'subcatagory' of NATS message.
Jetstream vs. core NATS
When we initialize a NATS instance/cluster, in addition to creating NATS users
and permissions for each service, we create a JetStream stream called
pivot_main and configure the subjects it is subscribed to. While we can always
add more streams in the future, it is more efficient to centrally store all
messages in one and use replicas for durability and throughput rather than
storing the same messages in multiple streams.
Each publishing service is responsible for creating/joining a JetStream durable consumer on this stream for queuing use cases, or for consuming with per-instance ephemeral consumers for pub/sub use cases. This is further described in the next sections.
Because pivot_main uses interest message retention, messages will be
retained as long as the message was published after the durable consumers are
created, which happens when each service start up. We assume that services
successfully established their durable consumers before they subsequently
failed, and therefore we assume NATS will retain the messages that haven't been
ack'd by that service's durable consumer.
Publishers should publish to the pivot_main stream, unless their messages are
intended to be ephemeral – high volume, low value. Publishers can publish core
NATS with subjects prefixed with ephemeral.servicename., such as
ephemeral.blockhead.views for example. This identifies to consumers that 1)
this subject will not be included in the JetStream stream and that 2) the
publisher is using core NATS to publish, so messages will be delivered at most
once.
It is acceptable for consumers to subscribe via core NATS subjects rather than
the Jetstream stream if they don't care about acknowledgements or durability and
just want to listen to the flow of NATS messages, but publishers should always
use the Jetstream publish method in the NATS client library, not the Core NATS
publish method, if they are publishing to a subject prefixed with servicename.
rather than ephemeral.servicename..
JetStream supports consumer-side subject filtration, so consumers can still
choose to consume only a subset of the pivot_main stream, as defined in the
consumer configuration that should be in the service's application code.
Queue
If a service wants each message to be consumed by one instance of itself and
manually 'acked', then it should consume via Jetstream using a durable consumer.
(I.e., set durable on the configuration consumer at service start time to the
same name.) This will result in 'round-robin' message delivery.
Example of this pattern: the Buzzbuzz service needs to process each event that could potentially trigger a notification once. If an instance of Buzzbuzz successfully processes a NATS message, it need not go to another instance also.
Note that the service must delete an old durable consumer if it is no longer used or the stream will fill up! This is intended JetStream behavior – just because all instances of a durable consumer are offline doesn't mean it should be cleaned up.
Pub/Sub
If a service wants all instances of itself to receive and process each message,
then it should either consume via core NATS or use Jetstream without durable
set on the created consumer.
Example of this pattern: a single instance of the Dealer service cannot process NATS messages on behalf of all instances. All Dealer instances must individually read each NATS message for the subjects that Dealer is subscribed to, so that they can assess whether any of the
userIdscorresponding to their websocket connections need to know about the event that has taken place. This is a good use case for core NATS subscription, because Dealer can't reliably deliver messages via Pilot websockets anyway, so there really isn't a need to introduce ack-ing.
Addressing the Dual-Write Problem
We don't use change data capture to publish to NATS. See the Service Oriented Architecture article for more on this.
Message Contents
Message content should be limited to be mindful of our 5 MB max size (stay well
below that) but at the same time, publishers should attempt to provide all the
information a consumer would need to process the message. For example, when the
Messenger service publishes to NATS to represent a message.sent event when a
Pivot chat message is sent in a Pivot room, at least 100 to 200 characters of
the message text as well as the array of mentioned userIds should be included
in the body of the NATS message, because while the Dealer service only needs the
messageId and userIds, the Buzzbuzz service needs more information and
should not need to make a gRPC query to Messenger, it should just be able to
rely on the NATS message.
userIds[]
Often, NATS messages should include an array of userIds. This way, services
like Dealer can determine which users need to receive notification of new data
and services like Buzzbuzz can determine who might need a notification, without
going back to the publishing service for more information.
For example, when a Pivot chat message is sent in a room, the Messenger service
sends a NATS message. This NATS message should include the userIds of room
members (or space members with access to the room block), so that Dealer can
simply interate over that array to determine which websocket connections need to
recieve a message. Likewise Buzzbuzz knows that it needs to consider creating a
notification for each userId in the array.
Handling Large Audience Messaging
Consider the Messenger service where each event (a Pivot chat message being
sent) can potentially have a large number of recipients. The event data includes
a messageId, roomId, and an array of userIds, with each userId being
approximately 30 characters long.
The design challenge is to find a balance between sending as few NATS messages as possible to minimize load on NATS, and providing enough information in each NATS message to minimize the need for consuming services to query the database for additional data.
Given that our NATS configuration supports a maximum message size of 5MB, it's
feasible to include the entire array of recipient userIds in each message, as
long as the total number of recipients doesn't exceed a certain threshold. From
initial calculations, a single NATS message can accommodate around 100,000
userIds, alongside other message content. However, to account for variability
and potential overheads, we recommend setting a much lower threshold. For
instance, you might choose to send multiple messages if the number of recipients
exceeds 10,000.
Here's how this pattern works:
When preparing to send an event, the publishing service collects the necessary
data, including the list of recipient userIds. If the number of userIds is
below the defined threshold (e.g., 10,000), the service includes all userIds
in a single message and publishes it. If the number of userIds exceeds the
threshold, the service splits them into chunks of 10,000 (or your chosen
threshold) and sends multiple messages. Each message contains the same event
data, but a different chunk of userIds. By following this pattern, you can
ensure that each message is functional on its own and includes all the data
needed for consuming services to process the event, minimizing the need for
additional database queries.
This pattern assumes that consuming services do not require message ordering and
that consuming services can use the messageId (or whatever record ID) as an
identempocy key. Each message is functional on its own and can be processed
independently.
Observability
It's essential to monitor consumer pending message count and stream size and properly alert on those metrics. Stream size growth tells us there is a problem but not what the problem is. Consumer pending count tells us which service is causing the issue.