How to Use pgx for Advanced PostgreSQL Features in Go (COPY, LISTEN/NOTIFY)

Use `pgx`'s `CopyFrom` method for high-performance bulk inserts and its `Conn` interface to handle `LISTEN` and `NOTIFY` for real-time event streaming.

The bottleneck of row-by-row inserts

You are building a data pipeline. You have ten thousand rows to load into a PostgreSQL table. You write a loop, call db.Exec with an INSERT statement for each row, and watch the progress bar crawl. The database spends more time parsing SQL, acquiring row-level locks, and writing transaction logs than actually storing your data. You need a faster way to move bulk data without rewriting your application in C or switching to a different database.

How the COPY protocol actually works

PostgreSQL solves this with the COPY protocol. Instead of sending individual SQL statements, you open a direct pipe to the table and stream raw values. The database skips query planning, skips per-row transaction overhead, and writes data in large sequential blocks. It is the same mechanism behind pg_dump and pg_restore. pgx exposes this pipe through CopyFrom, which handles the binary formatting and chunking so you do not have to write a custom wire protocol parser.

Think of standard INSERT like mailing individual letters. Each one needs an envelope, a stamp, and a trip to the post office. COPY is like loading a moving truck. You pack everything at once, drive it to the destination, and unload the entire contents in one go. The database receives a continuous stream of binary-encoded values, validates them against the table schema, and writes them directly to the tablespace.

Streaming data with CopyFrom

Here is the simplest way to stream rows into a table using pgx. The code demonstrates how to convert a Go slice into a database-ready iterator and execute the bulk load.

package main

import (
    "context"
    "fmt"
    "log"
    "github.com/jackc/pgx/v5"
)

func main() {
    // Context carries cancellation signals for the entire operation
    ctx := context.Background()
    conn, err := pgx.Connect(ctx, "postgres://localhost:5432/mydb")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close(ctx)

    // Define the data structure matching the target table columns
    type User struct {
        ID   int32
        Name string
        Age  int32
    }

    rows := []User{
        {1, "Alice", 30},
        {2, "Bob", 25},
        {3, "Charlie", 35},
    }

    // Convert the slice into a pgx-compatible row iterator
    // pgx.CopyFromRows reads each struct and serializes it to the PostgreSQL binary format
    rowSrc := pgx.CopyFromRows(rows)

    // Execute the bulk load in a single network round trip
    // The identifier wraps the table name to prevent SQL injection
    count, err := conn.CopyFrom(
        ctx,
        pgx.Identifier{"users"},
        []string{"id", "name", "age"},
        rowSrc,
    )
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Loaded %d rows\n", count)
}

What happens under the hood

The code above does three things under the hood. First, pgx.CopyFromRows wraps your slice in an iterator that implements pgx.CopyFromSource. When CopyFrom runs, it asks the iterator for the next row, converts each field to PostgreSQL's binary wire format, and sends it over the connection. Second, conn.CopyFrom opens a COPY FROM STDIN session with the database. The server suspends normal query processing for that connection and waits for raw data. Third, once the iterator returns an error or reaches the end, pgx sends a termination signal. The database commits the batch and returns the row count.

The entire operation happens inside a single transaction. If the server rejects a row because of a type mismatch or a constraint violation, the whole batch rolls back. You get an error like ERROR: invalid input syntax for type integer: "abc" (SQLSTATE 22P02) if your data does not match the column definitions. The compiler will also catch structural mismatches early if you try to pass a slice of the wrong type to CopyFromRows, rejecting it with a type assertion error at compile time.

Go conventions favor explicit error handling here. The if err != nil { log.Fatal(err) } pattern looks repetitive, but it forces you to acknowledge failure points. In production code, you would return the error up the call stack instead of calling log.Fatal. Context always goes as the first parameter, conventionally named ctx. Functions that take a context should respect cancellation and deadlines. This keeps your shutdown logic predictable.

COPY moves data fast. Keep the iterator lightweight and let the database handle the writes.

Real-time events with LISTEN and NOTIFY

Bulk loading solves the write problem. Real-time event streaming solves the read problem. PostgreSQL includes a built-in pub/sub system using LISTEN and NOTIFY. A session subscribes to a named channel, and any other session can broadcast a message to that channel. The listener blocks until a message arrives or the context cancels.

This pattern shines when you need to react to database events without polling. A background worker can wait for new orders, a cache invalidation service can watch for updates, or a notification system can push alerts to connected clients.

Here is how you wire it up in a long-running service. The example shows proper goroutine isolation, context propagation, and graceful shutdown handling.

package main

import (
    "context"
    "fmt"
    "log"
    "time"
    "github.com/jackc/pgx/v5"
)

// startListener subscribes to a PostgreSQL channel and processes incoming payloads
func startListener(ctx context.Context, conn *pgx.Conn, channel string) {
    // Register the connection as a listener on the specified channel
    _, err := conn.Exec(ctx, "LISTEN "+channel)
    if err != nil {
        log.Printf("failed to listen on %s: %v", channel, err)
        return
    }

    // Run the blocking receive loop in the background
    go func() {
        for {
            // Blocks until a notification arrives or ctx is cancelled
            // The 0 timeout means wait indefinitely until a message arrives
            notif, err := conn.ReceiveNotification(ctx, 0)
            if err != nil {
                // Context cancellation is expected during graceful shutdown
                if ctx.Err() != nil {
                    log.Println("listener stopped: context cancelled")
                    return
                }
                log.Printf("notification error: %v", err)
                return
            }

            // Process the payload according to your application logic
            fmt.Printf("channel=%s pid=%d payload=%s\n", notif.Channel, notif.PID, notif.Payload)
        }
    }()
}

func main() {
    ctx := context.Background()
    conn, err := pgx.Connect(ctx, "postgres://localhost:5432/mydb")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close(ctx)

    startListener(ctx, conn, "inventory_updates")

    // Simulate a notification from another session or trigger
    _, _ = conn.Exec(ctx, "NOTIFY inventory_updates, 'item_sold_42'")

    // Keep the main goroutine alive to receive the message
    time.Sleep(2 * time.Second)
}

The connection trap and production realities

The LISTEN command tells PostgreSQL to route messages sent to inventory_updates to this specific connection. The ReceiveNotification call blocks the goroutine until the database delivers a message or the context deadline expires. The 0 parameter sets the timeout to infinite, meaning it waits indefinitely. When another session runs NOTIFY inventory_updates, 'payload', PostgreSQL wakes the blocked goroutine and returns a pgx.Notification struct containing the channel name, the backend process ID, and the payload string.

You must run ReceiveNotification in a separate goroutine. If you call it on the main thread, your application freezes until a message arrives. The goroutine runs independently, processing events while the rest of your program handles HTTP requests, background jobs, or CLI commands.

Context cancellation is the standard way to shut down the listener. When you cancel the parent context, ReceiveNotification returns immediately with context.Canceled. The goroutine checks ctx.Err(), logs a clean exit, and returns. This prevents goroutine leaks when your service restarts or scales down. The worst goroutine bug is the one that never logs.

The biggest trap with LISTEN is connection pooling. pgxpool manages a pool of connections and routes queries to whichever one is available. LISTEN binds to a specific backend process. If the pool rotates the connection, your listener disappears. You must use a dedicated *pgx.Conn for any channel you subscribe to. Keep that connection separate from your query pool.

Another common mistake is ignoring payload size limits. PostgreSQL restricts NOTIFY payloads to 8000 bytes. If you try to send a large JSON blob, the server rejects it with ERROR: notify payload is too long (SQLSTATE 22023). Serialize large data to a table or a message queue instead. Use NOTIFY only for lightweight signals like order_created or cache_invalidate.

Go developers often reach for *string or interface{} when passing data around, but pgx works best with concrete types. The pgtype package handles conversions, but you get fewer surprises when you define explicit structs. The compiler will catch mismatches like cannot use []User (type []User) as type []interface{} if you try to force a type conversion that CopyFromRows does not accept. Public names start with a capital letter. Private start lowercase. No keywords like public or private. Follow the naming rules and the code reads naturally.

Error handling in database code follows a strict pattern. Check the error immediately after the call. Return it up the stack. Do not swallow it with _ unless you have a documented reason. The if err != nil boilerplate is verbose by design. It makes failure paths visible in the control flow. Trust the pattern. It scales.

LISTEN requires a dedicated connection. Isolate it, cancel it cleanly, and keep payloads small.

When to reach for which tool

Use CopyFrom when you need to load thousands of rows and want to avoid per-row transaction overhead. Use standard INSERT statements when you are writing single records or small batches under fifty rows. Use LISTEN and NOTIFY when you need lightweight, database-native event signaling between sessions. Use a message broker like Redis Streams or Kafka when you need durable message queues, fan-out delivery, or payloads larger than eight kilobytes. Use a polling loop with SELECT ... FOR UPDATE SKIP LOCKED when you need guaranteed processing order and exactly-once delivery semantics. Use pgxpool for general query routing, but isolate LISTEN channels on dedicated connections.

COPY moves data fast. LISTEN moves signals fast. Keep them separate, respect the connection boundaries, and let PostgreSQL do the heavy lifting.

Where to go next