How to Implement the Publish-Subscribe Pattern in Go

Implement the Publish-Subscribe pattern in Go using channels and a Topic struct to decouple message producers from multiple consumers.

The broadcast problem

You build a service that tracks market prices. The feed updates every second. You need a dashboard to render the numbers, a logger to write them to disk, and a risk engine to check thresholds. You could wire them together directly, but every new component means touching the feed code. You want a broadcast line. One update goes out. Every listener gets a copy. The feed does not care who is listening.

The publish-subscribe pattern solves this by decoupling producers from consumers. The publisher drops a message into a shared distribution point. Subscribers register interest ahead of time. When the message arrives, the system fans it out. Think of a radio station. The station does not know how many cars are tuned in. It just transmits. Every car with the dial set to the right frequency gets the signal. In Go, we usually build this with channels or interfaces. Channels give you built-in backpressure and cancellation. Interfaces give you explicit contracts and easy testing. The trade-off is between flow control and structural clarity.

How pub-sub actually works in Go

Go does not ship with a built-in pub-sub library. The standard library gives you channels, goroutines, and synchronization primitives. You assemble the pattern yourself. The core idea is simple: maintain a registry of listeners, protect it with a mutex, and fan out messages when they arrive.

The naive approach stores a slice of interfaces and spawns a goroutine for each subscriber during publish. That works for a quick prototype. It also creates unbounded goroutines, blocks the publisher if a subscriber hangs, and makes cleanup nearly impossible. The robust approach uses a single broadcast channel and a forwarder goroutine per subscriber. The publisher writes once. The forwarder goroutines read from the broadcast channel and write to dedicated subscriber channels. This gives you automatic backpressure, clean lifecycle management, and predictable memory usage.

The minimal implementation

Here is the simplest working broadcast line. It uses a buffered channel for the main feed and spawns a forwarder goroutine for each subscriber.

package main

import (
	"fmt"
	"sync"
)

// Topic represents a named broadcast channel.
type Topic struct {
	name string
	// buffered to 1 so the publisher can return immediately on the first send
	ch chan string
	// protects the subscriber registry during concurrent subscribe/unsubscribe
	mu sync.Mutex
}

// NewTopic creates a broadcast line with a fixed buffer size.
func NewTopic(name string, bufSize int) *Topic {
	return &Topic{
		name: name,
		ch:   make(chan string, bufSize),
	}
}

// Subscribe returns a read-only channel for the subscriber.
func (t *Topic) Subscribe() <-chan string {
	t.mu.Lock()
	defer t.mu.Unlock()
	// create a dedicated channel for this subscriber
	subCh := make(chan string, 1)
	go func() {
		for msg := range t.ch {
			// forward the message to the subscriber's channel
			subCh <- msg
		}
		close(subCh)
	}()
	return subCh
}

// Publish sends a message to all active subscribers.
func (t *Topic) Publish(msg string) {
	t.mu.Lock()
	defer t.mu.Unlock()
	// send to the main broadcast channel
	t.ch <- msg
}

The receiver name is one letter matching the type. Go convention favors (t *Topic) over (this *Topic) or (self *Topic). Keep it short. The compiler does not care, but the community does.

Walking through the flow

When you call Subscribe, the method locks the mutex, creates a new buffered channel, and spawns a forwarder goroutine. The forwarder runs a range loop over the main broadcast channel. It reads each message and writes it to the subscriber channel. The subscriber channel has a buffer of one. That means the forwarder can drop the message and return immediately, even if the subscriber is slow. The publisher never blocks on subscriber speed.

When you call Publish, the method locks the mutex and sends the message into the main channel. The main channel is buffered. If the buffer is full, the publisher blocks until a forwarder reads a message. This is intentional. Backpressure protects your system from memory exhaustion. You tune the buffer size based on your expected message rate and subscriber latency.

When the publisher is done, you close the main channel. The range loops in the forwarder goroutines detect the close, exit cleanly, and close their subscriber channels. The subscribers read until their channels close, then stop. No goroutine leaks. No dangling references.

Goroutines are cheap. Channels are not magic.

Real-world wiring

Production code needs lifecycle management. You usually wrap the topic in a struct that accepts a context.Context as the first parameter. The context carries cancellation signals and deadlines. Functions that take a context should respect cancellation and deadlines. This is a hard convention in Go.

Here is a realistic event bus that handles startup, shutdown, and graceful cancellation.

package main

import (
	"context"
	"fmt"
	"sync"
)

// EventBus manages topics and handles graceful shutdown.
type EventBus struct {
	topics map[string]*Topic
	mu     sync.Mutex
}

// NewEventBus creates a registry for broadcast topics.
func NewEventBus() *EventBus {
	return &EventBus{
		topics: make(map[string]*Topic),
	}
}

// CreateTopic registers a new broadcast line.
func (e *EventBus) CreateTopic(name string, bufSize int) *Topic {
	e.mu.Lock()
	defer e.mu.Unlock()
	topic := NewTopic(name, bufSize)
	e.topics[name] = topic
	return topic
}

// Shutdown closes all topics and waits for forwarders to exit.
func (e *EventBus) Shutdown(ctx context.Context) error {
	e.mu.Lock()
	defer e.mu.Unlock()
	for _, t := range e.topics {
		close(t.ch)
	}
	// wait for context cancellation or timeout
	<-ctx.Done()
	return ctx.Err()
}

The Shutdown method closes every broadcast channel. The forwarder goroutines detect the close and exit. The context parameter lets you enforce a deadline. If the shutdown takes too long, the context cancels and you get a context deadline exceeded error from the runtime. You handle it the same way you handle any other error: check it, log it, and move on. The community accepts the if err != nil boilerplate because it makes the unhappy path visible. Do not hide it behind panics or silent returns.

Accept interfaces, return structs. This bus returns a concrete *Topic because the caller needs to call Publish and Subscribe. If you were building a library that consumed events, you would define an EventPublisher interface and accept it as a parameter. That keeps your code testable and decoupled from implementation details.

Where things break

Pub-sub looks simple until you hit the edge cases. The most common failure is a goroutine leak. The forwarder goroutine waits on the main channel. If you never close the main channel, the goroutine runs forever. Always have a cancellation path. Close channels when the publisher is done, or use a context to signal shutdown.

Another failure is the blocking publisher. If you use an unbuffered main channel and all subscribers are slow, Publish blocks until a subscriber reads. Your request handler hangs. Your metrics spike. Your users see timeouts. Buffer the main channel. Size it to your expected burst rate. Monitor channel length in production.

The compiler will catch some mistakes early. If you try to send on a closed channel, the runtime panics with send on closed channel. If you forget to import a package, you get undefined: pkg. If you pass the wrong type to a function, the compiler rejects it with cannot use x (untyped int constant) as string value in argument. These errors are plain text. Read them. They tell you exactly what went wrong.

Do not pass a *string to your subscribers. Strings are already cheap to pass by value. They are immutable and backed by a pointer and length. Passing a pointer adds an extra dereference without saving memory. Pass the string directly.

The worst goroutine bug is the one that never logs. Add a trace ID to your messages. Print it when subscribers receive it. You will spot deadlocks and dropped messages before they hit production.

When to reach for pub-sub

Concurrency patterns are tools. Pick the right one for the job.

Use a channel-based pub-sub when you need backpressure and automatic lifecycle management. Use an interface-based registry when you want explicit contracts and synchronous delivery. Use a dedicated message broker when you need persistence, exactly-once delivery, or cross-process communication. Use direct function calls when you have a fixed number of consumers and do not need decoupling.

Trust the standard library. Go gives you everything you need to build reliable event distribution. Keep the buffer sizes small. Close channels when you are done. Respect context cancellation. The pattern will serve you well.

Where to go next