How to Use Message Queues (NATS, RabbitMQ, Kafka) in Go

Web
Connect to NATS, RabbitMQ, or Kafka in Go by installing their respective third-party client libraries and using their connection methods.

The shock absorber for your system

You built a service that processes images. It works fine with ten requests per second. Then a marketing campaign hits, and a thousand requests arrive at once. Your database locks up. The HTTP server runs out of memory. The whole thing grinds to a halt. The fix isn't always more hardware. Sometimes you need a shock absorber. You need a message queue.

A message queue sits between the producer (the code that creates work) and the consumer (the code that does the work). The producer drops a message on the belt and moves on. The consumer picks up messages at its own pace. If the belt fills up, the producer slows down or drops messages, but it doesn't crash the consumer. If the consumer is slow, messages pile up safely. The two sides stop talking directly. They talk to the queue. This decoupling saves systems from cascading failures.

Decoupling with a buffer

Queues solve three problems. They handle spikes in traffic by buffering requests. They allow services to run at different speeds. They provide a single point of failure isolation. If the consumer is down, the queue holds the messages. When the consumer comes back, it processes the backlog. The producer never sees the downtime.

Go's standard library doesn't include clients for external message brokers. The philosophy is to keep the core small and let the community build high-quality third-party tools. You'll use libraries like nats.go, amqp091-go, or sarama. These are mature, well-maintained packages. You add them to your module with go get.

NATS: Speed and simplicity

NATS is a lightweight messaging system. It focuses on low latency and simple publish-subscribe patterns. The nats.go client is idiomatic and easy to use. It handles reconnection and buffering for you.

Here's the simplest NATS connection: connect, publish a message, flush the buffer, and close.

package main

import (
	"fmt"
	"log"

	"github.com/nats-io/nats.go"
)

func main() {
	// Connect to the default NATS server URL.
	// nats.Connect handles the network handshake and returns a connection object.
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		// Log the error and exit. In production, handle this gracefully.
		log.Fatal(err)
	}
	// Close the connection when main exits to free resources.
	defer nc.Close()

	// Publish a message to the "orders" subject.
	// NATS is a publish-subscribe system; the subject acts like a topic.
	err = nc.Publish("orders", []byte("new order received"))
	if err != nil {
		log.Fatal(err)
	}

	// Flush ensures the message reaches the server before we proceed.
	// Without this, the client might close the connection before sending.
	err = nc.Flush()
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println("Message sent")
}

The nats.Connect call opens a TCP connection to the broker. The Publish call writes bytes to the network buffer. Flush forces the buffer to the wire. This is important because network writes can be asynchronous. If you skip Flush, the program might exit before the message leaves the client. The defer nc.Close ensures the connection shuts down cleanly. Go's defer runs when the function returns, which is perfect for cleanup.

NATS is fast. Don't over-engineer the routing.

RabbitMQ: Reliability and routing

RabbitMQ uses the AMQP protocol. It's heavier than NATS but offers more features. You get complex routing, message persistence, and reliable delivery guarantees. The amqp091-go client is the standard choice.

RabbitMQ separates connections from channels. A connection is an expensive TCP socket. A channel is a lightweight logical stream over that socket. You create one connection and multiple channels. This saves resources and simplifies management.

Here's how you set up a connection and channel.

// Connect to RabbitMQ and open a channel.
// amqp.Dial creates the TCP connection and authenticates.
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
	log.Fatal(err)
}
// Close the connection when done.
defer conn.Close()

// Create a channel for publishing and consuming.
// Channels are cheap; reuse them instead of creating new ones.
ch, err := conn.Channel()
if err != nil {
	log.Fatal(err)
}
defer ch.Close()

Once you have a channel, you declare a queue and publish messages. Queue declaration ensures the queue exists. You can set durability so messages survive broker restarts.

// Declare a queue named "tasks".
// Arguments: name, durable, auto-delete, exclusive, no-wait, args.
q, err := ch.QueueDeclare("tasks", true, false, false, false, nil)
if err != nil {
	log.Fatal(err)
}

// Publish a message to the default exchange.
// The routing key matches the queue name for direct binding.
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
	Body: []byte("heavy task"),
})
if err != nil {
	log.Fatal(err)
}

The QueueDeclare call creates the queue if it doesn't exist. The true flag makes it durable. The Publish call sends the message. The empty string for the exchange uses the default exchange, which routes directly to queues by name. RabbitMQ is reliable. Define your queues explicitly.

Kafka: The distributed log

Kafka is a distributed commit log. It's designed for high-throughput data pipelines. The sarama client gives you access to Kafka's partitioning model. You don't just publish to a topic; you publish to a topic and partition. This allows strict ordering within partitions and massive parallelism across partitions.

Sarama is lower level than the NATS or RabbitMQ clients. You configure producers and consumers explicitly. You handle partition assignment and offset management. This gives you control but adds complexity.

Use a SyncProducer when you need to know if a message was accepted before proceeding. Use an AsyncProducer when you want to fire-and-forget and handle callbacks. Sarama doesn't manage offsets for you. You must store and commit offsets in your application or use a consumer group.

Kafka is a log. Respect the partition order.

Context and graceful shutdown

Go applications use context.Context for cancellation and deadlines. Message queue clients often manage their own lifecycles. You need to bridge the gap. Wrap your queue logic in a goroutine that listens on the context's done channel.

func runConsumer(ctx context.Context, ch *amqp.Channel) {
	// Start a goroutine to listen for context cancellation.
	go func() {
		select {
		case <-ctx.Done():
			// Context cancelled; close the channel to stop consuming.
			ch.Close()
		}
	}()

	// Consume messages until the channel is closed.
	msgs, err := ch.Consume("tasks", "", true, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}

	// Block until the channel closes or an error occurs.
	for msg := range msgs {
		processMessage(msg)
	}
}

The select statement waits for the context to finish. When it does, it closes the channel. The Consume call returns when the channel closes. This pattern ensures your consumer stops cleanly when the application shuts down. Always pass context.Context as the first parameter to functions that start long-running operations.

Context is plumbing. Run it through every long-lived call site.

Pitfalls and errors

Message queues introduce network dependencies. If the broker is down, your code needs to handle it. nats.Connect returns an error like nats: no servers available if it can't reach the server. You must check this error. If you ignore it, your program panics later when it tries to publish.

Subscriptions hold resources. If you create a subscription and never close it, you leak a goroutine and a connection. Always call Unsubscribe or close the connection. The worst goroutine bug is the one that never logs.

Publishing can block if the network buffer fills up. The client has an internal buffer. If you produce faster than the network can send, Publish waits. This can stall your goroutine. Use SetFlusher or check errors to handle backpressure.

RabbitMQ channels can be closed by the server if an error occurs. Check the NotifyClose channel to detect this. If you don't, your code might hang waiting on a dead channel.

Go conventions for queue code

Go has strong conventions for error handling and structure. Follow them to keep your code readable.

Check every error. if err != nil { return err } is verbose by design. The community accepts the boilerplate because it makes the unhappy path visible. Don't use _ to discard errors unless you have a specific reason.

Use gofmt to format your code. Don't argue about indentation; let the tool decide. Most editors run it on save.

Name your receivers with one or two letters matching the type. (c *Client) Publish(...) is better than (this *Client).

Public names start with a capital letter. Private names start lowercase. No keywords like public or private.

Accept interfaces, return structs. If you wrap a queue client, return a concrete struct. If you depend on a queue, accept an interface. This makes testing easier.

Choosing the right tool

Use NATS when you need low-latency pub-sub or request-reply patterns with minimal configuration. Use RabbitMQ when you require complex routing, message persistence, and reliable delivery guarantees for business-critical workflows. Use Kafka when you are building a data pipeline that needs to store high-volume streams and replay history for analytics. Use Go channels when the producer and consumer run in the same process and you want zero network overhead. Use direct HTTP calls when the consumer must respond immediately and you can tolerate tight coupling between services.

Queues decouple time. Respect the buffer.

Where to go next