Use pgx's CopyFrom method for high-performance bulk inserts and its Conn interface to handle LISTEN and NOTIFY for real-time event streaming. These features bypass standard query overhead and provide native support for PostgreSQL's asynchronous messaging capabilities.
For bulk data ingestion, CopyFrom is significantly faster than individual INSERT statements because it streams data directly to the database server in a single transaction. You define the column names and pass a slice of data; pgx handles the serialization.
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
)
// Assuming 'rows' is a slice of structs or []interface{} matching your table schema
rows := []struct {
ID int32
Name string
Age int32
}{
{1, "Alice", 30},
{2, "Bob", 25},
}
ctx := context.Background()
conn, err := pgx.Connect(ctx, "postgres://user:pass@localhost:5432/dbname")
if err != nil {
panic(err)
}
defer conn.Close(ctx)
// Execute COPY FROM STDIN
count, err := conn.CopyFrom(
ctx,
pgx.Identifier{"users"}, // Table name
[]string{"id", "name", "age"}, // Column names
pgx.CopyFromRows(rows),
)
if err != nil {
panic(err)
}
fmt.Printf("Inserted %d rows\n", count)
For real-time updates, LISTEN and NOTIFY allow your Go application to react to database events immediately. You establish a listener on a specific channel, then use Conn.ReceiveNotification to block and wait for messages sent from other database sessions or triggers.
// Start listening on a channel named "inventory_updates"
_, err = conn.Exec(ctx, "LISTEN inventory_updates")
if err != nil {
panic(err)
}
// Run this in a goroutine to avoid blocking the main thread
go func() {
for {
// Blocks until a notification arrives or context is cancelled
notif, err := conn.ReceiveNotification(ctx)
if err != nil {
// Handle context cancellation or connection errors
if ctx.Err() != nil {
return
}
panic(err)
}
fmt.Printf("Received notification: %s (Payload: %s)\n", notif.Channel, notif.Payload)
// Process the payload here
}
}()
// Example of sending a notification from another session (or same session)
_, err = conn.Exec(ctx, "NOTIFY inventory_updates, 'item_sold'")
Remember that ReceiveNotification blocks, so always run it in a separate goroutine if your application needs to perform other tasks concurrently. For production systems, consider using pgxpool instead of a single Conn for better connection management, but note that LISTEN requires a dedicated connection since it holds the connection open while waiting for events.