Files
diachron/master/runexpress.go
Michael Wolf 2f5ef7c267 Add automatic restart for crashed worker processes
Workers are now monitored and automatically restarted when they crash.
The worker pool validates addresses before returning them to skip stale
entries from crashed workers.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-25 17:55:43 -06:00

246 lines
4.8 KiB
Go

package main
import (
"fmt"
"io"
"log"
"os"
"os/exec"
"sync"
"syscall"
"time"
)
// worker tracks the state of a single express worker process
type worker struct {
port int
addr string
cmd *exec.Cmd
stopping bool
mu sync.Mutex
}
func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool *WorkerPool) {
var currentWorkers []*worker
var mu sync.Mutex
// Helper to start a worker and monitor it for crashes
startWorker := func(port int) *worker {
w := &worker{
port: port,
addr: fmt.Sprintf("127.0.0.1:%d", port),
}
// start spawns the worker process. Caller must NOT hold w.mu.
start := func() bool {
cmd := exec.Command("../express/run.sh", "--listen", w.addr)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
log.Printf("[express:%d] Failed to start: %v", port, err)
return false
}
w.mu.Lock()
w.cmd = cmd
w.mu.Unlock()
log.Printf("[express:%d] Started (pid %d)", port, cmd.Process.Pid)
return true
}
if !start() {
return nil
}
// Monitor and restart on crash
go func() {
for {
w.mu.Lock()
currentCmd := w.cmd
stopping := w.stopping
w.mu.Unlock()
if stopping {
return
}
if currentCmd == nil {
time.Sleep(time.Second)
w.mu.Lock()
if w.stopping {
w.mu.Unlock()
return
}
w.mu.Unlock()
if start() {
pool.AddWorker(w.addr)
}
continue
}
err := currentCmd.Wait()
w.mu.Lock()
if w.stopping {
w.mu.Unlock()
return
}
w.cmd = nil
w.mu.Unlock()
// Worker crashed - remove from pool and restart
pool.RemoveWorker(w.addr)
if err != nil {
log.Printf("[express:%d] Process crashed: %v, restarting...", w.port, err)
} else {
log.Printf("[express:%d] Process exited unexpectedly, restarting...", w.port)
}
time.Sleep(time.Second)
w.mu.Lock()
if w.stopping {
w.mu.Unlock()
return
}
w.mu.Unlock()
if start() {
pool.AddWorker(w.addr)
}
// If start failed, cmd is still nil and next iteration will retry
}
}()
return w
}
// Helper to stop a worker (for intentional shutdown)
stopWorker := func(w *worker) {
if w == nil {
return
}
w.mu.Lock()
w.stopping = true
cmd := w.cmd
w.mu.Unlock()
if cmd == nil || cmd.Process == nil {
return
}
pid := cmd.Process.Pid
log.Printf("[express:%d] Stopping (pid %d)", w.port, pid)
cmd.Process.Signal(syscall.SIGTERM)
// Wait briefly for graceful shutdown
done := make(chan struct{})
go func() {
cmd.Wait()
close(done)
}()
select {
case <-done:
log.Printf("[express:%d] Stopped gracefully (pid %d)", w.port, pid)
case <-time.After(5 * time.Second):
log.Printf("[express:%d] Force killing (pid %d)", w.port, pid)
cmd.Process.Kill()
}
}
// Helper to stop all workers
stopAllWorkers := func(workers []*worker) {
for _, w := range workers {
stopWorker(w)
}
}
// Helper to start all workers and update the worker pool
startAllWorkers := func() []*worker {
workers := make([]*worker, 0, numProcesses)
addresses := make([]string, 0, numProcesses)
for i := 0; i < numProcesses; i++ {
port := basePort + i
w := startWorker(port)
if w != nil {
workers = append(workers, w)
addresses = append(addresses, w.addr)
}
}
// Update the worker pool with new worker addresses
pool.SetWorkers(addresses)
return workers
}
// Helper to run the build
runBuild := func() bool {
log.Printf("[build] Starting ncc build...")
cmd := exec.Command("../express/build.sh")
stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
if err := cmd.Start(); err != nil {
log.Printf("[build] Failed to start: %v", err)
return false
}
// Copy output
go io.Copy(os.Stdout, stdout)
go io.Copy(os.Stderr, stderr)
err := cmd.Wait()
if err != nil {
log.Printf("[build] Failed: %v", err)
return false
}
log.Printf("[build] Success")
return true
}
// Debounce timer
var debounceTimer *time.Timer
const debounceDelay = 100 * time.Millisecond
// Initial build and start
log.Printf("[master] Initial build...")
if runBuild() {
currentWorkers = startAllWorkers()
} else {
log.Printf("[master] Initial build failed")
}
for change := range changes {
log.Printf("[watch] %s: %s", change.Operation, change.Path)
// Reset debounce timer
if debounceTimer != nil {
debounceTimer.Stop()
}
debounceTimer = time.AfterFunc(debounceDelay, func() {
if !runBuild() {
log.Printf("[master] Build failed, keeping current workers")
return
}
mu.Lock()
defer mu.Unlock()
// Stop all old workers (this sets stopping=true to prevent auto-restart)
stopAllWorkers(currentWorkers)
// Start all new workers
currentWorkers = startAllWorkers()
})
}
}