Files
diachron/master/runexpress.go
2026-02-02 18:31:03 -05:00

246 lines
4.8 KiB
Go

package main
import (
"fmt"
"io"
"log"
"os"
"os/exec"
"sync"
"syscall"
"time"
)
// worker tracks the state of a single express worker process
type worker struct {
port int
addr string
cmd *exec.Cmd
stopping bool
mu sync.Mutex
}
func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool *WorkerPool) {
var currentWorkers []*worker
var mu sync.Mutex
// Helper to start a worker and monitor it for crashes
startWorker := func(port int) *worker {
w := &worker{
port: port,
addr: fmt.Sprintf("127.0.0.1:%d", port),
}
// start spawns the worker process. Caller must NOT hold w.mu.
start := func() bool {
cmd := exec.Command("../backend/run.sh", "--listen", w.addr)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
log.Printf("[express:%d] Failed to start: %v", port, err)
return false
}
w.mu.Lock()
w.cmd = cmd
w.mu.Unlock()
log.Printf("[express:%d] Started (pid %d)", port, cmd.Process.Pid)
return true
}
if !start() {
return nil
}
// Monitor and restart on crash
go func() {
for {
w.mu.Lock()
currentCmd := w.cmd
stopping := w.stopping
w.mu.Unlock()
if stopping {
return
}
if currentCmd == nil {
time.Sleep(time.Second)
w.mu.Lock()
if w.stopping {
w.mu.Unlock()
return
}
w.mu.Unlock()
if start() {
pool.AddWorker(w.addr)
}
continue
}
err := currentCmd.Wait()
w.mu.Lock()
if w.stopping {
w.mu.Unlock()
return
}
w.cmd = nil
w.mu.Unlock()
// Worker crashed - remove from pool and restart
pool.RemoveWorker(w.addr)
if err != nil {
log.Printf("[express:%d] Process crashed: %v, restarting...", w.port, err)
} else {
log.Printf("[express:%d] Process exited unexpectedly, restarting...", w.port)
}
time.Sleep(time.Second)
w.mu.Lock()
if w.stopping {
w.mu.Unlock()
return
}
w.mu.Unlock()
if start() {
pool.AddWorker(w.addr)
}
// If start failed, cmd is still nil and next iteration will retry
}
}()
return w
}
// Helper to stop a worker (for intentional shutdown)
stopWorker := func(w *worker) {
if w == nil {
return
}
w.mu.Lock()
w.stopping = true
cmd := w.cmd
w.mu.Unlock()
if cmd == nil || cmd.Process == nil {
return
}
pid := cmd.Process.Pid
log.Printf("[express:%d] Stopping (pid %d)", w.port, pid)
cmd.Process.Signal(syscall.SIGTERM)
// Wait briefly for graceful shutdown
done := make(chan struct{})
go func() {
cmd.Wait()
close(done)
}()
select {
case <-done:
log.Printf("[express:%d] Stopped gracefully (pid %d)", w.port, pid)
case <-time.After(5 * time.Second):
log.Printf("[express:%d] Force killing (pid %d)", w.port, pid)
cmd.Process.Kill()
}
}
// Helper to stop all workers
stopAllWorkers := func(workers []*worker) {
for _, w := range workers {
stopWorker(w)
}
}
// Helper to start all workers and update the worker pool
startAllWorkers := func() []*worker {
workers := make([]*worker, 0, numProcesses)
addresses := make([]string, 0, numProcesses)
for i := 0; i < numProcesses; i++ {
port := basePort + i
w := startWorker(port)
if w != nil {
workers = append(workers, w)
addresses = append(addresses, w.addr)
}
}
// Update the worker pool with new worker addresses
pool.SetWorkers(addresses)
return workers
}
// Helper to run the build
runBuild := func() bool {
log.Printf("[build] Starting ncc build...")
cmd := exec.Command("../backend/build.sh")
stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
if err := cmd.Start(); err != nil {
log.Printf("[build] Failed to start: %v", err)
return false
}
// Copy output
go io.Copy(os.Stdout, stdout)
go io.Copy(os.Stderr, stderr)
err := cmd.Wait()
if err != nil {
log.Printf("[build] Failed: %v", err)
return false
}
log.Printf("[build] Success")
return true
}
// Debounce timer
var debounceTimer *time.Timer
const debounceDelay = 100 * time.Millisecond
// Initial build and start
log.Printf("[master] Initial build...")
if runBuild() {
currentWorkers = startAllWorkers()
} else {
log.Printf("[master] Initial build failed")
}
for change := range changes {
log.Printf("[watch] %s: %s", change.Operation, change.Path)
// Reset debounce timer
if debounceTimer != nil {
debounceTimer.Stop()
}
debounceTimer = time.AfterFunc(debounceDelay, func() {
if !runBuild() {
log.Printf("[master] Build failed, keeping current workers")
return
}
mu.Lock()
defer mu.Unlock()
// Stop all old workers (this sets stopping=true to prevent auto-restart)
stopAllWorkers(currentWorkers)
// Start all new workers
currentWorkers = startAllWorkers()
})
}
}