Advanced Go Concurrency Patterns for Scalable Applications
Go's concurrency model, built around goroutines and channels, is one of its most powerful features. While basic concurrency is straightforward, mastering advanced patterns is crucial for building scalable, efficient, and robust applications. This article delves into several sophisticated concurrency patterns that can significantly improve your Go programs.
1. Worker Pool Pattern
The worker pool pattern is a fundamental technique for managing a fixed set of goroutines to process a stream of tasks. It prevents the creation of an unbounded number of goroutines, which can lead to resource exhaustion.
Why Use a Worker Pool?
- Resource Management: Limits the number of concurrent goroutines, controlling memory and CPU usage.
- Performance: Reduces the overhead of constantly creating and destroying goroutines.
- Predictability: Makes the system's behavior more predictable under load.
Implementation
package main
import (
"fmt"
"sync"
"time"
)
// Task represents a unit of work.
type Task struct {
ID int
Data string
}
// Result represents the outcome of a task.
type Result struct {
TaskID int
Output string
Error error
}
// Worker receives tasks from a channel, processes them, and sends results to another channel.
type Worker struct {
ID int
TaskChan chan Task
ResultChan chan Result
QuitChan chan bool
}
// NewWorker creates a new worker instance.
func NewWorker(id int, taskChan chan Task, resultChan chan Result) *Worker {
return &Worker{
ID: id,
TaskChan: taskChan,
ResultChan: resultChan,
QuitChan: make(chan bool),
}
}
// Start begins the worker's processing loop.
func (w *Worker) Start(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case task := <-w.TaskChan:
// Simulate work
fmt.Printf("Worker %d processing task %d\n", w.ID, task.ID)
time.Sleep(100 * time.Millisecond) // Simulate processing time
result := Result{
TaskID: task.ID,
Output: fmt.Sprintf("Processed: %s", task.Data),
}
w.ResultChan <- result
case <-w.QuitChan:
fmt.Printf("Worker %d stopping\n", w.ID)
return
}
}
}
// Stop signals the worker to stop processing.
func (w *Worker) Stop() {
w.QuitChan <- true
}
// Dispatcher manages a pool of workers.
type Dispatcher struct {
WorkerPool chan chan Task
MaxWorkers int
TaskQueue chan Task
ResultChan chan Result
Workers []*Worker
QuitChan chan bool
}
// NewDispatcher creates a new dispatcher.
func NewDispatcher(maxWorkers int, taskQueueSize int) *Dispatcher {
return &Dispatcher{
WorkerPool: make(chan chan Task, maxWorkers),
MaxWorkers: maxWorkers,
TaskQueue: make(chan Task, taskQueueSize),
ResultChan: make(chan Result, taskQueueSize), // Buffer to prevent blocking
Workers: make([]*Worker, 0, maxWorkers),
QuitChan: make(chan bool),
}
}
// Run starts the dispatcher and its worker pool.
func (d *Dispatcher) Run() {
var wg sync.WaitGroup
// Start workers
for i := 0; i < d.MaxWorkers; i++ {
worker := NewWorker(i+1, d.WorkerPool, d.ResultChan)
d.Workers = append(d.Workers, worker)
wg.Add(1)
go worker.Start(&wg)
}
// Start the dispatcher loop
go d.dispatch()
// Handle graceful shutdown
go func() {
<-d.QuitChan
close(d.TaskQueue)
for _, worker := range d.Workers {
worker.Stop()
}
wg.Wait() // Wait for all workers to finish
close(d.ResultChan)
}()
}
// dispatch listens for tasks and dispatches them to available workers.
func (d *Dispatcher) dispatch() {
for {
select {
case task, ok := <-d.TaskQueue:
if !ok {
return // Channel closed
}
// Get an available worker task channel
workerTaskChan := <-d.WorkerPool
// Send the task to the worker
workerTaskChan <- task
}
}
}
// Submit adds a task to the queue.
func (d *Dispatcher) Submit(task Task) {
d.TaskQueue <- task
}
// Results returns the result channel.
func (d *Dispatcher) Results() <-chan Result {
return d.ResultChan
}
// Stop gracefully shuts down the dispatcher.
func (d *Dispatcher) Stop() {
close(d.QuitChan)
}
func main() {
// Create a dispatcher with 3 workers and a task queue of 100
dispatcher := NewDispatcher(3, 100)
dispatcher.Run()
// Submit tasks
numTasks := 10
for i := 1; i <= numTasks; i++ {
dispatcher.Submit(Task{ID: i, Data: fmt.Sprintf("Task-%d-Data", i)})
}
// Collect results
completed := 0
for result := range dispatcher.Results() {
fmt.Printf("Received result for task %d: %s\n", result.TaskID, result.Output)
completed++
if completed == numTasks {
break
}
}
// Stop the dispatcher
dispatcher.Stop()
fmt.Println("All tasks completed.")
}
Key Concepts
- Task Queue: A buffered channel where tasks are queued.
- Worker Pool: A channel of channels, where each inner channel belongs to a worker. Workers signal availability by sending their task channel to the pool.
- Dispatcher: Coordinates tasks from the queue to available workers.
- Graceful Shutdown: Ensures all workers finish their current tasks before terminating.
2. Fan-In and Fan-Out Patterns
Fan-out and fan-in are patterns for distributing work among multiple goroutines and then collecting the results.
Fan-Out
Fan-out involves starting multiple goroutines to handle input from the same channel. This is useful for parallelizing CPU or I/O-bound tasks.
Fan-In
Fan-in involves merging multiple result channels into a single channel. This simplifies collecting results from multiple workers.
Implementation
package main
import (
"fmt"
"sync"
"time"
)
// process simulates a worker that processes data.
func process(id int, input <-chan int, output chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for v := range input {
// Simulate work
fmt.Printf("Worker %d processing %d\n", id, v)
time.Sleep(50 * time.Millisecond) // Simulate processing time
result := v * v // Simple transformation
output <- result
}
fmt.Printf("Worker %d finished\n", id)
}
// fanIn merges results from multiple channels into one.
func fanIn(channels []<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, c := range channels {
// Launch a goroutine for each input channel
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(c)
}
// Close the output channel when all inputs are drained
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
const numWorkers = 3
const numJobs = 10
// Create input channel
jobs := make(chan int, numJobs)
// Create result channels for each worker
resultChannels := make([]<-chan int, numWorkers)
var wg sync.WaitGroup
// Start workers (Fan-Out)
for i := 0; i < numWorkers; i++ {
// Each worker gets its own result channel
resultChan := make(chan int, numJobs) // Buffer to prevent blocking
resultChannels[i] = resultChan
wg.Add(1)
go process(i+1, jobs, resultChan, &wg)
}
// Send jobs
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // Close input channel to signal workers to finish
// Fan-In: Merge all result channels
finalResults := fanIn(resultChannels)
// Collect results
fmt.Println("Collecting results...")
for result := range finalResults {
fmt.Printf("Result: %d\n", result)
}
// Wait for all workers to complete
wg.Wait()
fmt.Println("All workers finished.")
}
Key Concepts
- Parallelization: Fan-out distributes tasks to multiple workers for concurrent processing.
- Result Merging: Fan-in simplifies the collection of results from multiple sources.
- Channel Management: Properly managing channel lifecycle is crucial to prevent goroutine leaks.
3. Semaphore Pattern
A semaphore is a synchronization primitive that controls access to a resource by limiting the number of concurrent operations. While Go's sync.Mutex
allows only one goroutine, a semaphore allows a specified number N
.
Implementation
Go's golang.org/x/sync/semaphore
package provides an efficient implementation. However, we can illustrate the concept with a buffered channel.
package main
import (
"context"
"fmt"
"golang.org/x/sync/semaphore"
"sync"
"time"
)
const (
maxConcurrency = 3
numTasks = 10
)
func worker(ctx context.Context, sem *semaphore.Weighted, id int, wg *sync.WaitGroup) {
defer wg.Done()
// Acquire a semaphore slot. If the context is cancelled, this will return an error.
if err := sem.Acquire(ctx, 1); err != nil {
fmt.Printf("Worker %d: Context cancelled before acquiring semaphore: %v\n", id, err)
return
}
// Ensure the semaphore slot is released
defer sem.Release(1)
// Simulate work that requires limited resources
fmt.Printf("Worker %d: Starting work (concurrency limited to %d)\n", id, maxConcurrency)
time.Sleep(2 * time.Second) // Simulate work
fmt.Printf("Worker %d: Finished work\n", id)
}
func main() {
// Create a semaphore with a weight of maxConcurrency
sem := semaphore.NewWeighted(maxConcurrency)
// Use a context for potential cancellation
ctx := context.Background()
// Example with timeout context
// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
// defer cancel()
var wg sync.WaitGroup
for i := 1; i <= numTasks; i++ {
wg.Add(1)
go worker(ctx, sem, i, &wg)
}
wg.Wait()
fmt.Println("All workers completed.")
}
Key Concepts
- Resource Limiting: Prevents overwhelming a system resource (e.g., database connections, file descriptors).
- Context Integration: Allows for cancellation and timeout handling.
- Weighted Semaphores: The
semaphore.Weighted
allows acquiring/releases of varying weights, useful for resources of different sizes.
4. Context Management for Cancellation and Timeout
Proper use of context.Context
is essential for building robust concurrent applications. It provides a way to carry deadlines, cancellation signals, and request-scoped values across API boundaries and between processes.
Implementation
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
// Simulates a long-running task that can be cancelled.
func longRunningTask(ctx context.Context, taskID int) error {
// Create a random duration for the task to simulate variability
duration := time.Duration(rand.Intn(5)+1) * time.Second
fmt.Printf("Task %d: Starting, will take %v\n", taskID, duration)
for {
select {
case <-time.After(duration):
fmt.Printf("Task %d: Completed successfully\n", taskID)
return nil
case <-ctx.Done():
// Context was cancelled or timed out
fmt.Printf("Task %d: Cancelled or timed out: %v\n", taskID, ctx.Err())
return ctx.Err() // Return the cancellation error
}
}
}
func main() {
// Example 1: Cancellation with context cancellation
fmt.Println("--- Example 1: Manual Cancellation ---")
ctx1, cancel1 := context.WithCancel(context.Background())
// Start the task
go func() {
err := longRunningTask(ctx1, 1)
if err != nil {
fmt.Printf("Task 1 finished with error: %v\n", err)
}
}()
// Simulate some work in the main goroutine
time.Sleep(2 * time.Second)
// Cancel the context
cancel1()
// Give the goroutine time to print its result
time.Sleep(1 * time.Second)
// Example 2: Timeout with context
fmt.Println("\n--- Example 2: Timeout ---")
// Create a context that times out after 2 seconds
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel2() // Always call cancel to release resources
// Start the task
done := make(chan error, 1)
go func() {
done <- longRunningTask(ctx2, 2)
}()
// Wait for the task to complete or the context to timeout
select {
case err := <-done:
if err != nil {
fmt.Printf("Task 2 finished with error: %v\n", err)
} else {
fmt.Println("Task 2 completed successfully")
}
case <-time.After(5 * time.Second): // Safety timeout for the example
fmt.Println("Main: Timeout waiting for task 2")
}
}
Key Concepts
context.WithCancel
: Creates a context that can be cancelled manually.context.WithTimeout
/context.WithDeadline
: Creates a context that is automatically cancelled after a duration or at a specific time.ctx.Done()
: A channel that is closed when the context is cancelled or times out.ctx.Err()
: Returns the error that caused the context to be cancelled (Canceled
orDeadlineExceeded
).- Propagation: Contexts should be passed down through function calls, allowing for coordinated cancellation across a tree of goroutines.
5. Pipeline Pattern
The pipeline pattern chains together stages of processing, where each stage is a goroutine that receives values from upstream via a channel and sends values downstream via another channel. This pattern is useful for breaking down complex processing into smaller, composable steps.
Implementation
package main
import (
"fmt"
"sync"
)
// generator generates integers from 0 to n-1.
func generator(n int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < n; i++ {
out <- i
}
}()
return out
}
// square takes a channel of integers and returns a channel of their squares.
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for num := range in {
out <- num * num
}
}()
return out
}
// filter filters out even numbers.
func filter(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for num := range in {
if num%2 != 0 { // Keep odd numbers
out <- num
}
}
}()
return out
}
// sink receives values from a channel and prints them.
func sink(name string, in <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for num := range in {
fmt.Printf("%s received: %d\n", name, num)
}
}
func main() {
// Create the pipeline stages
nums := generator(10)
squared := square(nums)
odds := filter(squared)
// Multiple sinks can consume the same output
var wg sync.WaitGroup
wg.Add(2)
go sink("Sink1", odds, &wg)
go sink("Sink2", odds, &wg) // This will receive nothing as the channel is drained by Sink1
wg.Wait()
fmt.Println("Pipeline completed.")
// To demonstrate multiple consumers properly, re-run the pipeline
fmt.Println("\n--- Re-running pipeline for multiple consumers ---")
nums2 := generator(10)
squared2 := square(nums2)
odds2 := filter(squared2)
// Collect results to fan-out to multiple sinks
var results []int
for num := range odds2 {
results = append(results, num)
}
// Fan-out to multiple sinks
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
defer wg2.Done()
for _, num := range results {
fmt.Printf("SinkA received: %d\n", num)
}
}()
go func() {
defer wg2.Done()
for _, num := range results {
fmt.Printf("SinkB received: %d\n", num)
}
}()
wg2.Wait()
}
Key Concepts
- Decoupling: Each stage operates independently.
- Composability: Stages can be easily combined to form complex data flows.
- Backpressure: Channels naturally provide backpressure; a slow downstream stage will slow the entire pipeline.
- Resource Management: Channels must be properly closed to signal the end of data and prevent goroutine leaks.
Conclusion
Mastering these advanced concurrency patterns in Go is essential for building high-performance, scalable applications. The worker pool pattern manages resources effectively, fan-in/fan-out enables parallel processing, semaphores control access to limited resources, context provides cancellation and timeout mechanisms, and pipelines help structure complex data flows.
By understanding and applying these patterns, you can write Go code that is not only concurrent but also robust, maintainable, and efficient. As with any powerful tool, it's important to use these patterns judiciously and understand their trade-offs in the context of your specific application's requirements.