Compare commits
6 Commits
bee6938a67
...
03980e114b
| Author | SHA1 | Date | |
|---|---|---|---|
| 03980e114b | |||
| 539717efda | |||
| 8be88bb696 | |||
| ab74695f4c | |||
| dc5a70ba33 | |||
| 4adf6cf358 |
35
TODO.md
35
TODO.md
@@ -3,19 +3,6 @@
|
||||
- [ ] Add unit tests all over the place.
|
||||
- ⚠️ Huge task - needs breakdown before starting
|
||||
|
||||
- [ ] Add logging service
|
||||
- New golang program, in the same directory as master
|
||||
- Intended to be started by master
|
||||
- Listens to a port specified command line arg
|
||||
- Accepts POSTed (or possibly PUT) json messages, currently in a
|
||||
to-be-defined format. We will work on this format later.
|
||||
- Keeps the most recent N messages in memory. N can be a fairly large
|
||||
number; let's start by assuming 1 million.
|
||||
|
||||
- [ ] Log to logging service from the express backend
|
||||
- Fill out types and functions in `express/logging.ts`
|
||||
|
||||
|
||||
- [ ] Create initial docker-compose.yml file for local development
|
||||
- include most recent stable postgres
|
||||
|
||||
@@ -34,6 +21,13 @@
|
||||
|
||||
## medium importance
|
||||
|
||||
- [ ] Add a log viewer
|
||||
- with queries
|
||||
- convert to logfmt and is there a viewer UI we could pull in and use
|
||||
instead?
|
||||
|
||||
- [ ] figure out and add logging to disk
|
||||
|
||||
- [ ] Add email verification
|
||||
|
||||
- [ ] Update check script:
|
||||
@@ -51,6 +45,10 @@
|
||||
|
||||
## low importance
|
||||
|
||||
- [ ] add a prometheus-style `/metrics` endpoint to master
|
||||
- [ ] create a metrics server analogous to the logging server
|
||||
- accept various stats from the workers (TBD)
|
||||
|
||||
- [ ] move `master-bin` into a subdir like `master/cmd` or whatever is
|
||||
idiomatic for golang programs; adapt `master` wrapper shell script
|
||||
accordingly
|
||||
@@ -83,3 +81,14 @@
|
||||
- [x] Add wrapper script to run master program (so that various assumptions related
|
||||
to relative paths are safer)
|
||||
|
||||
- [x] Add logging service
|
||||
- New golang program, in the same directory as master
|
||||
- Intended to be started by master
|
||||
- Listens to a port specified command line arg
|
||||
- Accepts POSTed (or possibly PUT) json messages, currently in a
|
||||
to-be-defined format. We will work on this format later.
|
||||
- Keeps the most recent N messages in memory. N can be a fairly large
|
||||
number; let's start by assuming 1 million.
|
||||
|
||||
- [x] Log to logging service from the express backend
|
||||
- Fill out types and functions in `express/logging.ts`
|
||||
|
||||
@@ -32,15 +32,39 @@ type FilterArgument = {
|
||||
match?: (string | RegExp)[];
|
||||
};
|
||||
|
||||
const log = (_message: Message) => {
|
||||
// WRITEME
|
||||
console.log(
|
||||
`will POST a message to ${cli.logAddress.host}:${cli.logAddress.port}`,
|
||||
);
|
||||
const loggerUrl = `http://${cli.logAddress.host}:${cli.logAddress.port}`;
|
||||
|
||||
const log = (message: Message) => {
|
||||
const payload = {
|
||||
timestamp: message.timestamp ?? Date.now(),
|
||||
source: message.source,
|
||||
text: message.text,
|
||||
};
|
||||
|
||||
const getLogs = (filter: FilterArgument) => {
|
||||
// WRITEME
|
||||
fetch(`${loggerUrl}/log`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(payload),
|
||||
}).catch((err) => {
|
||||
console.error("[logging] Failed to send log:", err.message);
|
||||
});
|
||||
};
|
||||
|
||||
const getLogs = async (filter: FilterArgument): Promise<Message[]> => {
|
||||
const params = new URLSearchParams();
|
||||
if (filter.limit) {
|
||||
params.set("limit", String(filter.limit));
|
||||
}
|
||||
if (filter.before) {
|
||||
- params.set("before", String(filter.before));
|
||||
}
|
||||
if (filter.after) {
|
||||
params.set("after", String(filter.after));
|
||||
}
|
||||
|
||||
const url = `${loggerUrl}/logs?${params.toString()}`;
|
||||
const response = await fetch(url);
|
||||
return response.json();
|
||||
};
|
||||
|
||||
// FIXME: there's scope for more specialized functions although they
|
||||
|
||||
@@ -15,12 +15,14 @@
|
||||
"dependencies": {
|
||||
"@ianvs/prettier-plugin-sort-imports": "^4.7.0",
|
||||
"@types/node": "^24.10.1",
|
||||
"@types/nunjucks": "^3.2.6",
|
||||
"@vercel/ncc": "^0.38.4",
|
||||
"express": "^5.1.0",
|
||||
"nodemon": "^3.1.11",
|
||||
"nunjucks": "^3.2.4",
|
||||
"path-to-regexp": "^8.3.0",
|
||||
"prettier": "^3.6.2",
|
||||
"ts-luxon": "^6.2.0",
|
||||
"ts-node": "^10.9.2",
|
||||
"tsx": "^4.20.6",
|
||||
"typescript": "^5.9.3",
|
||||
|
||||
17
express/pnpm-lock.yaml
generated
17
express/pnpm-lock.yaml
generated
@@ -14,6 +14,9 @@ importers:
|
||||
'@types/node':
|
||||
specifier: ^24.10.1
|
||||
version: 24.10.1
|
||||
'@types/nunjucks':
|
||||
specifier: ^3.2.6
|
||||
version: 3.2.6
|
||||
'@vercel/ncc':
|
||||
specifier: ^0.38.4
|
||||
version: 0.38.4
|
||||
@@ -32,6 +35,9 @@ importers:
|
||||
prettier:
|
||||
specifier: ^3.6.2
|
||||
version: 3.6.2
|
||||
ts-luxon:
|
||||
specifier: ^6.2.0
|
||||
version: 6.2.0
|
||||
ts-node:
|
||||
specifier: ^10.9.2
|
||||
version: 10.9.2(@types/node@24.10.1)(typescript@5.9.3)
|
||||
@@ -371,6 +377,9 @@ packages:
|
||||
'@types/node@24.10.1':
|
||||
resolution: {integrity: sha512-GNWcUTRBgIRJD5zj+Tq0fKOJ5XZajIiBroOF0yvj2bSU1WvNdYS/dn9UxwsujGW4JX06dnHyjV2y9rRaybH0iQ==}
|
||||
|
||||
'@types/nunjucks@3.2.6':
|
||||
resolution: {integrity: sha512-pHiGtf83na1nCzliuAdq8GowYiXvH5l931xZ0YEHaLMNFgynpEqx+IPStlu7UaDkehfvl01e4x/9Tpwhy7Ue3w==}
|
||||
|
||||
'@types/qs@6.14.0':
|
||||
resolution: {integrity: sha512-eOunJqu0K1923aExK6y8p6fsihYEn/BYuQ4g0CxAAgFc4b/ZLN4CrsRZ55srTdqoiLzU2B2evC+apEIxprEzkQ==}
|
||||
|
||||
@@ -814,6 +823,10 @@ packages:
|
||||
resolution: {integrity: sha512-r0eojU4bI8MnHr8c5bNo7lJDdI2qXlWWJk6a9EAFG7vbhTjElYhBVS3/miuE0uOuoLdb8Mc/rVfsmm6eo5o9GA==}
|
||||
hasBin: true
|
||||
|
||||
ts-luxon@6.2.0:
|
||||
resolution: {integrity: sha512-4I1tkW6gtydyLnUUIvfezBl5B3smurkgKmHdMOYI2g9Fn3Zg1lGJdhsCXu2VNl95CYbW2+SoNtStcf1CKOcQjw==}
|
||||
engines: {node: '>=18'}
|
||||
|
||||
ts-node@10.9.2:
|
||||
resolution: {integrity: sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==}
|
||||
hasBin: true
|
||||
@@ -1103,6 +1116,8 @@ snapshots:
|
||||
dependencies:
|
||||
undici-types: 7.16.0
|
||||
|
||||
'@types/nunjucks@3.2.6': {}
|
||||
|
||||
'@types/qs@6.14.0': {}
|
||||
|
||||
'@types/range-parser@1.2.7': {}
|
||||
@@ -1588,6 +1603,8 @@ snapshots:
|
||||
|
||||
touch@3.1.1: {}
|
||||
|
||||
ts-luxon@6.2.0: {}
|
||||
|
||||
ts-node@10.9.2(@types/node@24.10.1)(typescript@5.9.3):
|
||||
dependencies:
|
||||
'@cspotcode/source-map-support': 0.8.1
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
/// <reference lib="dom" />
|
||||
|
||||
import nunjucks from "nunjucks";
|
||||
import { DateTime } from "ts-luxon";
|
||||
import { contentTypes } from "./content-types";
|
||||
import { multiHandler } from "./handlers";
|
||||
import { HttpCode, httpCodes } from "./http-codes";
|
||||
@@ -72,6 +74,29 @@ const routes: Route[] = [
|
||||
};
|
||||
},
|
||||
},
|
||||
{
|
||||
path: "/time",
|
||||
methods: ["GET"],
|
||||
handler: async (_req): Promise<Result> => {
|
||||
const now = DateTime.now();
|
||||
const template = `
|
||||
<html>
|
||||
<head></head>
|
||||
<body>
|
||||
{{ now }}
|
||||
</body>
|
||||
</html>
|
||||
`;
|
||||
|
||||
const result = nunjucks.renderString(template, { now });
|
||||
|
||||
return {
|
||||
code: httpCodes.success.OK,
|
||||
contentType: contentTypes.text.html,
|
||||
result,
|
||||
};
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
export { routes };
|
||||
|
||||
1
logger/.gitignore
vendored
Normal file
1
logger/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
logger-bin
|
||||
3
logger/go.mod
Normal file
3
logger/go.mod
Normal file
@@ -0,0 +1,3 @@
|
||||
module philologue.net/diachron/logger-bin
|
||||
|
||||
go 1.23.3
|
||||
70
logger/main.go
Normal file
70
logger/main.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func main() {
|
||||
port := flag.Int("port", 8085, "port to listen on")
|
||||
capacity := flag.Int("capacity", 1000000, "max messages to store")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
store := NewLogStore(*capacity)
|
||||
|
||||
http.HandleFunc("POST /log", func(w http.ResponseWriter, r *http.Request) {
|
||||
var msg Message
|
||||
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
|
||||
http.Error(w, "invalid JSON", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
store.Add(msg)
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
})
|
||||
|
||||
http.HandleFunc("GET /logs", func(w http.ResponseWriter, r *http.Request) {
|
||||
params := FilterParams{}
|
||||
|
||||
if limit := r.URL.Query().Get("limit"); limit != "" {
|
||||
if n, err := strconv.Atoi(limit); err == nil {
|
||||
params.Limit = n
|
||||
}
|
||||
}
|
||||
if before := r.URL.Query().Get("before"); before != "" {
|
||||
if ts, err := strconv.ParseInt(before, 10, 64); err == nil {
|
||||
params.Before = ts
|
||||
}
|
||||
}
|
||||
if after := r.URL.Query().Get("after"); after != "" {
|
||||
if ts, err := strconv.ParseInt(after, 10, 64); err == nil {
|
||||
params.After = ts
|
||||
}
|
||||
}
|
||||
|
||||
messages := store.GetFiltered(params)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(messages)
|
||||
})
|
||||
|
||||
http.HandleFunc("GET /status", func(w http.ResponseWriter, r *http.Request) {
|
||||
status := map[string]any{
|
||||
"count": store.Count(),
|
||||
"capacity": *capacity,
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(status)
|
||||
})
|
||||
|
||||
listenAddr := fmt.Sprintf(":%d", *port)
|
||||
log.Printf("[logger] Listening on %s (capacity: %d)", listenAddr, *capacity)
|
||||
if err := http.ListenAndServe(listenAddr, nil); err != nil {
|
||||
log.Fatalf("[logger] Failed to start: %v", err)
|
||||
}
|
||||
}
|
||||
126
logger/store.go
Normal file
126
logger/store.go
Normal file
@@ -0,0 +1,126 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Message represents a log entry from the express backend
|
||||
type Message struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Source string `json:"source"` // "logging" | "diagnostic" | "user"
|
||||
Text []string `json:"text"`
|
||||
}
|
||||
|
||||
// LogStore is a thread-safe ring buffer for log messages
|
||||
type LogStore struct {
|
||||
mu sync.RWMutex
|
||||
messages []Message
|
||||
head int // next write position
|
||||
full bool // whether buffer has wrapped
|
||||
capacity int
|
||||
}
|
||||
|
||||
// NewLogStore creates a new log store with the given capacity
|
||||
func NewLogStore(capacity int) *LogStore {
|
||||
return &LogStore{
|
||||
messages: make([]Message, capacity),
|
||||
capacity: capacity,
|
||||
}
|
||||
}
|
||||
|
||||
// Add inserts a new message into the store
|
||||
func (s *LogStore) Add(msg Message) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.messages[s.head] = msg
|
||||
s.head++
|
||||
if s.head >= s.capacity {
|
||||
s.head = 0
|
||||
s.full = true
|
||||
}
|
||||
}
|
||||
|
||||
// Count returns the number of messages in the store
|
||||
func (s *LogStore) Count() int {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
if s.full {
|
||||
return s.capacity
|
||||
}
|
||||
return s.head
|
||||
}
|
||||
|
||||
// GetRecent returns the most recent n messages, newest first
|
||||
func (s *LogStore) GetRecent(n int) []Message {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
count := s.Count()
|
||||
if n > count {
|
||||
n = count
|
||||
}
|
||||
if n == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
result := make([]Message, n)
|
||||
pos := s.head - 1
|
||||
for i := 0; i < n; i++ {
|
||||
if pos < 0 {
|
||||
pos = s.capacity - 1
|
||||
}
|
||||
result[i] = s.messages[pos]
|
||||
pos--
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Filter parameters for retrieving logs
|
||||
type FilterParams struct {
|
||||
Limit int // max messages to return (0 = default 100)
|
||||
Before int64 // only messages before this timestamp
|
||||
After int64 // only messages after this timestamp
|
||||
}
|
||||
|
||||
// GetFiltered returns messages matching the filter criteria
|
||||
func (s *LogStore) GetFiltered(params FilterParams) []Message {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
limit := params.Limit
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
|
||||
count := s.Count()
|
||||
if count == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
result := make([]Message, 0, limit)
|
||||
pos := s.head - 1
|
||||
|
||||
for i := 0; i < count && len(result) < limit; i++ {
|
||||
if pos < 0 {
|
||||
pos = s.capacity - 1
|
||||
}
|
||||
msg := s.messages[pos]
|
||||
|
||||
// Apply filters
|
||||
if params.Before > 0 && msg.Timestamp >= params.Before {
|
||||
pos--
|
||||
continue
|
||||
}
|
||||
if params.After > 0 && msg.Timestamp <= params.After {
|
||||
pos--
|
||||
continue
|
||||
}
|
||||
|
||||
result = append(result, msg)
|
||||
pos--
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
@@ -13,16 +13,22 @@ func main() {
|
||||
workers := flag.Int("workers", 1, "number of worker processes")
|
||||
basePort := flag.Int("base-port", 3000, "base port for worker processes")
|
||||
listenPort := flag.Int("port", 8080, "port for the reverse proxy to listen on")
|
||||
loggerPort := flag.Int("logger-port", 8085, "port for the logger service")
|
||||
loggerCapacity := flag.Int("logger-capacity", 1000000, "max messages for logger to store")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
// Create worker pool
|
||||
pool := NewWorkerPool()
|
||||
|
||||
// Setup signal handling
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
// Start and manage the logger process
|
||||
stopLogger := startLogger(*loggerPort, *loggerCapacity)
|
||||
defer stopLogger()
|
||||
|
||||
// Create worker pool
|
||||
pool := NewWorkerPool()
|
||||
|
||||
fileChanges := make(chan FileChange, 10)
|
||||
|
||||
go watchFiles(*watchDir, fileChanges)
|
||||
|
||||
106
master/runlogger.go
Normal file
106
master/runlogger.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
// startLogger starts the logger process and returns a function to stop it.
|
||||
// It automatically restarts the logger if it crashes.
|
||||
func startLogger(port int, capacity int) func() {
|
||||
var mu sync.Mutex
|
||||
var cmd *exec.Cmd
|
||||
var stopping bool
|
||||
|
||||
portStr := strconv.Itoa(port)
|
||||
capacityStr := strconv.Itoa(capacity)
|
||||
|
||||
start := func() *exec.Cmd {
|
||||
c := exec.Command("../logger/logger", "--port", portStr, "--capacity", capacityStr)
|
||||
c.Stdout = os.Stdout
|
||||
c.Stderr = os.Stderr
|
||||
|
||||
if err := c.Start(); err != nil {
|
||||
log.Printf("[logger] Failed to start: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Printf("[logger] Started (pid %d) on port %s", c.Process.Pid, portStr)
|
||||
return c
|
||||
}
|
||||
|
||||
// Start initial logger
|
||||
cmd = start()
|
||||
|
||||
// Monitor and restart on crash
|
||||
go func() {
|
||||
for {
|
||||
mu.Lock()
|
||||
currentCmd := cmd
|
||||
mu.Unlock()
|
||||
|
||||
if currentCmd == nil {
|
||||
time.Sleep(time.Second)
|
||||
mu.Lock()
|
||||
if !stopping {
|
||||
cmd = start()
|
||||
}
|
||||
mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
err := currentCmd.Wait()
|
||||
|
||||
mu.Lock()
|
||||
if stopping {
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Printf("[logger] Process exited: %v, restarting...", err)
|
||||
} else {
|
||||
log.Printf("[logger] Process exited normally, restarting...")
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
cmd = start()
|
||||
mu.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
// Return stop function
|
||||
return func() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
stopping = true
|
||||
|
||||
if cmd == nil || cmd.Process == nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[logger] Stopping (pid %d)", cmd.Process.Pid)
|
||||
cmd.Process.Signal(syscall.SIGTERM)
|
||||
|
||||
// Wait briefly for graceful shutdown
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
cmd.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
log.Printf("[logger] Stopped gracefully")
|
||||
case <-time.After(5 * time.Second):
|
||||
log.Printf("[logger] Force killing")
|
||||
cmd.Process.Kill()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user