Compare commits
6 Commits
bee6938a67
...
03980e114b
| Author | SHA1 | Date | |
|---|---|---|---|
| 03980e114b | |||
| 539717efda | |||
| 8be88bb696 | |||
| ab74695f4c | |||
| dc5a70ba33 | |||
| 4adf6cf358 |
35
TODO.md
35
TODO.md
@@ -3,19 +3,6 @@
|
|||||||
- [ ] Add unit tests all over the place.
|
- [ ] Add unit tests all over the place.
|
||||||
- ⚠️ Huge task - needs breakdown before starting
|
- ⚠️ Huge task - needs breakdown before starting
|
||||||
|
|
||||||
- [ ] Add logging service
|
|
||||||
- New golang program, in the same directory as master
|
|
||||||
- Intended to be started by master
|
|
||||||
- Listens to a port specified command line arg
|
|
||||||
- Accepts POSTed (or possibly PUT) json messages, currently in a
|
|
||||||
to-be-defined format. We will work on this format later.
|
|
||||||
- Keeps the most recent N messages in memory. N can be a fairly large
|
|
||||||
number; let's start by assuming 1 million.
|
|
||||||
|
|
||||||
- [ ] Log to logging service from the express backend
|
|
||||||
- Fill out types and functions in `express/logging.ts`
|
|
||||||
|
|
||||||
|
|
||||||
- [ ] Create initial docker-compose.yml file for local development
|
- [ ] Create initial docker-compose.yml file for local development
|
||||||
- include most recent stable postgres
|
- include most recent stable postgres
|
||||||
|
|
||||||
@@ -34,6 +21,13 @@
|
|||||||
|
|
||||||
## medium importance
|
## medium importance
|
||||||
|
|
||||||
|
- [ ] Add a log viewer
|
||||||
|
- with queries
|
||||||
|
- convert to logfmt and is there a viewer UI we could pull in and use
|
||||||
|
instead?
|
||||||
|
|
||||||
|
- [ ] figure out and add logging to disk
|
||||||
|
|
||||||
- [ ] Add email verification
|
- [ ] Add email verification
|
||||||
|
|
||||||
- [ ] Update check script:
|
- [ ] Update check script:
|
||||||
@@ -51,6 +45,10 @@
|
|||||||
|
|
||||||
## low importance
|
## low importance
|
||||||
|
|
||||||
|
- [ ] add a prometheus-style `/metrics` endpoint to master
|
||||||
|
- [ ] create a metrics server analogous to the logging server
|
||||||
|
- accept various stats from the workers (TBD)
|
||||||
|
|
||||||
- [ ] move `master-bin` into a subdir like `master/cmd` or whatever is
|
- [ ] move `master-bin` into a subdir like `master/cmd` or whatever is
|
||||||
idiomatic for golang programs; adapt `master` wrapper shell script
|
idiomatic for golang programs; adapt `master` wrapper shell script
|
||||||
accordingly
|
accordingly
|
||||||
@@ -83,3 +81,14 @@
|
|||||||
- [x] Add wrapper script to run master program (so that various assumptions related
|
- [x] Add wrapper script to run master program (so that various assumptions related
|
||||||
to relative paths are safer)
|
to relative paths are safer)
|
||||||
|
|
||||||
|
- [x] Add logging service
|
||||||
|
- New golang program, in the same directory as master
|
||||||
|
- Intended to be started by master
|
||||||
|
- Listens to a port specified command line arg
|
||||||
|
- Accepts POSTed (or possibly PUT) json messages, currently in a
|
||||||
|
to-be-defined format. We will work on this format later.
|
||||||
|
- Keeps the most recent N messages in memory. N can be a fairly large
|
||||||
|
number; let's start by assuming 1 million.
|
||||||
|
|
||||||
|
- [x] Log to logging service from the express backend
|
||||||
|
- Fill out types and functions in `express/logging.ts`
|
||||||
|
|||||||
@@ -32,15 +32,39 @@ type FilterArgument = {
|
|||||||
match?: (string | RegExp)[];
|
match?: (string | RegExp)[];
|
||||||
};
|
};
|
||||||
|
|
||||||
const log = (_message: Message) => {
|
const loggerUrl = `http://${cli.logAddress.host}:${cli.logAddress.port}`;
|
||||||
// WRITEME
|
|
||||||
console.log(
|
const log = (message: Message) => {
|
||||||
`will POST a message to ${cli.logAddress.host}:${cli.logAddress.port}`,
|
const payload = {
|
||||||
);
|
timestamp: message.timestamp ?? Date.now(),
|
||||||
|
source: message.source,
|
||||||
|
text: message.text,
|
||||||
|
};
|
||||||
|
|
||||||
|
fetch(`${loggerUrl}/log`, {
|
||||||
|
method: "POST",
|
||||||
|
headers: { "Content-Type": "application/json" },
|
||||||
|
body: JSON.stringify(payload),
|
||||||
|
}).catch((err) => {
|
||||||
|
console.error("[logging] Failed to send log:", err.message);
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
const getLogs = (filter: FilterArgument) => {
|
const getLogs = async (filter: FilterArgument): Promise<Message[]> => {
|
||||||
// WRITEME
|
const params = new URLSearchParams();
|
||||||
|
if (filter.limit) {
|
||||||
|
params.set("limit", String(filter.limit));
|
||||||
|
}
|
||||||
|
if (filter.before) {
|
||||||
|
- params.set("before", String(filter.before));
|
||||||
|
}
|
||||||
|
if (filter.after) {
|
||||||
|
params.set("after", String(filter.after));
|
||||||
|
}
|
||||||
|
|
||||||
|
const url = `${loggerUrl}/logs?${params.toString()}`;
|
||||||
|
const response = await fetch(url);
|
||||||
|
return response.json();
|
||||||
};
|
};
|
||||||
|
|
||||||
// FIXME: there's scope for more specialized functions although they
|
// FIXME: there's scope for more specialized functions although they
|
||||||
|
|||||||
@@ -15,12 +15,14 @@
|
|||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@ianvs/prettier-plugin-sort-imports": "^4.7.0",
|
"@ianvs/prettier-plugin-sort-imports": "^4.7.0",
|
||||||
"@types/node": "^24.10.1",
|
"@types/node": "^24.10.1",
|
||||||
|
"@types/nunjucks": "^3.2.6",
|
||||||
"@vercel/ncc": "^0.38.4",
|
"@vercel/ncc": "^0.38.4",
|
||||||
"express": "^5.1.0",
|
"express": "^5.1.0",
|
||||||
"nodemon": "^3.1.11",
|
"nodemon": "^3.1.11",
|
||||||
"nunjucks": "^3.2.4",
|
"nunjucks": "^3.2.4",
|
||||||
"path-to-regexp": "^8.3.0",
|
"path-to-regexp": "^8.3.0",
|
||||||
"prettier": "^3.6.2",
|
"prettier": "^3.6.2",
|
||||||
|
"ts-luxon": "^6.2.0",
|
||||||
"ts-node": "^10.9.2",
|
"ts-node": "^10.9.2",
|
||||||
"tsx": "^4.20.6",
|
"tsx": "^4.20.6",
|
||||||
"typescript": "^5.9.3",
|
"typescript": "^5.9.3",
|
||||||
|
|||||||
17
express/pnpm-lock.yaml
generated
17
express/pnpm-lock.yaml
generated
@@ -14,6 +14,9 @@ importers:
|
|||||||
'@types/node':
|
'@types/node':
|
||||||
specifier: ^24.10.1
|
specifier: ^24.10.1
|
||||||
version: 24.10.1
|
version: 24.10.1
|
||||||
|
'@types/nunjucks':
|
||||||
|
specifier: ^3.2.6
|
||||||
|
version: 3.2.6
|
||||||
'@vercel/ncc':
|
'@vercel/ncc':
|
||||||
specifier: ^0.38.4
|
specifier: ^0.38.4
|
||||||
version: 0.38.4
|
version: 0.38.4
|
||||||
@@ -32,6 +35,9 @@ importers:
|
|||||||
prettier:
|
prettier:
|
||||||
specifier: ^3.6.2
|
specifier: ^3.6.2
|
||||||
version: 3.6.2
|
version: 3.6.2
|
||||||
|
ts-luxon:
|
||||||
|
specifier: ^6.2.0
|
||||||
|
version: 6.2.0
|
||||||
ts-node:
|
ts-node:
|
||||||
specifier: ^10.9.2
|
specifier: ^10.9.2
|
||||||
version: 10.9.2(@types/node@24.10.1)(typescript@5.9.3)
|
version: 10.9.2(@types/node@24.10.1)(typescript@5.9.3)
|
||||||
@@ -371,6 +377,9 @@ packages:
|
|||||||
'@types/node@24.10.1':
|
'@types/node@24.10.1':
|
||||||
resolution: {integrity: sha512-GNWcUTRBgIRJD5zj+Tq0fKOJ5XZajIiBroOF0yvj2bSU1WvNdYS/dn9UxwsujGW4JX06dnHyjV2y9rRaybH0iQ==}
|
resolution: {integrity: sha512-GNWcUTRBgIRJD5zj+Tq0fKOJ5XZajIiBroOF0yvj2bSU1WvNdYS/dn9UxwsujGW4JX06dnHyjV2y9rRaybH0iQ==}
|
||||||
|
|
||||||
|
'@types/nunjucks@3.2.6':
|
||||||
|
resolution: {integrity: sha512-pHiGtf83na1nCzliuAdq8GowYiXvH5l931xZ0YEHaLMNFgynpEqx+IPStlu7UaDkehfvl01e4x/9Tpwhy7Ue3w==}
|
||||||
|
|
||||||
'@types/qs@6.14.0':
|
'@types/qs@6.14.0':
|
||||||
resolution: {integrity: sha512-eOunJqu0K1923aExK6y8p6fsihYEn/BYuQ4g0CxAAgFc4b/ZLN4CrsRZ55srTdqoiLzU2B2evC+apEIxprEzkQ==}
|
resolution: {integrity: sha512-eOunJqu0K1923aExK6y8p6fsihYEn/BYuQ4g0CxAAgFc4b/ZLN4CrsRZ55srTdqoiLzU2B2evC+apEIxprEzkQ==}
|
||||||
|
|
||||||
@@ -814,6 +823,10 @@ packages:
|
|||||||
resolution: {integrity: sha512-r0eojU4bI8MnHr8c5bNo7lJDdI2qXlWWJk6a9EAFG7vbhTjElYhBVS3/miuE0uOuoLdb8Mc/rVfsmm6eo5o9GA==}
|
resolution: {integrity: sha512-r0eojU4bI8MnHr8c5bNo7lJDdI2qXlWWJk6a9EAFG7vbhTjElYhBVS3/miuE0uOuoLdb8Mc/rVfsmm6eo5o9GA==}
|
||||||
hasBin: true
|
hasBin: true
|
||||||
|
|
||||||
|
ts-luxon@6.2.0:
|
||||||
|
resolution: {integrity: sha512-4I1tkW6gtydyLnUUIvfezBl5B3smurkgKmHdMOYI2g9Fn3Zg1lGJdhsCXu2VNl95CYbW2+SoNtStcf1CKOcQjw==}
|
||||||
|
engines: {node: '>=18'}
|
||||||
|
|
||||||
ts-node@10.9.2:
|
ts-node@10.9.2:
|
||||||
resolution: {integrity: sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==}
|
resolution: {integrity: sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==}
|
||||||
hasBin: true
|
hasBin: true
|
||||||
@@ -1103,6 +1116,8 @@ snapshots:
|
|||||||
dependencies:
|
dependencies:
|
||||||
undici-types: 7.16.0
|
undici-types: 7.16.0
|
||||||
|
|
||||||
|
'@types/nunjucks@3.2.6': {}
|
||||||
|
|
||||||
'@types/qs@6.14.0': {}
|
'@types/qs@6.14.0': {}
|
||||||
|
|
||||||
'@types/range-parser@1.2.7': {}
|
'@types/range-parser@1.2.7': {}
|
||||||
@@ -1588,6 +1603,8 @@ snapshots:
|
|||||||
|
|
||||||
touch@3.1.1: {}
|
touch@3.1.1: {}
|
||||||
|
|
||||||
|
ts-luxon@6.2.0: {}
|
||||||
|
|
||||||
ts-node@10.9.2(@types/node@24.10.1)(typescript@5.9.3):
|
ts-node@10.9.2(@types/node@24.10.1)(typescript@5.9.3):
|
||||||
dependencies:
|
dependencies:
|
||||||
'@cspotcode/source-map-support': 0.8.1
|
'@cspotcode/source-map-support': 0.8.1
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
/// <reference lib="dom" />
|
/// <reference lib="dom" />
|
||||||
|
|
||||||
|
import nunjucks from "nunjucks";
|
||||||
|
import { DateTime } from "ts-luxon";
|
||||||
import { contentTypes } from "./content-types";
|
import { contentTypes } from "./content-types";
|
||||||
import { multiHandler } from "./handlers";
|
import { multiHandler } from "./handlers";
|
||||||
import { HttpCode, httpCodes } from "./http-codes";
|
import { HttpCode, httpCodes } from "./http-codes";
|
||||||
@@ -72,6 +74,29 @@ const routes: Route[] = [
|
|||||||
};
|
};
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
path: "/time",
|
||||||
|
methods: ["GET"],
|
||||||
|
handler: async (_req): Promise<Result> => {
|
||||||
|
const now = DateTime.now();
|
||||||
|
const template = `
|
||||||
|
<html>
|
||||||
|
<head></head>
|
||||||
|
<body>
|
||||||
|
{{ now }}
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
`;
|
||||||
|
|
||||||
|
const result = nunjucks.renderString(template, { now });
|
||||||
|
|
||||||
|
return {
|
||||||
|
code: httpCodes.success.OK,
|
||||||
|
contentType: contentTypes.text.html,
|
||||||
|
result,
|
||||||
|
};
|
||||||
|
},
|
||||||
|
},
|
||||||
];
|
];
|
||||||
|
|
||||||
export { routes };
|
export { routes };
|
||||||
|
|||||||
1
logger/.gitignore
vendored
Normal file
1
logger/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
logger-bin
|
||||||
3
logger/go.mod
Normal file
3
logger/go.mod
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
module philologue.net/diachron/logger-bin
|
||||||
|
|
||||||
|
go 1.23.3
|
||||||
70
logger/main.go
Normal file
70
logger/main.go
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
port := flag.Int("port", 8085, "port to listen on")
|
||||||
|
capacity := flag.Int("capacity", 1000000, "max messages to store")
|
||||||
|
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
store := NewLogStore(*capacity)
|
||||||
|
|
||||||
|
http.HandleFunc("POST /log", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var msg Message
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
|
||||||
|
http.Error(w, "invalid JSON", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
store.Add(msg)
|
||||||
|
w.WriteHeader(http.StatusCreated)
|
||||||
|
})
|
||||||
|
|
||||||
|
http.HandleFunc("GET /logs", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
params := FilterParams{}
|
||||||
|
|
||||||
|
if limit := r.URL.Query().Get("limit"); limit != "" {
|
||||||
|
if n, err := strconv.Atoi(limit); err == nil {
|
||||||
|
params.Limit = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if before := r.URL.Query().Get("before"); before != "" {
|
||||||
|
if ts, err := strconv.ParseInt(before, 10, 64); err == nil {
|
||||||
|
params.Before = ts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if after := r.URL.Query().Get("after"); after != "" {
|
||||||
|
if ts, err := strconv.ParseInt(after, 10, 64); err == nil {
|
||||||
|
params.After = ts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
messages := store.GetFiltered(params)
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(messages)
|
||||||
|
})
|
||||||
|
|
||||||
|
http.HandleFunc("GET /status", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
status := map[string]any{
|
||||||
|
"count": store.Count(),
|
||||||
|
"capacity": *capacity,
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(status)
|
||||||
|
})
|
||||||
|
|
||||||
|
listenAddr := fmt.Sprintf(":%d", *port)
|
||||||
|
log.Printf("[logger] Listening on %s (capacity: %d)", listenAddr, *capacity)
|
||||||
|
if err := http.ListenAndServe(listenAddr, nil); err != nil {
|
||||||
|
log.Fatalf("[logger] Failed to start: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
126
logger/store.go
Normal file
126
logger/store.go
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Message represents a log entry from the express backend
|
||||||
|
type Message struct {
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
Source string `json:"source"` // "logging" | "diagnostic" | "user"
|
||||||
|
Text []string `json:"text"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogStore is a thread-safe ring buffer for log messages
|
||||||
|
type LogStore struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
messages []Message
|
||||||
|
head int // next write position
|
||||||
|
full bool // whether buffer has wrapped
|
||||||
|
capacity int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLogStore creates a new log store with the given capacity
|
||||||
|
func NewLogStore(capacity int) *LogStore {
|
||||||
|
return &LogStore{
|
||||||
|
messages: make([]Message, capacity),
|
||||||
|
capacity: capacity,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add inserts a new message into the store
|
||||||
|
func (s *LogStore) Add(msg Message) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
s.messages[s.head] = msg
|
||||||
|
s.head++
|
||||||
|
if s.head >= s.capacity {
|
||||||
|
s.head = 0
|
||||||
|
s.full = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count returns the number of messages in the store
|
||||||
|
func (s *LogStore) Count() int {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
if s.full {
|
||||||
|
return s.capacity
|
||||||
|
}
|
||||||
|
return s.head
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRecent returns the most recent n messages, newest first
|
||||||
|
func (s *LogStore) GetRecent(n int) []Message {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
count := s.Count()
|
||||||
|
if n > count {
|
||||||
|
n = count
|
||||||
|
}
|
||||||
|
if n == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]Message, n)
|
||||||
|
pos := s.head - 1
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
if pos < 0 {
|
||||||
|
pos = s.capacity - 1
|
||||||
|
}
|
||||||
|
result[i] = s.messages[pos]
|
||||||
|
pos--
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter parameters for retrieving logs
|
||||||
|
type FilterParams struct {
|
||||||
|
Limit int // max messages to return (0 = default 100)
|
||||||
|
Before int64 // only messages before this timestamp
|
||||||
|
After int64 // only messages after this timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetFiltered returns messages matching the filter criteria
|
||||||
|
func (s *LogStore) GetFiltered(params FilterParams) []Message {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
limit := params.Limit
|
||||||
|
if limit <= 0 {
|
||||||
|
limit = 100
|
||||||
|
}
|
||||||
|
|
||||||
|
count := s.Count()
|
||||||
|
if count == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]Message, 0, limit)
|
||||||
|
pos := s.head - 1
|
||||||
|
|
||||||
|
for i := 0; i < count && len(result) < limit; i++ {
|
||||||
|
if pos < 0 {
|
||||||
|
pos = s.capacity - 1
|
||||||
|
}
|
||||||
|
msg := s.messages[pos]
|
||||||
|
|
||||||
|
// Apply filters
|
||||||
|
if params.Before > 0 && msg.Timestamp >= params.Before {
|
||||||
|
pos--
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if params.After > 0 && msg.Timestamp <= params.After {
|
||||||
|
pos--
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
result = append(result, msg)
|
||||||
|
pos--
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
@@ -13,16 +13,22 @@ func main() {
|
|||||||
workers := flag.Int("workers", 1, "number of worker processes")
|
workers := flag.Int("workers", 1, "number of worker processes")
|
||||||
basePort := flag.Int("base-port", 3000, "base port for worker processes")
|
basePort := flag.Int("base-port", 3000, "base port for worker processes")
|
||||||
listenPort := flag.Int("port", 8080, "port for the reverse proxy to listen on")
|
listenPort := flag.Int("port", 8080, "port for the reverse proxy to listen on")
|
||||||
|
loggerPort := flag.Int("logger-port", 8085, "port for the logger service")
|
||||||
|
loggerCapacity := flag.Int("logger-capacity", 1000000, "max messages for logger to store")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
// Create worker pool
|
|
||||||
pool := NewWorkerPool()
|
|
||||||
|
|
||||||
// Setup signal handling
|
// Setup signal handling
|
||||||
sigCh := make(chan os.Signal, 1)
|
sigCh := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
|
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
|
||||||
|
|
||||||
|
// Start and manage the logger process
|
||||||
|
stopLogger := startLogger(*loggerPort, *loggerCapacity)
|
||||||
|
defer stopLogger()
|
||||||
|
|
||||||
|
// Create worker pool
|
||||||
|
pool := NewWorkerPool()
|
||||||
|
|
||||||
fileChanges := make(chan FileChange, 10)
|
fileChanges := make(chan FileChange, 10)
|
||||||
|
|
||||||
go watchFiles(*watchDir, fileChanges)
|
go watchFiles(*watchDir, fileChanges)
|
||||||
|
|||||||
106
master/runlogger.go
Normal file
106
master/runlogger.go
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// startLogger starts the logger process and returns a function to stop it.
|
||||||
|
// It automatically restarts the logger if it crashes.
|
||||||
|
func startLogger(port int, capacity int) func() {
|
||||||
|
var mu sync.Mutex
|
||||||
|
var cmd *exec.Cmd
|
||||||
|
var stopping bool
|
||||||
|
|
||||||
|
portStr := strconv.Itoa(port)
|
||||||
|
capacityStr := strconv.Itoa(capacity)
|
||||||
|
|
||||||
|
start := func() *exec.Cmd {
|
||||||
|
c := exec.Command("../logger/logger", "--port", portStr, "--capacity", capacityStr)
|
||||||
|
c.Stdout = os.Stdout
|
||||||
|
c.Stderr = os.Stderr
|
||||||
|
|
||||||
|
if err := c.Start(); err != nil {
|
||||||
|
log.Printf("[logger] Failed to start: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("[logger] Started (pid %d) on port %s", c.Process.Pid, portStr)
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start initial logger
|
||||||
|
cmd = start()
|
||||||
|
|
||||||
|
// Monitor and restart on crash
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
mu.Lock()
|
||||||
|
currentCmd := cmd
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
if currentCmd == nil {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
mu.Lock()
|
||||||
|
if !stopping {
|
||||||
|
cmd = start()
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := currentCmd.Wait()
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
if stopping {
|
||||||
|
mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[logger] Process exited: %v, restarting...", err)
|
||||||
|
} else {
|
||||||
|
log.Printf("[logger] Process exited normally, restarting...")
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
cmd = start()
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Return stop function
|
||||||
|
return func() {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
|
||||||
|
stopping = true
|
||||||
|
|
||||||
|
if cmd == nil || cmd.Process == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("[logger] Stopping (pid %d)", cmd.Process.Pid)
|
||||||
|
cmd.Process.Signal(syscall.SIGTERM)
|
||||||
|
|
||||||
|
// Wait briefly for graceful shutdown
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
cmd.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
log.Printf("[logger] Stopped gracefully")
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
log.Printf("[logger] Force killing")
|
||||||
|
cmd.Process.Kill()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user