The Outbox Pattern ensures reliable messaging by writing the message to a database table within the same transaction as the business logic, then using a separate process to read and send those messages. This prevents data loss if the message broker is unavailable or the application crashes after the transaction commits. Implement it by creating an outbox table, inserting a message record during your main transaction, and running a background worker to poll and dispatch pending messages.
// 1. Define the outbox table schema (PostgreSQL example)
// CREATE TABLE outbox (
// id BIGSERIAL PRIMARY KEY,
// payload JSONB NOT NULL,
// destination VARCHAR(255) NOT NULL,
// status VARCHAR(50) DEFAULT 'pending',
// created_at TIMESTAMP DEFAULT NOW()
// );
// 2. Write business logic and outbox entry in a single transaction
func (s *Service) CreateOrder(ctx context.Context, order Order) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Save order
if err := s.saveOrder(tx, order); err != nil {
return err
}
// Write to outbox table
if err := s.saveOutboxMessage(tx, order.ID, "order-created", order); err != nil {
return err
}
return tx.Commit()
}
// 3. Background worker to poll and send messages
func (s *Service) StartOutboxProcessor(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.processPendingMessages(ctx)
}
}
}
func (s *Service) processPendingMessages(ctx context.Context) {
// Fetch pending messages
rows, err := s.db.QueryContext(ctx, "SELECT id, payload, destination FROM outbox WHERE status = 'pending' LIMIT 100")
if err != nil { return }
defer rows.Close()
for rows.Next() {
var id int64
var payload json.RawMessage
var dest string
if err := rows.Scan(&id, &payload, &dest); err != nil { continue }
// Send to broker (e.g., Kafka, RabbitMQ)
if err := s.sendToBroker(dest, payload); err != nil {
// Log error, do not update status yet (retry later)
continue
}
// Mark as sent
s.db.ExecContext(ctx, "UPDATE outbox SET status = 'sent' WHERE id = $1", id)
}
}