How to Use Google Cloud Pub/Sub from Go

Web
Use the cloud.google.com/go/pubsub library to create topics and subscriptions, then publish and receive messages in Go.

The decoupling problem

You build a service that processes images. A user uploads a photo, and your API needs to resize it, generate thumbnails, and update a database. If you do all that in the HTTP handler, the user stares at a loading spinner for ten seconds. You want the API to return immediately while the heavy work happens in the background. Google Cloud Pub/Sub gives you a reliable message queue to decouple the producer from the consumer. The producer sends a message and moves on. The consumer picks up the message whenever it's ready.

Pub/Sub in plain words

Pub/Sub stands for Publish/Subscribe. Think of it like a radio station and a set of radios. The station broadcasts a signal. Any radio tuned to that frequency hears the message. In cloud terms, the topic is the frequency. The subscription is a radio. You publish a message to a topic. Every subscription attached to that topic gets a copy. If no one is listening, the message sits in the subscription's buffer until a subscriber shows up.

This is different from a direct function call. The sender doesn't know who receives the message. The receiver doesn't know who sent it. They only agree on the topic name and the message format. The library handles retries, buffering, and scaling so you don't have to write that logic yourself.

Minimal example

Here's the skeleton. You create a client, make a topic, publish a message, create a subscription, and receive the message. The code splits into two parts to keep blocks manageable.

package main

import (
	"context"
	"fmt"
	"log"

	"cloud.google.com/go/pubsub"
)

func main() {
	// Context carries cancellation signals and deadlines through the call stack.
	ctx := context.Background()

	// NewClient authenticates automatically using the default credentials chain.
	client, err := pubsub.NewClient(ctx, "my-project-id")
	if err != nil {
		log.Fatal(err)
	}
	// Close releases the underlying HTTP connections and goroutines.
	defer client.Close()

	// Topic is a handle; Create sends the RPC to actually provision the resource.
	topic := client.Topic("my-topic")
	if _, err := topic.Create(ctx, nil); err != nil {
		log.Fatal(err)
	}

	// Publish returns a result immediately; the message is buffered locally.
	result := topic.Publish(ctx, &pubsub.Message{Data: []byte("hello")})

	// Get blocks until the server assigns a message ID or returns an error.
	id, err := result.Get(ctx)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("Published:", id)
}

The receiving side runs a loop that processes messages as they arrive.

func receiveMessages(ctx context.Context, client *pubsub.Client) {
	// Subscription is a handle; Create provisions the subscription resource.
	sub := client.Subscription("my-sub")
	if _, err := sub.Create(ctx, "my-topic", nil); err != nil {
		log.Fatal(err)
	}

	// Receive blocks and processes messages concurrently in background goroutines.
	err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Println("Received:", string(msg.Data))
		// Ack tells the server the message was processed successfully.
		msg.Ack()
	})
	if err != nil {
		log.Printf("Receive error: %v", err)
	}
}

How the client works

The client uses Application Default Credentials. You don't pass a key file unless you're running in an environment without metadata, like a local machine without gcloud configured. Topic and Subscription are just handles. Calling Create sends the network request. If the resource already exists, Create returns an error. In production code, you usually create topics and subscriptions via the console or Terraform, then just use the handles in your Go code.

Publish returns immediately. It buffers the message in memory and sends it asynchronously. Get blocks until the server acknowledges the message. This is crucial. If you don't call Get, you don't know if the message failed. The library retries automatically, but you need the ID to confirm success. If Get returns an error, the message was lost.

Receive spawns goroutines to fetch messages. It runs until the context is cancelled. The callback runs concurrently for multiple messages. You must call Ack or Nack on every message. Ack deletes the message. Nack tells the server to redeliver it later. If you do nothing, the message expires and gets redelivered.

Context is the kill switch. Pass it everywhere.

Realistic usage

Real code runs in a server. You publish from an HTTP handler. You receive in a long-running goroutine. The handler needs to return quickly, but it also needs to ensure the message reached the server.

// HandleUpload publishes a job and returns immediately.
func HandleUpload(ctx context.Context, client *pubsub.Client) error {
	topic := client.Topic("image-jobs")
	msg := &pubsub.Message{
		Data: []byte(`{"url": "gs://bucket/img.jpg"}`),
		// Attributes let you add metadata without parsing the payload.
		Attributes: map[string]string{"priority": "high"},
	}
	result := topic.Publish(ctx, msg)
	// Get ensures the message reached the server before we return.
	_, err := result.Get(ctx)
	return err
}

The receiver runs as a background task. It starts when the server starts and stops when the server shuts down.

// StartReceiver runs the subscriber loop in the background.
func StartReceiver(ctx context.Context, client *pubsub.Client) {
	sub := client.Subscription("image-workers")
	err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		// Process the message.
		fmt.Println("Processing:", string(msg.Data))
		// Ack tells the server we succeeded. The message is deleted.
		msg.Ack()
	})
	if err != nil {
		log.Printf("Receiver error: %v", err)
	}
}

The convention is to pass context.Context as the first parameter, named ctx. Functions that take a context should respect cancellation and deadlines. The receiver loop checks the context implicitly. When you cancel the context, Receive stops fetching new messages and waits for in-flight callbacks to finish.

Tuning the receiver

The default settings work for many cases, but high-throughput systems need tuning. ReceiveSettings controls concurrency and deadlines.

sub.ReceiveSettings = pubsub.ReceiveSettings{
	// MaxExtension increases the ack deadline automatically if processing takes time.
	MaxExtension: 10 * time.Minute,
	// MaxOutstandingMessages limits concurrency to prevent overwhelming the worker.
	MaxOutstandingMessages: 100,
	// MaxExtensionDeadline is the cap for automatic deadline extensions.
	MaxExtensionDeadline: 15 * time.Minute,
}

MaxOutstandingMessages caps the number of messages processed concurrently. If you hit the limit, the receiver pauses fetching until some messages complete. This protects your worker from being overwhelmed. MaxExtension automatically extends the ack deadline while the callback runs. If your callback takes longer than the ack deadline, the server redelivers the message. MaxExtension prevents that by bumping the deadline in the background.

Flow control protects your worker. Tune the limits.

Ordering and dead letters

Pub/Sub delivers messages in order only if you use ordering keys. By default, messages are delivered in an arbitrary order. If you need order, set an OrderingKey on the message. All messages with the same key are delivered sequentially to a single subscriber. This reduces throughput. Use ordering keys only when sequence matters, like applying database updates.

msg := &pubsub.Message{
	Data:        []byte("update"),
	OrderingKey: "user-123",
}

Messages that fail processing end up in a dead letter topic. You configure this via the subscription settings. The library retries messages up to a maximum number of deliveries. After that, the message moves to the dead letter topic. You can inspect dead letter topics to debug failures.

Dead letter topics catch the poison. Inspect them regularly.

Pitfalls and errors

The compiler rejects topic.Publish(msg) with not enough arguments in call to topic.Publish if you forget the context. Context is mandatory. Every RPC needs one.

Runtime errors happen when the network fails or permissions are wrong. The compiler complains with context deadline exceeded if Get waits too long. Set a timeout on the context. The compiler rejects permission denied if the service account lacks IAM roles. Check the Cloud Console. The compiler rejects topic not found if you reference a missing resource.

The real danger is the goroutine leak. Receive spawns goroutines to fetch messages. If you don't cancel the context, those goroutines stay alive. When your program exits, the runtime waits for them. Your process hangs. Always pass a cancellable context to Receive and call the cancel function when you're done.

Another trap is ignoring Get. Publish buffers locally. If the network drops, the buffer fills up. Get blocks until the buffer drains or fails. If you skip Get, you think the message sent, but it's stuck in memory. The worst bug is the silent drop. Check Get errors.

When to use Pub/Sub

Use Google Cloud Pub/Sub when you need to decouple services that run independently. Use Pub/Sub when you have a fan-out pattern where one event triggers multiple downstream workers. Use a direct HTTP call when the caller needs an immediate response and the latency is acceptable. Use a database queue when you need strong transactional guarantees with your data and don't want to manage external infrastructure. Use a simple Go channel when the producer and consumer live in the same process.

Where to go next