The Problem:
A developer wants to create multiple consumers for multiple subjects on the same stream in NATS JetStream. They are using the Pull subscribe feature to establish a pull/request relationship between the NATS server and the message source. The developer is unsure if it is possible to keep both subjects active simultaneously and seeks guidance on how to achieve this.
The Solutions:
Solution 1: Using multiple consumers on a single stream
You can create multiple consumers for a single stream on NATS JetStream, allowing you to subscribe to different subjects within that stream.
Each consumer can be configured with a specific subject filter, which determines the messages it will receive. For example, if you have a stream with subjects "events.one", "events.two", and "data.one", you could create one consumer to receive all messages on "events." and another consumer to receive all messages on "data.".
Benefits of using multiple consumers:
- Scalability: If you have a large number of messages being published to a single subject, it may be more efficient to create multiple consumers to handle the load.
- Flexibility: Multiple consumers allow you to filter messages based on different criteria, such as subject or message content.
- Reliability: If one consumer fails, other consumers will still be able to receive messages.
How to use multiple consumers:
To create multiple consumers on a single stream, you can use the following steps:
- Create a stream using the
nats.NewStream()
function. - For each subject you want to consume, create a consumer using the
nats.NewConsumer()
function. - Configure the consumer with the desired subject filter and ack policy.
- Start the consumer by calling the
nats.Start()
function.
Once you have created multiple consumers, you can use the nats.Pull()
function to receive messages from each consumer.
Code example:
package main
import (
"context"
"errors"
"log"
"github.com/nats-io/go/v2"
"github.com/nats-io/go/v2/jetstream"
)
func main() {
// Create a NATS connection
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Create a JetStream management interface
js, err := jetstream.New(nc)
if err != nil {
log.Fatal(err)
}
// Get the stream handle
stream, err := js.Stream(context.Background(), "ORDERS")
if err != nil {
log.Fatal(err)
}
// Create a consumer for each subject
consumers := make(map[string]*jetstream.Consumer)
for _, subject := range []string{"events.*", "data.*"} {
consumer, err := stream.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{
Durable: "my-consumer",
AckPolicy: jetstream.AckExplicitPolicy,
Subject: subject,
})
if err != nil {
log.Fatal(err)
}
consumers[subject] = consumer
}
// Start the consumers
for _, consumer := range consumers {
if err := consumer.Start(context.Background()); err != nil {
log.Fatal(err)
}
}
// Pull messages from the consumers
for {
select {
case msg, ok := <-consumers["events.*"].Msgs:
if ok {
log.Printf("Received message from events.*: %s", string(msg.Data))
msg.Ack()
} else {
log.Fatal(errors.New("consumer closed"))
}
case msg, ok := <-consumers["data.*"].Msgs:
if ok {
log.Printf("Received message from data.*: %s", string(msg.Data))
msg.Ack()
} else {
log.Fatal(errors.New("consumer closed"))
}
}
}
}
Solution 2: Multiple Consumers and Subjects with NATS JetStream
To create multiple consumers for multiple subjects on a single NATS JetStream stream, follow these steps:
-
Configure the Stream:
- Create a stream configuration (
nats.StreamConfig
) with the desired stream name and a list of subject patterns.
- Create a stream configuration (
-
Add the Stream:
- Use
js.AddStream()
to add the configured stream to JetStream.
- Use
-
Add Consumers:
- For each subject you wish to consume from, create a consumer configuration (
nats.ConsumerConfig
) with the desired consumer options (e.g., durable, delivery subject, ack policy). - Use
js.AddConsumer()
to add each consumer to the stream. Set theDeliverySubject
to an empty string.
- For each subject you wish to consume from, create a consumer configuration (
-
Subscribe to Messages:
- For each consumer, use
js.PullSubscribe()
to bind the consumer to the stream and create a subscription for the specified subject.
- For each consumer, use
By following these steps, you can create multiple consumers that can consume messages from multiple subjects within the same NATS JetStream stream.
Code Sample:
import (
"github.com/nats-io/nats.go"
)
func main() {
js, _ := nats.Connect(nats.DefaultURL)
// Configure the stream
streamConfig := &nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"ORDERS.*"},
}
// Add the stream
if _, err := js.AddStream(streamConfig); err != nil {
// Handle error
}
// Add consumers
for subject := range subjects {
consumerName := "consumer-" + subject
consumerConfig := &nats.ConsumerConfig{
Durable: consumerName,
AckPolicy: nats.AckExplicitPolicy,
}
if _, err := js.AddConsumer(streamConfig.Name, consumerConfig); err != nil {
// Handle error
}
// Subscribe to messages
if sub, err := js.PullSubscribe(subject, consumerName, nats.Bind(streamConfig.Name, consumerName), nats.MaxDeliver(2)); err != nil {
// Handle error
} else {
go handleMessages(sub)
}
}
}
// handleMessages handles incoming messages from the subscription.
func handleMessages(sub *nats.Subscription) {
for {
msg, err := sub.NextMsg()
if err != nil {
// Handle error
}
// Process the message
_ = msg.Ack()
}
}
Q&A
Can you make multiple consumers for multiple subjects on one stream on NATS JetStream?
Yes, you can make multiple consumers for multiple subjects on one stream on NATS JetStream.
If you need to make a filter for the JetStream, what field should you use?
FilterSubject
Video Explanation:
The following video, titled "The ONE feature that makes NATS more powerful than Kafka, Pulsar ...", provides additional insights and in-depth exploration related to the topics discussed in this post.
JetStream's consumers are so incredibly flexible that it makes it possible to create entirely new applications with a single dataset as a ...
The following video, titled "The ONE feature that makes NATS more powerful than Kafka, Pulsar ...", provides additional insights and in-depth exploration related to the topics discussed in this post.
JetStream's consumers are so incredibly flexible that it makes it possible to create entirely new applications with a single dataset as a ...