How to Use Reactive Streams Patterns in Go

Go implements reactive streams patterns using channels and goroutines to manage asynchronous data flow and backpressure.

When the producer outpaces the consumer

You are building a service that ingests events from a high-traffic API. The events arrive in bursts of thousands per second. Your processing logic writes each event to a database, which takes a few milliseconds per write. If you process events sequentially, the network buffer fills up, the client requests start timing out, and your latency spikes. You need a way to let the fast producer keep flowing while the slow consumer catches up, without allocating unbounded memory or dropping data.

Go does not have a built-in Reactive Streams library. You do not need one. Channels and goroutines implement reactive patterns directly. A buffered channel provides the buffer. A goroutine provides the concurrency. The channel capacity provides the backpressure. When the buffer fills, the producer blocks. That is backpressure. The producer stops until the consumer reads. No complex protocol required.

Channels are reactive streams

Reactive streams describe asynchronous data flow with non-blocking backpressure. In Go, a channel is a typed pipe that connects goroutines. You send values into one end and receive them from the other. The channel can be unbuffered or buffered.

An unbuffered channel requires both sender and receiver to be ready at the same time. The send blocks until a receive happens. This is strict synchronization. A buffered channel holds a fixed number of values. The send blocks only when the buffer is full. This decouples the sender and receiver. The sender can push values into the buffer and return immediately, as long as there is space.

Think of a conveyor belt in a factory. The belt has a fixed length. Workers place items on one end. Workers remove items from the other. If the belt is full, the upstream worker cannot place more items until space opens up. The belt size is your buffer. The workers are goroutines. The belt enforces the flow rate.

Minimal stream with backpressure

Here is the simplest stream: a producer, a processor, and a consumer, linked by channels. The producer sends integers. The processor doubles them. The consumer prints them. The channel buffer limits how much data can pile up.

package main

import "fmt"

// main sets up a pipeline with backpressure.
func main() {
	// buffered channel acts as the stream buffer.
	// size 5 limits how much data can accumulate.
	stream := make(chan int, 5)

	// producer runs in a goroutine to avoid blocking main.
	go func() {
		for i := 0; i < 20; i++ {
			stream <- i
		}
		// signal completion so the consumer can exit.
		close(stream)
	}()

	// consumer reads until the channel closes.
	for v := range stream {
		fmt.Println(v * 2)
	}
}

The producer sends 20 values. The buffer holds 5. The producer sends the first 5 values and then blocks. The consumer reads one value, prints it, and frees a slot in the buffer. The producer unblocks and sends another value. This continues until all 20 values are processed. When the producer finishes, it closes the channel. The consumer's range loop detects the close and exits. No goroutine leaks.

Channels close, goroutines leak. Always close the channel from the producer side.

How the buffer controls pressure

The buffer size determines the trade-off between latency and throughput. A small buffer means the producer blocks frequently. This keeps memory usage low and ensures the consumer is always close to the producer. A large buffer allows the producer to run ahead. This smooths out bursts but increases memory usage and latency. The consumer sees old data.

Tune the buffer size based on your workload. If the consumer is significantly slower than the producer, a larger buffer prevents the producer from stalling. If the consumer is fast, a small buffer is sufficient. If you set the buffer too large, you risk an out-of-memory error when the consumer stalls. The buffer fills, the producer keeps sending, and memory grows until the system kills the process.

The compiler does not check buffer sizes. You get panic: all goroutines are asleep - deadlock! if you forget to close a channel or if a goroutine blocks forever waiting on a channel that never receives. The runtime detects when no goroutine can make progress and panics. This usually means a missing close or a logic error in the pipeline.

Buffer size is a tuning knob, not a magic fix. Measure your burst patterns and set the capacity accordingly.

Realistic pipeline with context

Real streams need error handling and cancellation. Here is a pipeline that processes data, handles errors, and respects context cancellation. The context allows the stream to stop if a timeout expires or a client disconnects.

package main

import (
	"context"
	"fmt"
)

// processStream reads from input, transforms values, and sends to output.
// It stops if ctx is cancelled or an error occurs.
func processStream(ctx context.Context, input <-chan int, output chan<- int) {
	for {
		select {
		case <-ctx.Done():
			// context cancelled, stop processing immediately.
			close(output)
			return
		case v, ok := <-input:
			if !ok {
				// input channel closed, drain complete.
				close(output)
				return
			}
			// skip multiples of 5 for this example.
			if v%5 == 0 {
				continue
			}
			output <- v * 2
		}
	}
}

// main demonstrates a cancellable stream.
func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	input := make(chan int, 10)
	output := make(chan int, 10)

	go processStream(ctx, input, output)

	// simulate producer.
	go func() {
		for i := 1; i <= 10; i++ {
			input <- i
		}
		close(input)
	}()

	// consume output.
	for v := range output {
		fmt.Println(v)
	}
}

The processStream function uses a select statement to listen on both the input channel and the context. If the context is cancelled, it closes the output channel and returns. This propagates the cancellation downstream. The consumer's range loop exits when the output closes.

The convention is to pass context.Context as the first parameter, named ctx. Functions that accept a context should respect cancellation and deadlines. This allows callers to control the lifetime of long-running operations.

Context is plumbing. Run it through every long-lived call site.

Multi-stage transformations

Complex streams often chain multiple transformations. Here is a two-stage pipeline where one goroutine feeds another. Each stage runs in its own goroutine. The channels connect the stages.

package main

import "fmt"

// generate produces integers and sends them to out.
func generate(out chan<- int) {
	for i := 1; i <= 5; i++ {
		out <- i
	}
	close(out)
}

// square reads from in, squares values, and sends to out.
func square(in <-chan int, out chan<- int) {
	for v := range in {
		out <- v * v
	}
	close(out)
}

// main chains generate and square, then prints results.
func main() {
	nums := make(chan int)
	squares := make(chan int)

	go generate(nums)
	go square(nums, squares)

	for s := range squares {
		fmt.Println(s)
	}
}

The generate function sends values and closes the channel. The square function reads until the input closes, then closes its output. The main function ranges over the final output. The pipeline drains completely.

Each stage is responsible for closing its output channel. The producer closes. The intermediate stages close when their input closes. This ensures the downstream consumers can detect completion.

Chain the closes. Every stage must signal when it is done.

Pitfalls and runtime panics

Streams introduce concurrency bugs that are hard to debug. The most common issue is sending on a closed channel. If a goroutine tries to send to a channel that is already closed, the runtime panics with panic: send on closed channel. This happens when you close a channel too early or when multiple goroutines try to close the same channel.

Another issue is forgetting to close a channel. If the producer does not close the channel, the consumer's range loop blocks forever. The goroutine leaks. The program hangs. The runtime may eventually panic with a deadlock error if no other goroutine can proceed.

Buffer sizing is also a pitfall. A buffer that is too small causes the producer to block frequently, reducing throughput. A buffer that is too large consumes memory and increases latency. If the consumer stalls, a large buffer fills up and the producer keeps sending until memory runs out.

Error handling in streams requires care. You cannot return an error from a goroutine directly. You must send the error on a channel or use context cancellation. If you use an error channel, you need a way to propagate the error through the pipeline. This adds complexity. Context cancellation is often simpler. Cancel the context when an error occurs, and have all stages check the context.

The worst goroutine bug is the one that never logs. Always add logging or metrics to detect stalls and leaks.

Decision: when to use streams vs alternatives

Use a buffered channel when you need to decouple producer and consumer rates and allow a small amount of data to accumulate. Use an unbuffered channel when you want strict synchronization: the producer must wait for the consumer to be ready for every single value. Use a worker pool with a job channel when you have bounded concurrency requirements and need to limit the number of simultaneous operations. Use context.Context with your stream when the operation can be cancelled by a timeout, a client disconnect, or a parent request. Use a select statement when a goroutine needs to react to multiple channels or a cancellation signal without blocking on just one. Use plain sequential code when you don't need concurrency: the simplest thing that works is usually the right thing.

Where to go next