How to Use Client Streaming RPC in Go

Web
Client streaming RPCs allow a client to send a stream of messages to the server for a single request, while the server waits to receive all messages before processing and returning a single response.

Client streaming RPCs allow a client to send a stream of messages to the server for a single request, while the server waits to receive all messages before processing and returning a single response. You implement this by defining a streaming input in your .proto file, generating the Go code, and using the Send() method in a loop on the client side before calling CloseAndRecv().

First, define your service in the .proto file using the stream keyword for the request message:

service DataProcessor {
  rpc ProcessStream (stream DataItem) returns (ProcessResult);
}

message DataItem {
  string value = 1;
}

message ProcessResult {
  int32 total_count = 1;
  string summary = 2;
}

After running protoc and protoc-gen-go-grpc to generate the Go code, the client implementation involves creating a stream client, sending multiple items, and then closing the stream to trigger the server's response.

Here is a practical client implementation:

package main

import (
    "context"
    "log"
    "google.golang.org/grpc"
    pb "your/module/path" // Replace with your actual import path
)

func main() {
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()

    client := pb.NewDataProcessorClient(conn)
    
    // Create the streaming RPC
    stream, err := client.ProcessStream(context.Background())
    if err != nil {
        log.Fatalf("failed to create stream: %v", err)
    }

    // Send multiple messages
    items := []string{"data-1", "data-2", "data-3"}
    for _, item := range items {
        if err := stream.Send(&pb.DataItem{Value: item}); err != nil {
            log.Fatalf("failed to send: %v", err)
        }
    }

    // Close the stream and receive the single response
    resp, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("failed to receive response: %v", err)
    }

    log.Printf("Server response: Count=%d, Summary=%s", resp.TotalCount, resp.Summary)
}

On the server side, you implement the ProcessStream method by accepting a DataProcessor_ProcessStreamServer interface. You must loop over stream.Recv() to read incoming messages until it returns io.EOF, process them, and finally return the result.

func (s *server) ProcessStream(stream pb.DataProcessor_ProcessStreamServer) error {
    count := 0
    for {
        item, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        count++
        // Process item.Value here
    }

    return stream.SendAndClose(&pb.ProcessResult{
        TotalCount: int32(count),
        Summary:    "Processing complete",
    })
}

Key points to remember: always call CloseAndRecv() on the client to signal the end of the stream and get the response. On the server, handle io.EOF from Recv() to detect when the client has finished sending. Do not attempt to send a response before the client closes the stream, as the server typically waits for the full batch to arrive.