How to Use RabbitMQ in Go with amqp091-go

Web
Connect to RabbitMQ in Go by dialing the server URL and publishing messages via an amqp091-go channel.

The signup form that waits too long

You build a signup form. The user clicks "Create Account". Your server validates the password, writes to the database, and returns a 200 OK. Then you realize you also need to send a welcome email. If you send the email inside the handler, the user waits three seconds for the SMTP server to respond. If the email server is down, the signup fails. You need a way to hand off the work without blocking.

RabbitMQ solves this by letting you drop a message in a queue and move on. Another service picks up the message later and sends the email. Your signup stays fast and reliable. The queue absorbs spikes in traffic. If ten thousand users sign up at once, the messages pile up in the queue. The email worker processes them at a steady pace. The system doesn't crash.

Connections, channels, and queues

RabbitMQ speaks AMQP, the Advanced Message Queuing Protocol. It's a standard for how applications talk to a message broker. The library github.com/streadway/amqp implements this protocol for Go. The model has a few layers.

A connection is the TCP link to the broker. It's expensive to create. You negotiate credentials, virtual hosts, and protocol versions over the connection. A channel is a lightweight virtual connection inside the TCP connection. You do all your work on channels. Channels multiplex requests over a single TCP socket. A queue holds messages until a consumer reads them. An exchange decides where messages go based on routing keys.

Think of the connection as a highway. Channels are the lanes. You don't build a new highway every time you drive; you just pick a lane. Queues are the destinations. Exchanges are the traffic cops directing cars to the right lane.

Channels are cheap. Connections are not. Reuse connections and create channels per logical task.

Minimal publisher

Here's the skeleton. Connect, open a channel, declare a queue, publish a message, and close everything.

// Connect to the broker and open a channel.
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
    log.Fatalf("connection failed: %v", err)
}
defer conn.Close()

ch, err := conn.Channel()
if err != nil {
    log.Fatalf("channel failed: %v", err)
}
defer ch.Close()

The Dial call establishes the TCP connection and performs the AMQP handshake. The Channel call allocates a channel ID on the broker. Both return errors if the network is down or credentials are wrong. Go error handling is verbose by design. Check every error. The boilerplate makes the unhappy path visible.

// Declare the queue and publish a message.
q, err := ch.QueueDeclare("hello", false, false, false, false, nil)
if err != nil {
    log.Fatalf("queue declare failed: %v", err)
}

err = ch.Publish("", q.Name, false, false, amqp.Publishing{Body: []byte("Hello")})
if err != nil {
    log.Fatalf("publish failed: %v", err)
}

QueueDeclare creates the queue if it doesn't exist, or verifies it exists if it does. The empty exchange name in Publish sends to the default exchange. The routing key becomes the queue name. The broker routes the message to the queue. The call returns immediately.

Declare queues before you publish. The broker doesn't guess.

How the frames flow

When you call Dial, the library opens a TCP socket and sends protocol headers. The broker responds with its capabilities. The client sends credentials. The broker authenticates and returns a connection open frame. Calling Channel sends a channel.open frame. The broker assigns a channel ID and replies with channel.open-ok.

QueueDeclare sends a queue.declare frame. The broker checks permissions and queue state. It replies with the queue name, message count, and consumer count. Publish sends a basic.publish frame followed by the message body. If you use the default exchange, the routing key matches the queue name exactly. The broker enqueues the message and returns nothing unless you enable confirmations.

The library buffers frames. If the network drops, the buffer fills up. You get a connection reset by peer error. You need a reconnection loop for production code.

Exchanges and routing

The default exchange is a shortcut. Real systems use named exchanges. An exchange receives messages and routes them to queues based on bindings. A binding links an exchange to a queue with a routing key. This decouples producers from queues. Producers publish to an exchange. Consumers listen to queues. The broker handles the mapping.

There are three main exchange types. A direct exchange routes by exact key match. A fanout exchange broadcasts to all bound queues. A topic exchange uses wildcard patterns. You declare exchanges with ExchangeDeclare. You bind them with QueueBind.

// Declare a fanout exchange to broadcast messages to multiple queues.
err = ch.ExchangeDeclare(
    "logs",   // name
    "fanout", // type
    true,     // durable
    false,    // auto-deleted
    false,    // internal
    false,    // no-wait
    nil,      // arguments
)
if err != nil {
    log.Fatalf("exchange declare failed: %v", err)
}

// Bind the queue to the exchange. Fanout ignores the routing key.
err = ch.QueueBind(
    q.Name, // queue name
    "",     // routing key (ignored for fanout)
    "logs", // exchange name
    false,  // no-wait
    nil,    // arguments
)
if err != nil {
    log.Fatalf("queue bind failed: %v", err)
}

Publish to the exchange instead of the queue. The routing key in Publish must match the binding key for direct and topic exchanges. Fanout ignores the key. If no queue matches the routing key, the message is dropped unless you set the mandatory flag. If mandatory is true, the broker returns the message via a Return channel. You must listen on returns to catch routing failures.

Exchanges add flexibility. Queues hold the data. Bindings wire them together.

Controlling flow with QoS

By default, RabbitMQ sends messages as fast as possible. If your consumer is slow, the broker fills the consumer's memory with unacked messages. You need to set a prefetch count. ch.Qos(prefetchCount, 0, false) tells the broker to send at most N unacked messages to this channel. The broker holds back messages until the consumer acks them.

This prevents memory overflow and balances load across multiple consumers. Set prefetch to 1 for strict one-at-a-time processing. Set it higher for throughput. The second argument is prefetch size in bytes. Zero means unlimited bytes. The third argument is global. False applies to the channel. True applies to the connection.

// Set prefetch count to 1. The broker won't send more than one unacked message.
err = ch.Qos(1, 0, false)
if err != nil {
    log.Fatalf("qos failed: %v", err)
}

Without QoS, a single slow consumer can starve others. The broker floods the slow consumer with messages while fast consumers sit idle. QoS ensures fair distribution.

Set QoS before consuming. The broker enforces limits per channel.

Realistic consumer with acks

Real code needs to handle message processing without blocking the broker. Here's a consumer that reads messages, processes them, and acknowledges them. The consumer runs in a goroutine. The main goroutine waits for a signal to stop.

// Start consuming messages. Auto-ack is disabled so we can ack after processing.
msgs, err := ch.Consume(
    q.Name, // queue
    "",     // consumer tag
    false,  // auto-ack
    false,  // exclusive
    false,  // no-local
    false,  // no-wait
    nil,    // args
)
if err != nil {
    log.Fatalf("consume failed: %v", err)
}

// Process messages in a loop.
go func() {
    for msg := range msgs {
        if err := process(msg.Body); err != nil {
            // Nack with requeue to retry the message later.
            msg.Nack(false, true)
            continue
        }
        // Acknowledge the message to remove it from the queue.
        msg.Ack(false)
    }
}()

The Consume call returns a channel of Delivery structs. The msgs channel stays open until the consumer is cancelled. The goroutine blocks on <-msgs. If you don't cancel the consumer, the goroutine leaks. Use ch.Cancel(consumerTag) to stop consuming. Or close the channel.

The msg.Ack(false) call tells the broker the message is processed. The false means "don't ack multiple". If you set it to true, the broker acks all pending messages. Use multiple acks only if you process messages in batches. msg.Nack(false, true) rejects the message and requeues it. The second true means requeue. If false, the message is discarded or sent to a dead letter queue.

The context.Context convention applies here. Functions that take a context should respect cancellation. The AMQP library doesn't use context. Wrap the consume loop with a context select to handle shutdown.

Acknowledge messages only after processing. Unacked messages survive restarts.

Durability and data loss

If the broker restarts, non-durable queues and messages disappear. Set durable: true on QueueDeclare and ExchangeDeclare. Set DeliveryMode: amqp.Persistent on Publishing. This writes messages to disk. Durability adds latency. Use it only when you cannot afford to lose messages. Transient queues are faster and fine for logs or metrics.

// Publish a persistent message.
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
    DeliveryMode: amqp.Persistent,
    Body:         []byte("Important data"),
})
if err != nil {
    log.Fatalf("publish failed: %v", err)
}

Forgetting persistent delivery mode is a common mistake. The queue is durable, but the message is transient. On restart, the queue survives, but the message is gone. The compiler won't warn you about transient messages. You lose data on restart.

Test restarts. Restart the broker and check if messages survive. If they don't, you missed a durability flag.

Pitfalls and error handling

Channels can close independently of the connection. If the broker detects a protocol violation or a resource limit, it closes the channel. Your code gets a channel closed error. You must check ch.NotifyClose to catch these events. The notification channel fires when the channel closes.

// Monitor channel closure.
closeCh := make(chan *amqp.Error, 1)
go func() {
    err := <-ch.NotifyClose(closeCh)
    log.Printf("channel closed: %v", err)
}()

Connections drop too. Network blips cause connection reset by peer. You need a reconnection loop. Wrap Dial in a loop with exponential backoff. Reopen channels and redeclare queues after reconnecting. State is lost on reconnect.

Publishing to a non-existent queue with the default exchange fails silently unless you set the mandatory flag. If mandatory is true and no queue matches, the broker returns the message via a Return channel. If you ignore returns, messages vanish. You get no route from the broker.

Goroutine leaks happen when the goroutine waits on a channel that never gets closed. Always have a cancellation path. Use context.Context to signal shutdown. Cancel consumers and close channels on exit.

The worst goroutine bug is the one that never logs. Monitor channel closures and returns.

When to use RabbitMQ

Use RabbitMQ when you need durable message persistence and complex routing topologies. Use a simple in-memory channel when you are connecting goroutines within the same process and don't need durability. Use NATS when you need high throughput and a simpler publish-subscribe model without the overhead of exchanges and queues. Use direct HTTP calls when the downstream service is fast and you can tolerate synchronous latency.

Pick the tool that matches your durability needs. Don't over-engineer a queue for a local handoff.

Where to go next