Go Concurrency Patterns for Distributed Systems: Advanced Synchronization and Coordination

60 min read 12071 words

Table of Contents

In the realm of distributed systems, concurrency is both a powerful tool and a significant challenge. While Go’s concurrency primitives—goroutines and channels—provide an elegant foundation for concurrent programming, building robust distributed systems requires mastering advanced patterns that can handle the complexities of coordination across multiple services, nodes, and failure domains.

This comprehensive guide explores sophisticated concurrency patterns specifically designed for distributed systems in Go. We’ll move beyond basic goroutines and channels to examine advanced synchronization techniques, coordination mechanisms, error handling strategies, and performance optimization approaches that can help you build resilient, scalable distributed applications.


Advanced Channel Patterns

Channels are Go’s primary mechanism for communication between goroutines, but in distributed systems, we need to leverage more sophisticated patterns to handle complex coordination scenarios.

Fan-Out/Fan-In Pattern

The fan-out/fan-in pattern distributes work across multiple goroutines and then collects the results:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// Task represents a unit of work to be processed
type Task struct {
	ID     int
	Input  string
	Result string
	Err    error
}

// fanOut distributes tasks to multiple worker goroutines
func fanOut(ctx context.Context, tasks []Task, workerCount int) <-chan Task {
	tasksCh := make(chan Task)
	
	go func() {
		defer close(tasksCh)
		
		for _, task := range tasks {
			select {
			case tasksCh <- task:
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return tasksCh
}

// processTask simulates processing a task with potential failures
func processTask(ctx context.Context, task Task) Task {
	// Simulate processing time
	select {
	case <-time.After(100 * time.Millisecond):
		task.Result = fmt.Sprintf("Processed: %s", task.Input)
		return task
	case <-ctx.Done():
		task.Err = ctx.Err()
		return task
	}
}

// worker processes tasks from the input channel and sends results to the output channel
func worker(ctx context.Context, id int, tasksCh <-chan Task, resultsCh chan<- Task) {
	for task := range tasksCh {
		select {
		case <-ctx.Done():
			return
		default:
			task.Result = fmt.Sprintf("Worker %d processed: %s", id, task.Input)
			
			// Simulate occasional failures
			if task.ID%7 == 0 {
				task.Err = fmt.Errorf("processing error on task %d", task.ID)
				task.Result = ""
			}
			
			// Send the result
			select {
			case resultsCh <- task:
			case <-ctx.Done():
				return
			}
		}
	}
}

// fanIn collects results from multiple workers into a single channel
func fanIn(ctx context.Context, workerCount int, resultsCh chan Task) <-chan Task {
	multiplexedCh := make(chan Task)
	var wg sync.WaitGroup
	wg.Add(workerCount)
	
	// Start a goroutine for each worker to collect results
	for i := 0; i < workerCount; i++ {
		go func() {
			defer wg.Done()
			for {
				select {
				case result, ok := <-resultsCh:
					if !ok {
						return
					}
					select {
					case multiplexedCh <- result:
					case <-ctx.Done():
						return
					}
				case <-ctx.Done():
					return
				}
			}
		}()
	}
	
	// Close the multiplexed channel once all workers are done
	go func() {
		wg.Wait()
		close(multiplexedCh)
	}()
	
	return multiplexedCh
}

// distributeAndProcess implements the complete fan-out/fan-in pattern
func distributeAndProcess(ctx context.Context, tasks []Task, workerCount int) []Task {
	// Create a buffered channel for results to prevent blocking
	resultsCh := make(chan Task, len(tasks))
	
	// Fan out: distribute tasks to workers
	tasksCh := fanOut(ctx, tasks, workerCount)
	
	// Start workers
	var wg sync.WaitGroup
	wg.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go func(workerID int) {
			defer wg.Done()
			worker(ctx, workerID, tasksCh, resultsCh)
		}(i)
	}
	
	// Close results channel when all workers are done
	go func() {
		wg.Wait()
		close(resultsCh)
	}()
	
	// Fan in: collect results
	collectedResults := make([]Task, 0, len(tasks))
	for result := range resultsCh {
		collectedResults = append(collectedResults, result)
	}
	
	return collectedResults
}

func main() {
	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	// Generate sample tasks
	tasks := make([]Task, 20)
	for i := 0; i < 20; i++ {
		tasks[i] = Task{
			ID:    i,
			Input: fmt.Sprintf("Task %d input", i),
		}
	}
	
	// Process tasks using fan-out/fan-in pattern
	results := distributeAndProcess(ctx, tasks, 5)
	
	// Print results
	fmt.Println("Results:")
	successCount := 0
	failureCount := 0
	
	for _, result := range results {
		if result.Err != nil {
			fmt.Printf("Task %d failed: %v\n", result.ID, result.Err)
			failureCount++
		} else {
			fmt.Printf("Task %d succeeded: %s\n", result.ID, result.Result)
			successCount++
		}
	}
	
	fmt.Printf("\nSummary: %d succeeded, %d failed\n", successCount, failureCount)
}

This pattern is particularly useful for distributed systems where you need to:

  • Process a large number of tasks in parallel
  • Handle failures gracefully
  • Collect and aggregate results efficiently
  • Implement backpressure mechanisms

Multiplexing with Select

In distributed systems, you often need to coordinate multiple channels with different priorities and timeouts:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"
)

// Event represents a message in our system
type Event struct {
	Source  string
	Type    string
	Payload interface{}
	Time    time.Time
}

// EventSource generates events from different parts of a distributed system
func EventSource(ctx context.Context, name string, interval time.Duration, priority int) <-chan Event {
	ch := make(chan Event)
	
	go func() {
		defer close(ch)
		ticker := time.NewTicker(interval)
		defer ticker.Stop()
		
		for {
			select {
			case <-ticker.C:
				// Generate an event
				event := Event{
					Source:  name,
					Type:    fmt.Sprintf("event-type-%d", rand.Intn(3)),
					Payload: fmt.Sprintf("data from %s", name),
					Time:    time.Now(),
				}
				
				// Try to send the event
				select {
				case ch <- event:
					// Event sent successfully
				case <-ctx.Done():
					return
				case <-time.After(50 * time.Millisecond):
					// Couldn't send within timeout, log and continue
					fmt.Printf("Warning: Dropped event from %s due to backpressure\n", name)
				}
				
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return ch
}

// PriorityMultiplexer combines multiple event sources with priority handling
func PriorityMultiplexer(ctx context.Context, sources map[string]<-chan Event, priorities map[string]int) <-chan Event {
	multiplexed := make(chan Event)
	
	go func() {
		defer close(multiplexed)
		
		// Keep track of active sources
		remaining := len(sources)
		
		// Create a case for each source
		for remaining > 0 {
			// Find the highest priority source with available events
			var highestPriority int = -1
			var selectedEvent Event
			var selectedSource string
			
			for name, ch := range sources {
				if ch == nil {
					continue // This source is already closed
				}
				
				// Try to receive from this source with a short timeout
				select {
				case event, ok := <-ch:
					if !ok {
						// Source is closed, remove it
						sources[name] = nil
						remaining--
						continue
					}
					
					// Check if this source has higher priority than current selection
					priority := priorities[name]
					if highestPriority == -1 || priority > highestPriority {
						highestPriority = priority
						selectedEvent = event
						selectedSource = name
					}
				case <-time.After(1 * time.Millisecond):
					// No event available from this source right now
					continue
				}
			}
			
			// If we found an event, try to send it
			if highestPriority != -1 {
				select {
				case multiplexed <- selectedEvent:
					fmt.Printf("Forwarded event from %s (priority %d)\n", 
						selectedSource, highestPriority)
				case <-ctx.Done():
					return
				}
			} else {
				// No events available from any source, wait a bit
				select {
				case <-time.After(10 * time.Millisecond):
				case <-ctx.Done():
					return
				}
			}
		}
	}()
	
	return multiplexed
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	// Create event sources with different priorities
	sources := make(map[string]<-chan Event)
	priorities := make(map[string]int)
	
	// High priority source (critical system events)
	sources["critical"] = EventSource(ctx, "critical", 500*time.Millisecond, 10)
	priorities["critical"] = 10
	
	// Medium priority source (user actions)
	sources["user"] = EventSource(ctx, "user", 200*time.Millisecond, 5)
	priorities["user"] = 5
	
	// Low priority source (metrics)
	sources["metrics"] = EventSource(ctx, "metrics", 100*time.Millisecond, 1)
	priorities["metrics"] = 1
	
	// Multiplex the sources with priority handling
	multiplexed := PriorityMultiplexer(ctx, sources, priorities)
	
	// Process the multiplexed events
	for event := range multiplexed {
		fmt.Printf("Processed: %s event from %s at %v\n", 
			event.Type, event.Source, event.Time.Format(time.RFC3339Nano))
	}
}

This pattern enables sophisticated event handling in distributed systems by:

  • Prioritizing critical events over less important ones
  • Implementing backpressure to prevent overwhelming consumers
  • Gracefully handling source failures
  • Efficiently multiplexing multiple event streams

Timed Channel Operations

In distributed systems, timeouts are crucial for preventing deadlocks and ensuring responsiveness:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"
)

// Result represents the outcome of a distributed operation
type Result struct {
	Value interface{}
	Err   error
}

// simulateDistributedOperation mimics a call to a remote service with variable latency
func simulateDistributedOperation(ctx context.Context, name string, minLatency, maxLatency time.Duration) <-chan Result {
	resultCh := make(chan Result, 1) // Buffered to prevent goroutine leak
	
	go func() {
		// Calculate a random latency between min and max
		latency := minLatency + time.Duration(rand.Int63n(int64(maxLatency-minLatency)))
		
		// Simulate processing
		select {
		case <-time.After(latency):
			// 10% chance of error
			if rand.Intn(10) == 0 {
				resultCh <- Result{nil, fmt.Errorf("%s operation failed", name)}
			} else {
				resultCh <- Result{fmt.Sprintf("%s result", name), nil}
			}
		case <-ctx.Done():
			resultCh <- Result{nil, ctx.Err()}
		}
		
		close(resultCh)
	}()
	
	return resultCh
}

// firstResponse returns the first successful result or the last error if all fail
func firstResponse(ctx context.Context, timeout time.Duration, operations ...func(context.Context) <-chan Result) Result {
	// Create a context with timeout
	opCtx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	
	// Create a channel for the first response
	firstCh := make(chan Result, 1)
	
	// Launch all operations
	for _, op := range operations {
		go func(operation func(context.Context) <-chan Result) {
			resultCh := operation(opCtx)
			result := <-resultCh
			
			// Only forward successful results or the last error
			if result.Err == nil {
				// Try to send the successful result, but don't block
				select {
				case firstCh <- result:
					// Successfully sent the result
					cancel() // Cancel other operations
				default:
					// Channel already has a result, do nothing
				}
			} else if ctx.Err() != nil {
				// Context was canceled, likely because another operation succeeded
				return
			} else {
				// This was an operation error, send it but don't cancel others
				select {
				case firstCh <- result:
					// Sent the error
				default:
					// Channel already has a result, do nothing
				}
			}
		}(op)
	}
	
	// Wait for the first result or timeout
	select {
	case result := <-firstCh:
		return result
	case <-ctx.Done():
		return Result{nil, ctx.Err()}
	}
}

func main() {
	// Seed the random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create a parent context
	ctx := context.Background()
	
	// Define operations with different latency profiles
	fastButUnreliable := func(ctx context.Context) <-chan Result {
		return simulateDistributedOperation(ctx, "fast-service", 50*time.Millisecond, 150*time.Millisecond)
	}
	
	mediumLatency := func(ctx context.Context) <-chan Result {
		return simulateDistributedOperation(ctx, "medium-service", 100*time.Millisecond, 300*time.Millisecond)
	}
	
	slowButReliable := func(ctx context.Context) <-chan Result {
		return simulateDistributedOperation(ctx, "slow-service", 200*time.Millisecond, 500*time.Millisecond)
	}
	
	// Try multiple operations with a timeout
	fmt.Println("Executing distributed operations with redundancy...")
	result := firstResponse(ctx, 400*time.Millisecond, fastButUnreliable, mediumLatency, slowButReliable)
	
	if result.Err != nil {
		fmt.Printf("All operations failed or timed out: %v\n", result.Err)
	} else {
		fmt.Printf("Operation succeeded with result: %v\n", result.Value)
	}
	
	// Demonstrate a more complex scenario with multiple attempts
	fmt.Println("\nExecuting with multiple attempts...")
	
	for attempt := 1; attempt <= 3; attempt++ {
		fmt.Printf("Attempt %d...\n", attempt)
		
		// Increase timeout with each attempt
		timeout := time.Duration(attempt) * 200 * time.Millisecond
		
		result = firstResponse(ctx, timeout, fastButUnreliable, mediumLatency, slowButReliable)
		
		if result.Err == nil {
			fmt.Printf("Success on attempt %d: %v\n", attempt, result.Value)
			break
		} else {
			fmt.Printf("Attempt %d failed: %v\n", attempt, result.Err)
		}
	}
}

This pattern is essential for distributed systems where:

  • Services may have variable latency
  • You need to implement redundancy across multiple services
  • Graceful degradation is required when services are slow or unavailable
  • Timeouts must be carefully managed to prevent cascading failures

Worker Pool and Pipeline Patterns

Worker pools and pipelines are powerful patterns for processing data efficiently in distributed systems.

Advanced Worker Pool with Adaptive Scaling

This implementation adjusts the number of workers based on load:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

// Job represents a unit of work
type Job struct {
	ID       int
	Payload  interface{}
	Duration time.Duration // Simulated processing time
}

// Result represents the outcome of job processing
type Result struct {
	JobID   int
	Output  interface{}
	Err     error
	Latency time.Duration
}

// AdaptiveWorkerPool implements a worker pool that scales based on load
type AdaptiveWorkerPool struct {
	jobQueue       chan Job
	resultQueue    chan Result
	workerCount    int32
	activeWorkers  int32
	maxWorkers     int32
	minWorkers     int32
	pendingJobs    int32
	mu             sync.Mutex
	stopCh         chan struct{}
	workerWg       sync.WaitGroup
	metrics        *PoolMetrics
	scaleInterval  time.Duration
	scaleThreshold float64 // Threshold for scaling (0-1)
}

// PoolMetrics tracks performance metrics for the worker pool
type PoolMetrics struct {
	totalJobs      int64
	completedJobs  int64
	failedJobs     int64
	totalLatency   int64 // in nanoseconds
	queueHighWater int32
}

// NewAdaptiveWorkerPool creates a new adaptive worker pool
func NewAdaptiveWorkerPool(ctx context.Context, minWorkers, maxWorkers, queueSize int) *AdaptiveWorkerPool {
	pool := &AdaptiveWorkerPool{
		jobQueue:       make(chan Job, queueSize),
		resultQueue:    make(chan Result, queueSize),
		minWorkers:     int32(minWorkers),
		maxWorkers:     int32(maxWorkers),
		workerCount:    0,
		activeWorkers:  0,
		pendingJobs:    0,
		stopCh:         make(chan struct{}),
		metrics:        &PoolMetrics{},
		scaleInterval:  500 * time.Millisecond,
		scaleThreshold: 0.7, // Scale up when 70% of workers are busy
	}
	
	// Start the minimum number of workers
	for i := 0; i < minWorkers; i++ {
		pool.startWorker(ctx)
	}
	
	// Start the autoscaler
	go pool.autoscaler(ctx)
	
	return pool
}

// startWorker launches a new worker goroutine
func (p *AdaptiveWorkerPool) startWorker(ctx context.Context) {
	p.workerWg.Add(1)
	atomic.AddInt32(&p.workerCount, 1)
	
	go func() {
		defer p.workerWg.Done()
		defer atomic.AddInt32(&p.workerCount, -1)
		
		for {
			select {
			case job, ok := <-p.jobQueue:
				if !ok {
					return // Channel closed
				}
				
				// Mark worker as active
				atomic.AddInt32(&p.activeWorkers, 1)
				atomic.AddInt32(&p.pendingJobs, -1)
				
				// Process the job
				startTime := time.Now()
				var result Result
				
				// Simulate processing with potential failures
				time.Sleep(job.Duration)
				
				if rand.Intn(10) < 1 { // 10% failure rate
					result = Result{
						JobID:   job.ID,
						Output:  nil,
						Err:     fmt.Errorf("processing error on job %d", job.ID),
						Latency: time.Since(startTime),
					}
					atomic.AddInt64(&p.metrics.failedJobs, 1)
				} else {
					result = Result{
						JobID:   job.ID,
						Output:  fmt.Sprintf("Processed result for job %d", job.ID),
						Err:     nil,
						Latency: time.Since(startTime),
					}
				}
				
				// Update metrics
				atomic.AddInt64(&p.metrics.completedJobs, 1)
				atomic.AddInt64(&p.metrics.totalLatency, int64(result.Latency))
				
				// Send the result
				select {
				case p.resultQueue <- result:
				case <-ctx.Done():
					return
				}
				
				// Mark worker as inactive
				atomic.AddInt32(&p.activeWorkers, -1)
				
			case <-p.stopCh:
				return
				
			case <-ctx.Done():
				return
			}
		}
	}()
}

// autoscaler adjusts the number of workers based on load
func (p *AdaptiveWorkerPool) autoscaler(ctx context.Context) {
	ticker := time.NewTicker(p.scaleInterval)
	defer ticker.Stop()
	
	for {
		select {
		case <-ticker.C:
			p.adjustWorkerCount(ctx)
		case <-p.stopCh:
			return
		case <-ctx.Done():
			return
		}
	}
}

// adjustWorkerCount scales the worker pool up or down based on current load
func (p *AdaptiveWorkerPool) adjustWorkerCount(ctx context.Context) {
	currentWorkers := atomic.LoadInt32(&p.workerCount)
	activeWorkers := atomic.LoadInt32(&p.activeWorkers)
	pendingJobs := atomic.LoadInt32(&p.pendingJobs)
	
	// Calculate utilization
	var utilization float64
	if currentWorkers > 0 {
		utilization = float64(activeWorkers) / float64(currentWorkers)
	}
	
	// Scale up if utilization is high and there are pending jobs
	if utilization >= p.scaleThreshold && pendingJobs > 0 && currentWorkers < p.maxWorkers {
		// Calculate how many workers to add (up to 25% more, at least 1)
		toAdd := max(1, int(float64(currentWorkers)*0.25))
		
		// Don't exceed max workers
		if currentWorkers+int32(toAdd) > p.maxWorkers {
			toAdd = int(p.maxWorkers - currentWorkers)
		}
		
		fmt.Printf("Scaling up: Adding %d workers (utilization: %.2f, pending: %d)\n", 
			toAdd, utilization, pendingJobs)
		
		for i := 0; i < toAdd; i++ {
			p.startWorker(ctx)
		}
	}
	
	// Scale down if utilization is low and we have more than minimum workers
	if utilization < p.scaleThreshold*0.5 && currentWorkers > p.minWorkers && pendingJobs == 0 {
		// Calculate how many workers to remove (up to 15% fewer, at least 1)
		toRemove := max(1, int(float64(currentWorkers)*0.15))
		
		// Don't go below min workers
		if currentWorkers-int32(toRemove) < p.minWorkers {
			toRemove = int(currentWorkers - p.minWorkers)
		}
		
		fmt.Printf("Scaling down: Removing %d workers (utilization: %.2f)\n", 
			toRemove, utilization)
		
		// Signal workers to stop
		for i := 0; i < toRemove; i++ {
			select {
			case p.stopCh <- struct{}{}:
			default:
				// If channel is full, we've already signaled enough workers
				break
			}
		}
	}
}

// Submit adds a job to the pool
func (p *AdaptiveWorkerPool) Submit(ctx context.Context, job Job) error {
	select {
	case p.jobQueue <- job:
		atomic.AddInt64(&p.metrics.totalJobs, 1)
		atomic.AddInt32(&p.pendingJobs, 1)
		
		// Update high water mark for queue
		pending := atomic.LoadInt32(&p.pendingJobs)
		for {
			highWater := atomic.LoadInt32(&p.metrics.queueHighWater)
			if pending <= highWater || atomic.CompareAndSwapInt32(&p.metrics.queueHighWater, highWater, pending) {
				break
			}
		}
		
		return nil
	case <-ctx.Done():
		return ctx.Err()
	default:
		return fmt.Errorf("job queue is full")
	}
}

// Results returns the channel for receiving results
func (p *AdaptiveWorkerPool) Results() <-chan Result {
	return p.resultQueue
}

// Shutdown gracefully shuts down the worker pool
func (p *AdaptiveWorkerPool) Shutdown() {
	close(p.jobQueue)
	p.workerWg.Wait()
	close(p.resultQueue)
	close(p.stopCh)
}

// GetMetrics returns the current pool metrics
func (p *AdaptiveWorkerPool) GetMetrics() PoolMetrics {
	completed := atomic.LoadInt64(&p.metrics.completedJobs)
	metrics := PoolMetrics{
		totalJobs:      atomic.LoadInt64(&p.metrics.totalJobs),
		completedJobs:  completed,
		failedJobs:     atomic.LoadInt64(&p.metrics.failedJobs),
		queueHighWater: atomic.LoadInt32(&p.metrics.queueHighWater),
	}
	
	// Calculate average latency
	if completed > 0 {
		metrics.totalLatency = atomic.LoadInt64(&p.metrics.totalLatency) / completed
	}
	
	return metrics
}

// Helper function for max of two integers
func max(a, b int) int {
	if a > b {
		return a
	}
	return b
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	
	// Create an adaptive worker pool
	pool := NewAdaptiveWorkerPool(ctx, 5, 20, 100)
	defer pool.Shutdown()
	
	// Start a goroutine to collect results
	go func() {
		for result := range pool.Results() {
			if result.Err != nil {
				fmt.Printf("Job %d failed: %v (took %v)\n", 
					result.JobID, result.Err, result.Latency)
			} else {
				fmt.Printf("Job %d completed: %v (took %v)\n", 
					result.JobID, result.Output, result.Latency)
			}
		}
	}()
	
	// Submit jobs in waves to demonstrate scaling
	for wave := 0; wave < 3; wave++ {
		fmt.Printf("\n--- Starting job wave %d ---\n", wave+1)
		
		// Submit a batch of jobs
		jobCount := 50 + wave*25
		for i := 0; i < jobCount; i++ {
			// Create jobs with variable processing times
			duration := time.Duration(50+rand.Intn(200)) * time.Millisecond
			job := Job{
				ID:       wave*1000 + i,
				Payload:  fmt.Sprintf("Job data %d", i),
				Duration: duration,
			}
			
			if err := pool.Submit(ctx, job); err != nil {
				fmt.Printf("Failed to submit job: %v\n", err)
			}
		}
		
		// Wait between waves
		time.Sleep(2 * time.Second)
		
		// Print current metrics
		metrics := pool.GetMetrics()
		fmt.Printf("\nPool metrics after wave %d:\n", wave+1)
		fmt.Printf("- Total jobs: %d\n", metrics.totalJobs)
		fmt.Printf("- Completed jobs: %d\n", metrics.completedJobs)
		fmt.Printf("- Failed jobs: %d\n", metrics.failedJobs)
		fmt.Printf("- Average latency: %v\n", time.Duration(metrics.totalLatency))
		fmt.Printf("- Queue high water: %d\n", metrics.queueHighWater)
		fmt.Printf("- Current workers: %d\n", atomic.LoadInt32(&pool.workerCount))
		fmt.Printf("- Active workers: %d\n", atomic.LoadInt32(&pool.activeWorkers))
	}
	
	// Wait for all jobs to complete
	time.Sleep(1 * time.Second)
	
	// Final metrics
	metrics := pool.GetMetrics()
	fmt.Printf("\nFinal pool metrics:\n")
	fmt.Printf("- Total jobs: %d\n", metrics.totalJobs)
	fmt.Printf("- Completed jobs: %d\n", metrics.completedJobs)
	fmt.Printf("- Failed jobs: %d\n", metrics.failedJobs)
	fmt.Printf("- Average latency: %v\n", time.Duration(metrics.totalLatency))
	fmt.Printf("- Queue high water: %d\n", metrics.queueHighWater)
}

This advanced worker pool pattern is ideal for distributed systems because it:

  • Automatically scales based on workload
  • Efficiently manages resources
  • Provides detailed metrics for monitoring
  • Handles backpressure through queue management
  • Gracefully recovers from failures

Multi-Stage Pipeline Pattern

Pipelines allow you to process data through multiple stages efficiently:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// DataItem represents a piece of data flowing through the pipeline
type DataItem struct {
	ID      int
	Data    interface{}
	Metadata map[string]interface{}
	Error   error
}

// PipelineStage represents a processing stage in the pipeline
type PipelineStage func(ctx context.Context, in <-chan DataItem) <-chan DataItem

// Pipeline represents a multi-stage data processing pipeline
type Pipeline struct {
	stages []PipelineStage
}

// NewPipeline creates a new data processing pipeline
func NewPipeline(stages ...PipelineStage) *Pipeline {
	return &Pipeline{
		stages: stages,
	}
}

// Execute runs data through the pipeline
func (p *Pipeline) Execute(ctx context.Context, source <-chan DataItem) <-chan DataItem {
	// Start with the source channel
	current := source
	
	// Apply each stage in sequence
	for _, stage := range p.stages {
		current = stage(ctx, current)
	}
	
	return current
}

// Source creates a source channel for the pipeline
func Source(ctx context.Context, items []DataItem) <-chan DataItem {
	out := make(chan DataItem)
	
	go func() {
		defer close(out)
		
		for _, item := range items {
			select {
			case out <- item:
				// Item sent successfully
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

// Example pipeline stages

// Validate checks if data items are valid
func Validate(ctx context.Context, in <-chan DataItem) <-chan DataItem {
	out := make(chan DataItem)
	
	go func() {
		defer close(out)
		
		for item := range in {
			// Skip already failed items
			if item.Error != nil {
				select {
				case out <- item:
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// Perform validation
			if item.Data == nil {
				item.Error = fmt.Errorf("invalid data: nil value")
			}
			
			// Forward the item
			select {
			case out <- item:
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

// Transform applies a transformation to data items
func Transform(ctx context.Context, in <-chan DataItem) <-chan DataItem {
	out := make(chan DataItem)
	
	go func() {
		defer close(out)
		
		for item := range in {
			// Skip already failed items
			if item.Error != nil {
				select {
				case out <- item:
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// Apply transformation
			switch v := item.Data.(type) {
			case string:
				item.Data = fmt.Sprintf("Transformed: %s", v)
			case int:
				item.Data = v * 2
			default:
				item.Error = fmt.Errorf("unsupported data type")
			}
			
			// Add metadata
			if item.Metadata == nil {
				item.Metadata = make(map[string]interface{})
			}
			item.Metadata["transformed_at"] = time.Now()
			
			// Forward the item
			select {
			case out <- item:
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

// Enrich adds additional data to items
func Enrich(ctx context.Context, in <-chan DataItem) <-chan DataItem {
	out := make(chan DataItem)
	
	go func() {
		defer close(out)
		
		for item := range in {
			// Skip already failed items
			if item.Error != nil {
				select {
				case out <- item:
				case <-ctx.Done():
					return
				}
				continue
			}
			
			// Add enrichment data
			if item.Metadata == nil {
				item.Metadata = make(map[string]interface{})
			}
			item.Metadata["enriched"] = true
			item.Metadata["enriched_at"] = time.Now()
			
			// Forward the item
			select {
			case out <- item:
			case <-ctx.Done():
				return
			}
		}
	}()
	
	return out
}

// ParallelStage processes items in parallel
func ParallelStage(workers int, processor func(DataItem) DataItem) PipelineStage {
	return func(ctx context.Context, in <-chan DataItem) <-chan DataItem {
		out := make(chan DataItem)
		
		// Start a fixed number of workers
		var wg sync.WaitGroup
		wg.Add(workers)
		
		for i := 0; i < workers; i++ {
			go func(workerID int) {
				defer wg.Done()
				
				for item := range in {
					// Skip already failed items
					if item.Error != nil {
						select {
						case out <- item:
						case <-ctx.Done():
							return
						}
						continue
					}
					
					// Process the item
					processedItem := processor(item)
					
					// Add worker metadata
					if processedItem.Metadata == nil {
						processedItem.Metadata = make(map[string]interface{})
					}
					processedItem.Metadata["worker_id"] = workerID
					
					// Forward the processed item
					select {
					case out <- processedItem:
					case <-ctx.Done():
						return
					}
				}
			}(i)
		}
		
		// Close the output channel when all workers are done
		go func() {
			wg.Wait()
			close(out)
		}()
		
		return out
	}
}

func main() {
	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	// Create sample data
	data := []DataItem{
		{ID: 1, Data: "item 1"},
		{ID: 2, Data: "item 2"},
		{ID: 3, Data: "item 3"},
		{ID: 4, Data: "item 4"},
		{ID: 5, Data: nil}, // This will fail validation
		{ID: 6, Data: "item 6"},
		{ID: 7, Data: 42}, // Different type
		{ID: 8, Data: "item 8"},
	}
	
	// Create a pipeline
	pipeline := NewPipeline(
		Validate,
		Transform,
		ParallelStage(3, func(item DataItem) DataItem {
			// Simulate processing time
			time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
			return item
		}),
		Enrich,
	)
	
	// Create a source channel
	source := Source(ctx, data)
	
	// Execute the pipeline
	results := pipeline.Execute(ctx, source)
	
	// Collect and print results
	for result := range results {
		if result.Error != nil {
			fmt.Printf("Item %d failed: %v\n", result.ID, result.Error)
		} else {
			fmt.Printf("Item %d processed: %v with metadata: %v\n", 
				result.ID, result.Data, result.Metadata)
		}
	}
}

This pipeline pattern is valuable for distributed systems because it:

  • Separates processing logic into discrete, reusable stages
  • Handles errors gracefully at each stage
  • Enables parallel processing where appropriate
  • Maintains context and metadata throughout the processing flow
  • Provides backpressure through channel buffering

Distributed Coordination Patterns

In distributed systems, coordinating activities across multiple nodes is a common challenge. Go’s concurrency primitives can be extended to implement sophisticated coordination patterns.

Distributed Mutex with etcd

This example demonstrates implementing a distributed mutex using etcd:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

// DistributedMutex wraps etcd's mutex for distributed locking
type DistributedMutex struct {
	client    *clientv3.Client
	session   *concurrency.Session
	mutex     *concurrency.Mutex
	lockPath  string
	nodeID    string
	isLocked  bool
	lockCount int
}

// NewDistributedMutex creates a new distributed mutex
func NewDistributedMutex(endpoints []string, lockPath string) (*DistributedMutex, error) {
	// Create etcd client
	client, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to create etcd client: %w", err)
	}

	// Generate a unique node ID
	hostname, err := os.Hostname()
	if err != nil {
		hostname = "unknown-host"
	}
	nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())

	// Create a session with keep-alive
	session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
	if err != nil {
		client.Close()
		return nil, fmt.Errorf("failed to create etcd session: %w", err)
	}

	// Create the mutex
	mutex := concurrency.NewMutex(session, lockPath)

	return &DistributedMutex{
		client:   client,
		session:  session,
		mutex:    mutex,
		lockPath: lockPath,
		nodeID:   nodeID,
		isLocked: false,
	}, nil
}

// Lock acquires the distributed mutex
func (dm *DistributedMutex) Lock(ctx context.Context) error {
	if dm.isLocked {
		dm.lockCount++
		return nil // Already locked by this instance
	}

	// Try to acquire the lock
	if err := dm.mutex.Lock(ctx); err != nil {
		return fmt.Errorf("failed to acquire lock: %w", err)
	}

	dm.isLocked = true
	dm.lockCount = 1
	log.Printf("Node %s acquired lock on %s", dm.nodeID, dm.lockPath)
	return nil
}

// Unlock releases the distributed mutex
func (dm *DistributedMutex) Unlock(ctx context.Context) error {
	if !dm.isLocked {
		return fmt.Errorf("mutex is not locked")
	}

	dm.lockCount--
	if dm.lockCount > 0 {
		return nil // Still held by other operations
	}

	// Release the lock
	if err := dm.mutex.Unlock(ctx); err != nil {
		return fmt.Errorf("failed to release lock: %w", err)
	}

	dm.isLocked = false
	log.Printf("Node %s released lock on %s", dm.nodeID, dm.lockPath)
	return nil
}

// Close releases resources
func (dm *DistributedMutex) Close() {
	if dm.isLocked {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		_ = dm.Unlock(ctx)
	}
	dm.session.Close()
	dm.client.Close()
}

// IsLocked returns whether this instance holds the lock
func (dm *DistributedMutex) IsLocked() bool {
	return dm.isLocked
}

// GetLockOwner returns the current owner of the lock
func (dm *DistributedMutex) GetLockOwner(ctx context.Context) (string, error) {
	resp, err := dm.client.Get(ctx, dm.lockPath, clientv3.WithPrefix())
	if err != nil {
		return "", fmt.Errorf("failed to get lock info: %w", err)
	}

	if len(resp.Kvs) == 0 {
		return "", nil // No lock owner
	}

	return string(resp.Kvs[0].Value), nil
}

func main() {
	// Connect to etcd
	endpoints := []string{"localhost:2379"}
	lockPath := "/locks/my-critical-section"

	// Create the distributed mutex
	mutex, err := NewDistributedMutex(endpoints, lockPath)
	if err != nil {
		log.Fatalf("Failed to create distributed mutex: %v", err)
	}
	defer mutex.Close()

	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// Try to acquire the lock
	log.Printf("Attempting to acquire lock...")
	if err := mutex.Lock(ctx); err != nil {
		log.Fatalf("Failed to acquire lock: %v", err)
	}

	// Simulate doing work while holding the lock
	log.Printf("Lock acquired! Performing critical section work...")
	time.Sleep(5 * time.Second)

	// Release the lock
	if err := mutex.Unlock(ctx); err != nil {
		log.Fatalf("Failed to release lock: %v", err)
	}
	log.Printf("Lock released")
}

Leader Election Pattern

Leader election is essential for coordinating distributed systems:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

// LeaderElection manages the leader election process
type LeaderElection struct {
	client     *clientv3.Client
	session    *concurrency.Session
	election   *concurrency.Election
	nodeID     string
	electionID string
	isLeader   bool
	leaderCh   chan bool
	stopCh     chan struct{}
}

// NewLeaderElection creates a new leader election instance
func NewLeaderElection(endpoints []string, electionID string) (*LeaderElection, error) {
	// Create etcd client
	client, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to create etcd client: %w", err)
	}

	// Generate a unique node ID
	hostname, err := os.Hostname()
	if err != nil {
		hostname = "unknown-host"
	}
	nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())

	// Create a session with keep-alive
	session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
	if err != nil {
		client.Close()
		return nil, fmt.Errorf("failed to create etcd session: %w", err)
	}

	// Create the election
	election := concurrency.NewElection(session, electionID)

	return &LeaderElection{
		client:     client,
		session:    session,
		election:   election,
		nodeID:     nodeID,
		electionID: electionID,
		isLeader:   false,
		leaderCh:   make(chan bool, 1),
		stopCh:     make(chan struct{}),
	}, nil
}

// Campaign starts the leader election process
func (le *LeaderElection) Campaign(ctx context.Context) error {
	// Start campaigning for leadership
	if err := le.election.Campaign(ctx, le.nodeID); err != nil {
		return fmt.Errorf("failed to campaign: %w", err)
	}

	le.isLeader = true
	le.leaderCh <- true
	log.Printf("Node %s became leader for %s", le.nodeID, le.electionID)
	return nil
}

// Resign gives up leadership
func (le *LeaderElection) Resign(ctx context.Context) error {
	if !le.isLeader {
		return nil // Not the leader
	}

	if err := le.election.Resign(ctx); err != nil {
		return fmt.Errorf("failed to resign: %w", err)
	}

	le.isLeader = false
	le.leaderCh <- false
	log.Printf("Node %s resigned leadership for %s", le.nodeID, le.electionID)
	return nil
}

// IsLeader returns whether this node is the leader
func (le *LeaderElection) IsLeader() bool {
	return le.isLeader
}

// LeaderChanges returns a channel that receives leadership change notifications
func (le *LeaderElection) LeaderChanges() <-chan bool {
	return le.leaderCh
}

// WatchLeader watches for leader changes
func (le *LeaderElection) WatchLeader(ctx context.Context) {
	go func() {
		defer close(le.leaderCh)

		for {
			select {
			case <-le.stopCh:
				return
			case <-ctx.Done():
				return
			default:
				// Get the current leader
				resp, err := le.election.Leader(ctx)
				if err != nil {
					log.Printf("Error getting leader: %v", err)
					time.Sleep(1 * time.Second)
					continue
				}

				currentLeader := string(resp.Kvs[0].Value)
				isLeader := currentLeader == le.nodeID

				// If leadership status changed, notify
				if isLeader != le.isLeader {
					le.isLeader = isLeader
					le.leaderCh <- isLeader
					if isLeader {
						log.Printf("Node %s became leader for %s", le.nodeID, le.electionID)
					} else {
						log.Printf("Node %s lost leadership for %s", le.nodeID, le.electionID)
					}
				}

				// Watch for changes
				watchCh := le.client.Watch(ctx, string(resp.Kvs[0].Key))
				select {
				case <-watchCh:
					// Leader changed, loop and check again
				case <-le.stopCh:
					return
				case <-ctx.Done():
					return
				}
			}
		}
	}()
}

// Close releases resources
func (le *LeaderElection) Close() {
	close(le.stopCh)
	if le.isLeader {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()
		_ = le.Resign(ctx)
	}
	le.session.Close()
	le.client.Close()
}

// GetCurrentLeader returns the current leader's ID
func (le *LeaderElection) GetCurrentLeader(ctx context.Context) (string, error) {
	resp, err := le.election.Leader(ctx)
	if err != nil {
		return "", fmt.Errorf("failed to get leader: %w", err)
	}

	if len(resp.Kvs) == 0 {
		return "", nil // No leader
	}

	return string(resp.Kvs[0].Value), nil
}

func main() {
	// Connect to etcd
	endpoints := []string{"localhost:2379"}
	electionID := "/elections/my-service-leader"

	// Create the leader election
	election, err := NewLeaderElection(endpoints, electionID)
	if err != nil {
		log.Fatalf("Failed to create leader election: %v", err)
	}
	defer election.Close()

	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	// Start watching for leader changes
	election.WatchLeader(ctx)

	// Start campaigning for leadership
	log.Printf("Node %s starting campaign...", election.nodeID)
	go func() {
		if err := election.Campaign(ctx); err != nil {
			log.Printf("Campaign error: %v", err)
		}
	}()

	// Handle leadership changes
	for isLeader := range election.LeaderChanges() {
		if isLeader {
			log.Printf("I am now the leader! Starting leader tasks...")
			// Perform leader-specific work
			time.Sleep(10 * time.Second)
			
			// Simulate stepping down
			log.Printf("Resigning leadership...")
			if err := election.Resign(ctx); err != nil {
				log.Printf("Error resigning: %v", err)
			}
		} else {
			log.Printf("I am now a follower. Waiting for leadership...")
			// Perform follower-specific work
			
			// After some time, campaign again
			time.Sleep(5 * time.Second)
			log.Printf("Starting new campaign...")
			go func() {
				if err := election.Campaign(ctx); err != nil {
					log.Printf("Campaign error: %v", err)
				}
			}()
		}
	}
}

Distributed Semaphore

A distributed semaphore allows limiting concurrent access across multiple nodes:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"sync"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

// DistributedSemaphore implements a semaphore that works across multiple nodes
type DistributedSemaphore struct {
	client    *clientv3.Client
	session   *concurrency.Session
	semPath   string
	nodeID    string
	count     int
	resources map[string]struct{}
	mu        sync.Mutex
}

// NewDistributedSemaphore creates a new distributed semaphore
func NewDistributedSemaphore(endpoints []string, semPath string, count int) (*DistributedSemaphore, error) {
	// Create etcd client
	client, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		return nil, fmt.Errorf("failed to create etcd client: %w", err)
	}

	// Generate a unique node ID
	hostname, err := os.Hostname()
	if err != nil {
		hostname = "unknown-host"
	}
	nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())

	// Create a session with keep-alive
	session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
	if err != nil {
		client.Close()
		return nil, fmt.Errorf("failed to create etcd session: %w", err)
	}

	return &DistributedSemaphore{
		client:    client,
		session:   session,
		semPath:   semPath,
		nodeID:    nodeID,
		count:     count,
		resources: make(map[string]struct{}),
	}, nil
}

// Acquire attempts to acquire a resource from the semaphore
func (ds *DistributedSemaphore) Acquire(ctx context.Context) (string, error) {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	// Generate a unique resource ID
	resourceID := fmt.Sprintf("%s/%s-%d", ds.semPath, ds.nodeID, time.Now().UnixNano())

	// Try to create the resource key with a lease
	_, err := ds.client.Put(ctx, resourceID, ds.nodeID, clientv3.WithLease(ds.session.Lease()))
	if err != nil {
		return "", fmt.Errorf("failed to create resource: %w", err)
	}

	// Get all resources to check if we're within the limit
	resp, err := ds.client.Get(ctx, ds.semPath, clientv3.WithPrefix())
	if err != nil {
		// Try to clean up
		_, _ = ds.client.Delete(ctx, resourceID)
		return "", fmt.Errorf("failed to get resources: %w", err)
	}

	// Check if we're within the semaphore limit
	if len(resp.Kvs) > ds.count {
		// We need to release our resource and wait
		_, _ = ds.client.Delete(ctx, resourceID)
		return "", fmt.Errorf("semaphore limit reached")
	}

	// We successfully acquired a resource
	ds.resources[resourceID] = struct{}{}
	log.Printf("Node %s acquired resource %s", ds.nodeID, resourceID)
	return resourceID, nil
}

// Release releases a previously acquired resource
func (ds *DistributedSemaphore) Release(ctx context.Context, resourceID string) error {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	// Check if we own this resource
	if _, exists := ds.resources[resourceID]; !exists {
		return fmt.Errorf("resource not owned by this semaphore instance")
	}

	// Delete the resource key
	_, err := ds.client.Delete(ctx, resourceID)
	if err != nil {
		return fmt.Errorf("failed to delete resource: %w", err)
	}

	// Remove from our local tracking
	delete(ds.resources, resourceID)
	log.Printf("Node %s released resource %s", ds.nodeID, resourceID)
	return nil
}

// TryAcquireWithTimeout attempts to acquire a resource with a timeout
func (ds *DistributedSemaphore) TryAcquireWithTimeout(ctx context.Context, timeout time.Duration) (string, error) {
	timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()

	// Try to acquire immediately first
	resourceID, err := ds.Acquire(ctx)
	if err == nil {
		return resourceID, nil
	}

	// If that fails, retry with exponential backoff
	backoff := 50 * time.Millisecond
	maxBackoff := 1 * time.Second

	for {
		select {
		case <-timeoutCtx.Done():
			return "", fmt.Errorf("timeout waiting for semaphore: %w", timeoutCtx.Err())
		case <-time.After(backoff):
			// Try again
			resourceID, err := ds.Acquire(ctx)
			if err == nil {
				return resourceID, nil
			}

			// Increase backoff for next attempt
			backoff *= 2
			if backoff > maxBackoff {
				backoff = maxBackoff
			}
		}
	}
}

// Close releases all resources and cleans up
func (ds *DistributedSemaphore) Close() {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	// Release all acquired resources
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	for resourceID := range ds.resources {
		_, _ = ds.client.Delete(ctx, resourceID)
		delete(ds.resources, resourceID)
	}

	ds.session.Close()
	ds.client.Close()
}

// GetAvailable returns the number of available resources
func (ds *DistributedSemaphore) GetAvailable(ctx context.Context) (int, error) {
	resp, err := ds.client.Get(ctx, ds.semPath, clientv3.WithPrefix())
	if err != nil {
		return 0, fmt.Errorf("failed to get resources: %w", err)
	}

	return ds.count - len(resp.Kvs), nil
}

func main() {
	// Connect to etcd
	endpoints := []string{"localhost:2379"}
	semPath := "/semaphores/connection-limit"
	maxConnections := 3

	// Create the distributed semaphore
	sem, err := NewDistributedSemaphore(endpoints, semPath, maxConnections)
	if err != nil {
		log.Fatalf("Failed to create distributed semaphore: %v", err)
	}
	defer sem.Close()

	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// Simulate multiple workers trying to acquire resources
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()

			log.Printf("Worker %d trying to acquire resource...", workerID)
			resourceID, err := sem.TryAcquireWithTimeout(ctx, 5*time.Second)
			if err != nil {
				log.Printf("Worker %d failed to acquire resource: %v", workerID, err)
				return
			}

			log.Printf("Worker %d acquired resource %s", workerID, resourceID)

			// Simulate doing work
			time.Sleep(2 * time.Second)

			// Release the resource
			if err := sem.Release(ctx, resourceID); err != nil {
				log.Printf("Worker %d failed to release resource: %v", workerID, err)
			} else {
				log.Printf("Worker %d released resource %s", workerID, resourceID)
			}
		}(i)
	}

	// Wait for all workers to finish
	wg.Wait()
}

Error Handling and Recovery Patterns

Robust error handling is crucial for distributed systems where failures are common.

Circuit Breaker Pattern

The circuit breaker pattern prevents cascading failures in distributed systems:

package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

// CircuitBreakerState represents the state of the circuit breaker
type CircuitBreakerState int

const (
	StateClosed CircuitBreakerState = iota // Normal operation, requests pass through
	StateOpen                              // Circuit is open, requests fail fast
	StateHalfOpen                          // Testing if the service is healthy again
)

// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
	name                   string
	state                  CircuitBreakerState
	failureThreshold       int64
	successThreshold       int64
	resetTimeout           time.Duration
	failureCount           int64
	successCount           int64
	lastStateChange        time.Time
	mutex                  sync.RWMutex
	onStateChange          func(name string, from, to CircuitBreakerState)
	consecutiveFailures    int64
	consecutiveSuccesses   int64
	totalRequests          int64
	totalSuccesses         int64
	totalFailures          int64
	totalTimeouts          int64
	totalShortCircuits     int64
	cumulativeResponseTime int64 // in nanoseconds
}

// CircuitBreakerOption defines a function that configures a CircuitBreaker
type CircuitBreakerOption func(*CircuitBreaker)

// WithFailureThreshold sets the threshold for failures before opening the circuit
func WithFailureThreshold(threshold int64) CircuitBrea
type PipelineStage func(ctx context.Context, in <-chan Data
kerOption {
	return func(cb *CircuitBreaker) {
		cb.failureThreshold = threshold
	}
}

// WithSuccessThreshold sets the threshold for successes before closing the circuit
func WithSuccessThreshold(threshold int64) CircuitBreakerOption {
	return func(cb *CircuitBreaker) {
		cb.successThreshold = threshold
	}
}

// WithResetTimeout sets the timeout before trying to close the circuit again
func WithResetTimeout(timeout time.Duration) CircuitBreakerOption {
	return func(cb *CircuitBreaker) {
		cb.resetTimeout = timeout
	}
}

// WithOnStateChange sets a callback for state changes
func WithOnStateChange(callback func(name string, from, to CircuitBreakerState)) CircuitBreakerOption {
	return func(cb *CircuitBreaker) {
		cb.onStateChange = callback
	}
}

// NewCircuitBreaker creates a new circuit breaker
func NewCircuitBreaker(name string, options ...CircuitBreakerOption) *CircuitBreaker {
	cb := &CircuitBreaker{
		name:             name,
		state:            StateClosed,
		failureThreshold: 5,
		successThreshold: 3,
		resetTimeout:     10 * time.Second,
		lastStateChange:  time.Now(),
	}

	// Apply options
	for _, option := range options {
		option(cb)
	}

	return cb
}

// Execute runs the given function with circuit breaker protection
func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() (interface{}, error)) (interface{}, error) {
	// Check if the circuit is open
	if !cb.allowRequest() {
		atomic.AddInt64(&cb.totalShortCircuits, 1)
		return nil, errors.New("circuit breaker is open")
	}

	// Track metrics
	atomic.AddInt64(&cb.totalRequests, 1)
	startTime := time.Now()

	// Execute the protected function
	result, err := fn()

	// Update metrics based on the result
	latency := time.Since(startTime)
	atomic.AddInt64(&cb.cumulativeResponseTime, int64(latency))

	// Check for timeout
	if errors.Is(err, context.DeadlineExceeded) {
		atomic.AddInt64(&cb.totalTimeouts, 1)
		cb.recordFailure()
		return nil, err
	}

	// Record success or failure
	if err != nil {
		atomic.AddInt64(&cb.totalFailures, 1)
		cb.recordFailure()
		return nil, err
	}

	atomic.AddInt64(&cb.totalSuccesses, 1)
	cb.recordSuccess()
	return result, nil
}

// allowRequest checks if a request should be allowed based on the circuit state
func (cb *CircuitBreaker) allowRequest() bool {
	cb.mutex.RLock()
	defer cb.mutex.RUnlock()

	switch cb.state {
	case StateClosed:
		return true
	case StateOpen:
		// Check if it's time to try again
		if time.Since(cb.lastStateChange) > cb.resetTimeout {
			// Transition to half-open
			cb.mutex.RUnlock()
			cb.transitionState(StateHalfOpen)
			cb.mutex.RLock()
			return true
		}
		return false
	case StateHalfOpen:
		// In half-open state, allow limited requests to test the service
		return atomic.LoadInt64(&cb.successCount)+atomic.LoadInt64(&cb.failureCount) < cb.successThreshold
	default:
		return true
	}
}

// recordSuccess records a successful request
func (cb *CircuitBreaker) recordSuccess() {
	cb.mutex.Lock()
	defer cb.mutex.Unlock()

	atomic.AddInt64(&cb.successCount, 1)
	atomic.StoreInt64(&cb.failureCount, 0)
	atomic.AddInt64(&cb.consecutiveSuccesses, 1)
	atomic.StoreInt64(&cb.consecutiveFailures, 0)

	// If we're in half-open state and have enough successes, close the circuit
	if cb.state == StateHalfOpen && atomic.LoadInt64(&cb.successCount) >= cb.successThreshold {
		cb.transitionState(StateClosed)
	}
}

// recordFailure records a failed request
func (cb *CircuitBreaker) recordFailure() {
	cb.mutex.Lock()
	defer cb.mutex.Unlock()

	atomic.AddInt64(&cb.failureCount, 1)
	atomic.StoreInt64(&cb.successCount, 0)
	atomic.AddInt64(&cb.consecutiveFailures, 1)
	atomic.StoreInt64(&cb.consecutiveSuccesses, 0)

	// If we have too many failures, open the circuit
	if (cb.state == StateClosed && atomic.LoadInt64(&cb.failureCount) >= cb.failureThreshold) ||
		(cb.state == StateHalfOpen && atomic.LoadInt64(&cb.failureCount) > 0) {
		cb.transitionState(StateOpen)
	}
}

// transitionState changes the state of the circuit breaker
func (cb *CircuitBreaker) transitionState(newState CircuitBreakerState) {
	oldState := cb.state
	cb.state = newState
	cb.lastStateChange = time.Now()

	// Reset counters
	atomic.StoreInt64(&cb.failureCount, 0)
	atomic.StoreInt64(&cb.successCount, 0)

	// Notify state change if callback is set
	if cb.onStateChange != nil {
		cb.onStateChange(cb.name, oldState, newState)
	}

	log.Printf("Circuit breaker %s state changed from %v to %v", cb.name, oldState, newState)
}

// GetState returns the current state of the circuit breaker
func (cb *CircuitBreaker) GetState() CircuitBreakerState {
	cb.mutex.RLock()
	defer cb.mutex.RUnlock()
	return cb.state
}

// GetMetrics returns the current metrics of the circuit breaker
func (cb *CircuitBreaker) GetMetrics() map[string]interface{} {
	return map[string]interface{}{
		"state":                  cb.GetState(),
		"total_requests":         atomic.LoadInt64(&cb.totalRequests),
		"total_successes":        atomic.LoadInt64(&cb.totalSuccesses),
		"total_failures":         atomic.LoadInt64(&cb.totalFailures),
		"total_timeouts":         atomic.LoadInt64(&cb.totalTimeouts),
		"total_short_circuits":   atomic.LoadInt64(&cb.totalShortCircuits),
		"consecutive_successes":  atomic.LoadInt64(&cb.consecutiveSuccesses),
		"consecutive_failures":   atomic.LoadInt64(&cb.consecutiveFailures),
		"average_response_time":  time.Duration(atomic.LoadInt64(&cb.cumulativeResponseTime) / max(1, atomic.LoadInt64(&cb.totalRequests))),
		"last_state_change_ago":  time.Since(cb.lastStateChange),
	}
}

// Helper function for max of two int64s
func max(a, b int64) int64 {
	if a > b {
		return a
	}
	return b
}

// simulateService simulates a remote service with variable reliability
func simulateService(ctx context.Context, serviceID string, failureRate int, latency time.Duration) (string, error) {
	// Simulate random failures
	if rand.Intn(100) < failureRate {
		return "", fmt.Errorf("service %s failed", serviceID)
	}

	// Simulate processing time
	select {
	case <-time.After(latency):
		return fmt.Sprintf("Response from %s", serviceID), nil
	case <-ctx.Done():
		return "", ctx.Err()
	}
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())

	// Create a circuit breaker
	cb := NewCircuitBreaker("example-service",
		WithFailureThreshold(3),
		WithSuccessThreshold(2),
		WithResetTimeout(5*time.Second),
		WithOnStateChange(func(name string, from, to CircuitBreakerState) {
			fmt.Printf("Circuit breaker %s changed from %v to %v\n", name, from, to)
		}),
	)

	// Simulate a series of requests
	for i := 0; i < 20; i++ {
		fmt.Printf("\nRequest %d:\n", i+1)

		// Create a context with timeout
		ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)

		// Execute the request through the circuit breaker
		result, err := cb.Execute(ctx, func() (interface{}, error) {
			// Simulate different failure rates and latencies based on the iteration
			var failureRate, latencyMs int

			if i < 5 {
				// First few requests are successful
				failureRate = 0
				latencyMs = 100
			} else if i < 10 {
				// Next few requests have high failure rate
				failureRate = 80
				latencyMs = 300
			} else if i < 15 {
				// Then we simulate a recovery
				failureRate = 0
				latencyMs = 100
			} else {
				// Finally, simulate timeouts
				failureRate = 0
				latencyMs = 1500 // This will exceed our timeout
			}

			return simulateService(ctx, "example-service", failureRate, time.Duration(latencyMs)*time.Millisecond)
		})

		cancel() // Always cancel the context

		// Print the result
		if err != nil {
			fmt.Printf("Error: %v\n", err)
		} else {
			fmt.Printf("Success: %v\n", result)
		}

		// Print current metrics
		metrics := cb.GetMetrics()
		fmt.Printf("Circuit state: %v\n", metrics["state"])
		fmt.Printf("Success/Failure: %d/%d\n", metrics["total_successes"], metrics["total_failures"])

		// Wait a bit between requests
		time.Sleep(500 * time.Millisecond)
	}
}

This circuit breaker pattern is essential for distributed systems because it:

  • Prevents cascading failures across services
  • Fails fast when a service is unhealthy
  • Allows for graceful recovery
  • Provides detailed metrics for monitoring
  • Implements sophisticated state management

Graceful Shutdown Pattern

Proper shutdown handling is crucial for maintaining data integrity:

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

// GracefulShutdown manages the graceful shutdown process
type GracefulShutdown struct {
	timeout       time.Duration
	shutdownFuncs []ShutdownFunc
	wg            sync.WaitGroup
	shutdownCh    chan struct{}
	doneCh        chan struct{}
}

// ShutdownFunc represents a function to be called during shutdown
type ShutdownFunc func(ctx context.Context) error

// NewGracefulShutdown creates a new graceful shutdown manager
func NewGracefulShutdown(timeout time.Duration) *GracefulShutdown {
	return &GracefulShutdown{
		timeout:    timeout,
		shutdownCh: make(chan struct{}),
		doneCh:     make(chan struct{}),
	}
}

// AddShutdownFunc adds a function to be called during shutdown
func (gs *GracefulShutdown) AddShutdownFunc(name string, fn ShutdownFunc) {
	gs.shutdownFuncs = append(gs.shutdownFuncs, func(ctx context.Context) error {
		log.Printf("Shutting down %s...", name)
		err := fn(ctx)
		if err != nil {
			log.Printf("Error shutting down %s: %v", name, err)
			return err
		}
		log.Printf("%s shutdown complete", name)
		return nil
	})
}

// Start begins listening for shutdown signals
func (gs *GracefulShutdown) Start() {
	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		select {
		case sig := <-signalCh:
			log.Printf("Received signal: %v", sig)
			gs.Shutdown()
		case <-gs.shutdownCh:
			// Shutdown triggered programmatically
		}
	}()
}

// Shutdown initiates the graceful shutdown process
func (gs *GracefulShutdown) Shutdown() {
	// Ensure we only shut down once
	select {
	case <-gs.shutdownCh:
		// Already shutting down
		return
	default:
		close(gs.shutdownCh)
	}

	log.Println("Starting graceful shutdown...")

	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), gs.timeout)
	defer cancel()

	// Execute all shutdown functions
	for _, fn := range gs.shutdownFuncs {
		gs.wg.Add(1)
		go func(shutdownFn ShutdownFunc) {
			defer gs.wg.Done()
			_ = shutdownFn(ctx)
		}(fn)
	}

	// Wait for all shutdown functions to complete or timeout
	shutdownComplete := make(chan struct{})
	go func() {
		gs.wg.Wait()
		close(shutdownComplete)
	}()

	select {
	case <-shutdownComplete:
		log.Println("Graceful shutdown completed successfully")
	case <-ctx.Done():
		log.Println("Graceful shutdown timed out")
	}

	close(gs.doneCh)
}

// Wait blocks until shutdown is complete
func (gs *GracefulShutdown) Wait() {
	<-gs.doneCh
}

// IsShuttingDown returns whether shutdown has been initiated
func (gs *GracefulShutdown) IsShuttingDown() bool {
	select {
	case <-gs.shutdownCh:
		return true
	default:
		return false
	}
}

// Example HTTP server with graceful shutdown
type Server struct {
	server   *http.Server
	shutdown *GracefulShutdown
}

// NewServer creates a new HTTP server with graceful shutdown
func NewServer(addr string, shutdown *GracefulShutdown) *Server {
	server := &http.Server{
		Addr: addr,
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			// Check if we're shutting down
			if shutdown.IsShuttingDown() {
				w.WriteHeader(http.StatusServiceUnavailable)
				w.Write([]byte("Server is shutting down"))
				return
			}

			// Normal request handling
			fmt.Fprintf(w, "Hello, World!")
		}),
	}

	s := &Server{
		server:   server,
		shutdown: shutdown,
	}

	// Register shutdown handler
	shutdown.AddShutdownFunc("http-server", func(ctx context.Context) error {
		return server.Shutdown(ctx)
	})

	return s
}

// Start starts the HTTP server
func (s *Server) Start() error {
	log.Printf("Starting HTTP server on %s", s.server.Addr)
	return s.server.ListenAndServe()
}

// Example worker pool with graceful shutdown
type WorkerPool struct {
	jobCh    chan string
	shutdown *GracefulShutdown
	wg       sync.WaitGroup
}

// NewWorkerPool creates a new worker pool with graceful shutdown
func NewWorkerPool(workerCount int, jobBufferSize int, shutdown *GracefulShutdown) *WorkerPool {
	wp := &WorkerPool{
		jobCh:    make(chan string, jobBufferSize),
		shutdown: shutdown,
	}

	// Start workers
	for i := 0; i < workerCount; i++ {
		wp.wg.Add(1)
		go wp.worker(i)
	}

	// Register shutdown handler
	shutdown.AddShutdownFunc("worker-pool", func(ctx context.Context) error {
		log.Println("Closing job channel...")
		close(wp.jobCh)

		// Wait for all workers to finish
		doneCh := make(chan struct{})
		go func() {
			wp.wg.Wait()
			close(doneCh)
		}()

		select {
		case <-doneCh:
			return nil
		case <-ctx.Done():
			return ctx.Err()
		}
	})

	return wp
}

// worker processes jobs
func (wp *WorkerPool) worker(id int) {
	defer wp.wg.Done()
	log.Printf("Worker %d started", id)

	for job := range wp.jobCh {
		// Check if we're shutting down
		if wp.shutdown.IsShuttingDown() {
			log.Printf("Worker %d processing final jobs before shutdown", id)
		}

		// Process the job
		log.Printf("Worker %d processing job: %s", id, job)
		time.Sleep(100 * time.Millisecond) // Simulate work
	}

	log.Printf("Worker %d stopped", id)
}

// SubmitJob submits a job to the worker pool
func (wp *WorkerPool) SubmitJob(job string) error {
	if wp.shutdown.IsShuttingDown() {
		return fmt.Errorf("worker pool is shutting down")
	}

	select {
	case wp.jobCh <- job:
		return nil
	default:
		return fmt.Errorf("job queue is full")
	}
}

func main() {
	// Create a graceful shutdown manager with 5 second timeout
	shutdown := NewGracefulShutdown(5 * time.Second)
	shutdown.Start()

	// Create and start HTTP server
	server := NewServer(":8080", shutdown)
	go func() {
		if err := server.Start(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("HTTP server error: %v", err)
		}
	}()

	// Create worker pool
	workerPool := NewWorkerPool(3, 10, shutdown)

	// Submit some jobs
	go func() {
		for i := 0; i < 20; i++ {
			job := fmt.Sprintf("Job %d", i)
			if err := workerPool.SubmitJob(job); err != nil {
				log.Printf("Failed to submit job: %v", err)
			}
			time.Sleep(200 * time.Millisecond)
		}
	}()

	// Wait for shutdown signal
	log.Println("Server is running. Press Ctrl+C to shutdown.")
	shutdown.Wait()
	log.Println("Server exited")
}

This graceful shutdown pattern is valuable for distributed systems because it:

  • Ensures in-flight operations complete before shutdown
  • Prevents data loss during service termination
  • Provides a clean shutdown sequence for dependent components
  • Implements timeout handling for stuck operations
  • Allows for health check integration during shutdown

Performance Optimization and Monitoring

Optimizing and monitoring concurrent code is essential for distributed systems.

Goroutine Leak Detection

Detecting and preventing goroutine leaks is crucial for long-running services:

package main

import (
	"context"
	"fmt"
	"log"
	"runtime"
	"sync"
	"time"
)

// LeakDetector monitors goroutine count and detects potential leaks
type LeakDetector struct {
	interval       time.Duration
	threshold      float64
	baselineCount  int
	previousCount  int
	samples        []int
	sampleSize     int
	stopCh         chan struct{}
	mu             sync.Mutex
	alertThreshold int
	onLeak         func(count int, samples []int)
}

// NewLeakDetector creates a new goroutine leak detector
func NewLeakDetector(interval time.Duration, threshold float64, sampleSize int) *LeakDetector {
	return &LeakDetector{
		interval:       interval,
		threshold:      threshold,
		samples:        make([]int, 0, sampleSize),
		sampleSize:     sampleSize,
		stopCh:         make(chan struct{}),
		alertThreshold: 1000, // Alert if more than 1000 goroutines over baseline
		onLeak: func(count int, samples []int) {
			log.Printf("WARNING: Potential goroutine leak detected! Current count: %d", count)
		},
	}
}

// Start begins monitoring for goroutine leaks
func (ld *LeakDetector) Start() {
	// Capture the baseline after a short warmup
	time.Sleep(100 * time.Millisecond)
	ld.baselineCount = runtime.NumGoroutine()
	ld.previousCount = ld.baselineCount

	log.Printf("Leak detector started with baseline of %d goroutines", ld.baselineCount)

	go func() {
		ticker := time.NewTicker(ld.interval)
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				ld.checkForLeaks()
			case <-ld.stopCh:
				return
			}
		}
	}()
}

// Stop stops the leak detector
func (ld *LeakDetector) Stop() {
	close(ld.stopCh)
}

// SetAlertThreshold sets the threshold for leak alerts
func (ld *LeakDetector) SetAlertThreshold(threshold int) {
	ld.mu.Lock()
	defer ld.mu.Unlock()
	ld.alertThreshold = threshold
}

// SetOnLeakDetected sets the callback for leak detection
func (ld *LeakDetector) SetOnLeakDetected(callback func(count int, samples []int)) {
	ld.mu.Lock()
	defer ld.mu.Unlock()
	ld.onLeak = callback
}

// checkForLeaks checks if there's a potential goroutine leak
func (ld *LeakDetector) checkForLeaks() {
	ld.mu.Lock()
	defer ld.mu.Unlock()

	currentCount := runtime.NumGoroutine()
	
	// Add to samples
	ld.samples = append(ld.samples, currentCount)
	if len(ld.samples) > ld.sampleSize {
		ld.samples = ld.samples[1:]
	}
	
	// Check for significant increase
	if currentCount > ld.baselineCount+ld.alertThreshold {
		// Check if the count is consistently increasing
		if currentCount > ld.previousCount {
			// Calculate growth rate
			growthRate := float64(currentCount-ld.previousCount) / float64(ld.previousCount)
			
			if growthRate > ld.threshold {
				if ld.onLeak != nil {
					ld.onLeak(currentCount, ld.samples)
				}
			}
		}
	}
	
	ld.previousCount = currentCount
}

// GetStats returns current goroutine statistics
func (ld *LeakDetector) GetStats() map[string]interface{} {
	ld.mu.Lock()
	defer ld.mu.Unlock()
	
	return map[string]interface{}{
		"current_count":   runtime.NumGoroutine(),
		"baseline_count":  ld.baselineCount,
		"previous_count":  ld.previousCount,
		"samples":         ld.samples,
		"alert_threshold": ld.alertThreshold,
	}
}

// simulateLeakingGoroutines creates goroutines that never terminate
func simulateLeakingGoroutines(count int) {
	for i := 0; i < count; i++ {
		go func(id int) {
			log.Printf("Leaking goroutine %d started", id)
			select {} // This goroutine will never terminate
		}(i)
	}
}

// simulateTemporaryGoroutines creates goroutines that terminate after a delay
func simulateTemporaryGoroutines(count int, duration time.Duration) {
	for i := 0; i < count; i++ {
		go func(id int) {
			log.Printf("Temporary goroutine %d started", id)
			time.Sleep(duration)
			log.Printf("Temporary goroutine %d finished", id)
		}(i)
	}
}

// dumpStacks prints all goroutine stacks for debugging
func dumpStacks() {
	buf := make([]byte, 1<<20)
	stackLen := runtime.Stack(buf, true)
	log.Printf("=== GOROUTINE DUMP ===\n%s\n=== END DUMP ===", buf[:stackLen])
}

func main() {
	// Create a leak detector
	detector := NewLeakDetector(500*time.Millisecond, 0.05, 10)
	detector.SetOnLeakDetected(func(count int, samples []int) {
		log.Printf("LEAK DETECTED: %d goroutines running (baseline: %d)", 
			count, detector.baselineCount)
		log.Printf("Recent samples: %v", samples)
		dumpStacks() // Dump stacks for debugging
	})
	
	// Start the detector
	detector.Start()
	defer detector.Stop()
	
	// Print initial stats
	log.Printf("Initial goroutine count: %d", runtime.NumGoroutine())
	
	// Simulate normal goroutine usage
	log.Println("Creating temporary goroutines...")
	simulateTemporaryGoroutines(100, 2*time.Second)
	
	// Wait a bit
	time.Sleep(3 * time.Second)
	log.Printf("Goroutine count after temporary spike: %d", runtime.NumGoroutine())
	
	// Simulate a leak
	log.Println("Simulating a goroutine leak...")
	simulateLeakingGoroutines(50)
	
	// Wait for detection
	time.Sleep(2 * time.Second)
	
	// Create more leaks to trigger detection
	log.Println("Creating more leaking goroutines...")
	simulateLeakingGoroutines(100)
	
	// Wait for detection
	time.Sleep(5 * time.Second)
	
	// Print final stats
	stats := detector.GetStats()
	log.Printf("Final stats: %+v", stats)
}

Contention Profiling

Identifying and resolving lock contention is essential for performance:

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"net/http"
	_ "net/http/pprof" // Import for profiling
	"os"
	"runtime"
	"sync"
	"time"
)

// SharedResource simulates a resource with different locking strategies
type SharedResource struct {
	name       string
	value      int
	mutex      sync.Mutex
	rwMutex    sync.RWMutex
	accessLog  []string
	logMutex   sync.Mutex
	accessCount int64
}

// UpdateWithMutex updates the resource using a standard mutex
func (r *SharedResource) UpdateWithMutex(id int, val int) {
	r.mutex.Lock()
	defer r.mutex.Unlock()
	
	// Simulate some work
	time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
	
	r.value += val
	r.logAccess(fmt.Sprintf("Writer %d updated value to %d", id, r.value))
}

// ReadWithMutex reads the resource using a standard mutex
func (r *SharedResource) ReadWithMutex(id int) int {
	r.mutex.Lock()
	defer r.mutex.Unlock()
	
	// Simulate some work
	time.Sleep(time.Duration(1+rand.Intn(2)) * time.Millisecond)
	
	r.logAccess(fmt.Sprintf("Reader %d read value %d", id, r.value))
	return r.value
}

// UpdateWithRWMutex updates the resource using a read-write mutex
func (r *SharedResource) UpdateWithRWMutex(id int, val int) {
	r.rwMutex.Lock()
	defer r.rwMutex.Unlock()
	
	// Simulate some work
	time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
	
	r.value += val
	r.logAccess(fmt.Sprintf("Writer %d updated value to %d (RW)", id, r.value))
}

// ReadWithRWMutex reads the resource using a read-write mutex
func (r *SharedResource) ReadWithRWMutex(id int) int {
	r.rwMutex.RLock()
	defer r.rwMutex.RUnlock()
	
	// Simulate some work
	time.Sleep(time.Duration(1+rand.Intn(2)) * time.Millisecond)
	
	r.logAccess(fmt.Sprintf("Reader %d read value %d (RW)", id, r.value))
	return r.value
}

// logAccess logs an access to the resource
func (r *SharedResource) logAccess(msg string) {
	r.logMutex.Lock()
	defer r.logMutex.Unlock()
	
	r.accessLog = append(r.accessLog, msg)
	if len(r.accessLog) > 100 {
		r.accessLog = r.accessLog[1:]
	}
	r.accessCount++
}

// GetStats returns statistics about the resource
func (r *SharedResource) GetStats() map[string]interface{} {
	r.logMutex.Lock()
	defer r.logMutex.Unlock()
	
	return map[string]interface{}{
		"name":         r.name,
		"value":        r.value,
		"access_count": r.accessCount,
		"recent_logs":  r.accessLog[max(0, len(r.accessLog)-5):],
	}
}

// Helper function for max of two ints
func max(a, b int) int {
	if a > b {
		return a
	}
	return b
}

// runContentionTest runs a test with different concurrency levels
func runContentionTest(ctx context.Context, useRWMutex bool, readers, writers int) {
	resource := &SharedResource{
		name: fmt.Sprintf("Resource-%v-%d-%d", useRWMutex, readers, writers),
	}
	
	log.Printf("Starting contention test with %d readers and %d writers (RWMutex: %v)",
		readers, writers, useRWMutex)
	
	// Start writers
	var wg sync.WaitGroup
	for i := 0; i < writers; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for {
				select {
				case <-ctx.Done():
					return
				default:
					// Update with
 random value
					if useRWMutex {
						resource.UpdateWithRWMutex(id, rand.Intn(10))
					} else {
						resource.UpdateWithMutex(id, rand.Intn(10))
					}
					
					time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
				}
			}
		}(i)
	}
	
	// Start readers
	for i := 0; i < readers; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			
			for {
				select {
				case <-ctx.Done():
					return
				default:
					// Read value
					if useRWMutex {
						_ = resource.ReadWithRWMutex(id)
					} else {
						_ = resource.ReadWithMutex(id)
					}
					
					time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
				}
			}
		}(i)
	}
	
	// Run for a fixed duration
	time.Sleep(5 * time.Second)
	
	// Print stats
	stats := resource.GetStats()
	log.Printf("Test results for %s:", resource.name)
	log.Printf("- Final value: %d", stats["value"])
	log.Printf("- Access count: %d", stats["access_count"])
	log.Printf("- Recent accesses: %v", stats["recent_logs"])
}

func main() {
	// Start pprof server for profiling
	go func() {
		log.Println("Starting pprof server on :6060")
		log.Println(http.ListenAndServe("localhost:6060", nil))
	}()
	
	// Set GOMAXPROCS to use all CPUs
	runtime.GOMAXPROCS(runtime.NumCPU())
	log.Printf("Running with GOMAXPROCS=%d", runtime.GOMAXPROCS(0))
	
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create a context with cancel
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	
	// Run tests with different configurations
	log.Println("=== Starting contention tests ===")
	
	// Test 1: Few readers, few writers with standard mutex
	runContentionTest(ctx, false, 5, 5)
	
	// Test 2: Few readers, few writers with RWMutex
	runContentionTest(ctx, true, 5, 5)
	
	// Test 3: Many readers, few writers with standard mutex
	runContentionTest(ctx, false, 50, 5)
	
	// Test 4: Many readers, few writers with RWMutex
	runContentionTest(ctx, true, 50, 5)
	
	log.Println("=== Contention tests complete ===")
	log.Println("To view profiling data, run: go tool pprof http://localhost:6060/debug/pprof/profile")
	log.Println("To view mutex contention: go tool pprof http://localhost:6060/debug/pprof/mutex")
	
	// Keep running for a while to allow profiling
	log.Println("Press Ctrl+C to exit")
	select {}
}

This contention profiling approach is valuable for distributed systems because it:

  • Identifies performance bottlenecks in concurrent code
  • Helps optimize lock usage for better throughput
  • Provides insights into resource contention
  • Enables data-driven decisions about synchronization strategies
  • Integrates with Go’s built-in profiling tools

Production Best Practices

When deploying concurrent Go code in production distributed systems, following these best practices can help ensure reliability and performance.

Context Propagation

Always propagate context through your entire call chain to ensure proper cancellation and timeout handling:

package main

import (
	"context"
	"fmt"
	"log"
	"time"
)

// Service represents a component in a distributed system
type Service struct {
	name        string
	dependencies []*Service
	processingTime time.Duration
}

// NewService creates a new service with dependencies
func NewService(name string, processingTime time.Duration, dependencies ...*Service) *Service {
	return &Service{
		name:           name,
		dependencies:   dependencies,
		processingTime: processingTime,
	}
}

// Process handles a request, propagating context to dependencies
func (s *Service) Process(ctx context.Context, requestID string) (string, error) {
	// Check if context is already canceled
	if ctx.Err() != nil {
		return "", ctx.Err()
	}
	
	log.Printf("[%s] Processing request %s", s.name, requestID)
	
	// Call dependencies first
	for _, dep := range s.dependencies {
		// Create a child context with timeout for the dependency call
		depCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
		defer cancel() // Always cancel to release resources
		
		result, err := dep.Process(depCtx, requestID)
		if err != nil {
			log.Printf("[%s] Dependency %s failed: %v", s.name, dep.name, err)
			return "", fmt.Errorf("dependency %s failed: %w", dep.name, err)
		}
		
		log.Printf("[%s] Dependency %s returned: %s", s.name, dep.name, result)
	}
	
	// Simulate processing time
	select {
	case <-time.After(s.processingTime):
		// Processing completed successfully
	case <-ctx.Done():
		// Context was canceled
		return "", ctx.Err()
	}
	
	response := fmt.Sprintf("Response from %s for request %s", s.name, requestID)
	log.Printf("[%s] Completed request %s", s.name, requestID)
	return response, nil
}

func main() {
	// Create a service dependency graph
	serviceD := NewService("ServiceD", 100*time.Millisecond)
	serviceE := NewService("ServiceE", 150*time.Millisecond)
	serviceB := NewService("ServiceB", 200*time.Millisecond, serviceD, serviceE)
	serviceC := NewService("ServiceC", 150*time.Millisecond, serviceE)
	serviceA := NewService("ServiceA", 100*time.Millisecond, serviceB, serviceC)
	
	// Create a root context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
	defer cancel()
	
	// Add request ID to context
	requestID := fmt.Sprintf("req-%d", time.Now().UnixNano())
	
	// Process the request
	log.Printf("Starting request %s with 800ms timeout", requestID)
	result, err := serviceA.Process(ctx, requestID)
	
	if err != nil {
		log.Printf("Request failed: %v", err)
	} else {
		log.Printf("Request succeeded: %s", result)
	}
	
	// Try another request with insufficient timeout
	log.Println("\nStarting another request with insufficient timeout")
	ctx2, cancel2 := context.WithTimeout(context.Background(), 300*time.Millisecond)
	defer cancel2()
	
	requestID2 := fmt.Sprintf("req-%d", time.Now().UnixNano())
	result2, err2 := serviceA.Process(ctx2, requestID2)
	
	if err2 != nil {
		log.Printf("Request failed as expected: %v", err2)
	} else {
		log.Printf("Request succeeded unexpectedly: %s", result2)
	}
}

Bounded Concurrency

Always limit concurrency to prevent resource exhaustion:

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"sync"
	"time"
)

// BoundedExecutor limits the number of concurrent operations
type BoundedExecutor struct {
	semaphore chan struct{}
	timeout   time.Duration
}

// NewBoundedExecutor creates a new bounded executor
func NewBoundedExecutor(maxConcurrent int, timeout time.Duration) *BoundedExecutor {
	return &BoundedExecutor{
		semaphore: make(chan struct{}, maxConcurrent),
		timeout:   timeout,
	}
}

// Execute runs the given function with bounded concurrency
func (e *BoundedExecutor) Execute(ctx context.Context, fn func() (interface{}, error)) (interface{}, error) {
	// Create a context with timeout
	execCtx, cancel := context.WithTimeout(ctx, e.timeout)
	defer cancel()
	
	// Try to acquire a semaphore slot
	select {
	case e.semaphore <- struct{}{}:
		// Acquired a slot
		defer func() { <-e.semaphore }()
	case <-execCtx.Done():
		// Couldn't acquire a slot within timeout
		return nil, fmt.Errorf("operation rejected: %w", execCtx.Err())
	}
	
	// Execute the function
	return fn()
}

// simulateOperation simulates a remote operation with variable latency
func simulateOperation(id int) (interface{}, error) {
	// Simulate random latency
	latency := 50 + rand.Intn(200)
	time.Sleep(time.Duration(latency) * time.Millisecond)
	
	// Simulate occasional failures
	if rand.Intn(10) == 0 {
		return nil, fmt.Errorf("operation %d failed", id)
	}
	
	return fmt.Sprintf("Result from operation %d", id), nil
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create a bounded executor
	executor := NewBoundedExecutor(5, 500*time.Millisecond)
	
	// Create a context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	// Track metrics
	var (
		totalOps      int
		successfulOps int
		rejectedOps   int
		failedOps     int
		mu            sync.Mutex
		wg            sync.WaitGroup
	)
	
	// Launch a bunch of operations
	for i := 0; i < 50; i++ {
		wg.Add(1)
		go func(opID int) {
			defer wg.Done()
			
			mu.Lock()
			totalOps++
			mu.Unlock()
			
			// Execute with bounded concurrency
			result, err := executor.Execute(ctx, func() (interface{}, error) {
				return simulateOperation(opID)
			})
			
			mu.Lock()
			defer mu.Unlock()
			
			if err != nil {
				if err.Error() == "operation rejected: context deadline exceeded" {
					rejectedOps++
					log.Printf("Operation %d rejected: %v", opID, err)
				} else {
					failedOps++
					log.Printf("Operation %d failed: %v", opID, err)
				}
			} else {
				successfulOps++
				log.Printf("Operation %d succeeded: %v", opID, result)
			}
		}(i)
		
		// Add some delay between operations
		time.Sleep(50 * time.Millisecond)
	}
	
	// Wait for all operations to complete
	wg.Wait()
	
	// Print summary
	log.Printf("\nSummary:")
	log.Printf("- Total operations: %d", totalOps)
	log.Printf("- Successful: %d", successfulOps)
	log.Printf("- Rejected: %d", rejectedOps)
	log.Printf("- Failed: %d", failedOps)
}

Graceful Degradation

Design systems to degrade gracefully under load:

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"
)

// ServiceLevel represents different service quality levels
type ServiceLevel int

const (
	FullService ServiceLevel = iota
	ReducedService
	MinimalService
	EmergencyService
)

// LoadManager monitors system load and adjusts service levels
type LoadManager struct {
	currentLevel     ServiceLevel
	cpuLoad          int64
	memoryUsage      int64
	requestRate      int64
	errorRate        int64
	thresholds       map[ServiceLevel]map[string]int64
	mu               sync.RWMutex
	onLevelChange    func(from, to ServiceLevel)
	degradationRules map[ServiceLevel]func()
}

// NewLoadManager creates a new load manager
func NewLoadManager() *LoadManager {
	lm := &LoadManager{
		currentLevel: FullService,
		thresholds: map[ServiceLevel]map[string]int64{
			ReducedService: {
				"cpu":         70,  // 70% CPU
				"memory":      80,  // 80% memory
				"requestRate": 1000, // 1000 req/sec
				"errorRate":   5,   // 5% errors
			},
			MinimalService: {
				"cpu":         85,
				"memory":      90,
				"requestRate": 2000,
				"errorRate":   10,
			},
			EmergencyService: {
				"cpu":         95,
				"memory":      95,
				"requestRate": 3000,
				"errorRate":   20,
			},
		},
		degradationRules: make(map[ServiceLevel]func()),
	}
	
	// Set default level change handler
	lm.onLevelChange = func(from, to ServiceLevel) {
		log.Printf("Service level changed from %v to %v", from, to)
	}
	
	return lm
}

// SetOnLevelChange sets the callback for service level changes
func (lm *LoadManager) SetOnLevelChange(callback func(from, to ServiceLevel)) {
	lm.mu.Lock()
	defer lm.mu.Unlock()
	lm.onLevelChange = callback
}

// SetDegradationRule sets the function to call when degrading to a specific level
func (lm *LoadManager) SetDegradationRule(level ServiceLevel, rule func()) {
	lm.mu.Lock()
	defer lm.mu.Unlock()
	lm.degradationRules[level] = rule
}

// UpdateMetrics updates the load metrics
func (lm *LoadManager) UpdateMetrics(cpu, memory, requestRate, errorRate int64) {
	lm.mu.Lock()
	defer lm.mu.Unlock()
	
	lm.cpuLoad = cpu
	lm.memoryUsage = memory
	lm.requestRate = requestRate
	lm.errorRate = errorRate
	
	// Check if we need to change service level
	lm.adjustServiceLevel()
}

// adjustServiceLevel changes the service level based on current metrics
func (lm *LoadManager) adjustServiceLevel() {
	// Determine appropriate service level
	var newLevel ServiceLevel
	
	if lm.cpuLoad >= lm.thresholds[EmergencyService]["cpu"] ||
		lm.memoryUsage >= lm.thresholds[EmergencyService]["memory"] ||
		lm.requestRate >= lm.thresholds[EmergencyService]["requestRate"] ||
		lm.errorRate >= lm.thresholds[EmergencyService]["errorRate"] {
		newLevel = EmergencyService
	} else if lm.cpuLoad >= lm.thresholds[MinimalService]["cpu"] ||
		lm.memoryUsage >= lm.thresholds[MinimalService]["memory"] ||
		lm.requestRate >= lm.thresholds[MinimalService]["requestRate"] ||
		lm.errorRate >= lm.thresholds[MinimalService]["errorRate"] {
		newLevel = MinimalService
	} else if lm.cpuLoad >= lm.thresholds[ReducedService]["cpu"] ||
		lm.memoryUsage >= lm.thresholds[ReducedService]["memory"] ||
		lm.requestRate >= lm.thresholds[ReducedService]["requestRate"] ||
		lm.errorRate >= lm.thresholds[ReducedService]["errorRate"] {
		newLevel = ReducedService
	} else {
		newLevel = FullService
	}
	
	// If level changed, notify and apply degradation rules
	if newLevel != lm.currentLevel {
		oldLevel := lm.currentLevel
		lm.currentLevel = newLevel
		
		// Notify about level change
		if lm.onLevelChange != nil {
			lm.onLevelChange(oldLevel, newLevel)
		}
		
		// Apply degradation rule if available
		if rule, exists := lm.degradationRules[newLevel]; exists && rule != nil {
			rule()
		}
	}
}

// GetCurrentLevel returns the current service level
func (lm *LoadManager) GetCurrentLevel() ServiceLevel {
	lm.mu.RLock()
	defer lm.mu.RUnlock()
	return lm.currentLevel
}

// GetMetrics returns the current metrics
func (lm *LoadManager) GetMetrics() map[string]int64 {
	lm.mu.RLock()
	defer lm.mu.RUnlock()
	
	return map[string]int64{
		"cpu":         lm.cpuLoad,
		"memory":      lm.memoryUsage,
		"requestRate": lm.requestRate,
		"errorRate":   lm.errorRate,
	}
}

// DegradableService demonstrates graceful degradation
type DegradableService struct {
	loadManager    *LoadManager
	workerPool     *BoundedExecutor
	cacheEnabled   bool
	retryEnabled   bool
	featureFlags   map[string]bool
	requestCounter int64
	errorCounter   int64
}

// NewDegradableService creates a new service with degradation capabilities
func NewDegradableService() *DegradableService {
	loadManager := NewLoadManager()
	service := &DegradableService{
		loadManager:  loadManager,
		workerPool:   NewBoundedExecutor(20, 1*time.Second),
		cacheEnabled: true,
		retryEnabled: true,
		featureFlags: map[string]bool{
			"analytics":     true,
			"notifications": true,
			"recommendations": true,
			"fullHistory":   true,
		},
	}
	
	// Configure degradation rules
	loadManager.SetDegradationRule(ReducedService, func() {
		log.Println("Applying REDUCED service level:")
		log.Println("- Disabling analytics")
		log.Println("- Reducing worker pool to 15")
		
		service.featureFlags["analytics"] = false
		service.workerPool = NewBoundedExecutor(15, 800*time.Millisecond)
	})
	
	loadManager.SetDegradationRule(MinimalService, func() {
		log.Println("Applying MINIMAL service level:")
		log.Println("- Disabling recommendations")
		log.Println("- Disabling notifications")
		log.Println("- Reducing worker pool to 10")
		log.Println("- Shortening timeouts")
		
		service.featureFlags["recommendations"] = false
		service.featureFlags["notifications"] = false
		service.workerPool = NewBoundedExecutor(10, 500*time.Millisecond)
	})
	
	loadManager.SetDegradationRule(EmergencyService, func() {
		log.Println("Applying EMERGENCY service level:")
		log.Println("- Disabling full history")
		log.Println("- Disabling retries")
		log.Println("- Reducing worker pool to 5")
		log.Println("- Shortening timeouts further")
		
		service.featureFlags["fullHistory"] = false
		service.retryEnabled = false
		service.workerPool = NewBoundedExecutor(5, 300*time.Millisecond)
	})
	
	return service
}

// HandleRequest processes a request with graceful degradation
func (s *DegradableService) HandleRequest(ctx context.Context, requestID string) (string, error) {
	// Increment request counter
	atomic.AddInt64(&s.requestCounter, 1)
	
	// Get current service level
	level := s.loadManager.GetCurrentLevel()
	
	// Execute with bounded concurrency
	result, err := s.workerPool.Execute(ctx, func() (interface{}, error) {
		// Simulate processing based on service level
		switch level {
		case FullService:
			// Full processing
			time.Sleep(100 * time.Millisecond)
			if s.featureFlags["analytics"] {
				// Do analytics processing
				time.Sleep(50 * time.Millisecond)
			}
			if s.featureFlags["recommendations"] {
				// Generate recommendations
				time.Sleep(50 * time.Millisecond)
			}
		case ReducedService:
			// Skip some processing
			time.Sleep(80 * time.Millisecond)
		case MinimalService:
			// Minimal processing
			time.Sleep(50 * time.Millisecond)
		case EmergencyService:
			// Critical path only
			time.Sleep(30 * time.Millisecond)
		}
		
		// Simulate occasional failures
		if rand.Intn(100) < 5 {
			atomic.AddInt64(&s.errorCounter, 1)
			return nil, fmt.Errorf("processing error")
		}
		
		return fmt.Sprintf("Response for %s (service level: %v)", requestID, level), nil
	})
	
	if err != nil {
		// Handle error based on service level
		if s.retryEnabled && level < MinimalService {
			// Retry once for less severe degradation levels
			log.Printf("Retrying request %s after error: %v", requestID, err)
			
			result, err = s.workerPool.Execute(ctx, func() (interface{}, error) {
				time.Sleep(50 * time.Millisecond)
				if rand.Intn(100) < 5 {
					atomic.AddInt64(&s.errorCounter, 1)
					return nil, fmt.Errorf("retry failed")
				}
				return fmt.Sprintf("Retry response for %s (service level: %v)", requestID, level), nil
			})
		}
		
		if err != nil {
			atomic.AddInt64(&s.errorCounter, 1)
			return "", err
		}
	}
	
	return result.(string), nil
}

// SimulateLoad generates synthetic load for the service
func (s *DegradableService) SimulateLoad(ctx context.Context, duration time.Duration) {
	// Reset counters
	atomic.StoreInt64(&s.requestCounter, 0)
	atomic.StoreInt64(&s.errorCounter, 0)
	
	// Start time
	startTime := time.Now()
	endTime := startTime.Add(duration)
	
	// Launch load generator
	var wg sync.WaitGroup
	
	// Start with low load
	go func() {
		rate := int64(50) // requests per second
		
		for time.Now().Before(endTime) {
			// Calculate how many requests to send this second
			currentRate := atomic.LoadInt64(&rate)
			interval := time.Second / time.Duration(currentRate)
			
			for i := 0; i < int(currentRate); i++ {
				wg.Add(1)
				go func(reqID string) {
					defer wg.Done()
					
					reqCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
					defer cancel()
					
					_, _ = s.HandleRequest(reqCtx, reqID)
				}(fmt.Sprintf("req-%d", time.Now().UnixNano()))
				
				time.Sleep(interval)
			}
			
			// Increase load over time
			elapsed := time.Since(startTime)
			progress := float64(elapsed) / float64(duration)
			
			// Simulate a load curve that peaks in the middle
			if progress < 0.5 {
				// Ramp up to peak
				newRate := 50 + int64(progress*2*1950) // Max 2000 req/sec at peak
				atomic.StoreInt64(&rate, newRate)
			} else {
				// Ramp down from peak
				newRate := 50 + int64((1-progress)*2*1950)
				atomic.StoreInt64(&rate, newRate)
			}
			
			// Update load metrics every second
			cpuLoad := 30 + int64(progress*70) // Simulate CPU increasing with load
			if progress > 0.5 {
				cpuLoad = 30 + int64((1-progress)*140)
			}
			
			memoryUsage := 40 + int64(progress*55) // Memory grows and stays high
			
			// Calculate request rate (requests per second)
			requestRate := atomic.LoadInt64(&s.requestCounter)
			atomic.StoreInt64(&s.requestCounter, 0)
			
			// Calculate error rate (percentage)
			errorCount := atomic.LoadInt64(&s.errorCounter)
			atomic.StoreInt64(&s.errorCounter, 0)
			
			var errorRate int64
			if requestRate > 0 {
				errorRate = (errorCount * 100) / requestRate
			}
			
			// Update load manager
			s.loadManager.UpdateMetrics(cpuLoad, memoryUsage, requestRate, errorRate)
			
			// Log current status
			level := s.loadManager.GetCurrentLevel()
			metrics := s.loadManager.GetMetrics()
			log.Printf("Load: CPU %d%%, Mem %d%%, Rate %d req/s, Errors %d%%, Level %v",
				metrics["cpu"], metrics["memory"], metrics["requestRate"], 
				metrics["errorRate"], level)
			
			// Wait for next second
			time.Sleep(1 * time.Second)
		}
	}()
	
	// Wait for load test to complete
	time.Sleep(duration)
	wg.Wait()
	
	log.Println("Load test completed")
}

func main() {
	// Seed random number generator
	rand.Seed(time.Now().UnixNano())
	
	// Create a degradable service
	service := NewDegradableService()
	
	// Create a context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	
	// Run a load test
	log.Println("Starting load test with graceful degradation...")
	service.SimulateLoad(ctx, 30*time.Second)
}

Observability Integration

Always instrument your concurrent code for proper observability:

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"net/http"
	"os"
	"runtime"
	"sync"
	"sync/atomic"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

// Metrics represents a collection of Prometheus metrics
type Metrics struct {
	requestCounter   *prometheus.CounterVec
	requestDuration  *prometheus.HistogramVec
	goroutineGauge   prometheus.Gauge
	workerPoolSize   *prometheus.GaugeVec
	queueDepth       *prometheus.GaugeVec
	errorCounter     *prometheus.CounterVec
	inFlightRequests *prometheus.GaugeVec
}

// NewMetrics creates and registers Prometheus metrics
func NewMetrics(reg prometheus.Registerer) *Metrics {
	m := &Metrics{
		requestCounter: prometheus.NewCounterVec(
			prometheus.CounterOpts{
				Name: "requests_total",
				Help: "Total number of requests processed",
			},
			[]string{"service", "endpoint", "status"},
		),
		requestDuration: prometheus.NewHistogramVec(
			prometheus.HistogramOpts{
				Name:    "request_duration_seconds",
				Help:    "Request duration in seconds",
				Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
			},
			[]string{"service", "endpoint"},
		),
		goroutineGauge: prometheus.NewGauge(
			prometheus.GaugeOpts{
				Name: "goroutines_total",
				Help: "Current number of goroutines",
			},
		),
		workerPoolSize: prometheus.NewGaugeVec(
			prometheus.GaugeOpts{
				Name: "worker_pool_size",
				Help: "Current size of worker pools",
			},
			[]string{"pool"},
		),
		queueDepth: prometheus.NewGaugeVec(
			prometheus.GaugeOpts{
				Name: "queue_depth",
				Help: "Current depth of work queues",
			},
			[]string{"queue"},
		),
		errorCounter: prometheus.NewCounterVec(
			prometheus.CounterOpts{
				Name: "errors_total",
				Help: "Total number of errors",
			},
			[]string{"service", "type"},
		),
		inFlightRequests: prometheus.NewGaugeVec(
			prometheus.GaugeOpts{
				Name: "in_flight_requests",
				Help: "Current number of in-flight requests",
			},
			[]string{"service"},
		),
	}

	// Register all metrics
	reg.MustRegister(
		m.requestCounter,
		m.requestDuration,
		m.goroutineGauge,
		m.workerPoolSize,
		m.queueDepth,
		m.errorCounter,
		m.inFlightRequests,
	)

	// Start goroutine collector
	go func() {
		for {
			m.goroutineGauge.Set(float64(runtime.NumGoroutine()))
			time.Sleep(1 * time.Second)
		}
	}()

	return m
}

// InstrumentedWorkerPool is a worker pool with metrics
type InstrumentedWorkerPool struct {
	name       string
	workers    int
	queue      chan Job
	metrics    *Metrics
	wg         sync.WaitGroup
	shutdown   chan struct{}
	processing int32
}

// Job represents a unit of work
type Job struct {
	ID       string
	Handler  func(ctx context.Context) (interface{}, error)
	Priority int
}

// NewInstrumentedWorkerPool creates a new worker pool with metrics
func NewInstrumentedWorkerPool(name string, workers, queueSize int, metrics *Metrics) *InstrumentedWorkerPool {
	pool := &InstrumentedWorkerPool{
		name:     name,
		workers:  workers,
		queue:    make(chan Job, queueSize),
		metrics:  metrics,
		shutdown: make(chan struct{}),
	}

	// Update initial metrics
	metrics.workerPoolSize.WithLabelValues(name).Set(float64(workers))
	metrics.queueDepth.WithL
abelValues(name).Set(0)

	// Start workers
	for i := 0; i < workers; i++ {
		pool.wg.Add(1)
		go pool.worker()
	}

	return pool
}

// worker processes jobs from the queue
func (p *InstrumentedWorkerPool) worker() {
	defer p.wg.Done()

	for {
		select {
		case job, ok := <-p.queue:
			if !ok {
				return // Channel closed
			}

			// Update metrics
			atomic.AddInt32(&p.processing, 1)
			p.metrics.inFlightRequests.WithLabelValues(p.name).Inc()

			// Process the job with tracing
			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
			startTime := time.Now()

			result, err := job.Handler(ctx)
			duration := time.Since(startTime)

			// Update metrics based on result
			p.metrics.requestDuration.WithLabelValues(p.name, job.ID).Observe(duration.Seconds())

			if err != nil {
				p.metrics.errorCounter.WithLabelValues(p.name, "job_error").Inc()
				p.metrics.requestCounter.WithLabelValues(p.name, job.ID, "error").Inc()
				log.Printf("[%s] Job %s failed: %v", p.name, job.ID, err)
			} else {
				p.metrics.requestCounter.WithLabelValues(p.name, job.ID, "success").Inc()
				log.Printf("[%s] Job %s succeeded: %v", p.name, job.ID, result)
			}

			// Update in-flight metrics
			atomic.AddInt32(&p.processing, -1)
			p.metrics.inFlightRequests.WithLabelValues(p.name).Dec()

			// Update queue depth metric
			p.metrics.queueDepth.WithLabelValues(p.name).Set(float64(len(p.queue)))

			cancel() // Always cancel the context

		case <-p.shutdown:
			return
		}
	}
}

// Submit adds a job to the worker pool
func (p *InstrumentedWorkerPool) Submit(job Job) error {
	select {
	case p.queue <- job:
		// Update queue depth metric
		p.metrics.queueDepth.WithLabelValues(p.name).Set(float64(len(p.queue)))
		return nil
	default:
		p.metrics.errorCounter.WithLabelValues(p.name, "queue_full").Inc()
		return fmt.Errorf("queue is full")
	}
}

// Shutdown stops the worker pool
func (p *InstrumentedWorkerPool) Shutdown() {
	close(p.shutdown)
	close(p.queue)
	p.wg.Wait()

	// Update metrics
	p.metrics.workerPoolSize.WithLabelValues(p.name).Set(0)
	p.metrics.queueDepth.WithLabelValues(p.name).Set(0)
	p.metrics.inFlightRequests.WithLabelValues(p.name).Set(0)
}

// GetMetrics returns current metrics for the pool
func (p *InstrumentedWorkerPool) GetMetrics() map[string]interface{} {
	return map[string]interface{}{
		"workers":    p.workers,
		"queue_size": len(p.queue),
		"processing": atomic.LoadInt32(&p.processing),
	}
}

func main() {
	// Create a Prometheus registry
	reg := prometheus.NewRegistry()

	// Create metrics
	metrics := NewMetrics(reg)

	// Start HTTP server for Prometheus metrics
	http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
	go func() {
		log.Println("Starting metrics server on :8080")
		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Printf("Metrics server error: %v", err)
		}
	}()

	// Create worker pools
	highPriorityPool := NewInstrumentedWorkerPool("high-priority", 5, 10, metrics)
	lowPriorityPool := NewInstrumentedWorkerPool("low-priority", 3, 20, metrics)

	// Create a context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Submit jobs to the pools
	for i := 0; i < 20; i++ {
		// Create jobs with different priorities
		highPriorityJob := Job{
			ID: fmt.Sprintf("high-%d", i),
			Handler: func(ctx context.Context) (interface{}, error) {
				// Simulate work
				time.Sleep(100 * time.Millisecond)
				
				// Simulate occasional errors
				if rand.Intn(10) == 0 {
					return nil, fmt.Errorf("high priority job failed")
				}
				
				return "high priority result", nil
			},
			Priority: 2,
		}
		
		lowPriorityJob := Job{
			ID: fmt.Sprintf("low-%d", i),
			Handler: func(ctx context.Context) (interface{}, error) {
				// Simulate work
				time.Sleep(200 * time.Millisecond)
				
				// Simulate occasional errors
				if rand.Intn(5) == 0 {
					return nil, fmt.Errorf("low priority job failed")
				}
				
				return "low priority result", nil
			},
			Priority: 1,
		}
		
		// Submit jobs
		if err := highPriorityPool.Submit(highPriorityJob); err != nil {
			log.Printf("Failed to submit high priority job: %v", err)
		}
		
		if err := lowPriorityPool.Submit(lowPriorityJob); err != nil {
			log.Printf("Failed to submit low priority job: %v", err)
		}
		
		// Add some delay between submissions
		time.Sleep(50 * time.Millisecond)
	}
	
	// Wait for jobs to complete
	time.Sleep(5 * time.Second)
	
	// Print metrics
	log.Printf("High priority pool metrics: %v", highPriorityPool.GetMetrics())
	log.Printf("Low priority pool metrics: %v", lowPriorityPool.GetMetrics())
	
	// Shutdown pools
	highPriorityPool.Shutdown()
	lowPriorityPool.Shutdown()
	
	log.Println("Worker pools shut down")
}

This observability integration approach is essential for distributed systems because it:

  • Provides real-time visibility into system behavior
  • Enables detection of performance bottlenecks
  • Facilitates capacity planning and scaling decisions
  • Helps identify and diagnose issues quickly
  • Supports data-driven optimization of concurrent code

Looking Ahead

Go’s concurrency primitives provide a solid foundation for building distributed systems, but mastering advanced patterns is essential for creating robust, scalable applications that can handle the complexities of distributed environments.

Throughout this guide, we’ve explored sophisticated concurrency patterns that address common challenges in distributed systems:

  1. Advanced Channel Patterns like fan-out/fan-in, multiplexing, and timed operations help manage complex data flows and handle variable latency in distributed environments.

  2. Worker Pool and Pipeline Patterns enable efficient processing of large workloads with adaptive scaling and staged processing, essential for handling variable load in distributed systems.

  3. Distributed Coordination Patterns such as distributed mutexes, leader election, and semaphores provide mechanisms for synchronizing activities across multiple nodes, a fundamental requirement for maintaining consistency in distributed systems.

  4. Error Handling and Recovery Patterns like circuit breakers and graceful shutdown ensure resilience in the face of failures, preventing cascading issues that can bring down entire systems.

  5. Performance Optimization and Monitoring techniques help identify bottlenecks and ensure efficient resource utilization, critical for maintaining performance at scale.

  6. Production Best Practices including context propagation, bounded concurrency, graceful degradation, and observability integration provide a framework for deploying concurrent code in production environments.

By applying these patterns appropriately, you can build distributed systems in Go that are not only concurrent but also resilient, maintainable, and performant. Remember that the key to successful concurrent programming in distributed systems is not just understanding these patterns individually, but knowing when and how to combine them to address the specific challenges of your application.

As distributed systems continue to evolve, these foundational patterns will remain relevant, providing a toolkit for solving complex coordination and synchronization problems in Go. The true art lies in selecting the right patterns for your specific use case and implementing them with careful attention to error handling, performance, and observability.

Andrew
Andrew

Andrew is a visionary software engineer and DevOps expert with a proven track record of delivering cutting-edge solutions that drive innovation at Ataiva.com. As a leader on numerous high-profile projects, Andrew brings his exceptional technical expertise and collaborative leadership skills to the table, fostering a culture of agility and excellence within the team. With a passion for architecting scalable systems, automating workflows, and empowering teams, Andrew is a sought-after authority in the field of software development and DevOps.

Tags

Recent Posts