How to Use Machinery for Distributed Task Processing in Go

Machinery is a robust asynchronous task queue and job scheduler for Go that allows you to distribute work across multiple workers using a message broker like Redis or RabbitMQ.

When synchronous work blocks your server

Your web server receives a request to resize a batch of uploaded images. The image processing takes four seconds. Every user waiting for that response holds up a connection slot. After a hundred concurrent uploads, the server drops new requests entirely. You need to hand the heavy lifting off to something else, get an immediate acknowledgment back to the user, and let background processes finish the work without blocking the main thread.

The kitchen rail analogy

A distributed task queue solves this by separating the code that creates work from the code that executes it. Think of a restaurant kitchen. Waiters take orders and drop tickets on a metal rail. They never step behind the counter. Cooks stand at the rail, grab the next ticket, and start cooking. If the kitchen gets busy, you hire more cooks. If the rail gets crowded, orders wait their turn. The rail is your message broker. The tickets are your tasks. Machinery is the framework that manages the rail, tracks which cook is working on which ticket, and stores the finished meals until the waiter comes back to pick them up.

Task queues turn unpredictable latency into predictable throughput. Your HTTP handler returns a 202 Accepted response in milliseconds. The actual computation happens elsewhere. The user gets instant feedback. The server stays responsive.

A minimal worker process

Here is the simplest worker setup. It connects to a local Redis instance, registers a single task, and starts listening for jobs.

package main

import (
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/RichardKnop/machinery/v2"
	"github.com/RichardKnop/machinery/v2/config"
)

// Add takes two integers and returns their sum.
func Add(x, y int) (int, error) {
	return x + y, nil
}

func main() {
	// Read broker URL from environment or fall back to local Redis
	brokerURL := os.Getenv("REDIS_URL")
	if brokerURL == "" {
		brokerURL = "redis://localhost:6379"
	}

	// Broker handles job distribution. Backend stores results.
	cfg := &config.Config{
		Broker:            brokerURL,
		Backend:           brokerURL,
		DefaultQueue:      "default",
		ResultsExpireTime: 3600,
	}

	// Initialize the server that routes tasks to registered handlers
	server, err := machinery.NewServer(cfg)
	if err != nil {
		log.Fatalf("Failed to initialize server: %v", err)
	}

	// Map the string identifier "add" to the actual Go function
	server.RegisterTask("add", Add)

	// Concurrency of 2 means two tasks run simultaneously per process
	worker := server.NewWorker("adder_worker", 2)
	if err := worker.Launch(); err != nil {
		log.Fatalf("Worker failed to start: %v", err)
	}

	// Block until OS signals a shutdown request
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	<-sigChan
	log.Println("Shutting down worker...")
}

Workers are long-running processes. They do not exit after handling one job. They stay alive, poll the broker, and execute tasks as they arrive. Keep your worker binaries lean. Deploy them behind a process manager like systemd or a container orchestrator so they restart automatically after crashes.

What happens under the hood

When you run the worker, it establishes a persistent connection to Redis and subscribes to the default queue. The Launch call blocks the main goroutine, which is why the signal handler sits at the bottom. Without it, the program would terminate immediately after printing the startup log. Machinery serializes the task name and arguments into JSON, pushes them to the broker, and waits. When a message appears, the worker pulls it, deserializes the arguments, looks up the registered function by name, and executes it in a separate goroutine. The result gets stored in the backend Redis database under a unique task ID. If the task panics, Machinery catches it, marks the job as failed, and logs the stack trace.

The framework handles argument conversion automatically. You pass 15 and 25 as generic interface values. Machinery inspects the target function signature, converts the JSON numbers to int, and calls the function. If the types do not match, the worker logs a type assertion failure and marks the task as failed. This automatic adapter saves you from writing boilerplate unmarshaling code, but it also means you must keep your task signatures stable across deployments. Changing a parameter type without updating the producer will break deserialization.

Sending jobs from a producer

Producers live in your web server or CLI tools. They never execute the task logic. They only construct the job signature and push it to the broker. Here is how you send a task with retry logic and a delay.

package main

import (
	"log"
	"time"

	"github.com/RichardKnop/machinery/v2"
	"github.com/RichardKnop/machinery/v2/config"
	"github.com/RichardKnop/machinery/v2/tasks"
)

func main() {
	// Reuse the same broker/backend configuration as the worker
	cfg := &config.Config{
		Broker:  "redis://localhost:6379",
		Backend: "redis://localhost:6379",
	}

	server, err := machinery.NewServer(cfg)
	if err != nil {
		log.Fatalf("Server init failed: %v", err)
	}

	// Attach arguments as generic interface{} values
	task := server.NewTask("add", 15, 25)

	// Configure execution behavior: retry 3 times on failure, delay 5 seconds
	signature := tasks.Signature{
		Name:        "add",
		Args:        task.Args,
		RetryCount:  3,
		RetryMax:    3,
		RetryWait:   1000,
		Delay:       time.Second * 5,
		Queue:       "default",
	}

	// Push the job to the broker and return immediately
	_, err = server.SendTask(&signature)
	if err != nil {
		log.Fatalf("Failed to enqueue task: %v", err)
	}

	log.Println("Task dispatched successfully")
}

Producers should treat the broker as a fire-and-forget destination unless you need synchronous result polling. The SendTask call serializes the signature, publishes it to Redis, and returns a task UUID. You can store that UUID in your database to track job status later. The Go community convention for error handling applies here: check the error immediately and fail fast. The if err != nil { log.Fatalf(...) } pattern is verbose by design. It makes the unhappy path visible and prevents silent failures from corrupting your queue.

Fetching results and following conventions

Tasks return []interface{} and error. Machinery marshals the return values into JSON and stores them in the backend. You can retrieve them later using the task UUID.

// AsyncResult wraps the task UUID and provides state polling methods
result, err := server.GetAsyncResult(taskUUID)
if err != nil {
	log.Fatalf("Failed to fetch result: %v", err)
}

// Wait blocks until the task completes, fails, or times out
state, err := result.Get(10 * time.Second)
if err != nil {
	log.Printf("Result unavailable: %v", err)
}

// state holds the deserialized return values from the worker
log.Printf("Task returned: %v", state)

Polling results introduces latency. In production, you usually avoid synchronous waiting. Instead, you store the task UUID in your application database, let the worker update the record via a webhook or direct DB write, and serve the status to users through standard API endpoints. This decouples your web tier from the queue backend.

Follow Go naming conventions when building around Machinery. Public names start with a capital letter. Private start lowercase. Do not use keywords like public or private. Functions that accept a context.Context should take it as the first parameter, conventionally named ctx. If your task needs to respect request cancellation, pass the context through your arguments and check ctx.Err() inside the worker. Trust gofmt. Argue logic, not formatting. Most editors run it on save, and it keeps your codebase consistent across teams.

Common pitfalls and runtime traps

Task functions must be pure. They cannot rely on global variables, open file handles, or active database connections that outlive the function call. If you try to register a function that does not match the expected signature, the compiler rejects it with cannot use Add (type func(int, int) (int, error)) as type func(...interface{}) ([]interface{}, error) in argument. Machinery expects variadic arguments and a slice return type, so you usually wrap your real logic or rely on its automatic adapter. The library handles the adapter internally when you register standard functions, but if you bypass registration and pass raw signatures, you will hit type mismatches at runtime.

Another common trap is assuming the broker is always available. If Redis goes down while a worker is mid-execution, the job status becomes unknown. Always set ResultsExpireTime to prevent stale data from filling your backend. Goroutine leaks happen when a task blocks on a channel that never receives a value. Machinery runs each task in its own goroutine pool. If one task hangs forever, it consumes a concurrency slot until the pool exhausts itself and the worker stops processing new jobs. Always attach timeouts or context cancellation to long-running tasks. The worst goroutine bug is the one that never logs.

Serialization limits also catch developers off guard. Machinery uses JSON by default. You cannot pass channels, mutexes, or functions as arguments. Complex structs must implement json.Marshaler or rely on standard struct tags. If you forget to export a struct field, the JSON encoder drops it silently. The worker receives a zero value and your logic fails in unpredictable ways. Run your task definitions through a JSON validator during development. Catch missing fields before they hit production.

Choosing the right concurrency model

Use Machinery when you need a battle-tested, Redis or RabbitMQ backed queue with built-in retry logic, result storage, and horizontal scaling. Use a simple channel-based producer-consumer pattern when your work stays within a single process and you want zero external dependencies. Use a dedicated microservice like Celery or Sidekiq when your team already works in Python or Ruby and needs cross-language interoperability. Use plain goroutines with a sync.WaitGroup when you need lightweight concurrency for a fixed number of tasks and don't require persistence or retry guarantees.

Task queues add infrastructure complexity. They require monitoring, broker maintenance, and careful retry tuning. Pick the simplest tool that solves your latency problem. Add distributed queues only when single-process concurrency hits its limits.

Where to go next