Go channels do not have built-in 'Generator', 'Fan-Out', 'Fan-In', or 'Pipeline' keywords; these are concurrency patterns implemented using goroutines and channels. You create a generator by launching a goroutine that sends data, fan-out by having multiple goroutines read from a single channel, fan-in by having multiple goroutines send to a single channel, and a pipeline by chaining these stages together.
// Generator: produces data
func gen(nums ...int) <-chan int {
c := make(chan int)
go func() {
for _, n := range nums {
c <- n
}
close(c)
}()
return c
}
// Fan-Out: multiple consumers read from one source
func worker(id int, c <-chan int) {
for n := range c {
fmt.Printf("worker %d got %d\n", id, n)
}
}
// Fan-In: multiple producers send to one consumer
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
wg.Add(len(cs))
for _, c := range cs {
go func(in <-chan int) {
for n := range in {
out <- n
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// Pipeline: chaining stages
func main() {
c1 := gen(1, 2, 3)
c2 := gen(4, 5, 6)
merged := merge(c1, c2)
go worker(1, merged)
go worker(2, merged)
go worker(3, merged)
select {}
}