Files
diachron/master/workerpool.go
Michael Wolf 2f5ef7c267 Add automatic restart for crashed worker processes
Workers are now monitored and automatically restarted when they crash.
The worker pool validates addresses before returning them to skip stale
entries from crashed workers.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 17:55:43 -06:00

135 lines
3.1 KiB
Go

package main
import (
"log"
"sync"
)
// WorkerPool manages a pool of worker addresses and tracks their availability.
// Each worker can only handle one request at a time.
type WorkerPool struct {
mu sync.Mutex
workers []string
available chan string
closed bool
}
// NewWorkerPool creates a new empty worker pool.
func NewWorkerPool() *WorkerPool {
return &WorkerPool{
available: make(chan string, 100), // buffered to avoid blocking
}
}
// SetWorkers updates the pool with a new set of worker addresses.
// Called when workers are started or restarted after a rebuild.
func (p *WorkerPool) SetWorkers(addrs []string) {
p.mu.Lock()
defer p.mu.Unlock()
// Drain the old available channel
close(p.available)
for range p.available {
// drain
}
// Create new channel and populate with new workers
p.available = make(chan string, len(addrs)+10)
p.workers = make([]string, len(addrs))
copy(p.workers, addrs)
for _, addr := range addrs {
p.available <- addr
}
log.Printf("[pool] Updated workers: %v", addrs)
}
// Acquire blocks until a worker is available and returns its address.
// Validates that the worker is still in the current set before returning.
func (p *WorkerPool) Acquire() (string, bool) {
for {
addr, ok := <-p.available
if !ok {
return "", false
}
// Validate worker is still in current set (might have crashed)
p.mu.Lock()
valid := false
for _, w := range p.workers {
if w == addr {
valid = true
break
}
}
p.mu.Unlock()
if valid {
log.Printf("[pool] Acquired worker %s", addr)
return addr, true
}
log.Printf("[pool] Skipping stale worker %s", addr)
// Worker was removed, try next one
}
}
// Release marks a worker as available again after it finishes handling a request.
func (p *WorkerPool) Release(addr string) {
p.mu.Lock()
defer p.mu.Unlock()
// Only release if the worker is still in our current set
for _, w := range p.workers {
if w == addr {
select {
case p.available <- addr:
log.Printf("[pool] Released worker %s", addr)
default:
// Channel full, worker may have been removed
}
return
}
}
// Worker not in current set (probably from before a rebuild), ignore
}
// RemoveWorker removes a worker from the pool (e.g., when it crashes).
// The worker will no longer receive requests.
func (p *WorkerPool) RemoveWorker(addr string) {
p.mu.Lock()
defer p.mu.Unlock()
// Remove from workers slice
newWorkers := make([]string, 0, len(p.workers))
for _, w := range p.workers {
if w != addr {
newWorkers = append(newWorkers, w)
}
}
p.workers = newWorkers
log.Printf("[pool] Removed worker %s, remaining: %v", addr, p.workers)
}
// AddWorker adds a worker to the pool and makes it available for requests.
func (p *WorkerPool) AddWorker(addr string) {
p.mu.Lock()
defer p.mu.Unlock()
// Check if already in pool
for _, w := range p.workers {
if w == addr {
return
}
}
p.workers = append(p.workers, addr)
select {
case p.available <- addr:
log.Printf("[pool] Added worker %s", addr)
default:
log.Printf("[pool] Added worker %s (channel full)", addr)
}
}