diff --git a/master/runexpress.go b/master/runexpress.go index a2333f8..3162512 100644 --- a/master/runexpress.go +++ b/master/runexpress.go @@ -11,45 +11,131 @@ import ( "time" ) +// worker tracks the state of a single express worker process +type worker struct { + port int + addr string + cmd *exec.Cmd + stopping bool + mu sync.Mutex +} + func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool *WorkerPool) { - var currentProcesses []*exec.Cmd + var currentWorkers []*worker var mu sync.Mutex - // Helper to start an express process on a specific port - startExpress := func(port int) *exec.Cmd { - listenAddr := fmt.Sprintf("127.0.0.1:%d", port) - cmd := exec.Command("../express/run.sh", "--listen", listenAddr) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + // Helper to start a worker and monitor it for crashes + startWorker := func(port int) *worker { + w := &worker{ + port: port, + addr: fmt.Sprintf("127.0.0.1:%d", port), + } - if err := cmd.Start(); err != nil { - log.Printf("[express:%d] Failed to start: %v", port, err) + // start spawns the worker process. Caller must NOT hold w.mu. + start := func() bool { + cmd := exec.Command("../express/run.sh", "--listen", w.addr) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + log.Printf("[express:%d] Failed to start: %v", port, err) + return false + } + + w.mu.Lock() + w.cmd = cmd + w.mu.Unlock() + + log.Printf("[express:%d] Started (pid %d)", port, cmd.Process.Pid) + return true + } + + if !start() { return nil } - log.Printf("[express:%d] Started (pid %d)", port, cmd.Process.Pid) + // Monitor and restart on crash + go func() { + for { + w.mu.Lock() + currentCmd := w.cmd + stopping := w.stopping + w.mu.Unlock() - // Monitor the process in background - go func(p int) { - err := cmd.Wait() - if err != nil { - log.Printf("[express:%d] Process exited: %v", p, err) - } else { - log.Printf("[express:%d] Process exited normally", p) + if stopping { + return + } + + if currentCmd == nil { + time.Sleep(time.Second) + w.mu.Lock() + if w.stopping { + w.mu.Unlock() + return + } + w.mu.Unlock() + + if start() { + pool.AddWorker(w.addr) + } + continue + } + + err := currentCmd.Wait() + + w.mu.Lock() + if w.stopping { + w.mu.Unlock() + return + } + w.cmd = nil + w.mu.Unlock() + + // Worker crashed - remove from pool and restart + pool.RemoveWorker(w.addr) + + if err != nil { + log.Printf("[express:%d] Process crashed: %v, restarting...", w.port, err) + } else { + log.Printf("[express:%d] Process exited unexpectedly, restarting...", w.port) + } + + time.Sleep(time.Second) + + w.mu.Lock() + if w.stopping { + w.mu.Unlock() + return + } + w.mu.Unlock() + + if start() { + pool.AddWorker(w.addr) + } + // If start failed, cmd is still nil and next iteration will retry } - }(port) + }() - return cmd + return w } - // Helper to stop an express process - stopExpress := func(cmd *exec.Cmd) { + // Helper to stop a worker (for intentional shutdown) + stopWorker := func(w *worker) { + if w == nil { + return + } + + w.mu.Lock() + w.stopping = true + cmd := w.cmd + w.mu.Unlock() + if cmd == nil || cmd.Process == nil { return } pid := cmd.Process.Pid - log.Printf("[express] Stopping (pid %d)", pid) + log.Printf("[express:%d] Stopping (pid %d)", w.port, pid) cmd.Process.Signal(syscall.SIGTERM) // Wait briefly for graceful shutdown @@ -61,36 +147,35 @@ func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool select { case <-done: - log.Printf("[express] Stopped gracefully (pid %d)", pid) + log.Printf("[express:%d] Stopped gracefully (pid %d)", w.port, pid) case <-time.After(5 * time.Second): - log.Printf("[express] Force killing (pid %d)", pid) + log.Printf("[express:%d] Force killing (pid %d)", w.port, pid) cmd.Process.Kill() } } - // Helper to stop all express processes - stopAllExpress := func(processes []*exec.Cmd) { - for _, cmd := range processes { - stopExpress(cmd) + // Helper to stop all workers + stopAllWorkers := func(workers []*worker) { + for _, w := range workers { + stopWorker(w) } } - // Helper to start all express processes and update the worker pool - startAllExpress := func() []*exec.Cmd { - processes := make([]*exec.Cmd, 0, numProcesses) + // Helper to start all workers and update the worker pool + startAllWorkers := func() []*worker { + workers := make([]*worker, 0, numProcesses) addresses := make([]string, 0, numProcesses) for i := 0; i < numProcesses; i++ { port := basePort + i - addr := fmt.Sprintf("127.0.0.1:%d", port) - cmd := startExpress(port) - if cmd != nil { - processes = append(processes, cmd) - addresses = append(addresses, addr) + w := startWorker(port) + if w != nil { + workers = append(workers, w) + addresses = append(addresses, w.addr) } } // Update the worker pool with new worker addresses pool.SetWorkers(addresses) - return processes + return workers } // Helper to run the build @@ -128,7 +213,7 @@ func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool // Initial build and start log.Printf("[master] Initial build...") if runBuild() { - currentProcesses = startAllExpress() + currentWorkers = startAllWorkers() } else { log.Printf("[master] Initial build failed") } @@ -143,18 +228,18 @@ func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool debounceTimer = time.AfterFunc(debounceDelay, func() { if !runBuild() { - log.Printf("[master] Build failed, keeping current processes") + log.Printf("[master] Build failed, keeping current workers") return } mu.Lock() defer mu.Unlock() - // Stop all old processes - stopAllExpress(currentProcesses) + // Stop all old workers (this sets stopping=true to prevent auto-restart) + stopAllWorkers(currentWorkers) - // Start all new processes - currentProcesses = startAllExpress() + // Start all new workers + currentWorkers = startAllWorkers() }) } } diff --git a/master/workerpool.go b/master/workerpool.go index e35a6f7..7f07a40 100644 --- a/master/workerpool.go +++ b/master/workerpool.go @@ -46,12 +46,32 @@ func (p *WorkerPool) SetWorkers(addrs []string) { } // Acquire blocks until a worker is available and returns its address. +// Validates that the worker is still in the current set before returning. func (p *WorkerPool) Acquire() (string, bool) { - addr, ok := <-p.available - if ok { - log.Printf("[pool] Acquired worker %s", addr) + for { + addr, ok := <-p.available + if !ok { + return "", false + } + + // Validate worker is still in current set (might have crashed) + p.mu.Lock() + valid := false + for _, w := range p.workers { + if w == addr { + valid = true + break + } + } + p.mu.Unlock() + + if valid { + log.Printf("[pool] Acquired worker %s", addr) + return addr, true + } + log.Printf("[pool] Skipping stale worker %s", addr) + // Worker was removed, try next one } - return addr, ok } // Release marks a worker as available again after it finishes handling a request. @@ -73,3 +93,42 @@ func (p *WorkerPool) Release(addr string) { } // Worker not in current set (probably from before a rebuild), ignore } + +// RemoveWorker removes a worker from the pool (e.g., when it crashes). +// The worker will no longer receive requests. +func (p *WorkerPool) RemoveWorker(addr string) { + p.mu.Lock() + defer p.mu.Unlock() + + // Remove from workers slice + newWorkers := make([]string, 0, len(p.workers)) + for _, w := range p.workers { + if w != addr { + newWorkers = append(newWorkers, w) + } + } + p.workers = newWorkers + + log.Printf("[pool] Removed worker %s, remaining: %v", addr, p.workers) +} + +// AddWorker adds a worker to the pool and makes it available for requests. +func (p *WorkerPool) AddWorker(addr string) { + p.mu.Lock() + defer p.mu.Unlock() + + // Check if already in pool + for _, w := range p.workers { + if w == addr { + return + } + } + + p.workers = append(p.workers, addr) + select { + case p.available <- addr: + log.Printf("[pool] Added worker %s", addr) + default: + log.Printf("[pool] Added worker %s (channel full)", addr) + } +}