package main import ( "log" "sync" ) // WorkerPool manages a pool of worker addresses and tracks their availability. // Each worker can only handle one request at a time. type WorkerPool struct { mu sync.Mutex workers []string available chan string closed bool } // NewWorkerPool creates a new empty worker pool. func NewWorkerPool() *WorkerPool { return &WorkerPool{ available: make(chan string, 100), // buffered to avoid blocking } } // SetWorkers updates the pool with a new set of worker addresses. // Called when workers are started or restarted after a rebuild. func (p *WorkerPool) SetWorkers(addrs []string) { p.mu.Lock() defer p.mu.Unlock() // Drain the old available channel close(p.available) for range p.available { // drain } // Create new channel and populate with new workers p.available = make(chan string, len(addrs)+10) p.workers = make([]string, len(addrs)) copy(p.workers, addrs) for _, addr := range addrs { p.available <- addr } log.Printf("[pool] Updated workers: %v", addrs) } // Acquire blocks until a worker is available and returns its address. // Validates that the worker is still in the current set before returning. func (p *WorkerPool) Acquire() (string, bool) { for { addr, ok := <-p.available if !ok { return "", false } // Validate worker is still in current set (might have crashed) p.mu.Lock() valid := false for _, w := range p.workers { if w == addr { valid = true break } } p.mu.Unlock() if valid { log.Printf("[pool] Acquired worker %s", addr) return addr, true } log.Printf("[pool] Skipping stale worker %s", addr) // Worker was removed, try next one } } // Release marks a worker as available again after it finishes handling a request. func (p *WorkerPool) Release(addr string) { p.mu.Lock() defer p.mu.Unlock() // Only release if the worker is still in our current set for _, w := range p.workers { if w == addr { select { case p.available <- addr: log.Printf("[pool] Released worker %s", addr) default: // Channel full, worker may have been removed } return } } // Worker not in current set (probably from before a rebuild), ignore } // RemoveWorker removes a worker from the pool (e.g., when it crashes). // The worker will no longer receive requests. func (p *WorkerPool) RemoveWorker(addr string) { p.mu.Lock() defer p.mu.Unlock() // Remove from workers slice newWorkers := make([]string, 0, len(p.workers)) for _, w := range p.workers { if w != addr { newWorkers = append(newWorkers, w) } } p.workers = newWorkers log.Printf("[pool] Removed worker %s, remaining: %v", addr, p.workers) } // AddWorker adds a worker to the pool and makes it available for requests. func (p *WorkerPool) AddWorker(addr string) { p.mu.Lock() defer p.mu.Unlock() // Check if already in pool for _, w := range p.workers { if w == addr { return } } p.workers = append(p.workers, addr) select { case p.available <- addr: log.Printf("[pool] Added worker %s", addr) default: log.Printf("[pool] Added worker %s (channel full)", addr) } }