How to Implement the Pipeline Pattern in Go

Implement the pipeline pattern in Go by chaining goroutines that pass data through channels to process streams concurrently.

The assembly line for data

You have a stream of log lines pouring in from a server. You need to filter out the noise, extract the error codes, and count how many of each appear. A single function that reads, filters, extracts, and counts works, but it holds up the main thread and mixes concerns. You want to process data as it arrives, with each step running independently. The pipeline pattern lets you chain stages together so data flows through them like water through a series of filters.

How pipelines work

Think of a factory assembly line. One worker welds the frame and passes it down. The next worker installs the engine. The third paints it. Each worker has a small buffer of parts. If the welder is fast but the painter is slow, the welder fills the buffer and waits. If the painter catches up, the buffer drains.

In Go, goroutines are the workers. Channels are the conveyor belts. Data flows from one goroutine to the next without shared memory. Each stage reads from an input channel, does its work, and writes to an output channel. The runtime handles scheduling. Channels synchronize the flow. When a stage closes its output channel, the signal propagates downstream until the final consumer exits.

Pipelines give you backpressure for free. If a downstream stage slows down, upstream stages block on their sends. The system naturally throttles itself instead of consuming memory.

Minimal example

Here's the skeleton of a pipeline stage. It takes an input channel, spawns a goroutine to process values, and returns an output channel.

package main

import "fmt"

// double reads integers from in, multiplies by two, and sends to out.
func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        // defer ensures out closes even if the loop panics or returns early
        defer close(out)
        for n := range in {
            out <- n * 2
        }
    }()
    return out
}

func main() {
    in := make(chan int)
    out := double(in)

    // producer runs concurrently to avoid deadlock
    go func() {
        in <- 1
        in <- 2
        close(in) // signal no more data
    }()

    for n := range out {
        fmt.Println(n)
    }
}

Close the channel when the sender is done. Never close a channel you're receiving from.

What happens at runtime

The program starts in main. It creates in and calls double. The double function creates out, spawns a goroutine, and returns out immediately. The goroutine blocks on range in because nothing is sent yet.

main spawns another goroutine to send 1 and 2. That goroutine sends 1. The double goroutine wakes up, multiplies to 2, and sends to out. main receives 2 and prints it. This continues until close(in) runs. The range loop in double exits. defer close(out) runs. main's range out exits. The program ends.

Channels synchronize without locks. The close call propagates the end-of-stream signal. If you forget to close in, the double goroutine waits forever. The runtime panics with fatal error: all goroutines are asleep - deadlock! because main blocks on out, but double blocks on in. Always close the source channel.

Closing a channel is the sender's responsibility. The receiver should never close a channel. If you close a channel you're receiving from, you risk a panic if another sender tries to write. Always close the output channel in the goroutine that writes to it.

Realistic example

Real pipelines handle errors and multiple stages. Here's a three-stage pipeline that fetches data, parses it, and aggregates results.

// fetcher downloads URLs and sends response bodies to out.
func fetcher(urls <-chan string) <-chan io.ReadCloser {
    out := make(chan io.ReadCloser)
    go func() {
        defer close(out)
        for url := range urls {
            resp, err := http.Get(url)
            if err != nil {
                continue // skip errors to keep pipeline flowing
            }
            out <- resp.Body
        }
    }()
    return out
}
// counter measures body sizes and sends counts to out.
func counter(bodies <-chan io.ReadCloser) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for body := range bodies {
            n, _ := io.Copy(io.Discard, body)
            body.Close() // release the connection
            out <- int(n)
        }
    }()
    return out
}

func main() {
    urls := make(chan string)
    bodies := fetcher(urls)
    sizes := counter(bodies)

    go func() {
        urls <- "https://example.com"
        close(urls)
    }()

    for size := range sizes {
        fmt.Println(size)
    }
}

Close every ReadCloser. Leaks are silent until they aren't.

HTTP responses hold network connections. Failing to close the body leaks connections and eventually crashes the server. The underscore discards the error from io.Copy here for brevity. In production code, you'd handle the error, perhaps by sending it through a separate error channel or wrapping it in the pipeline result.

Buffering and backpressure

Unbuffered channels synchronize every send. The sender blocks until the receiver reads. This provides natural backpressure. If the consumer is slow, the producer slows down.

Buffered channels decouple the stages slightly. A buffer of size N allows the producer to send N items before blocking. This can improve throughput if the stages have variable latency. But buffers hide problems. If you buffer too much, you consume memory and mask bottlenecks.

// bufferedDouble allows up to 10 items to queue before blocking.
func bufferedDouble(in <-chan int) <-chan int {
    out := make(chan int, 10)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * 2
        }
    }()
    return out
}

Start with unbuffered channels. Add buffering only when profiling shows a bottleneck.

Errors and context

Pipelines often need to carry errors. You can't just return an error from a goroutine. You have to send it through a channel. A common pattern is to send a special error value or use a struct that holds both data and error. Or use a separate error channel.

In production, pass a context.Context as the first argument to every pipeline stage. Check ctx.Done() in the loop to stop processing when the request is cancelled. This prevents goroutine leaks when the client disconnects.

// ctxDouble respects cancellation via context.
func ctxDouble(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                return // stop processing on cancellation
            case n, ok := <-in:
                if !ok {
                    return
                }
                out <- n * 2
            }
        }
    }()
    return out
}

Context is plumbing. Run it through every long-lived call site.

The select statement lets the goroutine react to both the input channel and the context. When the context cancels, the goroutine returns, defer close(out) runs, and the pipeline shuts down cleanly. Without context, a cancelled request leaves goroutines running until the input channel closes, which might never happen.

Pitfalls and errors

Pipelines are fragile if you break the flow. Forgetting to close the input channel leaves the first stage waiting forever. The runtime panics with fatal error: all goroutines are asleep - deadlock! because the main goroutine blocks on the output channel, but the pipeline goroutine blocks on the input channel.

Sending on a closed channel triggers panic: send on closed channel. This happens if you accidentally close a channel twice or close a channel while another goroutine is still writing. Use defer close once per channel in the goroutine that creates it.

Goroutine leaks happen when a goroutine waits on a channel that never gets closed. Always have a cancellation path. The worst goroutine bug is the one that never logs. Add timeouts or context cancellation to long-running stages.

If a stage panics, the panic doesn't propagate through the channel. The goroutine dies, the channel might stay open, and downstream stages hang. Wrap stage logic in recover if you need to handle panics gracefully, or use errgroup to coordinate errors across goroutines.

When to use pipelines

Use a pipeline when data flows through a fixed sequence of stages and each stage transforms the stream. Use a worker pool when tasks are independent and you need to bound concurrency to protect downstream resources. Use a fan-out pattern when you can split work across multiple identical workers and merge results later. Use sequential code when the overhead of channels and goroutines outweighs the benefit of concurrency.

Pipelines flow. Worker pools scale. Pick the shape that matches your data.

Where to go next