How to Use River for Background Jobs with PostgreSQL in Go

River is a robust job queue library for Go that uses PostgreSQL as its storage backend, allowing you to define jobs as Go structs and enqueue them with automatic retry logic, scheduling, and error handling.

River is a robust job queue library for Go that uses PostgreSQL as its storage backend, allowing you to define jobs as Go structs and enqueue them with automatic retry logic, scheduling, and error handling. You initialize a client with your database connection, register your job types, and then use the client to insert jobs into the river_job table where a separate worker process picks them up for execution.

First, install the library and ensure your PostgreSQL schema is set up using River's migration tools. You can run the migration directly via the CLI or programmatically.

go get github.com/riverqueue/river
# Run migrations to create the river_job table and necessary indexes
river migrate up --database-url "postgres://user:pass@host:5432/dbname"

Next, define your job struct by embedding river.Job and adding your specific payload fields. Register this type with the River client so it knows how to decode and execute the job.

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"

    "github.com/riverqueue/river"
    "github.com/riverqueue/river/rivershared/baseservice"
    "github.com/riverqueue/river/rivertype"
)

// Define your job type
type SendEmailJob struct {
    river.Job
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}

// Implement the Worker interface
type SendEmailWorker struct {
    *river.WorkerBase[SendEmailJob]
}

func (w *SendEmailWorker) Work(ctx context.Context, job *SendEmailJob) error {
    // Your business logic here
    log.Printf("Sending email to %s: %s", job.To, job.Subject)
    // Simulate sending...
    return nil
}

func main() {
    db, err := sql.Open("pgx", "postgres://user:pass@host:5432/dbname")
    if err != nil {
        log.Fatal(err)
    }

    // Initialize River client
    client, err := river.NewClient(db, &river.Config{
        Queues: map[string]river.QueueConfig{
            river.QueueDefault: {MaxWorkers: 10},
        },
    })
    if err != nil {
        log.Fatal(err)
    }

    // Register workers
    workerSet := river.NewWorkers()
    if err := river.AddWorker(workerSet, &SendEmailWorker{}); err != nil {
        log.Fatal(err)
    }
    if err := client.WorkerSet(workerSet); err != nil {
        log.Fatal(err)
    }

    // Start the worker loop
    if err := client.Start(context.Background()); err != nil {
        log.Fatal(err)
    }

    // Enqueue a job
    _, err = client.Insert(ctx, &SendEmailJob{
        To:      "user@example.com",
        Subject: "Hello",
        Body:    "This is a test",
    }, nil)
    if err != nil {
        log.Fatal(err)
    }
}

The Insert call adds the job to the river_job table. The Start method launches a background goroutine that polls the database for pending jobs, executes the Work method on your registered worker, and handles retries automatically if the function returns an error. You can configure retry policies, unique constraints, and scheduling directly in the Insert options or via the job struct tags.