diff --git a/framework/cmd.d/node b/framework/cmd.d/node index 05f3b98..c412659 100755 --- a/framework/cmd.d/node +++ b/framework/cmd.d/node @@ -4,4 +4,4 @@ set -eu DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -"$DIR"/../shims/node "$@" +exec "$DIR"/../shims/node "$@" diff --git a/master/main.go b/master/main.go index 2b011da..2aaa94c 100644 --- a/master/main.go +++ b/master/main.go @@ -1,23 +1,33 @@ package main import ( - // "context" "fmt" "os" "os/signal" - // "sync" + "strconv" "syscall" ) func main() { - // var program1 = os.Getenv("BUILD_COMMAND") - //var program2 = os.Getenv("RUN_COMMAND") + watchedDir := os.Getenv("WATCHED_DIR") - var watchedDir = os.Getenv("WATCHED_DIR") + numChildProcesses := 1 + if n, err := strconv.Atoi(os.Getenv("NUM_CHILD_PROCESSES")); err == nil && n > 0 { + numChildProcesses = n + } - // Create context for graceful shutdown - // ctx, cancel := context.WithCancel(context.Background()) - //defer cancel() + basePort := 3000 + if p, err := strconv.Atoi(os.Getenv("BASE_PORT")); err == nil && p > 0 { + basePort = p + } + + listenPort := 8080 + if p, err := strconv.Atoi(os.Getenv("LISTEN_PORT")); err == nil && p > 0 { + listenPort = p + } + + // Create worker pool + pool := NewWorkerPool() // Setup signal handling sigCh := make(chan os.Signal, 1) @@ -27,24 +37,16 @@ func main() { go watchFiles(watchedDir, fileChanges) - go runExpress(fileChanges) + go runExpress(fileChanges, numChildProcesses, basePort, pool) - // WaitGroup to track both processes - // var wg sync.WaitGroup - - // Start both processes - //wg.Add(2) - // go runProcess(ctx, &wg, "builder", program1) - // go runProcess(ctx, &wg, "runner", program2) + // Start the reverse proxy + listenAddr := fmt.Sprintf(":%d", listenPort) + go startProxy(listenAddr, pool) // Wait for interrupt signal <-sigCh fmt.Println("\nReceived interrupt signal, shutting down...") - // Cancel context to signal goroutines to stop - /// cancel() - // Wait for both processes to finish - // wg.Wait() fmt.Println("All processes terminated cleanly") } diff --git a/master/proxy.go b/master/proxy.go new file mode 100644 index 0000000..1ee52b2 --- /dev/null +++ b/master/proxy.go @@ -0,0 +1,48 @@ +package main + +import ( + "log" + "net/http" + "net/http/httputil" + "net/url" +) + +// startProxy starts an HTTP reverse proxy that forwards requests to workers. +// It acquires a worker from the pool for each request and releases it when done. +func startProxy(listenAddr string, pool *WorkerPool) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Acquire a worker (blocks if none available) + workerAddr, ok := pool.Acquire() + if !ok { + http.Error(w, "Service unavailable", http.StatusServiceUnavailable) + return + } + + // Ensure we release the worker when done + defer pool.Release(workerAddr) + + // Create reverse proxy to the worker + targetURL, err := url.Parse("http://" + workerAddr) + if err != nil { + log.Printf("[proxy] Failed to parse worker URL %s: %v", workerAddr, err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + proxy := httputil.NewSingleHostReverseProxy(targetURL) + + // Custom error handler + proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { + log.Printf("[proxy] Error proxying to %s: %v", workerAddr, err) + http.Error(w, "Bad gateway", http.StatusBadGateway) + } + + log.Printf("[proxy] %s %s -> %s", r.Method, r.URL.Path, workerAddr) + proxy.ServeHTTP(w, r) + }) + + log.Printf("[proxy] Listening on %s", listenAddr) + if err := http.ListenAndServe(listenAddr, handler); err != nil { + log.Fatalf("[proxy] Failed to start: %v", err) + } +} diff --git a/master/runexpress.go b/master/runexpress.go index f5fac71..a2333f8 100644 --- a/master/runexpress.go +++ b/master/runexpress.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "io" "log" "os" @@ -10,43 +11,45 @@ import ( "time" ) -func runExpress(changes <-chan FileChange) { - var currentProcess *exec.Cmd +func runExpress(changes <-chan FileChange, numProcesses int, basePort int, pool *WorkerPool) { + var currentProcesses []*exec.Cmd var mu sync.Mutex - // Helper to start the express process - startExpress := func() *exec.Cmd { - cmd := exec.Command("../express/run.sh") + // Helper to start an express process on a specific port + startExpress := func(port int) *exec.Cmd { + listenAddr := fmt.Sprintf("127.0.0.1:%d", port) + cmd := exec.Command("../express/run.sh", "--listen", listenAddr) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { - log.Printf("[express] Failed to start: %v", err) + log.Printf("[express:%d] Failed to start: %v", port, err) return nil } - log.Printf("[express] Started (pid %d)", cmd.Process.Pid) + log.Printf("[express:%d] Started (pid %d)", port, cmd.Process.Pid) // Monitor the process in background - go func() { + go func(p int) { err := cmd.Wait() if err != nil { - log.Printf("[express] Process exited: %v", err) + log.Printf("[express:%d] Process exited: %v", p, err) } else { - log.Printf("[express] Process exited normally") + log.Printf("[express:%d] Process exited normally", p) } - }() + }(port) return cmd } - // Helper to stop the express process + // Helper to stop an express process stopExpress := func(cmd *exec.Cmd) { if cmd == nil || cmd.Process == nil { return } - log.Printf("[express] Stopping (pid %d)", cmd.Process.Pid) + pid := cmd.Process.Pid + log.Printf("[express] Stopping (pid %d)", pid) cmd.Process.Signal(syscall.SIGTERM) // Wait briefly for graceful shutdown @@ -58,13 +61,38 @@ func runExpress(changes <-chan FileChange) { select { case <-done: - log.Printf("[express] Stopped gracefully") + log.Printf("[express] Stopped gracefully (pid %d)", pid) case <-time.After(5 * time.Second): - log.Printf("[express] Force killing") + log.Printf("[express] Force killing (pid %d)", pid) cmd.Process.Kill() } } + // Helper to stop all express processes + stopAllExpress := func(processes []*exec.Cmd) { + for _, cmd := range processes { + stopExpress(cmd) + } + } + + // Helper to start all express processes and update the worker pool + startAllExpress := func() []*exec.Cmd { + processes := make([]*exec.Cmd, 0, numProcesses) + addresses := make([]string, 0, numProcesses) + for i := 0; i < numProcesses; i++ { + port := basePort + i + addr := fmt.Sprintf("127.0.0.1:%d", port) + cmd := startExpress(port) + if cmd != nil { + processes = append(processes, cmd) + addresses = append(addresses, addr) + } + } + // Update the worker pool with new worker addresses + pool.SetWorkers(addresses) + return processes + } + // Helper to run the build runBuild := func() bool { log.Printf("[build] Starting ncc build...") @@ -97,6 +125,14 @@ func runExpress(changes <-chan FileChange) { var debounceTimer *time.Timer const debounceDelay = 100 * time.Millisecond + // Initial build and start + log.Printf("[master] Initial build...") + if runBuild() { + currentProcesses = startAllExpress() + } else { + log.Printf("[master] Initial build failed") + } + for change := range changes { log.Printf("[watch] %s: %s", change.Operation, change.Path) @@ -107,18 +143,18 @@ func runExpress(changes <-chan FileChange) { debounceTimer = time.AfterFunc(debounceDelay, func() { if !runBuild() { - log.Printf("[master] Build failed, keeping current process") + log.Printf("[master] Build failed, keeping current processes") return } mu.Lock() defer mu.Unlock() - // Stop old process - stopExpress(currentProcess) + // Stop all old processes + stopAllExpress(currentProcesses) - // Start new process - currentProcess = startExpress() + // Start all new processes + currentProcesses = startAllExpress() }) } } diff --git a/master/watchfiles.go b/master/watchfiles.go index b339def..c7d31b3 100644 --- a/master/watchfiles.go +++ b/master/watchfiles.go @@ -1,12 +1,32 @@ package main import ( - "github.com/fsnotify/fsnotify" "log" "os" "path/filepath" + "strings" + + "github.com/fsnotify/fsnotify" ) +// shouldIgnore returns true for paths that should not trigger rebuilds +func shouldIgnore(path string) bool { + // Ignore build output and dependencies + ignoreDirs := []string{"/dist/", "/node_modules/", "/.git/"} + for _, dir := range ignoreDirs { + if strings.Contains(path, dir) { + return true + } + } + // Also ignore if path ends with these directories + for _, dir := range []string{"/dist", "/node_modules", "/.git"} { + if strings.HasSuffix(path, dir) { + return true + } + } + return false +} + func watchFiles(dir string, changes chan<- FileChange) { watcher, err := fsnotify.NewWatcher() if err != nil { @@ -14,12 +34,15 @@ func watchFiles(dir string, changes chan<- FileChange) { } defer watcher.Close() - // Add all directories recursively + // Add all directories recursively (except ignored ones) err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { + if shouldIgnore(path) { + return filepath.SkipDir + } err = watcher.Add(path) if err != nil { log.Printf("Error watching %s: %v\n", path, err) @@ -38,6 +61,11 @@ func watchFiles(dir string, changes chan<- FileChange) { return } + // Skip ignored paths + if shouldIgnore(event.Name) { + continue + } + // Handle different types of events var operation string switch { diff --git a/master/workerpool.go b/master/workerpool.go new file mode 100644 index 0000000..e35a6f7 --- /dev/null +++ b/master/workerpool.go @@ -0,0 +1,75 @@ +package main + +import ( + "log" + "sync" +) + +// WorkerPool manages a pool of worker addresses and tracks their availability. +// Each worker can only handle one request at a time. +type WorkerPool struct { + mu sync.Mutex + workers []string + available chan string + closed bool +} + +// NewWorkerPool creates a new empty worker pool. +func NewWorkerPool() *WorkerPool { + return &WorkerPool{ + available: make(chan string, 100), // buffered to avoid blocking + } +} + +// SetWorkers updates the pool with a new set of worker addresses. +// Called when workers are started or restarted after a rebuild. +func (p *WorkerPool) SetWorkers(addrs []string) { + p.mu.Lock() + defer p.mu.Unlock() + + // Drain the old available channel + close(p.available) + for range p.available { + // drain + } + + // Create new channel and populate with new workers + p.available = make(chan string, len(addrs)+10) + p.workers = make([]string, len(addrs)) + copy(p.workers, addrs) + + for _, addr := range addrs { + p.available <- addr + } + + log.Printf("[pool] Updated workers: %v", addrs) +} + +// Acquire blocks until a worker is available and returns its address. +func (p *WorkerPool) Acquire() (string, bool) { + addr, ok := <-p.available + if ok { + log.Printf("[pool] Acquired worker %s", addr) + } + return addr, ok +} + +// Release marks a worker as available again after it finishes handling a request. +func (p *WorkerPool) Release(addr string) { + p.mu.Lock() + defer p.mu.Unlock() + + // Only release if the worker is still in our current set + for _, w := range p.workers { + if w == addr { + select { + case p.available <- addr: + log.Printf("[pool] Released worker %s", addr) + default: + // Channel full, worker may have been removed + } + return + } + } + // Worker not in current set (probably from before a rebuild), ignore +}