How to Use pgx for Advanced PostgreSQL Features in Go (COPY, LISTEN/NOTIFY)

Use `pgx`'s `CopyFrom` method for high-performance bulk inserts and its `Conn` interface to handle `LISTEN` and `NOTIFY` for real-time event streaming.

Use pgx's CopyFrom method for high-performance bulk inserts and its Conn interface to handle LISTEN and NOTIFY for real-time event streaming. These features bypass standard query overhead and provide native support for PostgreSQL's asynchronous messaging capabilities.

For bulk data ingestion, CopyFrom is significantly faster than individual INSERT statements because it streams data directly to the database server in a single transaction. You define the column names and pass a slice of data; pgx handles the serialization.

import (
    "context"
    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgtype"
)

// Assuming 'rows' is a slice of structs or []interface{} matching your table schema
rows := []struct {
    ID   int32
    Name string
    Age  int32
}{
    {1, "Alice", 30},
    {2, "Bob", 25},
}

ctx := context.Background()
conn, err := pgx.Connect(ctx, "postgres://user:pass@localhost:5432/dbname")
if err != nil {
    panic(err)
}
defer conn.Close(ctx)

// Execute COPY FROM STDIN
count, err := conn.CopyFrom(
    ctx,
    pgx.Identifier{"users"}, // Table name
    []string{"id", "name", "age"}, // Column names
    pgx.CopyFromRows(rows),
)
if err != nil {
    panic(err)
}
fmt.Printf("Inserted %d rows\n", count)

For real-time updates, LISTEN and NOTIFY allow your Go application to react to database events immediately. You establish a listener on a specific channel, then use Conn.ReceiveNotification to block and wait for messages sent from other database sessions or triggers.

// Start listening on a channel named "inventory_updates"
_, err = conn.Exec(ctx, "LISTEN inventory_updates")
if err != nil {
    panic(err)
}

// Run this in a goroutine to avoid blocking the main thread
go func() {
    for {
        // Blocks until a notification arrives or context is cancelled
        notif, err := conn.ReceiveNotification(ctx)
        if err != nil {
            // Handle context cancellation or connection errors
            if ctx.Err() != nil {
                return
            }
            panic(err)
        }

        fmt.Printf("Received notification: %s (Payload: %s)\n", notif.Channel, notif.Payload)
        // Process the payload here
    }
}()

// Example of sending a notification from another session (or same session)
_, err = conn.Exec(ctx, "NOTIFY inventory_updates, 'item_sold'")

Remember that ReceiveNotification blocks, so always run it in a separate goroutine if your application needs to perform other tasks concurrently. For production systems, consider using pgxpool instead of a single Conn for better connection management, but note that LISTEN requires a dedicated connection since it holds the connection open while waiting for events.