Announcement

👇Official Account👇

Welcome to join the group & private message

Article first/tail QR code

Skip to content

Advanced Go Concurrency Patterns for Scalable Applications

Go's concurrency model, built around goroutines and channels, is one of its most powerful features. While basic concurrency is straightforward, mastering advanced patterns is crucial for building scalable, efficient, and robust applications. This article delves into several sophisticated concurrency patterns that can significantly improve your Go programs.

1. Worker Pool Pattern

The worker pool pattern is a fundamental technique for managing a fixed set of goroutines to process a stream of tasks. It prevents the creation of an unbounded number of goroutines, which can lead to resource exhaustion.

Why Use a Worker Pool?

  1. Resource Management: Limits the number of concurrent goroutines, controlling memory and CPU usage.
  2. Performance: Reduces the overhead of constantly creating and destroying goroutines.
  3. Predictability: Makes the system's behavior more predictable under load.

Implementation

go
package main

import (
	"fmt"
	"sync"
	"time"
)

// Task represents a unit of work.
type Task struct {
	ID   int
	Data string
}

// Result represents the outcome of a task.
type Result struct {
	TaskID int
	Output string
	Error  error
}

// Worker receives tasks from a channel, processes them, and sends results to another channel.
type Worker struct {
	ID       int
	TaskChan chan Task
	ResultChan chan Result
	QuitChan chan bool
}

// NewWorker creates a new worker instance.
func NewWorker(id int, taskChan chan Task, resultChan chan Result) *Worker {
	return &Worker{
		ID:         id,
		TaskChan:   taskChan,
		ResultChan: resultChan,
		QuitChan:   make(chan bool),
	}
}

// Start begins the worker's processing loop.
func (w *Worker) Start(wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		select {
		case task := <-w.TaskChan:
			// Simulate work
			fmt.Printf("Worker %d processing task %d\n", w.ID, task.ID)
			time.Sleep(100 * time.Millisecond) // Simulate processing time
			result := Result{
				TaskID: task.ID,
				Output: fmt.Sprintf("Processed: %s", task.Data),
			}
			w.ResultChan <- result
		case <-w.QuitChan:
			fmt.Printf("Worker %d stopping\n", w.ID)
			return
		}
	}
}

// Stop signals the worker to stop processing.
func (w *Worker) Stop() {
	w.QuitChan <- true
}

// Dispatcher manages a pool of workers.
type Dispatcher struct {
	WorkerPool chan chan Task
	MaxWorkers int
	TaskQueue  chan Task
	ResultChan chan Result
	Workers    []*Worker
	QuitChan   chan bool
}

// NewDispatcher creates a new dispatcher.
func NewDispatcher(maxWorkers int, taskQueueSize int) *Dispatcher {
	return &Dispatcher{
		WorkerPool: make(chan chan Task, maxWorkers),
		MaxWorkers: maxWorkers,
		TaskQueue:  make(chan Task, taskQueueSize),
		ResultChan: make(chan Result, taskQueueSize), // Buffer to prevent blocking
		Workers:    make([]*Worker, 0, maxWorkers),
		QuitChan:   make(chan bool),
	}
}

// Run starts the dispatcher and its worker pool.
func (d *Dispatcher) Run() {
	var wg sync.WaitGroup
	// Start workers
	for i := 0; i < d.MaxWorkers; i++ {
		worker := NewWorker(i+1, d.WorkerPool, d.ResultChan)
		d.Workers = append(d.Workers, worker)
		wg.Add(1)
		go worker.Start(&wg)
	}

	// Start the dispatcher loop
	go d.dispatch()

	// Handle graceful shutdown
	go func() {
		<-d.QuitChan
		close(d.TaskQueue)
		for _, worker := range d.Workers {
			worker.Stop()
		}
		wg.Wait() // Wait for all workers to finish
		close(d.ResultChan)
	}()
}

// dispatch listens for tasks and dispatches them to available workers.
func (d *Dispatcher) dispatch() {
	for {
		select {
		case task, ok := <-d.TaskQueue:
			if !ok {
				return // Channel closed
			}
			// Get an available worker task channel
			workerTaskChan := <-d.WorkerPool
			// Send the task to the worker
			workerTaskChan <- task
		}
	}
}

// Submit adds a task to the queue.
func (d *Dispatcher) Submit(task Task) {
	d.TaskQueue <- task
}

// Results returns the result channel.
func (d *Dispatcher) Results() <-chan Result {
	return d.ResultChan
}

// Stop gracefully shuts down the dispatcher.
func (d *Dispatcher) Stop() {
	close(d.QuitChan)
}

func main() {
	// Create a dispatcher with 3 workers and a task queue of 100
	dispatcher := NewDispatcher(3, 100)
	dispatcher.Run()

	// Submit tasks
	numTasks := 10
	for i := 1; i <= numTasks; i++ {
		dispatcher.Submit(Task{ID: i, Data: fmt.Sprintf("Task-%d-Data", i)})
	}

	// Collect results
	completed := 0
	for result := range dispatcher.Results() {
		fmt.Printf("Received result for task %d: %s\n", result.TaskID, result.Output)
		completed++
		if completed == numTasks {
			break
		}
	}

	// Stop the dispatcher
	dispatcher.Stop()
	fmt.Println("All tasks completed.")
}

Key Concepts

  • Task Queue: A buffered channel where tasks are queued.
  • Worker Pool: A channel of channels, where each inner channel belongs to a worker. Workers signal availability by sending their task channel to the pool.
  • Dispatcher: Coordinates tasks from the queue to available workers.
  • Graceful Shutdown: Ensures all workers finish their current tasks before terminating.

2. Fan-In and Fan-Out Patterns

Fan-out and fan-in are patterns for distributing work among multiple goroutines and then collecting the results.

Fan-Out

Fan-out involves starting multiple goroutines to handle input from the same channel. This is useful for parallelizing CPU or I/O-bound tasks.

Fan-In

Fan-in involves merging multiple result channels into a single channel. This simplifies collecting results from multiple workers.

Implementation

go
package main

import (
	"fmt"
	"sync"
	"time"
)

// process simulates a worker that processes data.
func process(id int, input <-chan int, output chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for v := range input {
		// Simulate work
		fmt.Printf("Worker %d processing %d\n", id, v)
		time.Sleep(50 * time.Millisecond) // Simulate processing time
		result := v * v // Simple transformation
		output <- result
	}
	fmt.Printf("Worker %d finished\n", id)
}

// fanIn merges results from multiple channels into one.
func fanIn(channels []<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup
	wg.Add(len(channels))

	for _, c := range channels {
		// Launch a goroutine for each input channel
		go func(ch <-chan int) {
			defer wg.Done()
			for n := range ch {
				out <- n
			}
		}(c)
	}

	// Close the output channel when all inputs are drained
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	const numWorkers = 3
	const numJobs = 10

	// Create input channel
	jobs := make(chan int, numJobs)
	
	// Create result channels for each worker
	resultChannels := make([]<-chan int, numWorkers)
	var wg sync.WaitGroup

	// Start workers (Fan-Out)
	for i := 0; i < numWorkers; i++ {
		// Each worker gets its own result channel
		resultChan := make(chan int, numJobs) // Buffer to prevent blocking
		resultChannels[i] = resultChan
		wg.Add(1)
		go process(i+1, jobs, resultChan, &wg)
	}

	// Send jobs
	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs) // Close input channel to signal workers to finish

	// Fan-In: Merge all result channels
	finalResults := fanIn(resultChannels)

	// Collect results
	fmt.Println("Collecting results...")
	for result := range finalResults {
		fmt.Printf("Result: %d\n", result)
	}

	// Wait for all workers to complete
	wg.Wait()
	fmt.Println("All workers finished.")
}

Key Concepts

  • Parallelization: Fan-out distributes tasks to multiple workers for concurrent processing.
  • Result Merging: Fan-in simplifies the collection of results from multiple sources.
  • Channel Management: Properly managing channel lifecycle is crucial to prevent goroutine leaks.

3. Semaphore Pattern

A semaphore is a synchronization primitive that controls access to a resource by limiting the number of concurrent operations. While Go's sync.Mutex allows only one goroutine, a semaphore allows a specified number N.

Implementation

Go's golang.org/x/sync/semaphore package provides an efficient implementation. However, we can illustrate the concept with a buffered channel.

go
package main

import (
	"context"
	"fmt"
	"golang.org/x/sync/semaphore"
	"sync"
	"time"
)

const (
	maxConcurrency = 3
	numTasks       = 10
)

func worker(ctx context.Context, sem *semaphore.Weighted, id int, wg *sync.WaitGroup) {
	defer wg.Done()

	// Acquire a semaphore slot. If the context is cancelled, this will return an error.
	if err := sem.Acquire(ctx, 1); err != nil {
		fmt.Printf("Worker %d: Context cancelled before acquiring semaphore: %v\n", id, err)
		return
	}
	// Ensure the semaphore slot is released
	defer sem.Release(1)

	// Simulate work that requires limited resources
	fmt.Printf("Worker %d: Starting work (concurrency limited to %d)\n", id, maxConcurrency)
	time.Sleep(2 * time.Second) // Simulate work
	fmt.Printf("Worker %d: Finished work\n", id)
}

func main() {
	// Create a semaphore with a weight of maxConcurrency
	sem := semaphore.NewWeighted(maxConcurrency)
	
	// Use a context for potential cancellation
	ctx := context.Background()
	// Example with timeout context
	// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	// defer cancel()

	var wg sync.WaitGroup

	for i := 1; i <= numTasks; i++ {
		wg.Add(1)
		go worker(ctx, sem, i, &wg)
	}

	wg.Wait()
	fmt.Println("All workers completed.")
}

Key Concepts

  • Resource Limiting: Prevents overwhelming a system resource (e.g., database connections, file descriptors).
  • Context Integration: Allows for cancellation and timeout handling.
  • Weighted Semaphores: The semaphore.Weighted allows acquiring/releases of varying weights, useful for resources of different sizes.

4. Context Management for Cancellation and Timeout

Proper use of context.Context is essential for building robust concurrent applications. It provides a way to carry deadlines, cancellation signals, and request-scoped values across API boundaries and between processes.

Implementation

go
package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"
)

// Simulates a long-running task that can be cancelled.
func longRunningTask(ctx context.Context, taskID int) error {
	// Create a random duration for the task to simulate variability
	duration := time.Duration(rand.Intn(5)+1) * time.Second
	fmt.Printf("Task %d: Starting, will take %v\n", taskID, duration)

	for {
		select {
		case <-time.After(duration):
			fmt.Printf("Task %d: Completed successfully\n", taskID)
			return nil
		case <-ctx.Done():
			// Context was cancelled or timed out
			fmt.Printf("Task %d: Cancelled or timed out: %v\n", taskID, ctx.Err())
			return ctx.Err() // Return the cancellation error
		}
	}
}

func main() {
	// Example 1: Cancellation with context cancellation
	fmt.Println("--- Example 1: Manual Cancellation ---")
	ctx1, cancel1 := context.WithCancel(context.Background())
	
	// Start the task
	go func() {
		err := longRunningTask(ctx1, 1)
		if err != nil {
			fmt.Printf("Task 1 finished with error: %v\n", err)
		}
	}()

	// Simulate some work in the main goroutine
	time.Sleep(2 * time.Second)
	// Cancel the context
	cancel1()
	// Give the goroutine time to print its result
	time.Sleep(1 * time.Second)

	// Example 2: Timeout with context
	fmt.Println("\n--- Example 2: Timeout ---")
	// Create a context that times out after 2 seconds
	ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel2() // Always call cancel to release resources

	// Start the task
	done := make(chan error, 1)
	go func() {
		done <- longRunningTask(ctx2, 2)
	}()

	// Wait for the task to complete or the context to timeout
	select {
	case err := <-done:
		if err != nil {
			fmt.Printf("Task 2 finished with error: %v\n", err)
		} else {
			fmt.Println("Task 2 completed successfully")
		}
	case <-time.After(5 * time.Second): // Safety timeout for the example
		fmt.Println("Main: Timeout waiting for task 2")
	}
}

Key Concepts

  • context.WithCancel: Creates a context that can be cancelled manually.
  • context.WithTimeout / context.WithDeadline: Creates a context that is automatically cancelled after a duration or at a specific time.
  • ctx.Done(): A channel that is closed when the context is cancelled or times out.
  • ctx.Err(): Returns the error that caused the context to be cancelled (Canceled or DeadlineExceeded).
  • Propagation: Contexts should be passed down through function calls, allowing for coordinated cancellation across a tree of goroutines.

5. Pipeline Pattern

The pipeline pattern chains together stages of processing, where each stage is a goroutine that receives values from upstream via a channel and sends values downstream via another channel. This pattern is useful for breaking down complex processing into smaller, composable steps.

Implementation

go
package main

import (
	"fmt"
	"sync"
)

// generator generates integers from 0 to n-1.
func generator(n int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for i := 0; i < n; i++ {
			out <- i
		}
	}()
	return out
}

// square takes a channel of integers and returns a channel of their squares.
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for num := range in {
			out <- num * num
		}
	}()
	return out
}

// filter filters out even numbers.
func filter(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for num := range in {
			if num%2 != 0 { // Keep odd numbers
				out <- num
			}
		}
	}()
	return out
}

// sink receives values from a channel and prints them.
func sink(name string, in <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for num := range in {
		fmt.Printf("%s received: %d\n", name, num)
	}
}

func main() {
	// Create the pipeline stages
	nums := generator(10)
	squared := square(nums)
	odds := filter(squared)

	// Multiple sinks can consume the same output
	var wg sync.WaitGroup
	wg.Add(2)
	go sink("Sink1", odds, &wg)
	go sink("Sink2", odds, &wg) // This will receive nothing as the channel is drained by Sink1

	wg.Wait()
	fmt.Println("Pipeline completed.")
	
	// To demonstrate multiple consumers properly, re-run the pipeline
	fmt.Println("\n--- Re-running pipeline for multiple consumers ---")
	nums2 := generator(10)
	squared2 := square(nums2)
	odds2 := filter(squared2)
	
	// Collect results to fan-out to multiple sinks
	var results []int
	for num := range odds2 {
		results = append(results, num)
	}
	
	// Fan-out to multiple sinks
	var wg2 sync.WaitGroup
	wg2.Add(2)
	go func() {
		defer wg2.Done()
		for _, num := range results {
			fmt.Printf("SinkA received: %d\n", num)
		}
	}()
	go func() {
		defer wg2.Done()
		for _, num := range results {
			fmt.Printf("SinkB received: %d\n", num)
		}
	}()
	
	wg2.Wait()
}

Key Concepts

  • Decoupling: Each stage operates independently.
  • Composability: Stages can be easily combined to form complex data flows.
  • Backpressure: Channels naturally provide backpressure; a slow downstream stage will slow the entire pipeline.
  • Resource Management: Channels must be properly closed to signal the end of data and prevent goroutine leaks.

Conclusion

Mastering these advanced concurrency patterns in Go is essential for building high-performance, scalable applications. The worker pool pattern manages resources effectively, fan-in/fan-out enables parallel processing, semaphores control access to limited resources, context provides cancellation and timeout mechanisms, and pipelines help structure complex data flows.

By understanding and applying these patterns, you can write Go code that is not only concurrent but also robust, maintainable, and efficient. As with any powerful tool, it's important to use these patterns judiciously and understand their trade-offs in the context of your specific application's requirements.

Last updated: