Fan-out fan-in pattern with channels

The fan-out fan-in pattern uses goroutines and a shared queue to parallelize function compilation in the Go compiler.

Fan-out fan-in pattern with channels

You have a list of 200 image URLs to resize. Processing them one by one takes minutes. You spawn 200 goroutines to resize them in parallel. Now you have 200 goroutines screaming resized bytes back to you. How do you collect the results without losing data, blocking forever, or panicking when a goroutine tries to send after you've stopped listening?

This is the fan-out fan-in pattern. Fan-out distributes work to multiple workers. Fan-in merges the results back into a single stream. The pattern appears everywhere in Go: batch processing, HTTP handlers that aggregate microservices, data pipelines, and even inside the Go compiler itself.

The pattern relies on channels to coordinate flow and sync.WaitGroup or close to signal completion. Channels carry the data. The coordination logic ensures the program terminates cleanly when the work is done.

Concept in plain words

Imagine a restaurant kitchen. The head chef receives an order for ten burgers. The chef fans out the work: one cook grills patties, another toasts buns, a third prepares toppings. Each cook works independently. When a burger is ready, the cook places it on the serving line. The head chef fans in the results by taking finished burgers from the line and plating them.

In Go, the head chef is the main goroutine. The cooks are worker goroutines. The serving line is a channel. Fan-out happens when the main goroutine launches workers and sends them tasks. Fan-in happens when the main goroutine reads from the channel to collect results.

The tricky part is signaling when the work is finished. The head chef needs to know when all burgers are plated. If the chef stops listening too early, a cook might drop a burger trying to hand it over. If the chef waits too long, the kitchen sits idle. Go solves this with close and sync.WaitGroup.

Minimal example

This example processes a slice of strings concurrently and collects the results. It uses a buffered channel to prevent workers from blocking if the collector is slow, and a sync.WaitGroup to track when all workers finish.

package main

import (
	"fmt"
	"sync"
)

// ProcessItem simulates work and returns a result.
func ProcessItem(item string) string {
	return fmt.Sprintf("processed: %s", item)
}

// FanOutFanIn distributes items to goroutines and collects results.
func FanOutFanIn(items []string) []string {
	// Buffer size matches item count so workers never block on send.
	// This trades memory for latency: results arrive immediately.
	results := make(chan string, len(items))
	var wg sync.WaitGroup

	// Fan-out: launch a goroutine for each item.
	for _, item := range items {
		wg.Add(1)
		go func(i string) {
			defer wg.Done()
			// Capture loop variable explicitly to avoid closure bugs.
			result := ProcessItem(i)
			results <- result
		}(item)
	}

	// Fan-in: collect results in a separate goroutine.
	// This goroutine waits for all workers, then closes the channel.
	// Closing signals the range loop in the caller to stop.
	go func() {
		wg.Wait()
		close(results)
	}()

	// Collect results until the channel is closed.
	var collected []string
	for r := range results {
		collected = append(collected, r)
	}
	return collected
}

func main() {
	items := []string{"a", "b", "c"}
	for _, res := range FanOutFanIn(items) {
		fmt.Println(res)
	}
}

The code launches one goroutine per item. Each goroutine computes a result and sends it to the channel. A helper goroutine waits for all workers via wg.Wait, then closes the channel. The main goroutine ranges over the channel, which automatically stops when the channel closes.

Goroutines are cheap. Channels are not magic. The channel here is just a wire that connects producers to consumers. The close call is the signal that the wire is done carrying data.

Walk through what happens

When FanOutFanIn runs, the loop creates three goroutines. Each goroutine increments the WaitGroup counter via wg.Add(1). The goroutines execute ProcessItem and send the result to results. Because the channel is buffered with capacity three, the sends succeed immediately without blocking.

The helper goroutine calls wg.Wait. This blocks until the counter reaches zero. As each worker finishes, it calls wg.Done, decrementing the counter. Once the counter hits zero, wg.Wait returns, and the helper goroutine calls close(results).

Closing a channel does not send data. It marks the channel as closed. Any subsequent send on a closed channel panics with send on closed channel. Any receive on a closed channel returns the zero value and a boolean false. The range loop detects the close and exits cleanly.

The main goroutine collects values from the channel. The order of results is non-deterministic because goroutines schedule independently. If you need ordered results, you must attach an index to each result and sort after collection.

Context is plumbing. Run it through every long-lived call site. In a real application, ProcessItem would accept a context.Context as its first parameter. The context allows cancellation. If the caller cancels the context, workers should stop processing and return early.

Realistic example: The Go compiler

The Go compiler uses a fan-out fan-in pattern to compile functions in parallel. The implementation lives in compileFunctions and spawns workers based on the number of available cores. The compiler uses a mutex-protected slice instead of channels for the work queue.

// Simplified excerpt from the Go compiler showing fan-out with a shared queue.
// The real code uses a stack-based queue for cache locality.

var mu sync.Mutex
var wg sync.WaitGroup
var compilequeue []*ir.Func

// Fan-out: launch workers equal to available cores.
for workerId := range base.Flag.LowerC {
	wg.Add(1)
	go func(id int) {
		defer wg.Done()
		var closures []*ir.Func
		for {
			mu.Lock()
			// Push completed closures back to the queue for other workers.
			compilequeue = append(compilequeue, closures...)
			if len(compilequeue) == 0 {
				mu.Unlock()
				return
			}
			// Pop the last function from the stack.
			fn := compilequeue[len(compilequeue)-1]
			compilequeue = compilequeue[:len(compilequeue)-1]
			mu.Unlock()

			// Compile the function.
			ssagen.Compile(fn, id, profile)
			closures = fn.Closures
		}
	}(workerId)
}

// Fan-in: wait for all workers to finish.
wg.Wait()

The compiler code reveals a different trade-off. It uses a sync.Mutex to protect a shared slice. Workers lock the mutex, pop a function, unlock, compile, and repeat. When a worker finishes a function, it pushes any generated closures back onto the queue so other workers can pick them up.

Why a mutex instead of channels? The compiler processes millions of small operations. Channels involve heap allocation and runtime overhead for every send and receive. A mutex-protected slice avoids allocation and provides lower latency for high-frequency work. The compiler optimizes for raw throughput and memory efficiency.

This pattern also demonstrates work stealing. Workers push new work back to the shared queue. If one worker generates more closures than others, the queue balances the load dynamically. The wg.Wait at the end ensures the compiler waits until all functions are compiled.

The compiler optimizes for speed, not readability. You should optimize for correctness first. Use channels unless profiling proves the overhead is a bottleneck.

Pitfalls and common errors

Fan-out fan-in introduces concurrency hazards. The most common bugs involve goroutine leaks, deadlocks, and unhandled errors.

Goroutine leaks happen when a worker waits on a channel that never gets closed. If the main goroutine panics or returns early, workers may block forever. Always ensure the channel closes when work is done. The sync.WaitGroup pattern in the minimal example guarantees closure.

Deadlocks occur when goroutines wait for each other in a cycle. If you use an unbuffered channel and the main goroutine tries to collect results before launching workers, the program hangs. The compiler rejects this at runtime with fatal error: all goroutines are asleep - deadlock!.

// BAD: Unbuffered channel with no consumer ready.
ch := make(chan string)
go func() {
	ch <- "hello" // Blocks forever if no one reads.
}()
// Main does nothing. Deadlock.

Error handling requires care. Channels carry results, but what happens when a worker fails? If a worker panics, the program crashes unless you recover. If a worker returns an error, you need a way to propagate it to the collector.

The standard approach is to wrap results and errors in a struct, or use golang.org/x/sync/errgroup. errgroup provides a Wait method that returns the first error encountered and cancels the context for all workers.

// Good pattern: use errgroup for error propagation.
g, ctx := errgroup.WithContext(context.Background())
for _, item := range items {
    item := item
    g.Go(func() error {
        // ctx is passed to workers for cancellation.
        _, err := ProcessWithCtx(ctx, item)
        return err
    })
}
if err := g.Wait(); err != nil {
    return err
}

The worst goroutine bug is the one that never logs. Always add logging or tracing to workers so you can diagnose hangs. If a worker blocks, logs reveal where it stopped.

Decision: when to use this pattern

Choose the right concurrency tool based on your data flow and error requirements.

Use fan-out fan-in with channels when you have a stream of independent tasks and the consumer processes results as they arrive. Channels provide backpressure: if the consumer is slow, workers block naturally, preventing memory exhaustion.

Use errgroup when you need to wait for all tasks to complete and propagate the first error. errgroup handles context cancellation and error aggregation, reducing boilerplate. It is the standard choice for batch operations where any failure aborts the batch.

Use a mutex-protected queue when you have a shared resource and need fine-grained control over work distribution. This pattern suits high-performance scenarios where channel overhead is measurable, or when workers generate dynamic work that must be redistributed.

Use sequential code when the overhead of concurrency exceeds the benefit. If tasks are CPU-bound and you have few cores, or if tasks are fast and I/O light, spawning goroutines adds complexity without speed. The simplest thing that works is usually the right thing.

Where to go next