How to Pipeline Data with Channels in Go

Pipe data between Go goroutines safely by sending values into a channel and receiving them with a range loop.

How to Pipeline Data with Channels in Go

You have a server spitting out thousands of log lines per second. You need to filter for errors, extract the user IDs, and count how many times each user hit a bug. A single loop gets messy fast. You end up with nested if statements and temporary buffers that grow until the memory runs out. Go offers a different shape for this problem. You break the work into stages. Each stage runs in its own goroutine. Channels connect the stages like pipes. Data flows from one stage to the next without blocking the whole program.

The assembly line model

Think of a car assembly line. One worker installs the engine. They pass the car to the next worker who installs the wheels. The third worker adds the paint. Each worker focuses on one task. They don't wait for the whole car to be finished before starting their part. They just grab the car when it arrives, do their job, and push it forward. If the next worker is busy, the first worker pauses until there's space. If the first worker is slow, the second worker waits for a car.

Channels are the handoff points. They carry the data between workers. The workers are goroutines. The pipeline structure keeps each stage simple. You test each stage in isolation. You compose them by connecting the channels.

Channels are the wires. Goroutines are the workers. Keep the wires connected.

Minimal pipeline

Here's the skeleton of a pipeline. One goroutine generates numbers. A second goroutine squares them. The main function reads the results.

package main

import "fmt"

// generate sends numbers into the out channel.
func generate(out chan<- int) {
	for i := 1; i <= 5; i++ {
		out <- i // Send i to the channel. Blocks until a receiver is ready.
	}
	close(out) // Signal that no more values are coming.
}

// square reads from in, squares the value, and sends to out.
func square(in <-chan int, out chan<- int) {
	for n := range in { // Loop until the in channel closes.
		out <- n * n // Send the squared value.
	}
	close(out) // Close out when the input is exhausted.
}

func main() {
	ch1 := make(chan int) // Unbuffered channel for generate -> square.
	ch2 := make(chan int) // Unbuffered channel for square -> main.

	go generate(ch1) // Start the generator in a goroutine.
	go square(ch1, ch2) // Start the squarer in a goroutine.

	for n := range ch2 { // Read results until ch2 closes.
		fmt.Println(n)
	}
}
# output:
1
4
9
16
25

How the runtime flows

The main function creates two channels. It launches two goroutines. The generate goroutine tries to send 1 to ch1. The channel is unbuffered, so the send blocks until someone receives. The square goroutine is running. It tries to receive from ch1. The Go runtime scheduler matches the sender and receiver. The value 1 moves across.

square computes 1 * 1 and tries to send to ch2. main is waiting on range ch2. The receive happens. main prints 1. Now square can send. generate can send 2. The pipeline flows.

When generate finishes the loop, it calls close(ch1). The square loop sees ch1 is closed and exits. square calls close(ch2). main sees ch2 is closed and exits. The program ends.

The sender always closes the channel. Receivers should never close a channel. If a receiver closes, the compiler won't stop you, but another sender will panic. This rule keeps the ownership clear. The sender knows when the work is done.

Close the channel when the work is done. Receivers listen until the signal stops.

Buffered versus unbuffered

Unbuffered channels synchronize the sender and receiver. The send blocks until the receive happens. This forces the stages to stay in lockstep. Buffered channels decouple them slightly. You create a buffer with make(chan int, 10). You can send up to ten items without blocking.

Use a buffer when you want to absorb bursts of data or when the producer is much faster than the consumer. A buffer of one lets the producer send one item and return immediately. The consumer catches up later. Don't use a buffer to hide a design problem. If you need a huge buffer, you probably have a bottleneck.

Trust gofmt. It aligns your channel declarations so the direction arrows line up. Don't argue about formatting. Let the tool decide.

Realistic log processor

Here's a pipeline that processes log lines. It filters for errors and counts them by user. The code uses defer to ensure channels close even if a panic occurs.

package main

import (
	"fmt"
	"strings"
)

// LogLine represents a parsed log entry.
type LogLine struct {
	Level string
	User  string
}

// parseLog splits a raw line into a structured LogLine.
func parseLog(line string) LogLine {
	parts := strings.Split(line, " ")
	if len(parts) < 2 {
		return LogLine{}
	}
	return LogLine{Level: parts[0], User: parts[1]}
}

// filterErrors reads log lines and sends only errors to the output channel.
func filterErrors(in <-chan string, out chan<- LogLine) {
	defer close(out) // Close the channel when the function returns, signaling downstream stages.
	for line := range in {
		entry := parseLog(line)
		if entry.Level == "ERROR" {
			out <- entry // Forward only error entries.
		}
	}
}
// countUsers reads error logs and prints a summary per user.
func countUsers(in <-chan LogLine) {
	counts := make(map[string]int) // Accumulate error counts per user.
	for entry := range in {
		if entry.User != "" {
			counts[entry.User]++
		}
	}
	for user, count := range counts {
		fmt.Printf("User %s: %d errors\n", user, count)
	}
}

func main() {
	logs := make(chan string)
	errors := make(chan LogLine)

	go filterErrors(logs, errors)
	go countUsers(errors)

	logs <- "INFO user1 login" // Send simulated log lines to the pipeline.
	logs <- "ERROR user2 crash"
	logs <- "ERROR user2 timeout"
	logs <- "WARN user1 slow_query"
	logs <- "ERROR user3 panic"
	close(logs) // Close the input channel to unblock the pipeline.
}

In production code, pass a context.Context as the first argument to every stage. This lets you cancel the pipeline if the request times out. Functions that take a context should respect cancellation and deadlines.

Defer the close. Let the pipeline drain naturally.

Pitfalls and runtime errors

Channels are simple but have sharp edges. Forgetting to close a channel leaves the receiver blocked forever. The runtime detects this and panics with fatal error: all goroutines are asleep - deadlock!. Closing a channel twice causes a panic with panic: close of closed channel. Sending on a closed channel panics with panic: send on closed channel.

Goroutine leaks happen when a goroutine waits on a channel that never gets closed. Always have a cancellation path. If the pipeline stops, every stage must stop. A common leak occurs when a stage reads from a channel that the sender never closes because the sender panicked or returned early. Use defer close to guarantee the channel closes when the goroutine exits.

The worst goroutine bug is the one that never logs. Always have a cancellation path.

When to use a pipeline

Use a pipeline when data flows through a fixed sequence of transformations and each stage can process items independently. Use a worker pool when you have a batch of independent tasks that can run in parallel but share a bounded resource like a database connection. Use a fan-out/fan-in pattern when you need to split work across multiple workers and merge the results back together. Use sequential code when the stages depend on each other's results or when the overhead of goroutines outweighs the benefit of concurrency.

Pick the shape that matches the data flow. Concurrency is a tool, not a goal.

Where to go next