Can I make multiple consumers for multiple subjects on one stream on NATS JetStream? – Go

by
Ali Hasan
algorithm nats-jetstream nats-streaming-server nats.io

Quick Fix: Yes, You can create JetStream consumer with multiple filters using nats-server 2.10 and latest clients. Also, it’s recommended to use the new JS API to simplify usage.

The Problem:

A developer wants to create multiple consumers for multiple subjects on the same stream in NATS JetStream. They are using the Pull subscribe feature to establish a pull/request relationship between the NATS server and the message source. The developer is unsure if it is possible to keep both subjects active simultaneously and seeks guidance on how to achieve this.

The Solutions:

Solution 1: Using multiple consumers on a single stream

You can create multiple consumers for a single stream on NATS JetStream, allowing you to subscribe to different subjects within that stream.

Each consumer can be configured with a specific subject filter, which determines the messages it will receive. For example, if you have a stream with subjects "events.one", "events.two", and "data.one", you could create one consumer to receive all messages on "events." and another consumer to receive all messages on "data.".

Benefits of using multiple consumers:

  • Scalability: If you have a large number of messages being published to a single subject, it may be more efficient to create multiple consumers to handle the load.
  • Flexibility: Multiple consumers allow you to filter messages based on different criteria, such as subject or message content.
  • Reliability: If one consumer fails, other consumers will still be able to receive messages.

How to use multiple consumers:

To create multiple consumers on a single stream, you can use the following steps:

  1. Create a stream using the nats.NewStream() function.
  2. For each subject you want to consume, create a consumer using the nats.NewConsumer() function.
  3. Configure the consumer with the desired subject filter and ack policy.
  4. Start the consumer by calling the nats.Start() function.

Once you have created multiple consumers, you can use the nats.Pull() function to receive messages from each consumer.

Code example:

package main

import (
	"context"
	"errors"
	"log"

	"github.com/nats-io/go/v2"
	"github.com/nats-io/go/v2/jetstream"
)

func main() {
	// Create a NATS connection
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Close()

	// Create a JetStream management interface
	js, err := jetstream.New(nc)
	if err != nil {
		log.Fatal(err)
	}

	// Get the stream handle
	stream, err := js.Stream(context.Background(), "ORDERS")
	if err != nil {
		log.Fatal(err)
	}

	// Create a consumer for each subject
	consumers := make(map[string]*jetstream.Consumer)
	for _, subject := range []string{"events.*", "data.*"} {
		consumer, err := stream.CreateOrUpdateConsumer(context.Background(), jetstream.ConsumerConfig{
			Durable:   "my-consumer",
			AckPolicy: jetstream.AckExplicitPolicy,
			Subject:   subject,
		})
		if err != nil {
			log.Fatal(err)
		}
		consumers[subject] = consumer
	}

	// Start the consumers
	for _, consumer := range consumers {
		if err := consumer.Start(context.Background()); err != nil {
			log.Fatal(err)
		}
	}

	// Pull messages from the consumers
	for {
		select {
		case msg, ok := <-consumers["events.*"].Msgs:
			if ok {
				log.Printf("Received message from events.*: %s", string(msg.Data))
				msg.Ack()
			} else {
				log.Fatal(errors.New("consumer closed"))
			}
		case msg, ok := <-consumers["data.*"].Msgs:
			if ok {
				log.Printf("Received message from data.*: %s", string(msg.Data))
				msg.Ack()
			} else {
				log.Fatal(errors.New("consumer closed"))
			}
		}
	}
}

Solution 2: Multiple Consumers and Subjects with NATS JetStream

To create multiple consumers for multiple subjects on a single NATS JetStream stream, follow these steps:

  1. Configure the Stream:

    • Create a stream configuration (nats.StreamConfig) with the desired stream name and a list of subject patterns.
  2. Add the Stream:

    • Use js.AddStream() to add the configured stream to JetStream.
  3. Add Consumers:

    • For each subject you wish to consume from, create a consumer configuration (nats.ConsumerConfig) with the desired consumer options (e.g., durable, delivery subject, ack policy).
    • Use js.AddConsumer() to add each consumer to the stream. Set the DeliverySubject to an empty string.
  4. Subscribe to Messages:

    • For each consumer, use js.PullSubscribe() to bind the consumer to the stream and create a subscription for the specified subject.

By following these steps, you can create multiple consumers that can consume messages from multiple subjects within the same NATS JetStream stream.

Code Sample:

import (
    "github.com/nats-io/nats.go"
)

func main() {
    js, _ := nats.Connect(nats.DefaultURL)

    // Configure the stream
    streamConfig := &nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.*"},
    }

    // Add the stream
    if _, err := js.AddStream(streamConfig); err != nil {
        // Handle error
    }

    // Add consumers
    for subject := range subjects {
        consumerName := "consumer-" + subject

        consumerConfig := &nats.ConsumerConfig{
            Durable:     consumerName,
            AckPolicy:   nats.AckExplicitPolicy,
        }

        if _, err := js.AddConsumer(streamConfig.Name, consumerConfig); err != nil {
            // Handle error
        }

        // Subscribe to messages
        if sub, err := js.PullSubscribe(subject, consumerName, nats.Bind(streamConfig.Name, consumerName), nats.MaxDeliver(2)); err != nil {
            // Handle error
        } else {
            go handleMessages(sub)
        }
    }
}

// handleMessages handles incoming messages from the subscription.
func handleMessages(sub *nats.Subscription) {
    for {
        msg, err := sub.NextMsg()
        if err != nil {
            // Handle error
        }

        // Process the message
        _ = msg.Ack()
    }
}

Q&A

Can you make multiple consumers for multiple subjects on one stream on NATS JetStream?

Yes, you can make multiple consumers for multiple subjects on one stream on NATS JetStream.

If you need to make a filter for the JetStream, what field should you use?

FilterSubject

Video Explanation:

The following video, titled "The ONE feature that makes NATS more powerful than Kafka, Pulsar ...", provides additional insights and in-depth exploration related to the topics discussed in this post.

Play video

JetStream's consumers are so incredibly flexible that it makes it possible to create entirely new applications with a single dataset as a ...