Workers are now monitored and automatically restarted when they crash. The worker pool validates addresses before returning them to skip stale entries from crashed workers. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
135 lines
3.1 KiB
Go
135 lines
3.1 KiB
Go
package main
|
|
|
|
import (
|
|
"log"
|
|
"sync"
|
|
)
|
|
|
|
// WorkerPool manages a pool of worker addresses and tracks their availability.
|
|
// Each worker can only handle one request at a time.
|
|
type WorkerPool struct {
|
|
mu sync.Mutex
|
|
workers []string
|
|
available chan string
|
|
closed bool
|
|
}
|
|
|
|
// NewWorkerPool creates a new empty worker pool.
|
|
func NewWorkerPool() *WorkerPool {
|
|
return &WorkerPool{
|
|
available: make(chan string, 100), // buffered to avoid blocking
|
|
}
|
|
}
|
|
|
|
// SetWorkers updates the pool with a new set of worker addresses.
|
|
// Called when workers are started or restarted after a rebuild.
|
|
func (p *WorkerPool) SetWorkers(addrs []string) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
// Drain the old available channel
|
|
close(p.available)
|
|
for range p.available {
|
|
// drain
|
|
}
|
|
|
|
// Create new channel and populate with new workers
|
|
p.available = make(chan string, len(addrs)+10)
|
|
p.workers = make([]string, len(addrs))
|
|
copy(p.workers, addrs)
|
|
|
|
for _, addr := range addrs {
|
|
p.available <- addr
|
|
}
|
|
|
|
log.Printf("[pool] Updated workers: %v", addrs)
|
|
}
|
|
|
|
// Acquire blocks until a worker is available and returns its address.
|
|
// Validates that the worker is still in the current set before returning.
|
|
func (p *WorkerPool) Acquire() (string, bool) {
|
|
for {
|
|
addr, ok := <-p.available
|
|
if !ok {
|
|
return "", false
|
|
}
|
|
|
|
// Validate worker is still in current set (might have crashed)
|
|
p.mu.Lock()
|
|
valid := false
|
|
for _, w := range p.workers {
|
|
if w == addr {
|
|
valid = true
|
|
break
|
|
}
|
|
}
|
|
p.mu.Unlock()
|
|
|
|
if valid {
|
|
log.Printf("[pool] Acquired worker %s", addr)
|
|
return addr, true
|
|
}
|
|
log.Printf("[pool] Skipping stale worker %s", addr)
|
|
// Worker was removed, try next one
|
|
}
|
|
}
|
|
|
|
// Release marks a worker as available again after it finishes handling a request.
|
|
func (p *WorkerPool) Release(addr string) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
// Only release if the worker is still in our current set
|
|
for _, w := range p.workers {
|
|
if w == addr {
|
|
select {
|
|
case p.available <- addr:
|
|
log.Printf("[pool] Released worker %s", addr)
|
|
default:
|
|
// Channel full, worker may have been removed
|
|
}
|
|
return
|
|
}
|
|
}
|
|
// Worker not in current set (probably from before a rebuild), ignore
|
|
}
|
|
|
|
// RemoveWorker removes a worker from the pool (e.g., when it crashes).
|
|
// The worker will no longer receive requests.
|
|
func (p *WorkerPool) RemoveWorker(addr string) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
// Remove from workers slice
|
|
newWorkers := make([]string, 0, len(p.workers))
|
|
for _, w := range p.workers {
|
|
if w != addr {
|
|
newWorkers = append(newWorkers, w)
|
|
}
|
|
}
|
|
p.workers = newWorkers
|
|
|
|
log.Printf("[pool] Removed worker %s, remaining: %v", addr, p.workers)
|
|
}
|
|
|
|
// AddWorker adds a worker to the pool and makes it available for requests.
|
|
func (p *WorkerPool) AddWorker(addr string) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
// Check if already in pool
|
|
for _, w := range p.workers {
|
|
if w == addr {
|
|
return
|
|
}
|
|
}
|
|
|
|
p.workers = append(p.workers, addr)
|
|
select {
|
|
case p.available <- addr:
|
|
log.Printf("[pool] Added worker %s", addr)
|
|
default:
|
|
log.Printf("[pool] Added worker %s (channel full)", addr)
|
|
}
|
|
}
|