River is a robust job queue library for Go that uses PostgreSQL as its storage backend, allowing you to define jobs as Go structs and enqueue them with automatic retry logic, scheduling, and error handling. You initialize a client with your database connection, register your job types, and then use the client to insert jobs into the river_job table where a separate worker process picks them up for execution.
First, install the library and ensure your PostgreSQL schema is set up using River's migration tools. You can run the migration directly via the CLI or programmatically.
go get github.com/riverqueue/river
# Run migrations to create the river_job table and necessary indexes
river migrate up --database-url "postgres://user:pass@host:5432/dbname"
Next, define your job struct by embedding river.Job and adding your specific payload fields. Register this type with the River client so it knows how to decode and execute the job.
package main
import (
"context"
"database/sql"
"fmt"
"log"
"github.com/riverqueue/river"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivertype"
)
// Define your job type
type SendEmailJob struct {
river.Job
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
// Implement the Worker interface
type SendEmailWorker struct {
*river.WorkerBase[SendEmailJob]
}
func (w *SendEmailWorker) Work(ctx context.Context, job *SendEmailJob) error {
// Your business logic here
log.Printf("Sending email to %s: %s", job.To, job.Subject)
// Simulate sending...
return nil
}
func main() {
db, err := sql.Open("pgx", "postgres://user:pass@host:5432/dbname")
if err != nil {
log.Fatal(err)
}
// Initialize River client
client, err := river.NewClient(db, &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 10},
},
})
if err != nil {
log.Fatal(err)
}
// Register workers
workerSet := river.NewWorkers()
if err := river.AddWorker(workerSet, &SendEmailWorker{}); err != nil {
log.Fatal(err)
}
if err := client.WorkerSet(workerSet); err != nil {
log.Fatal(err)
}
// Start the worker loop
if err := client.Start(context.Background()); err != nil {
log.Fatal(err)
}
// Enqueue a job
_, err = client.Insert(ctx, &SendEmailJob{
To: "user@example.com",
Subject: "Hello",
Body: "This is a test",
}, nil)
if err != nil {
log.Fatal(err)
}
}
The Insert call adds the job to the river_job table. The Start method launches a background goroutine that polls the database for pending jobs, executes the Work method on your registered worker, and handles retries automatically if the function returns an error. You can configure retry policies, unique constraints, and scheduling directly in the Insert options or via the job struct tags.