Add automatic restart for crashed worker processes

Workers are now monitored and automatically restarted when they crash.
The worker pool validates addresses before returning them to skip stale
entries from crashed workers.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-25 17:55:43 -06:00
parent bcd71f2801
commit 2f5ef7c267
2 changed files with 191 additions and 47 deletions

View File

@@ -11,45 +11,131 @@ import (
"time" "time"
) )
// worker tracks the state of a single express worker process
type worker struct {
port int
addr string
cmd *exec.Cmd
stopping bool
mu sync.Mutex
}
func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool *WorkerPool) { func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool *WorkerPool) {
var currentProcesses []*exec.Cmd var currentWorkers []*worker
var mu sync.Mutex var mu sync.Mutex
// Helper to start an express process on a specific port // Helper to start a worker and monitor it for crashes
startExpress := func(port int) *exec.Cmd { startWorker := func(port int) *worker {
listenAddr := fmt.Sprintf("127.0.0.1:%d", port) w := &worker{
cmd := exec.Command("../express/run.sh", "--listen", listenAddr) port: port,
addr: fmt.Sprintf("127.0.0.1:%d", port),
}
// start spawns the worker process. Caller must NOT hold w.mu.
start := func() bool {
cmd := exec.Command("../express/run.sh", "--listen", w.addr)
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
log.Printf("[express:%d] Failed to start: %v", port, err) log.Printf("[express:%d] Failed to start: %v", port, err)
return false
}
w.mu.Lock()
w.cmd = cmd
w.mu.Unlock()
log.Printf("[express:%d] Started (pid %d)", port, cmd.Process.Pid)
return true
}
if !start() {
return nil return nil
} }
log.Printf("[express:%d] Started (pid %d)", port, cmd.Process.Pid) // Monitor and restart on crash
go func() {
for {
w.mu.Lock()
currentCmd := w.cmd
stopping := w.stopping
w.mu.Unlock()
if stopping {
return
}
if currentCmd == nil {
time.Sleep(time.Second)
w.mu.Lock()
if w.stopping {
w.mu.Unlock()
return
}
w.mu.Unlock()
if start() {
pool.AddWorker(w.addr)
}
continue
}
err := currentCmd.Wait()
w.mu.Lock()
if w.stopping {
w.mu.Unlock()
return
}
w.cmd = nil
w.mu.Unlock()
// Worker crashed - remove from pool and restart
pool.RemoveWorker(w.addr)
// Monitor the process in background
go func(p int) {
err := cmd.Wait()
if err != nil { if err != nil {
log.Printf("[express:%d] Process exited: %v", p, err) log.Printf("[express:%d] Process crashed: %v, restarting...", w.port, err)
} else { } else {
log.Printf("[express:%d] Process exited normally", p) log.Printf("[express:%d] Process exited unexpectedly, restarting...", w.port)
}
}(port)
return cmd
} }
// Helper to stop an express process time.Sleep(time.Second)
stopExpress := func(cmd *exec.Cmd) {
w.mu.Lock()
if w.stopping {
w.mu.Unlock()
return
}
w.mu.Unlock()
if start() {
pool.AddWorker(w.addr)
}
// If start failed, cmd is still nil and next iteration will retry
}
}()
return w
}
// Helper to stop a worker (for intentional shutdown)
stopWorker := func(w *worker) {
if w == nil {
return
}
w.mu.Lock()
w.stopping = true
cmd := w.cmd
w.mu.Unlock()
if cmd == nil || cmd.Process == nil { if cmd == nil || cmd.Process == nil {
return return
} }
pid := cmd.Process.Pid pid := cmd.Process.Pid
log.Printf("[express] Stopping (pid %d)", pid) log.Printf("[express:%d] Stopping (pid %d)", w.port, pid)
cmd.Process.Signal(syscall.SIGTERM) cmd.Process.Signal(syscall.SIGTERM)
// Wait briefly for graceful shutdown // Wait briefly for graceful shutdown
@@ -61,36 +147,35 @@ func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool
select { select {
case <-done: case <-done:
log.Printf("[express] Stopped gracefully (pid %d)", pid) log.Printf("[express:%d] Stopped gracefully (pid %d)", w.port, pid)
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
log.Printf("[express] Force killing (pid %d)", pid) log.Printf("[express:%d] Force killing (pid %d)", w.port, pid)
cmd.Process.Kill() cmd.Process.Kill()
} }
} }
// Helper to stop all express processes // Helper to stop all workers
stopAllExpress := func(processes []*exec.Cmd) { stopAllWorkers := func(workers []*worker) {
for _, cmd := range processes { for _, w := range workers {
stopExpress(cmd) stopWorker(w)
} }
} }
// Helper to start all express processes and update the worker pool // Helper to start all workers and update the worker pool
startAllExpress := func() []*exec.Cmd { startAllWorkers := func() []*worker {
processes := make([]*exec.Cmd, 0, numProcesses) workers := make([]*worker, 0, numProcesses)
addresses := make([]string, 0, numProcesses) addresses := make([]string, 0, numProcesses)
for i := 0; i < numProcesses; i++ { for i := 0; i < numProcesses; i++ {
port := basePort + i port := basePort + i
addr := fmt.Sprintf("127.0.0.1:%d", port) w := startWorker(port)
cmd := startExpress(port) if w != nil {
if cmd != nil { workers = append(workers, w)
processes = append(processes, cmd) addresses = append(addresses, w.addr)
addresses = append(addresses, addr)
} }
} }
// Update the worker pool with new worker addresses // Update the worker pool with new worker addresses
pool.SetWorkers(addresses) pool.SetWorkers(addresses)
return processes return workers
} }
// Helper to run the build // Helper to run the build
@@ -128,7 +213,7 @@ func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool
// Initial build and start // Initial build and start
log.Printf("[master] Initial build...") log.Printf("[master] Initial build...")
if runBuild() { if runBuild() {
currentProcesses = startAllExpress() currentWorkers = startAllWorkers()
} else { } else {
log.Printf("[master] Initial build failed") log.Printf("[master] Initial build failed")
} }
@@ -143,18 +228,18 @@ func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool
debounceTimer = time.AfterFunc(debounceDelay, func() { debounceTimer = time.AfterFunc(debounceDelay, func() {
if !runBuild() { if !runBuild() {
log.Printf("[master] Build failed, keeping current processes") log.Printf("[master] Build failed, keeping current workers")
return return
} }
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
// Stop all old processes // Stop all old workers (this sets stopping=true to prevent auto-restart)
stopAllExpress(currentProcesses) stopAllWorkers(currentWorkers)
// Start all new processes // Start all new workers
currentProcesses = startAllExpress() currentWorkers = startAllWorkers()
}) })
} }
} }

View File

@@ -46,12 +46,32 @@ func (p *WorkerPool) SetWorkers(addrs []string) {
} }
// Acquire blocks until a worker is available and returns its address. // Acquire blocks until a worker is available and returns its address.
// Validates that the worker is still in the current set before returning.
func (p *WorkerPool) Acquire() (string, bool) { func (p *WorkerPool) Acquire() (string, bool) {
for {
addr, ok := <-p.available addr, ok := <-p.available
if ok { if !ok {
log.Printf("[pool] Acquired worker %s", addr) return "", false
}
// Validate worker is still in current set (might have crashed)
p.mu.Lock()
valid := false
for _, w := range p.workers {
if w == addr {
valid = true
break
}
}
p.mu.Unlock()
if valid {
log.Printf("[pool] Acquired worker %s", addr)
return addr, true
}
log.Printf("[pool] Skipping stale worker %s", addr)
// Worker was removed, try next one
} }
return addr, ok
} }
// Release marks a worker as available again after it finishes handling a request. // Release marks a worker as available again after it finishes handling a request.
@@ -73,3 +93,42 @@ func (p *WorkerPool) Release(addr string) {
} }
// Worker not in current set (probably from before a rebuild), ignore // Worker not in current set (probably from before a rebuild), ignore
} }
// RemoveWorker removes a worker from the pool (e.g., when it crashes).
// The worker will no longer receive requests.
func (p *WorkerPool) RemoveWorker(addr string) {
p.mu.Lock()
defer p.mu.Unlock()
// Remove from workers slice
newWorkers := make([]string, 0, len(p.workers))
for _, w := range p.workers {
if w != addr {
newWorkers = append(newWorkers, w)
}
}
p.workers = newWorkers
log.Printf("[pool] Removed worker %s, remaining: %v", addr, p.workers)
}
// AddWorker adds a worker to the pool and makes it available for requests.
func (p *WorkerPool) AddWorker(addr string) {
p.mu.Lock()
defer p.mu.Unlock()
// Check if already in pool
for _, w := range p.workers {
if w == addr {
return
}
}
p.workers = append(p.workers, addr)
select {
case p.available <- addr:
log.Printf("[pool] Added worker %s", addr)
default:
log.Printf("[pool] Added worker %s (channel full)", addr)
}
}