In the realm of distributed systems, concurrency is both a powerful tool and a significant challenge. While Go’s concurrency primitives—goroutines and channels—provide an elegant foundation for concurrent programming, building robust distributed systems requires mastering advanced patterns that can handle the complexities of coordination across multiple services, nodes, and failure domains.
This comprehensive guide explores sophisticated concurrency patterns specifically designed for distributed systems in Go. We’ll move beyond basic goroutines and channels to examine advanced synchronization techniques, coordination mechanisms, error handling strategies, and performance optimization approaches that can help you build resilient, scalable distributed applications.
Advanced Channel Patterns
Channels are Go’s primary mechanism for communication between goroutines, but in distributed systems, we need to leverage more sophisticated patterns to handle complex coordination scenarios.
Fan-Out/Fan-In Pattern
The fan-out/fan-in pattern distributes work across multiple goroutines and then collects the results:
package main
import (
"context"
"fmt"
"sync"
"time"
)
// Task represents a unit of work to be processed
type Task struct {
ID int
Input string
Result string
Err error
}
// fanOut distributes tasks to multiple worker goroutines
func fanOut(ctx context.Context, tasks []Task, workerCount int) <-chan Task {
tasksCh := make(chan Task)
go func() {
defer close(tasksCh)
for _, task := range tasks {
select {
case tasksCh <- task:
case <-ctx.Done():
return
}
}
}()
return tasksCh
}
// processTask simulates processing a task with potential failures
func processTask(ctx context.Context, task Task) Task {
// Simulate processing time
select {
case <-time.After(100 * time.Millisecond):
task.Result = fmt.Sprintf("Processed: %s", task.Input)
return task
case <-ctx.Done():
task.Err = ctx.Err()
return task
}
}
// worker processes tasks from the input channel and sends results to the output channel
func worker(ctx context.Context, id int, tasksCh <-chan Task, resultsCh chan<- Task) {
for task := range tasksCh {
select {
case <-ctx.Done():
return
default:
task.Result = fmt.Sprintf("Worker %d processed: %s", id, task.Input)
// Simulate occasional failures
if task.ID%7 == 0 {
task.Err = fmt.Errorf("processing error on task %d", task.ID)
task.Result = ""
}
// Send the result
select {
case resultsCh <- task:
case <-ctx.Done():
return
}
}
}
}
// fanIn collects results from multiple workers into a single channel
func fanIn(ctx context.Context, workerCount int, resultsCh chan Task) <-chan Task {
multiplexedCh := make(chan Task)
var wg sync.WaitGroup
wg.Add(workerCount)
// Start a goroutine for each worker to collect results
for i := 0; i < workerCount; i++ {
go func() {
defer wg.Done()
for {
select {
case result, ok := <-resultsCh:
if !ok {
return
}
select {
case multiplexedCh <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
}
// Close the multiplexed channel once all workers are done
go func() {
wg.Wait()
close(multiplexedCh)
}()
return multiplexedCh
}
// distributeAndProcess implements the complete fan-out/fan-in pattern
func distributeAndProcess(ctx context.Context, tasks []Task, workerCount int) []Task {
// Create a buffered channel for results to prevent blocking
resultsCh := make(chan Task, len(tasks))
// Fan out: distribute tasks to workers
tasksCh := fanOut(ctx, tasks, workerCount)
// Start workers
var wg sync.WaitGroup
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func(workerID int) {
defer wg.Done()
worker(ctx, workerID, tasksCh, resultsCh)
}(i)
}
// Close results channel when all workers are done
go func() {
wg.Wait()
close(resultsCh)
}()
// Fan in: collect results
collectedResults := make([]Task, 0, len(tasks))
for result := range resultsCh {
collectedResults = append(collectedResults, result)
}
return collectedResults
}
func main() {
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Generate sample tasks
tasks := make([]Task, 20)
for i := 0; i < 20; i++ {
tasks[i] = Task{
ID: i,
Input: fmt.Sprintf("Task %d input", i),
}
}
// Process tasks using fan-out/fan-in pattern
results := distributeAndProcess(ctx, tasks, 5)
// Print results
fmt.Println("Results:")
successCount := 0
failureCount := 0
for _, result := range results {
if result.Err != nil {
fmt.Printf("Task %d failed: %v\n", result.ID, result.Err)
failureCount++
} else {
fmt.Printf("Task %d succeeded: %s\n", result.ID, result.Result)
successCount++
}
}
fmt.Printf("\nSummary: %d succeeded, %d failed\n", successCount, failureCount)
}
This pattern is particularly useful for distributed systems where you need to:
- Process a large number of tasks in parallel
- Handle failures gracefully
- Collect and aggregate results efficiently
- Implement backpressure mechanisms
Multiplexing with Select
In distributed systems, you often need to coordinate multiple channels with different priorities and timeouts:
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
// Event represents a message in our system
type Event struct {
Source string
Type string
Payload interface{}
Time time.Time
}
// EventSource generates events from different parts of a distributed system
func EventSource(ctx context.Context, name string, interval time.Duration, priority int) <-chan Event {
ch := make(chan Event)
go func() {
defer close(ch)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Generate an event
event := Event{
Source: name,
Type: fmt.Sprintf("event-type-%d", rand.Intn(3)),
Payload: fmt.Sprintf("data from %s", name),
Time: time.Now(),
}
// Try to send the event
select {
case ch <- event:
// Event sent successfully
case <-ctx.Done():
return
case <-time.After(50 * time.Millisecond):
// Couldn't send within timeout, log and continue
fmt.Printf("Warning: Dropped event from %s due to backpressure\n", name)
}
case <-ctx.Done():
return
}
}
}()
return ch
}
// PriorityMultiplexer combines multiple event sources with priority handling
func PriorityMultiplexer(ctx context.Context, sources map[string]<-chan Event, priorities map[string]int) <-chan Event {
multiplexed := make(chan Event)
go func() {
defer close(multiplexed)
// Keep track of active sources
remaining := len(sources)
// Create a case for each source
for remaining > 0 {
// Find the highest priority source with available events
var highestPriority int = -1
var selectedEvent Event
var selectedSource string
for name, ch := range sources {
if ch == nil {
continue // This source is already closed
}
// Try to receive from this source with a short timeout
select {
case event, ok := <-ch:
if !ok {
// Source is closed, remove it
sources[name] = nil
remaining--
continue
}
// Check if this source has higher priority than current selection
priority := priorities[name]
if highestPriority == -1 || priority > highestPriority {
highestPriority = priority
selectedEvent = event
selectedSource = name
}
case <-time.After(1 * time.Millisecond):
// No event available from this source right now
continue
}
}
// If we found an event, try to send it
if highestPriority != -1 {
select {
case multiplexed <- selectedEvent:
fmt.Printf("Forwarded event from %s (priority %d)\n",
selectedSource, highestPriority)
case <-ctx.Done():
return
}
} else {
// No events available from any source, wait a bit
select {
case <-time.After(10 * time.Millisecond):
case <-ctx.Done():
return
}
}
}
}()
return multiplexed
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create event sources with different priorities
sources := make(map[string]<-chan Event)
priorities := make(map[string]int)
// High priority source (critical system events)
sources["critical"] = EventSource(ctx, "critical", 500*time.Millisecond, 10)
priorities["critical"] = 10
// Medium priority source (user actions)
sources["user"] = EventSource(ctx, "user", 200*time.Millisecond, 5)
priorities["user"] = 5
// Low priority source (metrics)
sources["metrics"] = EventSource(ctx, "metrics", 100*time.Millisecond, 1)
priorities["metrics"] = 1
// Multiplex the sources with priority handling
multiplexed := PriorityMultiplexer(ctx, sources, priorities)
// Process the multiplexed events
for event := range multiplexed {
fmt.Printf("Processed: %s event from %s at %v\n",
event.Type, event.Source, event.Time.Format(time.RFC3339Nano))
}
}
This pattern enables sophisticated event handling in distributed systems by:
- Prioritizing critical events over less important ones
- Implementing backpressure to prevent overwhelming consumers
- Gracefully handling source failures
- Efficiently multiplexing multiple event streams
Timed Channel Operations
In distributed systems, timeouts are crucial for preventing deadlocks and ensuring responsiveness:
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
// Result represents the outcome of a distributed operation
type Result struct {
Value interface{}
Err error
}
// simulateDistributedOperation mimics a call to a remote service with variable latency
func simulateDistributedOperation(ctx context.Context, name string, minLatency, maxLatency time.Duration) <-chan Result {
resultCh := make(chan Result, 1) // Buffered to prevent goroutine leak
go func() {
// Calculate a random latency between min and max
latency := minLatency + time.Duration(rand.Int63n(int64(maxLatency-minLatency)))
// Simulate processing
select {
case <-time.After(latency):
// 10% chance of error
if rand.Intn(10) == 0 {
resultCh <- Result{nil, fmt.Errorf("%s operation failed", name)}
} else {
resultCh <- Result{fmt.Sprintf("%s result", name), nil}
}
case <-ctx.Done():
resultCh <- Result{nil, ctx.Err()}
}
close(resultCh)
}()
return resultCh
}
// firstResponse returns the first successful result or the last error if all fail
func firstResponse(ctx context.Context, timeout time.Duration, operations ...func(context.Context) <-chan Result) Result {
// Create a context with timeout
opCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// Create a channel for the first response
firstCh := make(chan Result, 1)
// Launch all operations
for _, op := range operations {
go func(operation func(context.Context) <-chan Result) {
resultCh := operation(opCtx)
result := <-resultCh
// Only forward successful results or the last error
if result.Err == nil {
// Try to send the successful result, but don't block
select {
case firstCh <- result:
// Successfully sent the result
cancel() // Cancel other operations
default:
// Channel already has a result, do nothing
}
} else if ctx.Err() != nil {
// Context was canceled, likely because another operation succeeded
return
} else {
// This was an operation error, send it but don't cancel others
select {
case firstCh <- result:
// Sent the error
default:
// Channel already has a result, do nothing
}
}
}(op)
}
// Wait for the first result or timeout
select {
case result := <-firstCh:
return result
case <-ctx.Done():
return Result{nil, ctx.Err()}
}
}
func main() {
// Seed the random number generator
rand.Seed(time.Now().UnixNano())
// Create a parent context
ctx := context.Background()
// Define operations with different latency profiles
fastButUnreliable := func(ctx context.Context) <-chan Result {
return simulateDistributedOperation(ctx, "fast-service", 50*time.Millisecond, 150*time.Millisecond)
}
mediumLatency := func(ctx context.Context) <-chan Result {
return simulateDistributedOperation(ctx, "medium-service", 100*time.Millisecond, 300*time.Millisecond)
}
slowButReliable := func(ctx context.Context) <-chan Result {
return simulateDistributedOperation(ctx, "slow-service", 200*time.Millisecond, 500*time.Millisecond)
}
// Try multiple operations with a timeout
fmt.Println("Executing distributed operations with redundancy...")
result := firstResponse(ctx, 400*time.Millisecond, fastButUnreliable, mediumLatency, slowButReliable)
if result.Err != nil {
fmt.Printf("All operations failed or timed out: %v\n", result.Err)
} else {
fmt.Printf("Operation succeeded with result: %v\n", result.Value)
}
// Demonstrate a more complex scenario with multiple attempts
fmt.Println("\nExecuting with multiple attempts...")
for attempt := 1; attempt <= 3; attempt++ {
fmt.Printf("Attempt %d...\n", attempt)
// Increase timeout with each attempt
timeout := time.Duration(attempt) * 200 * time.Millisecond
result = firstResponse(ctx, timeout, fastButUnreliable, mediumLatency, slowButReliable)
if result.Err == nil {
fmt.Printf("Success on attempt %d: %v\n", attempt, result.Value)
break
} else {
fmt.Printf("Attempt %d failed: %v\n", attempt, result.Err)
}
}
}
This pattern is essential for distributed systems where:
- Services may have variable latency
- You need to implement redundancy across multiple services
- Graceful degradation is required when services are slow or unavailable
- Timeouts must be carefully managed to prevent cascading failures
Worker Pool and Pipeline Patterns
Worker pools and pipelines are powerful patterns for processing data efficiently in distributed systems.
Advanced Worker Pool with Adaptive Scaling
This implementation adjusts the number of workers based on load:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// Job represents a unit of work
type Job struct {
ID int
Payload interface{}
Duration time.Duration // Simulated processing time
}
// Result represents the outcome of job processing
type Result struct {
JobID int
Output interface{}
Err error
Latency time.Duration
}
// AdaptiveWorkerPool implements a worker pool that scales based on load
type AdaptiveWorkerPool struct {
jobQueue chan Job
resultQueue chan Result
workerCount int32
activeWorkers int32
maxWorkers int32
minWorkers int32
pendingJobs int32
mu sync.Mutex
stopCh chan struct{}
workerWg sync.WaitGroup
metrics *PoolMetrics
scaleInterval time.Duration
scaleThreshold float64 // Threshold for scaling (0-1)
}
// PoolMetrics tracks performance metrics for the worker pool
type PoolMetrics struct {
totalJobs int64
completedJobs int64
failedJobs int64
totalLatency int64 // in nanoseconds
queueHighWater int32
}
// NewAdaptiveWorkerPool creates a new adaptive worker pool
func NewAdaptiveWorkerPool(ctx context.Context, minWorkers, maxWorkers, queueSize int) *AdaptiveWorkerPool {
pool := &AdaptiveWorkerPool{
jobQueue: make(chan Job, queueSize),
resultQueue: make(chan Result, queueSize),
minWorkers: int32(minWorkers),
maxWorkers: int32(maxWorkers),
workerCount: 0,
activeWorkers: 0,
pendingJobs: 0,
stopCh: make(chan struct{}),
metrics: &PoolMetrics{},
scaleInterval: 500 * time.Millisecond,
scaleThreshold: 0.7, // Scale up when 70% of workers are busy
}
// Start the minimum number of workers
for i := 0; i < minWorkers; i++ {
pool.startWorker(ctx)
}
// Start the autoscaler
go pool.autoscaler(ctx)
return pool
}
// startWorker launches a new worker goroutine
func (p *AdaptiveWorkerPool) startWorker(ctx context.Context) {
p.workerWg.Add(1)
atomic.AddInt32(&p.workerCount, 1)
go func() {
defer p.workerWg.Done()
defer atomic.AddInt32(&p.workerCount, -1)
for {
select {
case job, ok := <-p.jobQueue:
if !ok {
return // Channel closed
}
// Mark worker as active
atomic.AddInt32(&p.activeWorkers, 1)
atomic.AddInt32(&p.pendingJobs, -1)
// Process the job
startTime := time.Now()
var result Result
// Simulate processing with potential failures
time.Sleep(job.Duration)
if rand.Intn(10) < 1 { // 10% failure rate
result = Result{
JobID: job.ID,
Output: nil,
Err: fmt.Errorf("processing error on job %d", job.ID),
Latency: time.Since(startTime),
}
atomic.AddInt64(&p.metrics.failedJobs, 1)
} else {
result = Result{
JobID: job.ID,
Output: fmt.Sprintf("Processed result for job %d", job.ID),
Err: nil,
Latency: time.Since(startTime),
}
}
// Update metrics
atomic.AddInt64(&p.metrics.completedJobs, 1)
atomic.AddInt64(&p.metrics.totalLatency, int64(result.Latency))
// Send the result
select {
case p.resultQueue <- result:
case <-ctx.Done():
return
}
// Mark worker as inactive
atomic.AddInt32(&p.activeWorkers, -1)
case <-p.stopCh:
return
case <-ctx.Done():
return
}
}
}()
}
// autoscaler adjusts the number of workers based on load
func (p *AdaptiveWorkerPool) autoscaler(ctx context.Context) {
ticker := time.NewTicker(p.scaleInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.adjustWorkerCount(ctx)
case <-p.stopCh:
return
case <-ctx.Done():
return
}
}
}
// adjustWorkerCount scales the worker pool up or down based on current load
func (p *AdaptiveWorkerPool) adjustWorkerCount(ctx context.Context) {
currentWorkers := atomic.LoadInt32(&p.workerCount)
activeWorkers := atomic.LoadInt32(&p.activeWorkers)
pendingJobs := atomic.LoadInt32(&p.pendingJobs)
// Calculate utilization
var utilization float64
if currentWorkers > 0 {
utilization = float64(activeWorkers) / float64(currentWorkers)
}
// Scale up if utilization is high and there are pending jobs
if utilization >= p.scaleThreshold && pendingJobs > 0 && currentWorkers < p.maxWorkers {
// Calculate how many workers to add (up to 25% more, at least 1)
toAdd := max(1, int(float64(currentWorkers)*0.25))
// Don't exceed max workers
if currentWorkers+int32(toAdd) > p.maxWorkers {
toAdd = int(p.maxWorkers - currentWorkers)
}
fmt.Printf("Scaling up: Adding %d workers (utilization: %.2f, pending: %d)\n",
toAdd, utilization, pendingJobs)
for i := 0; i < toAdd; i++ {
p.startWorker(ctx)
}
}
// Scale down if utilization is low and we have more than minimum workers
if utilization < p.scaleThreshold*0.5 && currentWorkers > p.minWorkers && pendingJobs == 0 {
// Calculate how many workers to remove (up to 15% fewer, at least 1)
toRemove := max(1, int(float64(currentWorkers)*0.15))
// Don't go below min workers
if currentWorkers-int32(toRemove) < p.minWorkers {
toRemove = int(currentWorkers - p.minWorkers)
}
fmt.Printf("Scaling down: Removing %d workers (utilization: %.2f)\n",
toRemove, utilization)
// Signal workers to stop
for i := 0; i < toRemove; i++ {
select {
case p.stopCh <- struct{}{}:
default:
// If channel is full, we've already signaled enough workers
break
}
}
}
}
// Submit adds a job to the pool
func (p *AdaptiveWorkerPool) Submit(ctx context.Context, job Job) error {
select {
case p.jobQueue <- job:
atomic.AddInt64(&p.metrics.totalJobs, 1)
atomic.AddInt32(&p.pendingJobs, 1)
// Update high water mark for queue
pending := atomic.LoadInt32(&p.pendingJobs)
for {
highWater := atomic.LoadInt32(&p.metrics.queueHighWater)
if pending <= highWater || atomic.CompareAndSwapInt32(&p.metrics.queueHighWater, highWater, pending) {
break
}
}
return nil
case <-ctx.Done():
return ctx.Err()
default:
return fmt.Errorf("job queue is full")
}
}
// Results returns the channel for receiving results
func (p *AdaptiveWorkerPool) Results() <-chan Result {
return p.resultQueue
}
// Shutdown gracefully shuts down the worker pool
func (p *AdaptiveWorkerPool) Shutdown() {
close(p.jobQueue)
p.workerWg.Wait()
close(p.resultQueue)
close(p.stopCh)
}
// GetMetrics returns the current pool metrics
func (p *AdaptiveWorkerPool) GetMetrics() PoolMetrics {
completed := atomic.LoadInt64(&p.metrics.completedJobs)
metrics := PoolMetrics{
totalJobs: atomic.LoadInt64(&p.metrics.totalJobs),
completedJobs: completed,
failedJobs: atomic.LoadInt64(&p.metrics.failedJobs),
queueHighWater: atomic.LoadInt32(&p.metrics.queueHighWater),
}
// Calculate average latency
if completed > 0 {
metrics.totalLatency = atomic.LoadInt64(&p.metrics.totalLatency) / completed
}
return metrics
}
// Helper function for max of two integers
func max(a, b int) int {
if a > b {
return a
}
return b
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Create an adaptive worker pool
pool := NewAdaptiveWorkerPool(ctx, 5, 20, 100)
defer pool.Shutdown()
// Start a goroutine to collect results
go func() {
for result := range pool.Results() {
if result.Err != nil {
fmt.Printf("Job %d failed: %v (took %v)\n",
result.JobID, result.Err, result.Latency)
} else {
fmt.Printf("Job %d completed: %v (took %v)\n",
result.JobID, result.Output, result.Latency)
}
}
}()
// Submit jobs in waves to demonstrate scaling
for wave := 0; wave < 3; wave++ {
fmt.Printf("\n--- Starting job wave %d ---\n", wave+1)
// Submit a batch of jobs
jobCount := 50 + wave*25
for i := 0; i < jobCount; i++ {
// Create jobs with variable processing times
duration := time.Duration(50+rand.Intn(200)) * time.Millisecond
job := Job{
ID: wave*1000 + i,
Payload: fmt.Sprintf("Job data %d", i),
Duration: duration,
}
if err := pool.Submit(ctx, job); err != nil {
fmt.Printf("Failed to submit job: %v\n", err)
}
}
// Wait between waves
time.Sleep(2 * time.Second)
// Print current metrics
metrics := pool.GetMetrics()
fmt.Printf("\nPool metrics after wave %d:\n", wave+1)
fmt.Printf("- Total jobs: %d\n", metrics.totalJobs)
fmt.Printf("- Completed jobs: %d\n", metrics.completedJobs)
fmt.Printf("- Failed jobs: %d\n", metrics.failedJobs)
fmt.Printf("- Average latency: %v\n", time.Duration(metrics.totalLatency))
fmt.Printf("- Queue high water: %d\n", metrics.queueHighWater)
fmt.Printf("- Current workers: %d\n", atomic.LoadInt32(&pool.workerCount))
fmt.Printf("- Active workers: %d\n", atomic.LoadInt32(&pool.activeWorkers))
}
// Wait for all jobs to complete
time.Sleep(1 * time.Second)
// Final metrics
metrics := pool.GetMetrics()
fmt.Printf("\nFinal pool metrics:\n")
fmt.Printf("- Total jobs: %d\n", metrics.totalJobs)
fmt.Printf("- Completed jobs: %d\n", metrics.completedJobs)
fmt.Printf("- Failed jobs: %d\n", metrics.failedJobs)
fmt.Printf("- Average latency: %v\n", time.Duration(metrics.totalLatency))
fmt.Printf("- Queue high water: %d\n", metrics.queueHighWater)
}
This advanced worker pool pattern is ideal for distributed systems because it:
- Automatically scales based on workload
- Efficiently manages resources
- Provides detailed metrics for monitoring
- Handles backpressure through queue management
- Gracefully recovers from failures
Multi-Stage Pipeline Pattern
Pipelines allow you to process data through multiple stages efficiently:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// DataItem represents a piece of data flowing through the pipeline
type DataItem struct {
ID int
Data interface{}
Metadata map[string]interface{}
Error error
}
// PipelineStage represents a processing stage in the pipeline
type PipelineStage func(ctx context.Context, in <-chan DataItem) <-chan DataItem
// Pipeline represents a multi-stage data processing pipeline
type Pipeline struct {
stages []PipelineStage
}
// NewPipeline creates a new data processing pipeline
func NewPipeline(stages ...PipelineStage) *Pipeline {
return &Pipeline{
stages: stages,
}
}
// Execute runs data through the pipeline
func (p *Pipeline) Execute(ctx context.Context, source <-chan DataItem) <-chan DataItem {
// Start with the source channel
current := source
// Apply each stage in sequence
for _, stage := range p.stages {
current = stage(ctx, current)
}
return current
}
// Source creates a source channel for the pipeline
func Source(ctx context.Context, items []DataItem) <-chan DataItem {
out := make(chan DataItem)
go func() {
defer close(out)
for _, item := range items {
select {
case out <- item:
// Item sent successfully
case <-ctx.Done():
return
}
}
}()
return out
}
// Example pipeline stages
// Validate checks if data items are valid
func Validate(ctx context.Context, in <-chan DataItem) <-chan DataItem {
out := make(chan DataItem)
go func() {
defer close(out)
for item := range in {
// Skip already failed items
if item.Error != nil {
select {
case out <- item:
case <-ctx.Done():
return
}
continue
}
// Perform validation
if item.Data == nil {
item.Error = fmt.Errorf("invalid data: nil value")
}
// Forward the item
select {
case out <- item:
case <-ctx.Done():
return
}
}
}()
return out
}
// Transform applies a transformation to data items
func Transform(ctx context.Context, in <-chan DataItem) <-chan DataItem {
out := make(chan DataItem)
go func() {
defer close(out)
for item := range in {
// Skip already failed items
if item.Error != nil {
select {
case out <- item:
case <-ctx.Done():
return
}
continue
}
// Apply transformation
switch v := item.Data.(type) {
case string:
item.Data = fmt.Sprintf("Transformed: %s", v)
case int:
item.Data = v * 2
default:
item.Error = fmt.Errorf("unsupported data type")
}
// Add metadata
if item.Metadata == nil {
item.Metadata = make(map[string]interface{})
}
item.Metadata["transformed_at"] = time.Now()
// Forward the item
select {
case out <- item:
case <-ctx.Done():
return
}
}
}()
return out
}
// Enrich adds additional data to items
func Enrich(ctx context.Context, in <-chan DataItem) <-chan DataItem {
out := make(chan DataItem)
go func() {
defer close(out)
for item := range in {
// Skip already failed items
if item.Error != nil {
select {
case out <- item:
case <-ctx.Done():
return
}
continue
}
// Add enrichment data
if item.Metadata == nil {
item.Metadata = make(map[string]interface{})
}
item.Metadata["enriched"] = true
item.Metadata["enriched_at"] = time.Now()
// Forward the item
select {
case out <- item:
case <-ctx.Done():
return
}
}
}()
return out
}
// ParallelStage processes items in parallel
func ParallelStage(workers int, processor func(DataItem) DataItem) PipelineStage {
return func(ctx context.Context, in <-chan DataItem) <-chan DataItem {
out := make(chan DataItem)
// Start a fixed number of workers
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
go func(workerID int) {
defer wg.Done()
for item := range in {
// Skip already failed items
if item.Error != nil {
select {
case out <- item:
case <-ctx.Done():
return
}
continue
}
// Process the item
processedItem := processor(item)
// Add worker metadata
if processedItem.Metadata == nil {
processedItem.Metadata = make(map[string]interface{})
}
processedItem.Metadata["worker_id"] = workerID
// Forward the processed item
select {
case out <- processedItem:
case <-ctx.Done():
return
}
}
}(i)
}
// Close the output channel when all workers are done
go func() {
wg.Wait()
close(out)
}()
return out
}
}
func main() {
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create sample data
data := []DataItem{
{ID: 1, Data: "item 1"},
{ID: 2, Data: "item 2"},
{ID: 3, Data: "item 3"},
{ID: 4, Data: "item 4"},
{ID: 5, Data: nil}, // This will fail validation
{ID: 6, Data: "item 6"},
{ID: 7, Data: 42}, // Different type
{ID: 8, Data: "item 8"},
}
// Create a pipeline
pipeline := NewPipeline(
Validate,
Transform,
ParallelStage(3, func(item DataItem) DataItem {
// Simulate processing time
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return item
}),
Enrich,
)
// Create a source channel
source := Source(ctx, data)
// Execute the pipeline
results := pipeline.Execute(ctx, source)
// Collect and print results
for result := range results {
if result.Error != nil {
fmt.Printf("Item %d failed: %v\n", result.ID, result.Error)
} else {
fmt.Printf("Item %d processed: %v with metadata: %v\n",
result.ID, result.Data, result.Metadata)
}
}
}
This pipeline pattern is valuable for distributed systems because it:
- Separates processing logic into discrete, reusable stages
- Handles errors gracefully at each stage
- Enables parallel processing where appropriate
- Maintains context and metadata throughout the processing flow
- Provides backpressure through channel buffering
Distributed Coordination Patterns
In distributed systems, coordinating activities across multiple nodes is a common challenge. Go’s concurrency primitives can be extended to implement sophisticated coordination patterns.
Distributed Mutex with etcd
This example demonstrates implementing a distributed mutex using etcd:
package main
import (
"context"
"fmt"
"log"
"os"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
// DistributedMutex wraps etcd's mutex for distributed locking
type DistributedMutex struct {
client *clientv3.Client
session *concurrency.Session
mutex *concurrency.Mutex
lockPath string
nodeID string
isLocked bool
lockCount int
}
// NewDistributedMutex creates a new distributed mutex
func NewDistributedMutex(endpoints []string, lockPath string) (*DistributedMutex, error) {
// Create etcd client
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
// Generate a unique node ID
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown-host"
}
nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())
// Create a session with keep-alive
session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
if err != nil {
client.Close()
return nil, fmt.Errorf("failed to create etcd session: %w", err)
}
// Create the mutex
mutex := concurrency.NewMutex(session, lockPath)
return &DistributedMutex{
client: client,
session: session,
mutex: mutex,
lockPath: lockPath,
nodeID: nodeID,
isLocked: false,
}, nil
}
// Lock acquires the distributed mutex
func (dm *DistributedMutex) Lock(ctx context.Context) error {
if dm.isLocked {
dm.lockCount++
return nil // Already locked by this instance
}
// Try to acquire the lock
if err := dm.mutex.Lock(ctx); err != nil {
return fmt.Errorf("failed to acquire lock: %w", err)
}
dm.isLocked = true
dm.lockCount = 1
log.Printf("Node %s acquired lock on %s", dm.nodeID, dm.lockPath)
return nil
}
// Unlock releases the distributed mutex
func (dm *DistributedMutex) Unlock(ctx context.Context) error {
if !dm.isLocked {
return fmt.Errorf("mutex is not locked")
}
dm.lockCount--
if dm.lockCount > 0 {
return nil // Still held by other operations
}
// Release the lock
if err := dm.mutex.Unlock(ctx); err != nil {
return fmt.Errorf("failed to release lock: %w", err)
}
dm.isLocked = false
log.Printf("Node %s released lock on %s", dm.nodeID, dm.lockPath)
return nil
}
// Close releases resources
func (dm *DistributedMutex) Close() {
if dm.isLocked {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = dm.Unlock(ctx)
}
dm.session.Close()
dm.client.Close()
}
// IsLocked returns whether this instance holds the lock
func (dm *DistributedMutex) IsLocked() bool {
return dm.isLocked
}
// GetLockOwner returns the current owner of the lock
func (dm *DistributedMutex) GetLockOwner(ctx context.Context) (string, error) {
resp, err := dm.client.Get(ctx, dm.lockPath, clientv3.WithPrefix())
if err != nil {
return "", fmt.Errorf("failed to get lock info: %w", err)
}
if len(resp.Kvs) == 0 {
return "", nil // No lock owner
}
return string(resp.Kvs[0].Value), nil
}
func main() {
// Connect to etcd
endpoints := []string{"localhost:2379"}
lockPath := "/locks/my-critical-section"
// Create the distributed mutex
mutex, err := NewDistributedMutex(endpoints, lockPath)
if err != nil {
log.Fatalf("Failed to create distributed mutex: %v", err)
}
defer mutex.Close()
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Try to acquire the lock
log.Printf("Attempting to acquire lock...")
if err := mutex.Lock(ctx); err != nil {
log.Fatalf("Failed to acquire lock: %v", err)
}
// Simulate doing work while holding the lock
log.Printf("Lock acquired! Performing critical section work...")
time.Sleep(5 * time.Second)
// Release the lock
if err := mutex.Unlock(ctx); err != nil {
log.Fatalf("Failed to release lock: %v", err)
}
log.Printf("Lock released")
}
Leader Election Pattern
Leader election is essential for coordinating distributed systems:
package main
import (
"context"
"fmt"
"log"
"os"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
// LeaderElection manages the leader election process
type LeaderElection struct {
client *clientv3.Client
session *concurrency.Session
election *concurrency.Election
nodeID string
electionID string
isLeader bool
leaderCh chan bool
stopCh chan struct{}
}
// NewLeaderElection creates a new leader election instance
func NewLeaderElection(endpoints []string, electionID string) (*LeaderElection, error) {
// Create etcd client
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
// Generate a unique node ID
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown-host"
}
nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())
// Create a session with keep-alive
session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
if err != nil {
client.Close()
return nil, fmt.Errorf("failed to create etcd session: %w", err)
}
// Create the election
election := concurrency.NewElection(session, electionID)
return &LeaderElection{
client: client,
session: session,
election: election,
nodeID: nodeID,
electionID: electionID,
isLeader: false,
leaderCh: make(chan bool, 1),
stopCh: make(chan struct{}),
}, nil
}
// Campaign starts the leader election process
func (le *LeaderElection) Campaign(ctx context.Context) error {
// Start campaigning for leadership
if err := le.election.Campaign(ctx, le.nodeID); err != nil {
return fmt.Errorf("failed to campaign: %w", err)
}
le.isLeader = true
le.leaderCh <- true
log.Printf("Node %s became leader for %s", le.nodeID, le.electionID)
return nil
}
// Resign gives up leadership
func (le *LeaderElection) Resign(ctx context.Context) error {
if !le.isLeader {
return nil // Not the leader
}
if err := le.election.Resign(ctx); err != nil {
return fmt.Errorf("failed to resign: %w", err)
}
le.isLeader = false
le.leaderCh <- false
log.Printf("Node %s resigned leadership for %s", le.nodeID, le.electionID)
return nil
}
// IsLeader returns whether this node is the leader
func (le *LeaderElection) IsLeader() bool {
return le.isLeader
}
// LeaderChanges returns a channel that receives leadership change notifications
func (le *LeaderElection) LeaderChanges() <-chan bool {
return le.leaderCh
}
// WatchLeader watches for leader changes
func (le *LeaderElection) WatchLeader(ctx context.Context) {
go func() {
defer close(le.leaderCh)
for {
select {
case <-le.stopCh:
return
case <-ctx.Done():
return
default:
// Get the current leader
resp, err := le.election.Leader(ctx)
if err != nil {
log.Printf("Error getting leader: %v", err)
time.Sleep(1 * time.Second)
continue
}
currentLeader := string(resp.Kvs[0].Value)
isLeader := currentLeader == le.nodeID
// If leadership status changed, notify
if isLeader != le.isLeader {
le.isLeader = isLeader
le.leaderCh <- isLeader
if isLeader {
log.Printf("Node %s became leader for %s", le.nodeID, le.electionID)
} else {
log.Printf("Node %s lost leadership for %s", le.nodeID, le.electionID)
}
}
// Watch for changes
watchCh := le.client.Watch(ctx, string(resp.Kvs[0].Key))
select {
case <-watchCh:
// Leader changed, loop and check again
case <-le.stopCh:
return
case <-ctx.Done():
return
}
}
}
}()
}
// Close releases resources
func (le *LeaderElection) Close() {
close(le.stopCh)
if le.isLeader {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = le.Resign(ctx)
}
le.session.Close()
le.client.Close()
}
// GetCurrentLeader returns the current leader's ID
func (le *LeaderElection) GetCurrentLeader(ctx context.Context) (string, error) {
resp, err := le.election.Leader(ctx)
if err != nil {
return "", fmt.Errorf("failed to get leader: %w", err)
}
if len(resp.Kvs) == 0 {
return "", nil // No leader
}
return string(resp.Kvs[0].Value), nil
}
func main() {
// Connect to etcd
endpoints := []string{"localhost:2379"}
electionID := "/elections/my-service-leader"
// Create the leader election
election, err := NewLeaderElection(endpoints, electionID)
if err != nil {
log.Fatalf("Failed to create leader election: %v", err)
}
defer election.Close()
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// Start watching for leader changes
election.WatchLeader(ctx)
// Start campaigning for leadership
log.Printf("Node %s starting campaign...", election.nodeID)
go func() {
if err := election.Campaign(ctx); err != nil {
log.Printf("Campaign error: %v", err)
}
}()
// Handle leadership changes
for isLeader := range election.LeaderChanges() {
if isLeader {
log.Printf("I am now the leader! Starting leader tasks...")
// Perform leader-specific work
time.Sleep(10 * time.Second)
// Simulate stepping down
log.Printf("Resigning leadership...")
if err := election.Resign(ctx); err != nil {
log.Printf("Error resigning: %v", err)
}
} else {
log.Printf("I am now a follower. Waiting for leadership...")
// Perform follower-specific work
// After some time, campaign again
time.Sleep(5 * time.Second)
log.Printf("Starting new campaign...")
go func() {
if err := election.Campaign(ctx); err != nil {
log.Printf("Campaign error: %v", err)
}
}()
}
}
}
Distributed Semaphore
A distributed semaphore allows limiting concurrent access across multiple nodes:
package main
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
// DistributedSemaphore implements a semaphore that works across multiple nodes
type DistributedSemaphore struct {
client *clientv3.Client
session *concurrency.Session
semPath string
nodeID string
count int
resources map[string]struct{}
mu sync.Mutex
}
// NewDistributedSemaphore creates a new distributed semaphore
func NewDistributedSemaphore(endpoints []string, semPath string, count int) (*DistributedSemaphore, error) {
// Create etcd client
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
// Generate a unique node ID
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown-host"
}
nodeID := fmt.Sprintf("%s-%d", hostname, time.Now().UnixNano())
// Create a session with keep-alive
session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
if err != nil {
client.Close()
return nil, fmt.Errorf("failed to create etcd session: %w", err)
}
return &DistributedSemaphore{
client: client,
session: session,
semPath: semPath,
nodeID: nodeID,
count: count,
resources: make(map[string]struct{}),
}, nil
}
// Acquire attempts to acquire a resource from the semaphore
func (ds *DistributedSemaphore) Acquire(ctx context.Context) (string, error) {
ds.mu.Lock()
defer ds.mu.Unlock()
// Generate a unique resource ID
resourceID := fmt.Sprintf("%s/%s-%d", ds.semPath, ds.nodeID, time.Now().UnixNano())
// Try to create the resource key with a lease
_, err := ds.client.Put(ctx, resourceID, ds.nodeID, clientv3.WithLease(ds.session.Lease()))
if err != nil {
return "", fmt.Errorf("failed to create resource: %w", err)
}
// Get all resources to check if we're within the limit
resp, err := ds.client.Get(ctx, ds.semPath, clientv3.WithPrefix())
if err != nil {
// Try to clean up
_, _ = ds.client.Delete(ctx, resourceID)
return "", fmt.Errorf("failed to get resources: %w", err)
}
// Check if we're within the semaphore limit
if len(resp.Kvs) > ds.count {
// We need to release our resource and wait
_, _ = ds.client.Delete(ctx, resourceID)
return "", fmt.Errorf("semaphore limit reached")
}
// We successfully acquired a resource
ds.resources[resourceID] = struct{}{}
log.Printf("Node %s acquired resource %s", ds.nodeID, resourceID)
return resourceID, nil
}
// Release releases a previously acquired resource
func (ds *DistributedSemaphore) Release(ctx context.Context, resourceID string) error {
ds.mu.Lock()
defer ds.mu.Unlock()
// Check if we own this resource
if _, exists := ds.resources[resourceID]; !exists {
return fmt.Errorf("resource not owned by this semaphore instance")
}
// Delete the resource key
_, err := ds.client.Delete(ctx, resourceID)
if err != nil {
return fmt.Errorf("failed to delete resource: %w", err)
}
// Remove from our local tracking
delete(ds.resources, resourceID)
log.Printf("Node %s released resource %s", ds.nodeID, resourceID)
return nil
}
// TryAcquireWithTimeout attempts to acquire a resource with a timeout
func (ds *DistributedSemaphore) TryAcquireWithTimeout(ctx context.Context, timeout time.Duration) (string, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// Try to acquire immediately first
resourceID, err := ds.Acquire(ctx)
if err == nil {
return resourceID, nil
}
// If that fails, retry with exponential backoff
backoff := 50 * time.Millisecond
maxBackoff := 1 * time.Second
for {
select {
case <-timeoutCtx.Done():
return "", fmt.Errorf("timeout waiting for semaphore: %w", timeoutCtx.Err())
case <-time.After(backoff):
// Try again
resourceID, err := ds.Acquire(ctx)
if err == nil {
return resourceID, nil
}
// Increase backoff for next attempt
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
}
// Close releases all resources and cleans up
func (ds *DistributedSemaphore) Close() {
ds.mu.Lock()
defer ds.mu.Unlock()
// Release all acquired resources
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for resourceID := range ds.resources {
_, _ = ds.client.Delete(ctx, resourceID)
delete(ds.resources, resourceID)
}
ds.session.Close()
ds.client.Close()
}
// GetAvailable returns the number of available resources
func (ds *DistributedSemaphore) GetAvailable(ctx context.Context) (int, error) {
resp, err := ds.client.Get(ctx, ds.semPath, clientv3.WithPrefix())
if err != nil {
return 0, fmt.Errorf("failed to get resources: %w", err)
}
return ds.count - len(resp.Kvs), nil
}
func main() {
// Connect to etcd
endpoints := []string{"localhost:2379"}
semPath := "/semaphores/connection-limit"
maxConnections := 3
// Create the distributed semaphore
sem, err := NewDistributedSemaphore(endpoints, semPath, maxConnections)
if err != nil {
log.Fatalf("Failed to create distributed semaphore: %v", err)
}
defer sem.Close()
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Simulate multiple workers trying to acquire resources
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
log.Printf("Worker %d trying to acquire resource...", workerID)
resourceID, err := sem.TryAcquireWithTimeout(ctx, 5*time.Second)
if err != nil {
log.Printf("Worker %d failed to acquire resource: %v", workerID, err)
return
}
log.Printf("Worker %d acquired resource %s", workerID, resourceID)
// Simulate doing work
time.Sleep(2 * time.Second)
// Release the resource
if err := sem.Release(ctx, resourceID); err != nil {
log.Printf("Worker %d failed to release resource: %v", workerID, err)
} else {
log.Printf("Worker %d released resource %s", workerID, resourceID)
}
}(i)
}
// Wait for all workers to finish
wg.Wait()
}
Error Handling and Recovery Patterns
Robust error handling is crucial for distributed systems where failures are common.
Circuit Breaker Pattern
The circuit breaker pattern prevents cascading failures in distributed systems:
package main
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// CircuitBreakerState represents the state of the circuit breaker
type CircuitBreakerState int
const (
StateClosed CircuitBreakerState = iota // Normal operation, requests pass through
StateOpen // Circuit is open, requests fail fast
StateHalfOpen // Testing if the service is healthy again
)
// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
name string
state CircuitBreakerState
failureThreshold int64
successThreshold int64
resetTimeout time.Duration
failureCount int64
successCount int64
lastStateChange time.Time
mutex sync.RWMutex
onStateChange func(name string, from, to CircuitBreakerState)
consecutiveFailures int64
consecutiveSuccesses int64
totalRequests int64
totalSuccesses int64
totalFailures int64
totalTimeouts int64
totalShortCircuits int64
cumulativeResponseTime int64 // in nanoseconds
}
// CircuitBreakerOption defines a function that configures a CircuitBreaker
type CircuitBreakerOption func(*CircuitBreaker)
// WithFailureThreshold sets the threshold for failures before opening the circuit
func WithFailureThreshold(threshold int64) CircuitBrea
type PipelineStage func(ctx context.Context, in <-chan Data
kerOption {
return func(cb *CircuitBreaker) {
cb.failureThreshold = threshold
}
}
// WithSuccessThreshold sets the threshold for successes before closing the circuit
func WithSuccessThreshold(threshold int64) CircuitBreakerOption {
return func(cb *CircuitBreaker) {
cb.successThreshold = threshold
}
}
// WithResetTimeout sets the timeout before trying to close the circuit again
func WithResetTimeout(timeout time.Duration) CircuitBreakerOption {
return func(cb *CircuitBreaker) {
cb.resetTimeout = timeout
}
}
// WithOnStateChange sets a callback for state changes
func WithOnStateChange(callback func(name string, from, to CircuitBreakerState)) CircuitBreakerOption {
return func(cb *CircuitBreaker) {
cb.onStateChange = callback
}
}
// NewCircuitBreaker creates a new circuit breaker
func NewCircuitBreaker(name string, options ...CircuitBreakerOption) *CircuitBreaker {
cb := &CircuitBreaker{
name: name,
state: StateClosed,
failureThreshold: 5,
successThreshold: 3,
resetTimeout: 10 * time.Second,
lastStateChange: time.Now(),
}
// Apply options
for _, option := range options {
option(cb)
}
return cb
}
// Execute runs the given function with circuit breaker protection
func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() (interface{}, error)) (interface{}, error) {
// Check if the circuit is open
if !cb.allowRequest() {
atomic.AddInt64(&cb.totalShortCircuits, 1)
return nil, errors.New("circuit breaker is open")
}
// Track metrics
atomic.AddInt64(&cb.totalRequests, 1)
startTime := time.Now()
// Execute the protected function
result, err := fn()
// Update metrics based on the result
latency := time.Since(startTime)
atomic.AddInt64(&cb.cumulativeResponseTime, int64(latency))
// Check for timeout
if errors.Is(err, context.DeadlineExceeded) {
atomic.AddInt64(&cb.totalTimeouts, 1)
cb.recordFailure()
return nil, err
}
// Record success or failure
if err != nil {
atomic.AddInt64(&cb.totalFailures, 1)
cb.recordFailure()
return nil, err
}
atomic.AddInt64(&cb.totalSuccesses, 1)
cb.recordSuccess()
return result, nil
}
// allowRequest checks if a request should be allowed based on the circuit state
func (cb *CircuitBreaker) allowRequest() bool {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
switch cb.state {
case StateClosed:
return true
case StateOpen:
// Check if it's time to try again
if time.Since(cb.lastStateChange) > cb.resetTimeout {
// Transition to half-open
cb.mutex.RUnlock()
cb.transitionState(StateHalfOpen)
cb.mutex.RLock()
return true
}
return false
case StateHalfOpen:
// In half-open state, allow limited requests to test the service
return atomic.LoadInt64(&cb.successCount)+atomic.LoadInt64(&cb.failureCount) < cb.successThreshold
default:
return true
}
}
// recordSuccess records a successful request
func (cb *CircuitBreaker) recordSuccess() {
cb.mutex.Lock()
defer cb.mutex.Unlock()
atomic.AddInt64(&cb.successCount, 1)
atomic.StoreInt64(&cb.failureCount, 0)
atomic.AddInt64(&cb.consecutiveSuccesses, 1)
atomic.StoreInt64(&cb.consecutiveFailures, 0)
// If we're in half-open state and have enough successes, close the circuit
if cb.state == StateHalfOpen && atomic.LoadInt64(&cb.successCount) >= cb.successThreshold {
cb.transitionState(StateClosed)
}
}
// recordFailure records a failed request
func (cb *CircuitBreaker) recordFailure() {
cb.mutex.Lock()
defer cb.mutex.Unlock()
atomic.AddInt64(&cb.failureCount, 1)
atomic.StoreInt64(&cb.successCount, 0)
atomic.AddInt64(&cb.consecutiveFailures, 1)
atomic.StoreInt64(&cb.consecutiveSuccesses, 0)
// If we have too many failures, open the circuit
if (cb.state == StateClosed && atomic.LoadInt64(&cb.failureCount) >= cb.failureThreshold) ||
(cb.state == StateHalfOpen && atomic.LoadInt64(&cb.failureCount) > 0) {
cb.transitionState(StateOpen)
}
}
// transitionState changes the state of the circuit breaker
func (cb *CircuitBreaker) transitionState(newState CircuitBreakerState) {
oldState := cb.state
cb.state = newState
cb.lastStateChange = time.Now()
// Reset counters
atomic.StoreInt64(&cb.failureCount, 0)
atomic.StoreInt64(&cb.successCount, 0)
// Notify state change if callback is set
if cb.onStateChange != nil {
cb.onStateChange(cb.name, oldState, newState)
}
log.Printf("Circuit breaker %s state changed from %v to %v", cb.name, oldState, newState)
}
// GetState returns the current state of the circuit breaker
func (cb *CircuitBreaker) GetState() CircuitBreakerState {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
return cb.state
}
// GetMetrics returns the current metrics of the circuit breaker
func (cb *CircuitBreaker) GetMetrics() map[string]interface{} {
return map[string]interface{}{
"state": cb.GetState(),
"total_requests": atomic.LoadInt64(&cb.totalRequests),
"total_successes": atomic.LoadInt64(&cb.totalSuccesses),
"total_failures": atomic.LoadInt64(&cb.totalFailures),
"total_timeouts": atomic.LoadInt64(&cb.totalTimeouts),
"total_short_circuits": atomic.LoadInt64(&cb.totalShortCircuits),
"consecutive_successes": atomic.LoadInt64(&cb.consecutiveSuccesses),
"consecutive_failures": atomic.LoadInt64(&cb.consecutiveFailures),
"average_response_time": time.Duration(atomic.LoadInt64(&cb.cumulativeResponseTime) / max(1, atomic.LoadInt64(&cb.totalRequests))),
"last_state_change_ago": time.Since(cb.lastStateChange),
}
}
// Helper function for max of two int64s
func max(a, b int64) int64 {
if a > b {
return a
}
return b
}
// simulateService simulates a remote service with variable reliability
func simulateService(ctx context.Context, serviceID string, failureRate int, latency time.Duration) (string, error) {
// Simulate random failures
if rand.Intn(100) < failureRate {
return "", fmt.Errorf("service %s failed", serviceID)
}
// Simulate processing time
select {
case <-time.After(latency):
return fmt.Sprintf("Response from %s", serviceID), nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create a circuit breaker
cb := NewCircuitBreaker("example-service",
WithFailureThreshold(3),
WithSuccessThreshold(2),
WithResetTimeout(5*time.Second),
WithOnStateChange(func(name string, from, to CircuitBreakerState) {
fmt.Printf("Circuit breaker %s changed from %v to %v\n", name, from, to)
}),
)
// Simulate a series of requests
for i := 0; i < 20; i++ {
fmt.Printf("\nRequest %d:\n", i+1)
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// Execute the request through the circuit breaker
result, err := cb.Execute(ctx, func() (interface{}, error) {
// Simulate different failure rates and latencies based on the iteration
var failureRate, latencyMs int
if i < 5 {
// First few requests are successful
failureRate = 0
latencyMs = 100
} else if i < 10 {
// Next few requests have high failure rate
failureRate = 80
latencyMs = 300
} else if i < 15 {
// Then we simulate a recovery
failureRate = 0
latencyMs = 100
} else {
// Finally, simulate timeouts
failureRate = 0
latencyMs = 1500 // This will exceed our timeout
}
return simulateService(ctx, "example-service", failureRate, time.Duration(latencyMs)*time.Millisecond)
})
cancel() // Always cancel the context
// Print the result
if err != nil {
fmt.Printf("Error: %v\n", err)
} else {
fmt.Printf("Success: %v\n", result)
}
// Print current metrics
metrics := cb.GetMetrics()
fmt.Printf("Circuit state: %v\n", metrics["state"])
fmt.Printf("Success/Failure: %d/%d\n", metrics["total_successes"], metrics["total_failures"])
// Wait a bit between requests
time.Sleep(500 * time.Millisecond)
}
}
This circuit breaker pattern is essential for distributed systems because it:
- Prevents cascading failures across services
- Fails fast when a service is unhealthy
- Allows for graceful recovery
- Provides detailed metrics for monitoring
- Implements sophisticated state management
Graceful Shutdown Pattern
Proper shutdown handling is crucial for maintaining data integrity:
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// GracefulShutdown manages the graceful shutdown process
type GracefulShutdown struct {
timeout time.Duration
shutdownFuncs []ShutdownFunc
wg sync.WaitGroup
shutdownCh chan struct{}
doneCh chan struct{}
}
// ShutdownFunc represents a function to be called during shutdown
type ShutdownFunc func(ctx context.Context) error
// NewGracefulShutdown creates a new graceful shutdown manager
func NewGracefulShutdown(timeout time.Duration) *GracefulShutdown {
return &GracefulShutdown{
timeout: timeout,
shutdownCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}
// AddShutdownFunc adds a function to be called during shutdown
func (gs *GracefulShutdown) AddShutdownFunc(name string, fn ShutdownFunc) {
gs.shutdownFuncs = append(gs.shutdownFuncs, func(ctx context.Context) error {
log.Printf("Shutting down %s...", name)
err := fn(ctx)
if err != nil {
log.Printf("Error shutting down %s: %v", name, err)
return err
}
log.Printf("%s shutdown complete", name)
return nil
})
}
// Start begins listening for shutdown signals
func (gs *GracefulShutdown) Start() {
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
select {
case sig := <-signalCh:
log.Printf("Received signal: %v", sig)
gs.Shutdown()
case <-gs.shutdownCh:
// Shutdown triggered programmatically
}
}()
}
// Shutdown initiates the graceful shutdown process
func (gs *GracefulShutdown) Shutdown() {
// Ensure we only shut down once
select {
case <-gs.shutdownCh:
// Already shutting down
return
default:
close(gs.shutdownCh)
}
log.Println("Starting graceful shutdown...")
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), gs.timeout)
defer cancel()
// Execute all shutdown functions
for _, fn := range gs.shutdownFuncs {
gs.wg.Add(1)
go func(shutdownFn ShutdownFunc) {
defer gs.wg.Done()
_ = shutdownFn(ctx)
}(fn)
}
// Wait for all shutdown functions to complete or timeout
shutdownComplete := make(chan struct{})
go func() {
gs.wg.Wait()
close(shutdownComplete)
}()
select {
case <-shutdownComplete:
log.Println("Graceful shutdown completed successfully")
case <-ctx.Done():
log.Println("Graceful shutdown timed out")
}
close(gs.doneCh)
}
// Wait blocks until shutdown is complete
func (gs *GracefulShutdown) Wait() {
<-gs.doneCh
}
// IsShuttingDown returns whether shutdown has been initiated
func (gs *GracefulShutdown) IsShuttingDown() bool {
select {
case <-gs.shutdownCh:
return true
default:
return false
}
}
// Example HTTP server with graceful shutdown
type Server struct {
server *http.Server
shutdown *GracefulShutdown
}
// NewServer creates a new HTTP server with graceful shutdown
func NewServer(addr string, shutdown *GracefulShutdown) *Server {
server := &http.Server{
Addr: addr,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check if we're shutting down
if shutdown.IsShuttingDown() {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("Server is shutting down"))
return
}
// Normal request handling
fmt.Fprintf(w, "Hello, World!")
}),
}
s := &Server{
server: server,
shutdown: shutdown,
}
// Register shutdown handler
shutdown.AddShutdownFunc("http-server", func(ctx context.Context) error {
return server.Shutdown(ctx)
})
return s
}
// Start starts the HTTP server
func (s *Server) Start() error {
log.Printf("Starting HTTP server on %s", s.server.Addr)
return s.server.ListenAndServe()
}
// Example worker pool with graceful shutdown
type WorkerPool struct {
jobCh chan string
shutdown *GracefulShutdown
wg sync.WaitGroup
}
// NewWorkerPool creates a new worker pool with graceful shutdown
func NewWorkerPool(workerCount int, jobBufferSize int, shutdown *GracefulShutdown) *WorkerPool {
wp := &WorkerPool{
jobCh: make(chan string, jobBufferSize),
shutdown: shutdown,
}
// Start workers
for i := 0; i < workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
// Register shutdown handler
shutdown.AddShutdownFunc("worker-pool", func(ctx context.Context) error {
log.Println("Closing job channel...")
close(wp.jobCh)
// Wait for all workers to finish
doneCh := make(chan struct{})
go func() {
wp.wg.Wait()
close(doneCh)
}()
select {
case <-doneCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
})
return wp
}
// worker processes jobs
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
log.Printf("Worker %d started", id)
for job := range wp.jobCh {
// Check if we're shutting down
if wp.shutdown.IsShuttingDown() {
log.Printf("Worker %d processing final jobs before shutdown", id)
}
// Process the job
log.Printf("Worker %d processing job: %s", id, job)
time.Sleep(100 * time.Millisecond) // Simulate work
}
log.Printf("Worker %d stopped", id)
}
// SubmitJob submits a job to the worker pool
func (wp *WorkerPool) SubmitJob(job string) error {
if wp.shutdown.IsShuttingDown() {
return fmt.Errorf("worker pool is shutting down")
}
select {
case wp.jobCh <- job:
return nil
default:
return fmt.Errorf("job queue is full")
}
}
func main() {
// Create a graceful shutdown manager with 5 second timeout
shutdown := NewGracefulShutdown(5 * time.Second)
shutdown.Start()
// Create and start HTTP server
server := NewServer(":8080", shutdown)
go func() {
if err := server.Start(); err != nil && err != http.ErrServerClosed {
log.Fatalf("HTTP server error: %v", err)
}
}()
// Create worker pool
workerPool := NewWorkerPool(3, 10, shutdown)
// Submit some jobs
go func() {
for i := 0; i < 20; i++ {
job := fmt.Sprintf("Job %d", i)
if err := workerPool.SubmitJob(job); err != nil {
log.Printf("Failed to submit job: %v", err)
}
time.Sleep(200 * time.Millisecond)
}
}()
// Wait for shutdown signal
log.Println("Server is running. Press Ctrl+C to shutdown.")
shutdown.Wait()
log.Println("Server exited")
}
This graceful shutdown pattern is valuable for distributed systems because it:
- Ensures in-flight operations complete before shutdown
- Prevents data loss during service termination
- Provides a clean shutdown sequence for dependent components
- Implements timeout handling for stuck operations
- Allows for health check integration during shutdown
Performance Optimization and Monitoring
Optimizing and monitoring concurrent code is essential for distributed systems.
Goroutine Leak Detection
Detecting and preventing goroutine leaks is crucial for long-running services:
package main
import (
"context"
"fmt"
"log"
"runtime"
"sync"
"time"
)
// LeakDetector monitors goroutine count and detects potential leaks
type LeakDetector struct {
interval time.Duration
threshold float64
baselineCount int
previousCount int
samples []int
sampleSize int
stopCh chan struct{}
mu sync.Mutex
alertThreshold int
onLeak func(count int, samples []int)
}
// NewLeakDetector creates a new goroutine leak detector
func NewLeakDetector(interval time.Duration, threshold float64, sampleSize int) *LeakDetector {
return &LeakDetector{
interval: interval,
threshold: threshold,
samples: make([]int, 0, sampleSize),
sampleSize: sampleSize,
stopCh: make(chan struct{}),
alertThreshold: 1000, // Alert if more than 1000 goroutines over baseline
onLeak: func(count int, samples []int) {
log.Printf("WARNING: Potential goroutine leak detected! Current count: %d", count)
},
}
}
// Start begins monitoring for goroutine leaks
func (ld *LeakDetector) Start() {
// Capture the baseline after a short warmup
time.Sleep(100 * time.Millisecond)
ld.baselineCount = runtime.NumGoroutine()
ld.previousCount = ld.baselineCount
log.Printf("Leak detector started with baseline of %d goroutines", ld.baselineCount)
go func() {
ticker := time.NewTicker(ld.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ld.checkForLeaks()
case <-ld.stopCh:
return
}
}
}()
}
// Stop stops the leak detector
func (ld *LeakDetector) Stop() {
close(ld.stopCh)
}
// SetAlertThreshold sets the threshold for leak alerts
func (ld *LeakDetector) SetAlertThreshold(threshold int) {
ld.mu.Lock()
defer ld.mu.Unlock()
ld.alertThreshold = threshold
}
// SetOnLeakDetected sets the callback for leak detection
func (ld *LeakDetector) SetOnLeakDetected(callback func(count int, samples []int)) {
ld.mu.Lock()
defer ld.mu.Unlock()
ld.onLeak = callback
}
// checkForLeaks checks if there's a potential goroutine leak
func (ld *LeakDetector) checkForLeaks() {
ld.mu.Lock()
defer ld.mu.Unlock()
currentCount := runtime.NumGoroutine()
// Add to samples
ld.samples = append(ld.samples, currentCount)
if len(ld.samples) > ld.sampleSize {
ld.samples = ld.samples[1:]
}
// Check for significant increase
if currentCount > ld.baselineCount+ld.alertThreshold {
// Check if the count is consistently increasing
if currentCount > ld.previousCount {
// Calculate growth rate
growthRate := float64(currentCount-ld.previousCount) / float64(ld.previousCount)
if growthRate > ld.threshold {
if ld.onLeak != nil {
ld.onLeak(currentCount, ld.samples)
}
}
}
}
ld.previousCount = currentCount
}
// GetStats returns current goroutine statistics
func (ld *LeakDetector) GetStats() map[string]interface{} {
ld.mu.Lock()
defer ld.mu.Unlock()
return map[string]interface{}{
"current_count": runtime.NumGoroutine(),
"baseline_count": ld.baselineCount,
"previous_count": ld.previousCount,
"samples": ld.samples,
"alert_threshold": ld.alertThreshold,
}
}
// simulateLeakingGoroutines creates goroutines that never terminate
func simulateLeakingGoroutines(count int) {
for i := 0; i < count; i++ {
go func(id int) {
log.Printf("Leaking goroutine %d started", id)
select {} // This goroutine will never terminate
}(i)
}
}
// simulateTemporaryGoroutines creates goroutines that terminate after a delay
func simulateTemporaryGoroutines(count int, duration time.Duration) {
for i := 0; i < count; i++ {
go func(id int) {
log.Printf("Temporary goroutine %d started", id)
time.Sleep(duration)
log.Printf("Temporary goroutine %d finished", id)
}(i)
}
}
// dumpStacks prints all goroutine stacks for debugging
func dumpStacks() {
buf := make([]byte, 1<<20)
stackLen := runtime.Stack(buf, true)
log.Printf("=== GOROUTINE DUMP ===\n%s\n=== END DUMP ===", buf[:stackLen])
}
func main() {
// Create a leak detector
detector := NewLeakDetector(500*time.Millisecond, 0.05, 10)
detector.SetOnLeakDetected(func(count int, samples []int) {
log.Printf("LEAK DETECTED: %d goroutines running (baseline: %d)",
count, detector.baselineCount)
log.Printf("Recent samples: %v", samples)
dumpStacks() // Dump stacks for debugging
})
// Start the detector
detector.Start()
defer detector.Stop()
// Print initial stats
log.Printf("Initial goroutine count: %d", runtime.NumGoroutine())
// Simulate normal goroutine usage
log.Println("Creating temporary goroutines...")
simulateTemporaryGoroutines(100, 2*time.Second)
// Wait a bit
time.Sleep(3 * time.Second)
log.Printf("Goroutine count after temporary spike: %d", runtime.NumGoroutine())
// Simulate a leak
log.Println("Simulating a goroutine leak...")
simulateLeakingGoroutines(50)
// Wait for detection
time.Sleep(2 * time.Second)
// Create more leaks to trigger detection
log.Println("Creating more leaking goroutines...")
simulateLeakingGoroutines(100)
// Wait for detection
time.Sleep(5 * time.Second)
// Print final stats
stats := detector.GetStats()
log.Printf("Final stats: %+v", stats)
}
Contention Profiling
Identifying and resolving lock contention is essential for performance:
package main
import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
_ "net/http/pprof" // Import for profiling
"os"
"runtime"
"sync"
"time"
)
// SharedResource simulates a resource with different locking strategies
type SharedResource struct {
name string
value int
mutex sync.Mutex
rwMutex sync.RWMutex
accessLog []string
logMutex sync.Mutex
accessCount int64
}
// UpdateWithMutex updates the resource using a standard mutex
func (r *SharedResource) UpdateWithMutex(id int, val int) {
r.mutex.Lock()
defer r.mutex.Unlock()
// Simulate some work
time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
r.value += val
r.logAccess(fmt.Sprintf("Writer %d updated value to %d", id, r.value))
}
// ReadWithMutex reads the resource using a standard mutex
func (r *SharedResource) ReadWithMutex(id int) int {
r.mutex.Lock()
defer r.mutex.Unlock()
// Simulate some work
time.Sleep(time.Duration(1+rand.Intn(2)) * time.Millisecond)
r.logAccess(fmt.Sprintf("Reader %d read value %d", id, r.value))
return r.value
}
// UpdateWithRWMutex updates the resource using a read-write mutex
func (r *SharedResource) UpdateWithRWMutex(id int, val int) {
r.rwMutex.Lock()
defer r.rwMutex.Unlock()
// Simulate some work
time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
r.value += val
r.logAccess(fmt.Sprintf("Writer %d updated value to %d (RW)", id, r.value))
}
// ReadWithRWMutex reads the resource using a read-write mutex
func (r *SharedResource) ReadWithRWMutex(id int) int {
r.rwMutex.RLock()
defer r.rwMutex.RUnlock()
// Simulate some work
time.Sleep(time.Duration(1+rand.Intn(2)) * time.Millisecond)
r.logAccess(fmt.Sprintf("Reader %d read value %d (RW)", id, r.value))
return r.value
}
// logAccess logs an access to the resource
func (r *SharedResource) logAccess(msg string) {
r.logMutex.Lock()
defer r.logMutex.Unlock()
r.accessLog = append(r.accessLog, msg)
if len(r.accessLog) > 100 {
r.accessLog = r.accessLog[1:]
}
r.accessCount++
}
// GetStats returns statistics about the resource
func (r *SharedResource) GetStats() map[string]interface{} {
r.logMutex.Lock()
defer r.logMutex.Unlock()
return map[string]interface{}{
"name": r.name,
"value": r.value,
"access_count": r.accessCount,
"recent_logs": r.accessLog[max(0, len(r.accessLog)-5):],
}
}
// Helper function for max of two ints
func max(a, b int) int {
if a > b {
return a
}
return b
}
// runContentionTest runs a test with different concurrency levels
func runContentionTest(ctx context.Context, useRWMutex bool, readers, writers int) {
resource := &SharedResource{
name: fmt.Sprintf("Resource-%v-%d-%d", useRWMutex, readers, writers),
}
log.Printf("Starting contention test with %d readers and %d writers (RWMutex: %v)",
readers, writers, useRWMutex)
// Start writers
var wg sync.WaitGroup
for i := 0; i < writers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
// Update with
random value
if useRWMutex {
resource.UpdateWithRWMutex(id, rand.Intn(10))
} else {
resource.UpdateWithMutex(id, rand.Intn(10))
}
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
}
}
}(i)
}
// Start readers
for i := 0; i < readers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
// Read value
if useRWMutex {
_ = resource.ReadWithRWMutex(id)
} else {
_ = resource.ReadWithMutex(id)
}
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
}
}
}(i)
}
// Run for a fixed duration
time.Sleep(5 * time.Second)
// Print stats
stats := resource.GetStats()
log.Printf("Test results for %s:", resource.name)
log.Printf("- Final value: %d", stats["value"])
log.Printf("- Access count: %d", stats["access_count"])
log.Printf("- Recent accesses: %v", stats["recent_logs"])
}
func main() {
// Start pprof server for profiling
go func() {
log.Println("Starting pprof server on :6060")
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// Set GOMAXPROCS to use all CPUs
runtime.GOMAXPROCS(runtime.NumCPU())
log.Printf("Running with GOMAXPROCS=%d", runtime.GOMAXPROCS(0))
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create a context with cancel
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Run tests with different configurations
log.Println("=== Starting contention tests ===")
// Test 1: Few readers, few writers with standard mutex
runContentionTest(ctx, false, 5, 5)
// Test 2: Few readers, few writers with RWMutex
runContentionTest(ctx, true, 5, 5)
// Test 3: Many readers, few writers with standard mutex
runContentionTest(ctx, false, 50, 5)
// Test 4: Many readers, few writers with RWMutex
runContentionTest(ctx, true, 50, 5)
log.Println("=== Contention tests complete ===")
log.Println("To view profiling data, run: go tool pprof http://localhost:6060/debug/pprof/profile")
log.Println("To view mutex contention: go tool pprof http://localhost:6060/debug/pprof/mutex")
// Keep running for a while to allow profiling
log.Println("Press Ctrl+C to exit")
select {}
}
This contention profiling approach is valuable for distributed systems because it:
- Identifies performance bottlenecks in concurrent code
- Helps optimize lock usage for better throughput
- Provides insights into resource contention
- Enables data-driven decisions about synchronization strategies
- Integrates with Go’s built-in profiling tools
Production Best Practices
When deploying concurrent Go code in production distributed systems, following these best practices can help ensure reliability and performance.
Context Propagation
Always propagate context through your entire call chain to ensure proper cancellation and timeout handling:
package main
import (
"context"
"fmt"
"log"
"time"
)
// Service represents a component in a distributed system
type Service struct {
name string
dependencies []*Service
processingTime time.Duration
}
// NewService creates a new service with dependencies
func NewService(name string, processingTime time.Duration, dependencies ...*Service) *Service {
return &Service{
name: name,
dependencies: dependencies,
processingTime: processingTime,
}
}
// Process handles a request, propagating context to dependencies
func (s *Service) Process(ctx context.Context, requestID string) (string, error) {
// Check if context is already canceled
if ctx.Err() != nil {
return "", ctx.Err()
}
log.Printf("[%s] Processing request %s", s.name, requestID)
// Call dependencies first
for _, dep := range s.dependencies {
// Create a child context with timeout for the dependency call
depCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel() // Always cancel to release resources
result, err := dep.Process(depCtx, requestID)
if err != nil {
log.Printf("[%s] Dependency %s failed: %v", s.name, dep.name, err)
return "", fmt.Errorf("dependency %s failed: %w", dep.name, err)
}
log.Printf("[%s] Dependency %s returned: %s", s.name, dep.name, result)
}
// Simulate processing time
select {
case <-time.After(s.processingTime):
// Processing completed successfully
case <-ctx.Done():
// Context was canceled
return "", ctx.Err()
}
response := fmt.Sprintf("Response from %s for request %s", s.name, requestID)
log.Printf("[%s] Completed request %s", s.name, requestID)
return response, nil
}
func main() {
// Create a service dependency graph
serviceD := NewService("ServiceD", 100*time.Millisecond)
serviceE := NewService("ServiceE", 150*time.Millisecond)
serviceB := NewService("ServiceB", 200*time.Millisecond, serviceD, serviceE)
serviceC := NewService("ServiceC", 150*time.Millisecond, serviceE)
serviceA := NewService("ServiceA", 100*time.Millisecond, serviceB, serviceC)
// Create a root context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
defer cancel()
// Add request ID to context
requestID := fmt.Sprintf("req-%d", time.Now().UnixNano())
// Process the request
log.Printf("Starting request %s with 800ms timeout", requestID)
result, err := serviceA.Process(ctx, requestID)
if err != nil {
log.Printf("Request failed: %v", err)
} else {
log.Printf("Request succeeded: %s", result)
}
// Try another request with insufficient timeout
log.Println("\nStarting another request with insufficient timeout")
ctx2, cancel2 := context.WithTimeout(context.Background(), 300*time.Millisecond)
defer cancel2()
requestID2 := fmt.Sprintf("req-%d", time.Now().UnixNano())
result2, err2 := serviceA.Process(ctx2, requestID2)
if err2 != nil {
log.Printf("Request failed as expected: %v", err2)
} else {
log.Printf("Request succeeded unexpectedly: %s", result2)
}
}
Bounded Concurrency
Always limit concurrency to prevent resource exhaustion:
package main
import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"time"
)
// BoundedExecutor limits the number of concurrent operations
type BoundedExecutor struct {
semaphore chan struct{}
timeout time.Duration
}
// NewBoundedExecutor creates a new bounded executor
func NewBoundedExecutor(maxConcurrent int, timeout time.Duration) *BoundedExecutor {
return &BoundedExecutor{
semaphore: make(chan struct{}, maxConcurrent),
timeout: timeout,
}
}
// Execute runs the given function with bounded concurrency
func (e *BoundedExecutor) Execute(ctx context.Context, fn func() (interface{}, error)) (interface{}, error) {
// Create a context with timeout
execCtx, cancel := context.WithTimeout(ctx, e.timeout)
defer cancel()
// Try to acquire a semaphore slot
select {
case e.semaphore <- struct{}{}:
// Acquired a slot
defer func() { <-e.semaphore }()
case <-execCtx.Done():
// Couldn't acquire a slot within timeout
return nil, fmt.Errorf("operation rejected: %w", execCtx.Err())
}
// Execute the function
return fn()
}
// simulateOperation simulates a remote operation with variable latency
func simulateOperation(id int) (interface{}, error) {
// Simulate random latency
latency := 50 + rand.Intn(200)
time.Sleep(time.Duration(latency) * time.Millisecond)
// Simulate occasional failures
if rand.Intn(10) == 0 {
return nil, fmt.Errorf("operation %d failed", id)
}
return fmt.Sprintf("Result from operation %d", id), nil
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create a bounded executor
executor := NewBoundedExecutor(5, 500*time.Millisecond)
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Track metrics
var (
totalOps int
successfulOps int
rejectedOps int
failedOps int
mu sync.Mutex
wg sync.WaitGroup
)
// Launch a bunch of operations
for i := 0; i < 50; i++ {
wg.Add(1)
go func(opID int) {
defer wg.Done()
mu.Lock()
totalOps++
mu.Unlock()
// Execute with bounded concurrency
result, err := executor.Execute(ctx, func() (interface{}, error) {
return simulateOperation(opID)
})
mu.Lock()
defer mu.Unlock()
if err != nil {
if err.Error() == "operation rejected: context deadline exceeded" {
rejectedOps++
log.Printf("Operation %d rejected: %v", opID, err)
} else {
failedOps++
log.Printf("Operation %d failed: %v", opID, err)
}
} else {
successfulOps++
log.Printf("Operation %d succeeded: %v", opID, result)
}
}(i)
// Add some delay between operations
time.Sleep(50 * time.Millisecond)
}
// Wait for all operations to complete
wg.Wait()
// Print summary
log.Printf("\nSummary:")
log.Printf("- Total operations: %d", totalOps)
log.Printf("- Successful: %d", successfulOps)
log.Printf("- Rejected: %d", rejectedOps)
log.Printf("- Failed: %d", failedOps)
}
Graceful Degradation
Design systems to degrade gracefully under load:
package main
import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// ServiceLevel represents different service quality levels
type ServiceLevel int
const (
FullService ServiceLevel = iota
ReducedService
MinimalService
EmergencyService
)
// LoadManager monitors system load and adjusts service levels
type LoadManager struct {
currentLevel ServiceLevel
cpuLoad int64
memoryUsage int64
requestRate int64
errorRate int64
thresholds map[ServiceLevel]map[string]int64
mu sync.RWMutex
onLevelChange func(from, to ServiceLevel)
degradationRules map[ServiceLevel]func()
}
// NewLoadManager creates a new load manager
func NewLoadManager() *LoadManager {
lm := &LoadManager{
currentLevel: FullService,
thresholds: map[ServiceLevel]map[string]int64{
ReducedService: {
"cpu": 70, // 70% CPU
"memory": 80, // 80% memory
"requestRate": 1000, // 1000 req/sec
"errorRate": 5, // 5% errors
},
MinimalService: {
"cpu": 85,
"memory": 90,
"requestRate": 2000,
"errorRate": 10,
},
EmergencyService: {
"cpu": 95,
"memory": 95,
"requestRate": 3000,
"errorRate": 20,
},
},
degradationRules: make(map[ServiceLevel]func()),
}
// Set default level change handler
lm.onLevelChange = func(from, to ServiceLevel) {
log.Printf("Service level changed from %v to %v", from, to)
}
return lm
}
// SetOnLevelChange sets the callback for service level changes
func (lm *LoadManager) SetOnLevelChange(callback func(from, to ServiceLevel)) {
lm.mu.Lock()
defer lm.mu.Unlock()
lm.onLevelChange = callback
}
// SetDegradationRule sets the function to call when degrading to a specific level
func (lm *LoadManager) SetDegradationRule(level ServiceLevel, rule func()) {
lm.mu.Lock()
defer lm.mu.Unlock()
lm.degradationRules[level] = rule
}
// UpdateMetrics updates the load metrics
func (lm *LoadManager) UpdateMetrics(cpu, memory, requestRate, errorRate int64) {
lm.mu.Lock()
defer lm.mu.Unlock()
lm.cpuLoad = cpu
lm.memoryUsage = memory
lm.requestRate = requestRate
lm.errorRate = errorRate
// Check if we need to change service level
lm.adjustServiceLevel()
}
// adjustServiceLevel changes the service level based on current metrics
func (lm *LoadManager) adjustServiceLevel() {
// Determine appropriate service level
var newLevel ServiceLevel
if lm.cpuLoad >= lm.thresholds[EmergencyService]["cpu"] ||
lm.memoryUsage >= lm.thresholds[EmergencyService]["memory"] ||
lm.requestRate >= lm.thresholds[EmergencyService]["requestRate"] ||
lm.errorRate >= lm.thresholds[EmergencyService]["errorRate"] {
newLevel = EmergencyService
} else if lm.cpuLoad >= lm.thresholds[MinimalService]["cpu"] ||
lm.memoryUsage >= lm.thresholds[MinimalService]["memory"] ||
lm.requestRate >= lm.thresholds[MinimalService]["requestRate"] ||
lm.errorRate >= lm.thresholds[MinimalService]["errorRate"] {
newLevel = MinimalService
} else if lm.cpuLoad >= lm.thresholds[ReducedService]["cpu"] ||
lm.memoryUsage >= lm.thresholds[ReducedService]["memory"] ||
lm.requestRate >= lm.thresholds[ReducedService]["requestRate"] ||
lm.errorRate >= lm.thresholds[ReducedService]["errorRate"] {
newLevel = ReducedService
} else {
newLevel = FullService
}
// If level changed, notify and apply degradation rules
if newLevel != lm.currentLevel {
oldLevel := lm.currentLevel
lm.currentLevel = newLevel
// Notify about level change
if lm.onLevelChange != nil {
lm.onLevelChange(oldLevel, newLevel)
}
// Apply degradation rule if available
if rule, exists := lm.degradationRules[newLevel]; exists && rule != nil {
rule()
}
}
}
// GetCurrentLevel returns the current service level
func (lm *LoadManager) GetCurrentLevel() ServiceLevel {
lm.mu.RLock()
defer lm.mu.RUnlock()
return lm.currentLevel
}
// GetMetrics returns the current metrics
func (lm *LoadManager) GetMetrics() map[string]int64 {
lm.mu.RLock()
defer lm.mu.RUnlock()
return map[string]int64{
"cpu": lm.cpuLoad,
"memory": lm.memoryUsage,
"requestRate": lm.requestRate,
"errorRate": lm.errorRate,
}
}
// DegradableService demonstrates graceful degradation
type DegradableService struct {
loadManager *LoadManager
workerPool *BoundedExecutor
cacheEnabled bool
retryEnabled bool
featureFlags map[string]bool
requestCounter int64
errorCounter int64
}
// NewDegradableService creates a new service with degradation capabilities
func NewDegradableService() *DegradableService {
loadManager := NewLoadManager()
service := &DegradableService{
loadManager: loadManager,
workerPool: NewBoundedExecutor(20, 1*time.Second),
cacheEnabled: true,
retryEnabled: true,
featureFlags: map[string]bool{
"analytics": true,
"notifications": true,
"recommendations": true,
"fullHistory": true,
},
}
// Configure degradation rules
loadManager.SetDegradationRule(ReducedService, func() {
log.Println("Applying REDUCED service level:")
log.Println("- Disabling analytics")
log.Println("- Reducing worker pool to 15")
service.featureFlags["analytics"] = false
service.workerPool = NewBoundedExecutor(15, 800*time.Millisecond)
})
loadManager.SetDegradationRule(MinimalService, func() {
log.Println("Applying MINIMAL service level:")
log.Println("- Disabling recommendations")
log.Println("- Disabling notifications")
log.Println("- Reducing worker pool to 10")
log.Println("- Shortening timeouts")
service.featureFlags["recommendations"] = false
service.featureFlags["notifications"] = false
service.workerPool = NewBoundedExecutor(10, 500*time.Millisecond)
})
loadManager.SetDegradationRule(EmergencyService, func() {
log.Println("Applying EMERGENCY service level:")
log.Println("- Disabling full history")
log.Println("- Disabling retries")
log.Println("- Reducing worker pool to 5")
log.Println("- Shortening timeouts further")
service.featureFlags["fullHistory"] = false
service.retryEnabled = false
service.workerPool = NewBoundedExecutor(5, 300*time.Millisecond)
})
return service
}
// HandleRequest processes a request with graceful degradation
func (s *DegradableService) HandleRequest(ctx context.Context, requestID string) (string, error) {
// Increment request counter
atomic.AddInt64(&s.requestCounter, 1)
// Get current service level
level := s.loadManager.GetCurrentLevel()
// Execute with bounded concurrency
result, err := s.workerPool.Execute(ctx, func() (interface{}, error) {
// Simulate processing based on service level
switch level {
case FullService:
// Full processing
time.Sleep(100 * time.Millisecond)
if s.featureFlags["analytics"] {
// Do analytics processing
time.Sleep(50 * time.Millisecond)
}
if s.featureFlags["recommendations"] {
// Generate recommendations
time.Sleep(50 * time.Millisecond)
}
case ReducedService:
// Skip some processing
time.Sleep(80 * time.Millisecond)
case MinimalService:
// Minimal processing
time.Sleep(50 * time.Millisecond)
case EmergencyService:
// Critical path only
time.Sleep(30 * time.Millisecond)
}
// Simulate occasional failures
if rand.Intn(100) < 5 {
atomic.AddInt64(&s.errorCounter, 1)
return nil, fmt.Errorf("processing error")
}
return fmt.Sprintf("Response for %s (service level: %v)", requestID, level), nil
})
if err != nil {
// Handle error based on service level
if s.retryEnabled && level < MinimalService {
// Retry once for less severe degradation levels
log.Printf("Retrying request %s after error: %v", requestID, err)
result, err = s.workerPool.Execute(ctx, func() (interface{}, error) {
time.Sleep(50 * time.Millisecond)
if rand.Intn(100) < 5 {
atomic.AddInt64(&s.errorCounter, 1)
return nil, fmt.Errorf("retry failed")
}
return fmt.Sprintf("Retry response for %s (service level: %v)", requestID, level), nil
})
}
if err != nil {
atomic.AddInt64(&s.errorCounter, 1)
return "", err
}
}
return result.(string), nil
}
// SimulateLoad generates synthetic load for the service
func (s *DegradableService) SimulateLoad(ctx context.Context, duration time.Duration) {
// Reset counters
atomic.StoreInt64(&s.requestCounter, 0)
atomic.StoreInt64(&s.errorCounter, 0)
// Start time
startTime := time.Now()
endTime := startTime.Add(duration)
// Launch load generator
var wg sync.WaitGroup
// Start with low load
go func() {
rate := int64(50) // requests per second
for time.Now().Before(endTime) {
// Calculate how many requests to send this second
currentRate := atomic.LoadInt64(&rate)
interval := time.Second / time.Duration(currentRate)
for i := 0; i < int(currentRate); i++ {
wg.Add(1)
go func(reqID string) {
defer wg.Done()
reqCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
_, _ = s.HandleRequest(reqCtx, reqID)
}(fmt.Sprintf("req-%d", time.Now().UnixNano()))
time.Sleep(interval)
}
// Increase load over time
elapsed := time.Since(startTime)
progress := float64(elapsed) / float64(duration)
// Simulate a load curve that peaks in the middle
if progress < 0.5 {
// Ramp up to peak
newRate := 50 + int64(progress*2*1950) // Max 2000 req/sec at peak
atomic.StoreInt64(&rate, newRate)
} else {
// Ramp down from peak
newRate := 50 + int64((1-progress)*2*1950)
atomic.StoreInt64(&rate, newRate)
}
// Update load metrics every second
cpuLoad := 30 + int64(progress*70) // Simulate CPU increasing with load
if progress > 0.5 {
cpuLoad = 30 + int64((1-progress)*140)
}
memoryUsage := 40 + int64(progress*55) // Memory grows and stays high
// Calculate request rate (requests per second)
requestRate := atomic.LoadInt64(&s.requestCounter)
atomic.StoreInt64(&s.requestCounter, 0)
// Calculate error rate (percentage)
errorCount := atomic.LoadInt64(&s.errorCounter)
atomic.StoreInt64(&s.errorCounter, 0)
var errorRate int64
if requestRate > 0 {
errorRate = (errorCount * 100) / requestRate
}
// Update load manager
s.loadManager.UpdateMetrics(cpuLoad, memoryUsage, requestRate, errorRate)
// Log current status
level := s.loadManager.GetCurrentLevel()
metrics := s.loadManager.GetMetrics()
log.Printf("Load: CPU %d%%, Mem %d%%, Rate %d req/s, Errors %d%%, Level %v",
metrics["cpu"], metrics["memory"], metrics["requestRate"],
metrics["errorRate"], level)
// Wait for next second
time.Sleep(1 * time.Second)
}
}()
// Wait for load test to complete
time.Sleep(duration)
wg.Wait()
log.Println("Load test completed")
}
func main() {
// Seed random number generator
rand.Seed(time.Now().UnixNano())
// Create a degradable service
service := NewDegradableService()
// Create a context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Run a load test
log.Println("Starting load test with graceful degradation...")
service.SimulateLoad(ctx, 30*time.Second)
}
Observability Integration
Always instrument your concurrent code for proper observability:
package main
import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Metrics represents a collection of Prometheus metrics
type Metrics struct {
requestCounter *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
goroutineGauge prometheus.Gauge
workerPoolSize *prometheus.GaugeVec
queueDepth *prometheus.GaugeVec
errorCounter *prometheus.CounterVec
inFlightRequests *prometheus.GaugeVec
}
// NewMetrics creates and registers Prometheus metrics
func NewMetrics(reg prometheus.Registerer) *Metrics {
m := &Metrics{
requestCounter: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "requests_total",
Help: "Total number of requests processed",
},
[]string{"service", "endpoint", "status"},
),
requestDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "request_duration_seconds",
Help: "Request duration in seconds",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 10), // 1ms to ~1s
},
[]string{"service", "endpoint"},
),
goroutineGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "goroutines_total",
Help: "Current number of goroutines",
},
),
workerPoolSize: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "worker_pool_size",
Help: "Current size of worker pools",
},
[]string{"pool"},
),
queueDepth: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "queue_depth",
Help: "Current depth of work queues",
},
[]string{"queue"},
),
errorCounter: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "errors_total",
Help: "Total number of errors",
},
[]string{"service", "type"},
),
inFlightRequests: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "in_flight_requests",
Help: "Current number of in-flight requests",
},
[]string{"service"},
),
}
// Register all metrics
reg.MustRegister(
m.requestCounter,
m.requestDuration,
m.goroutineGauge,
m.workerPoolSize,
m.queueDepth,
m.errorCounter,
m.inFlightRequests,
)
// Start goroutine collector
go func() {
for {
m.goroutineGauge.Set(float64(runtime.NumGoroutine()))
time.Sleep(1 * time.Second)
}
}()
return m
}
// InstrumentedWorkerPool is a worker pool with metrics
type InstrumentedWorkerPool struct {
name string
workers int
queue chan Job
metrics *Metrics
wg sync.WaitGroup
shutdown chan struct{}
processing int32
}
// Job represents a unit of work
type Job struct {
ID string
Handler func(ctx context.Context) (interface{}, error)
Priority int
}
// NewInstrumentedWorkerPool creates a new worker pool with metrics
func NewInstrumentedWorkerPool(name string, workers, queueSize int, metrics *Metrics) *InstrumentedWorkerPool {
pool := &InstrumentedWorkerPool{
name: name,
workers: workers,
queue: make(chan Job, queueSize),
metrics: metrics,
shutdown: make(chan struct{}),
}
// Update initial metrics
metrics.workerPoolSize.WithLabelValues(name).Set(float64(workers))
metrics.queueDepth.WithL
abelValues(name).Set(0)
// Start workers
for i := 0; i < workers; i++ {
pool.wg.Add(1)
go pool.worker()
}
return pool
}
// worker processes jobs from the queue
func (p *InstrumentedWorkerPool) worker() {
defer p.wg.Done()
for {
select {
case job, ok := <-p.queue:
if !ok {
return // Channel closed
}
// Update metrics
atomic.AddInt32(&p.processing, 1)
p.metrics.inFlightRequests.WithLabelValues(p.name).Inc()
// Process the job with tracing
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
startTime := time.Now()
result, err := job.Handler(ctx)
duration := time.Since(startTime)
// Update metrics based on result
p.metrics.requestDuration.WithLabelValues(p.name, job.ID).Observe(duration.Seconds())
if err != nil {
p.metrics.errorCounter.WithLabelValues(p.name, "job_error").Inc()
p.metrics.requestCounter.WithLabelValues(p.name, job.ID, "error").Inc()
log.Printf("[%s] Job %s failed: %v", p.name, job.ID, err)
} else {
p.metrics.requestCounter.WithLabelValues(p.name, job.ID, "success").Inc()
log.Printf("[%s] Job %s succeeded: %v", p.name, job.ID, result)
}
// Update in-flight metrics
atomic.AddInt32(&p.processing, -1)
p.metrics.inFlightRequests.WithLabelValues(p.name).Dec()
// Update queue depth metric
p.metrics.queueDepth.WithLabelValues(p.name).Set(float64(len(p.queue)))
cancel() // Always cancel the context
case <-p.shutdown:
return
}
}
}
// Submit adds a job to the worker pool
func (p *InstrumentedWorkerPool) Submit(job Job) error {
select {
case p.queue <- job:
// Update queue depth metric
p.metrics.queueDepth.WithLabelValues(p.name).Set(float64(len(p.queue)))
return nil
default:
p.metrics.errorCounter.WithLabelValues(p.name, "queue_full").Inc()
return fmt.Errorf("queue is full")
}
}
// Shutdown stops the worker pool
func (p *InstrumentedWorkerPool) Shutdown() {
close(p.shutdown)
close(p.queue)
p.wg.Wait()
// Update metrics
p.metrics.workerPoolSize.WithLabelValues(p.name).Set(0)
p.metrics.queueDepth.WithLabelValues(p.name).Set(0)
p.metrics.inFlightRequests.WithLabelValues(p.name).Set(0)
}
// GetMetrics returns current metrics for the pool
func (p *InstrumentedWorkerPool) GetMetrics() map[string]interface{} {
return map[string]interface{}{
"workers": p.workers,
"queue_size": len(p.queue),
"processing": atomic.LoadInt32(&p.processing),
}
}
func main() {
// Create a Prometheus registry
reg := prometheus.NewRegistry()
// Create metrics
metrics := NewMetrics(reg)
// Start HTTP server for Prometheus metrics
http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
go func() {
log.Println("Starting metrics server on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Printf("Metrics server error: %v", err)
}
}()
// Create worker pools
highPriorityPool := NewInstrumentedWorkerPool("high-priority", 5, 10, metrics)
lowPriorityPool := NewInstrumentedWorkerPool("low-priority", 3, 20, metrics)
// Create a context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Submit jobs to the pools
for i := 0; i < 20; i++ {
// Create jobs with different priorities
highPriorityJob := Job{
ID: fmt.Sprintf("high-%d", i),
Handler: func(ctx context.Context) (interface{}, error) {
// Simulate work
time.Sleep(100 * time.Millisecond)
// Simulate occasional errors
if rand.Intn(10) == 0 {
return nil, fmt.Errorf("high priority job failed")
}
return "high priority result", nil
},
Priority: 2,
}
lowPriorityJob := Job{
ID: fmt.Sprintf("low-%d", i),
Handler: func(ctx context.Context) (interface{}, error) {
// Simulate work
time.Sleep(200 * time.Millisecond)
// Simulate occasional errors
if rand.Intn(5) == 0 {
return nil, fmt.Errorf("low priority job failed")
}
return "low priority result", nil
},
Priority: 1,
}
// Submit jobs
if err := highPriorityPool.Submit(highPriorityJob); err != nil {
log.Printf("Failed to submit high priority job: %v", err)
}
if err := lowPriorityPool.Submit(lowPriorityJob); err != nil {
log.Printf("Failed to submit low priority job: %v", err)
}
// Add some delay between submissions
time.Sleep(50 * time.Millisecond)
}
// Wait for jobs to complete
time.Sleep(5 * time.Second)
// Print metrics
log.Printf("High priority pool metrics: %v", highPriorityPool.GetMetrics())
log.Printf("Low priority pool metrics: %v", lowPriorityPool.GetMetrics())
// Shutdown pools
highPriorityPool.Shutdown()
lowPriorityPool.Shutdown()
log.Println("Worker pools shut down")
}
This observability integration approach is essential for distributed systems because it:
- Provides real-time visibility into system behavior
- Enables detection of performance bottlenecks
- Facilitates capacity planning and scaling decisions
- Helps identify and diagnose issues quickly
- Supports data-driven optimization of concurrent code
Looking Ahead
Go’s concurrency primitives provide a solid foundation for building distributed systems, but mastering advanced patterns is essential for creating robust, scalable applications that can handle the complexities of distributed environments.
Throughout this guide, we’ve explored sophisticated concurrency patterns that address common challenges in distributed systems:
Advanced Channel Patterns like fan-out/fan-in, multiplexing, and timed operations help manage complex data flows and handle variable latency in distributed environments.
Worker Pool and Pipeline Patterns enable efficient processing of large workloads with adaptive scaling and staged processing, essential for handling variable load in distributed systems.
Distributed Coordination Patterns such as distributed mutexes, leader election, and semaphores provide mechanisms for synchronizing activities across multiple nodes, a fundamental requirement for maintaining consistency in distributed systems.
Error Handling and Recovery Patterns like circuit breakers and graceful shutdown ensure resilience in the face of failures, preventing cascading issues that can bring down entire systems.
Performance Optimization and Monitoring techniques help identify bottlenecks and ensure efficient resource utilization, critical for maintaining performance at scale.
Production Best Practices including context propagation, bounded concurrency, graceful degradation, and observability integration provide a framework for deploying concurrent code in production environments.
By applying these patterns appropriately, you can build distributed systems in Go that are not only concurrent but also resilient, maintainable, and performant. Remember that the key to successful concurrent programming in distributed systems is not just understanding these patterns individually, but knowing when and how to combine them to address the specific challenges of your application.
As distributed systems continue to evolve, these foundational patterns will remain relevant, providing a toolkit for solving complex coordination and synchronization problems in Go. The true art lies in selecting the right patterns for your specific use case and implementing them with careful attention to error handling, performance, and observability.