NATS and JetStream

Overview of Pub/Sub and Queuing

NATS serves as our pub/sub + queuing + event streaming system. We don't use RabbitMQ/SQS/Kafka/Kinesis.

This article does not explain how NATS works. Read the NATS documentation (opens in a new tab) to get oriented. This article is about specific standards and patterns that we use at Pivot.

NATS uses the concept of 'messages' addressed to subjects which conflicts with our application concept of 'messages' in rooms. Therefore we say 'NATS message' to refer to an event sent by a publisher via NATS and 'Pivot chat message' to refer to a message sent in a Pivot room by a Pivot user. These concepts interact, becuase the Messenger service sends NATS messages whenever Pivot chat messages are sent!

Local Development

NATS is included in our dependencies-compose.yaml file in the pivot repo and is configured locally with a user for each service that uses it.

Cloud Deployment

We use Synadia Cloud, a hosted service from the makers of NATS. It is API compatible with open source NATS.

Patterns and Practices

Subject-Based Messaging

Multiple services should not publish to the same subject. A service should only publish to subjects using the service name in the first (or second in the case of ephemeral subjects) portion of the subject name. In fact, we create a user for each service that restricts the subjects each can post to to just those two.

The interesting thing about NATS that makes it so useful is that consumers can decide whether to consume a subject as if it were a traditional pub/sub service (each instance of our Dealer service reads every message for the subjects it consumes) or more of a message queue (our Buzzbuzz service instances consume each messages in a round-robin fashion and ack's each when done processing). This is a fancy version of Kafka's consumer groups concept, which leverages NATS Jetstream consumers.

A publisher should only publish a message to one subject. It is up to consumers to subscribe to that subject if they wish. They can configure that subscription in a way that supports their use case, as described below.

Schemas

NATS message bodies are just bytes. Producers are responsible for defining Protobuf schemas for their subjects and publishing with those schemas. Producers document on this site the name(s) of the subjects they publish to. Consumers can then subscribe to the subjects and decode messages using the schemas provided by the producer. Subjects need to be specific enough that a single Protobuf schema can correspond to all messages in that exact subject.

Protobuf schemas and NATS subjects should be versioned, but the version (e.g. v1) should be part of the last segment of the subject rather than its own segment (e.g. messenger.change_feed.room.createdv1 not .created.v1) because the version represents the specific Protobuf schema for the message, not a 'subcatagory' of NATS message.

Jetstream vs. core NATS

When we initialize a NATS instance/cluster, in addition to creating NATS users and permissions for each service, we create a JetStream stream called pivot_main and configure the subjects it is subscribed to. While we can always add more streams in the future, it is more efficient to centrally store all messages in one and use replicas for durability and throughput rather than storing the same messages in multiple streams.

Each publishing service is responsible for creating/joining a JetStream durable consumer on this stream for queuing use cases, or for consuming with per-instance ephemeral consumers for pub/sub use cases. This is further described in the next sections.

Because pivot_main uses interest message retention, messages will be retained as long as the message was published after the durable consumers are created, which happens when each service start up. We assume that services successfully established their durable consumers before they subsequently failed, and therefore we assume NATS will retain the messages that haven't been ack'd by that service's durable consumer.

Publishers should publish to the pivot_main stream, unless their messages are intended to be ephemeral – high volume, low value. Publishers can publish core NATS with subjects prefixed with ephemeral.servicename., such as ephemeral.blockhead.views for example. This identifies to consumers that 1) this subject will not be included in the JetStream stream and that 2) the publisher is using core NATS to publish, so messages will be delivered at most once.

It is acceptable for consumers to subscribe via core NATS subjects rather than the Jetstream stream if they don't care about acknowledgements or durability and just want to listen to the flow of NATS messages, but publishers should always use the Jetstream publish method in the NATS client library, not the Core NATS publish method, if they are publishing to a subject prefixed with servicename. rather than ephemeral.servicename..

JetStream supports consumer-side subject filtration, so consumers can still choose to consume only a subset of the pivot_main stream, as defined in the consumer configuration that should be in the service's application code.

Queue

If a service wants each message to be consumed by one instance of itself and manually 'acked', then it should consume via Jetstream using a durable consumer. (I.e., set durable on the configuration consumer at service start time to the same name.) This will result in 'round-robin' message delivery.

Example of this pattern: the Buzzbuzz service needs to process each event that could potentially trigger a notification once. If an instance of Buzzbuzz successfully processes a NATS message, it need not go to another instance also.

Note that the service must delete an old durable consumer if it is no longer used or the stream will fill up! This is intended JetStream behavior – just because all instances of a durable consumer are offline doesn't mean it should be cleaned up.

Pub/Sub

If a service wants all instances of itself to receive and process each message, then it should either consume via core NATS or use Jetstream without durable set on the created consumer.

Example of this pattern: a single instance of the Dealer service cannot process NATS messages on behalf of all instances. All Dealer instances must individually read each NATS message for the subjects that Dealer is subscribed to, so that they can assess whether any of the userIds corresponding to their websocket connections need to know about the event that has taken place. This is a good use case for core NATS subscription, because Dealer can't reliably deliver messages via Pilot websockets anyway, so there really isn't a need to introduce ack-ing.

Addressing the Dual-Write Problem

We don't use change data capture to publish to NATS. See the Service Oriented Architecture article for more on this.

Message Contents

Message content should be limited to be mindful of our 5 MB max size (stay well below that) but at the same time, publishers should attempt to provide all the information a consumer would need to process the message. For example, when the Messenger service publishes to NATS to represent a message.sent event when a Pivot chat message is sent in a Pivot room, at least 100 to 200 characters of the message text as well as the array of mentioned userIds should be included in the body of the NATS message, because while the Dealer service only needs the messageId and userIds, the Buzzbuzz service needs more information and should not need to make a gRPC query to Messenger, it should just be able to rely on the NATS message.

userIds[]

Often, NATS messages should include an array of userIds. This way, services like Dealer can determine which users need to receive notification of new data and services like Buzzbuzz can determine who might need a notification, without going back to the publishing service for more information.

For example, when a Pivot chat message is sent in a room, the Messenger service sends a NATS message. This NATS message should include the userIds of room members (or space members with access to the room block), so that Dealer can simply interate over that array to determine which websocket connections need to recieve a message. Likewise Buzzbuzz knows that it needs to consider creating a notification for each userId in the array.

Handling Large Audience Messaging

Consider the Messenger service where each event (a Pivot chat message being sent) can potentially have a large number of recipients. The event data includes a messageId, roomId, and an array of userIds, with each userId being approximately 30 characters long.

The design challenge is to find a balance between sending as few NATS messages as possible to minimize load on NATS, and providing enough information in each NATS message to minimize the need for consuming services to query the database for additional data.

Given that our NATS configuration supports a maximum message size of 5MB, it's feasible to include the entire array of recipient userIds in each message, as long as the total number of recipients doesn't exceed a certain threshold. From initial calculations, a single NATS message can accommodate around 100,000 userIds, alongside other message content. However, to account for variability and potential overheads, we recommend setting a much lower threshold. For instance, you might choose to send multiple messages if the number of recipients exceeds 10,000.

Here's how this pattern works:

When preparing to send an event, the publishing service collects the necessary data, including the list of recipient userIds. If the number of userIds is below the defined threshold (e.g., 10,000), the service includes all userIds in a single message and publishes it. If the number of userIds exceeds the threshold, the service splits them into chunks of 10,000 (or your chosen threshold) and sends multiple messages. Each message contains the same event data, but a different chunk of userIds. By following this pattern, you can ensure that each message is functional on its own and includes all the data needed for consuming services to process the event, minimizing the need for additional database queries.

This pattern assumes that consuming services do not require message ordering and that consuming services can use the messageId (or whatever record ID) as an identempocy key. Each message is functional on its own and can be processed independently.

Observability

It's essential to monitor consumer pending message count and stream size and properly alert on those metrics. Stream size growth tells us there is a problem but not what the problem is. Consumer pending count tells us which service is causing the issue.