Add first cut at a pool

This commit is contained in:
2026-01-01 15:43:49 -06:00
parent 8722062f4a
commit f504576f3e
6 changed files with 232 additions and 43 deletions

View File

@@ -4,4 +4,4 @@ set -eu
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
"$DIR"/../shims/node "$@" exec "$DIR"/../shims/node "$@"

View File

@@ -1,23 +1,33 @@
package main package main
import ( import (
// "context"
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
// "sync" "strconv"
"syscall" "syscall"
) )
func main() { func main() {
// var program1 = os.Getenv("BUILD_COMMAND") watchedDir := os.Getenv("WATCHED_DIR")
//var program2 = os.Getenv("RUN_COMMAND")
var watchedDir = os.Getenv("WATCHED_DIR") numChildProcesses := 1
if n, err := strconv.Atoi(os.Getenv("NUM_CHILD_PROCESSES")); err == nil && n > 0 {
numChildProcesses = n
}
// Create context for graceful shutdown basePort := 3000
// ctx, cancel := context.WithCancel(context.Background()) if p, err := strconv.Atoi(os.Getenv("BASE_PORT")); err == nil && p > 0 {
//defer cancel() basePort = p
}
listenPort := 8080
if p, err := strconv.Atoi(os.Getenv("LISTEN_PORT")); err == nil && p > 0 {
listenPort = p
}
// Create worker pool
pool := NewWorkerPool()
// Setup signal handling // Setup signal handling
sigCh := make(chan os.Signal, 1) sigCh := make(chan os.Signal, 1)
@@ -27,24 +37,16 @@ func main() {
go watchFiles(watchedDir, fileChanges) go watchFiles(watchedDir, fileChanges)
go runExpress(fileChanges) go runExpress(fileChanges, numChildProcesses, basePort, pool)
// WaitGroup to track both processes // Start the reverse proxy
// var wg sync.WaitGroup listenAddr := fmt.Sprintf(":%d", listenPort)
go startProxy(listenAddr, pool)
// Start both processes
//wg.Add(2)
// go runProcess(ctx, &wg, "builder", program1)
// go runProcess(ctx, &wg, "runner", program2)
// Wait for interrupt signal // Wait for interrupt signal
<-sigCh <-sigCh
fmt.Println("\nReceived interrupt signal, shutting down...") fmt.Println("\nReceived interrupt signal, shutting down...")
// Cancel context to signal goroutines to stop
/// cancel()
// Wait for both processes to finish
// wg.Wait()
fmt.Println("All processes terminated cleanly") fmt.Println("All processes terminated cleanly")
} }

48
master/proxy.go Normal file
View File

@@ -0,0 +1,48 @@
package main
import (
"log"
"net/http"
"net/http/httputil"
"net/url"
)
// startProxy starts an HTTP reverse proxy that forwards requests to workers.
// It acquires a worker from the pool for each request and releases it when done.
func startProxy(listenAddr string, pool *WorkerPool) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Acquire a worker (blocks if none available)
workerAddr, ok := pool.Acquire()
if !ok {
http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
return
}
// Ensure we release the worker when done
defer pool.Release(workerAddr)
// Create reverse proxy to the worker
targetURL, err := url.Parse("http://" + workerAddr)
if err != nil {
log.Printf("[proxy] Failed to parse worker URL %s: %v", workerAddr, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
proxy := httputil.NewSingleHostReverseProxy(targetURL)
// Custom error handler
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
log.Printf("[proxy] Error proxying to %s: %v", workerAddr, err)
http.Error(w, "Bad gateway", http.StatusBadGateway)
}
log.Printf("[proxy] %s %s -> %s", r.Method, r.URL.Path, workerAddr)
proxy.ServeHTTP(w, r)
})
log.Printf("[proxy] Listening on %s", listenAddr)
if err := http.ListenAndServe(listenAddr, handler); err != nil {
log.Fatalf("[proxy] Failed to start: %v", err)
}
}

View File

@@ -1,6 +1,7 @@
package main package main
import ( import (
"fmt"
"io" "io"
"log" "log"
"os" "os"
@@ -10,43 +11,45 @@ import (
"time" "time"
) )
func runExpress(changes <-chan FileChange) { func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool *WorkerPool) {
var currentProcess *exec.Cmd var currentProcesses []*exec.Cmd
var mu sync.Mutex var mu sync.Mutex
// Helper to start the express process // Helper to start an express process on a specific port
startExpress := func() *exec.Cmd { startExpress := func(port int) *exec.Cmd {
cmd := exec.Command("../express/run.sh") listenAddr := fmt.Sprintf("127.0.0.1:%d", port)
cmd := exec.Command("../express/run.sh", "--listen", listenAddr)
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
log.Printf("[express] Failed to start: %v", err) log.Printf("[express:%d] Failed to start: %v", port, err)
return nil return nil
} }
log.Printf("[express] Started (pid %d)", cmd.Process.Pid) log.Printf("[express:%d] Started (pid %d)", port, cmd.Process.Pid)
// Monitor the process in background // Monitor the process in background
go func() { go func(p int) {
err := cmd.Wait() err := cmd.Wait()
if err != nil { if err != nil {
log.Printf("[express] Process exited: %v", err) log.Printf("[express:%d] Process exited: %v", p, err)
} else { } else {
log.Printf("[express] Process exited normally") log.Printf("[express:%d] Process exited normally", p)
} }
}() }(port)
return cmd return cmd
} }
// Helper to stop the express process // Helper to stop an express process
stopExpress := func(cmd *exec.Cmd) { stopExpress := func(cmd *exec.Cmd) {
if cmd == nil || cmd.Process == nil { if cmd == nil || cmd.Process == nil {
return return
} }
log.Printf("[express] Stopping (pid %d)", cmd.Process.Pid) pid := cmd.Process.Pid
log.Printf("[express] Stopping (pid %d)", pid)
cmd.Process.Signal(syscall.SIGTERM) cmd.Process.Signal(syscall.SIGTERM)
// Wait briefly for graceful shutdown // Wait briefly for graceful shutdown
@@ -58,13 +61,38 @@ func runExpress(changes <-chan FileChange) {
select { select {
case <-done: case <-done:
log.Printf("[express] Stopped gracefully") log.Printf("[express] Stopped gracefully (pid %d)", pid)
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
log.Printf("[express] Force killing") log.Printf("[express] Force killing (pid %d)", pid)
cmd.Process.Kill() cmd.Process.Kill()
} }
} }
// Helper to stop all express processes
stopAllExpress := func(processes []*exec.Cmd) {
for _, cmd := range processes {
stopExpress(cmd)
}
}
// Helper to start all express processes and update the worker pool
startAllExpress := func() []*exec.Cmd {
processes := make([]*exec.Cmd, 0, numProcesses)
addresses := make([]string, 0, numProcesses)
for i := 0; i < numProcesses; i++ {
port := basePort + i
addr := fmt.Sprintf("127.0.0.1:%d", port)
cmd := startExpress(port)
if cmd != nil {
processes = append(processes, cmd)
addresses = append(addresses, addr)
}
}
// Update the worker pool with new worker addresses
pool.SetWorkers(addresses)
return processes
}
// Helper to run the build // Helper to run the build
runBuild := func() bool { runBuild := func() bool {
log.Printf("[build] Starting ncc build...") log.Printf("[build] Starting ncc build...")
@@ -97,6 +125,14 @@ func runExpress(changes <-chan FileChange) {
var debounceTimer *time.Timer var debounceTimer *time.Timer
const debounceDelay = 100 * time.Millisecond const debounceDelay = 100 * time.Millisecond
// Initial build and start
log.Printf("[master] Initial build...")
if runBuild() {
currentProcesses = startAllExpress()
} else {
log.Printf("[master] Initial build failed")
}
for change := range changes { for change := range changes {
log.Printf("[watch] %s: %s", change.Operation, change.Path) log.Printf("[watch] %s: %s", change.Operation, change.Path)
@@ -107,18 +143,18 @@ func runExpress(changes <-chan FileChange) {
debounceTimer = time.AfterFunc(debounceDelay, func() { debounceTimer = time.AfterFunc(debounceDelay, func() {
if !runBuild() { if !runBuild() {
log.Printf("[master] Build failed, keeping current process") log.Printf("[master] Build failed, keeping current processes")
return return
} }
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
// Stop old process // Stop all old processes
stopExpress(currentProcess) stopAllExpress(currentProcesses)
// Start new process // Start all new processes
currentProcess = startExpress() currentProcesses = startAllExpress()
}) })
} }
} }

View File

@@ -1,12 +1,32 @@
package main package main
import ( import (
"github.com/fsnotify/fsnotify"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"github.com/fsnotify/fsnotify"
) )
// shouldIgnore returns true for paths that should not trigger rebuilds
func shouldIgnore(path string) bool {
// Ignore build output and dependencies
ignoreDirs := []string{"/dist/", "/node_modules/", "/.git/"}
for _, dir := range ignoreDirs {
if strings.Contains(path, dir) {
return true
}
}
// Also ignore if path ends with these directories
for _, dir := range []string{"/dist", "/node_modules", "/.git"} {
if strings.HasSuffix(path, dir) {
return true
}
}
return false
}
func watchFiles(dir string, changes chan<- FileChange) { func watchFiles(dir string, changes chan<- FileChange) {
watcher, err := fsnotify.NewWatcher() watcher, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
@@ -14,12 +34,15 @@ func watchFiles(dir string, changes chan<- FileChange) {
} }
defer watcher.Close() defer watcher.Close()
// Add all directories recursively // Add all directories recursively (except ignored ones)
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil { if err != nil {
return err return err
} }
if info.IsDir() { if info.IsDir() {
if shouldIgnore(path) {
return filepath.SkipDir
}
err = watcher.Add(path) err = watcher.Add(path)
if err != nil { if err != nil {
log.Printf("Error watching %s: %v\n", path, err) log.Printf("Error watching %s: %v\n", path, err)
@@ -38,6 +61,11 @@ func watchFiles(dir string, changes chan<- FileChange) {
return return
} }
// Skip ignored paths
if shouldIgnore(event.Name) {
continue
}
// Handle different types of events // Handle different types of events
var operation string var operation string
switch { switch {

75
master/workerpool.go Normal file
View File

@@ -0,0 +1,75 @@
package main
import (
"log"
"sync"
)
// WorkerPool manages a pool of worker addresses and tracks their availability.
// Each worker can only handle one request at a time.
type WorkerPool struct {
mu sync.Mutex
workers []string
available chan string
closed bool
}
// NewWorkerPool creates a new empty worker pool.
func NewWorkerPool() *WorkerPool {
return &WorkerPool{
available: make(chan string, 100), // buffered to avoid blocking
}
}
// SetWorkers updates the pool with a new set of worker addresses.
// Called when workers are started or restarted after a rebuild.
func (p *WorkerPool) SetWorkers(addrs []string) {
p.mu.Lock()
defer p.mu.Unlock()
// Drain the old available channel
close(p.available)
for range p.available {
// drain
}
// Create new channel and populate with new workers
p.available = make(chan string, len(addrs)+10)
p.workers = make([]string, len(addrs))
copy(p.workers, addrs)
for _, addr := range addrs {
p.available <- addr
}
log.Printf("[pool] Updated workers: %v", addrs)
}
// Acquire blocks until a worker is available and returns its address.
func (p *WorkerPool) Acquire() (string, bool) {
addr, ok := <-p.available
if ok {
log.Printf("[pool] Acquired worker %s", addr)
}
return addr, ok
}
// Release marks a worker as available again after it finishes handling a request.
func (p *WorkerPool) Release(addr string) {
p.mu.Lock()
defer p.mu.Unlock()
// Only release if the worker is still in our current set
for _, w := range p.workers {
if w == addr {
select {
case p.available <- addr:
log.Printf("[pool] Released worker %s", addr)
default:
// Channel full, worker may have been removed
}
return
}
}
// Worker not in current set (probably from before a rebuild), ignore
}