// database.ts // PostgreSQL database access with Kysely query builder and simple migrations import * as fs from "node:fs"; import * as path from "node:path"; import { type Generated, Kysely, PostgresDialect, type Selectable, sql, } from "kysely"; import { Pool } from "pg"; import type { AuthStore, CreateSessionData, CreateUserData, } from "./auth/store"; import { generateToken, hashToken } from "./auth/token"; import type { SessionData, TokenId } from "./auth/types"; import type { Domain } from "./types"; import { AuthenticatedUser, type User, type UserId } from "./user"; // Connection configuration (supports environment variable overrides) const connectionConfig = { host: process.env.DB_HOST ?? "localhost", port: Number(process.env.DB_PORT ?? 5432), user: process.env.DB_USER ?? "diachron", password: process.env.DB_PASSWORD ?? "diachron", database: process.env.DB_NAME ?? "diachron", }; // Database schema types for Kysely // Generated marks columns with database defaults (optional on insert) interface UsersTable { id: string; status: Generated; display_name: string | null; created_at: Generated; updated_at: Generated; } interface UserEmailsTable { id: string; user_id: string; email: string; normalized_email: string; is_primary: Generated; is_verified: Generated; created_at: Generated; verified_at: Date | null; revoked_at: Date | null; } interface UserCredentialsTable { id: string; user_id: string; credential_type: Generated; password_hash: string | null; created_at: Generated; updated_at: Generated; } interface SessionsTable { id: Generated; token_hash: string; user_id: string; user_email_id: string | null; token_type: string; auth_method: string; created_at: Generated; expires_at: Date; revoked_at: Date | null; ip_address: string | null; user_agent: string | null; is_used: Generated; } interface Database { users: UsersTable; user_emails: UserEmailsTable; user_credentials: UserCredentialsTable; sessions: SessionsTable; } // Create the connection pool const pool = new Pool(connectionConfig); // Create the Kysely instance const db = new Kysely({ dialect: new PostgresDialect({ pool }), }); // Raw pool access for when you need it const rawPool = pool; // Execute raw SQL (for when Kysely doesn't fit) async function raw( query: string, params: unknown[] = [], ): Promise { const result = await pool.query(query, params); return result.rows as T[]; } // ============================================================================ // Migrations // ============================================================================ // Migration file naming convention: // yyyy-mm-dd_ss_description.sql // e.g., 2025-01-15_01_initial.sql, 2025-01-15_02_add_users.sql // // Migrations directory: express/migrations/ const FRAMEWORK_MIGRATIONS_DIR = path.join(__dirname, "diachron/migrations"); const APP_MIGRATIONS_DIR = path.join(__dirname, "migrations"); const MIGRATIONS_TABLE = "_migrations"; interface MigrationRecord { id: number; name: string; applied_at: Date; } // Ensure migrations table exists async function ensureMigrationsTable(): Promise { await pool.query(` CREATE TABLE IF NOT EXISTS ${MIGRATIONS_TABLE} ( id SERIAL PRIMARY KEY, name TEXT NOT NULL UNIQUE, applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) `); } // Get list of applied migrations async function getAppliedMigrations(): Promise { const result = await pool.query( `SELECT name FROM ${MIGRATIONS_TABLE} ORDER BY name`, ); return result.rows.map((r) => r.name); } // Get pending migration files function getMigrationFiles(kind: Domain): string[] { const dir = kind === "fw" ? FRAMEWORK_MIGRATIONS_DIR : APP_MIGRATIONS_DIR; if (!fs.existsSync(dir)) { return []; } const root = __dirname; const mm = fs .readdirSync(dir) .filter((f) => f.endsWith(".sql")) .filter((f) => /^\d{4}-\d{2}-\d{2}_\d{2}-/.test(f)) .map((f) => `${dir}/${f}`) .map((f) => f.replace(`${root}/`, "")) .sort(); return mm; } // Run a single migration async function runMigration(filename: string): Promise { // const filepath = path.join(MIGRATIONS_DIR, filename); const filepath = filename; const content = fs.readFileSync(filepath, "utf-8"); process.stdout.write(` Migration: ${filename}...`); // Run migration in a transaction const client = await pool.connect(); try { await client.query("BEGIN"); await client.query(content); await client.query( `INSERT INTO ${MIGRATIONS_TABLE} (name) VALUES ($1)`, [filename], ); await client.query("COMMIT"); console.log(" ✓"); } catch (err) { console.log(" ✗"); const message = err instanceof Error ? err.message : String(err); console.error(` Error: ${message}`); await client.query("ROLLBACK"); throw err; } finally { client.release(); } } function getAllMigrationFiles() { const fw_files = getMigrationFiles("fw"); const app_files = getMigrationFiles("app"); const all = [...fw_files, ...app_files]; return all; } // Run all pending migrations async function migrate(): Promise { await ensureMigrationsTable(); const applied = new Set(await getAppliedMigrations()); const all = getAllMigrationFiles(); const pending = all.filter((all) => !applied.has(all)); if (pending.length === 0) { console.log("No pending migrations"); return; } console.log(`Applying ${pending.length} migration(s):`); for (const file of pending) { await runMigration(file); } } // List migration status async function migrationStatus(): Promise<{ applied: string[]; pending: string[]; }> { await ensureMigrationsTable(); const applied = new Set(await getAppliedMigrations()); const ff = getAllMigrationFiles(); return { applied: ff.filter((ff) => applied.has(ff)), pending: ff.filter((ff) => !applied.has(ff)), }; } // ============================================================================ // PostgresAuthStore - Database-backed authentication storage // ============================================================================ class PostgresAuthStore implements AuthStore { // Session operations async createSession( data: CreateSessionData, ): Promise<{ token: string; session: SessionData }> { const token = generateToken(); const tokenHash = hashToken(token); const row = await db .insertInto("sessions") .values({ token_hash: tokenHash, user_id: data.userId, token_type: data.tokenType, auth_method: data.authMethod, expires_at: data.expiresAt, user_agent: data.userAgent ?? null, ip_address: data.ipAddress ?? null, }) .returningAll() .executeTakeFirstOrThrow(); const session: SessionData = { tokenId: row.token_hash, userId: row.user_id, tokenType: row.token_type as SessionData["tokenType"], authMethod: row.auth_method as SessionData["authMethod"], createdAt: row.created_at, expiresAt: row.expires_at, userAgent: row.user_agent ?? undefined, ipAddress: row.ip_address ?? undefined, isUsed: row.is_used ?? undefined, }; return { token, session }; } async getSession(tokenId: TokenId): Promise { const row = await db .selectFrom("sessions") .selectAll() .where("token_hash", "=", tokenId) .where("expires_at", ">", new Date()) .where("revoked_at", "is", null) .executeTakeFirst(); if (!row) { return null; } return { tokenId: row.token_hash, userId: row.user_id, tokenType: row.token_type as SessionData["tokenType"], authMethod: row.auth_method as SessionData["authMethod"], createdAt: row.created_at, expiresAt: row.expires_at, userAgent: row.user_agent ?? undefined, ipAddress: row.ip_address ?? undefined, isUsed: row.is_used ?? undefined, }; } async updateLastUsed(_tokenId: TokenId): Promise { // The new schema doesn't have last_used_at column // This is now a no-op; session activity tracking could be added later } async deleteSession(tokenId: TokenId): Promise { // Soft delete by setting revoked_at await db .updateTable("sessions") .set({ revoked_at: new Date() }) .where("token_hash", "=", tokenId) .execute(); } async deleteUserSessions(userId: UserId): Promise { const result = await db .updateTable("sessions") .set({ revoked_at: new Date() }) .where("user_id", "=", userId) .where("revoked_at", "is", null) .executeTakeFirst(); return Number(result.numUpdatedRows); } // User operations async getUserByEmail(email: string): Promise { // Find user through user_emails table const normalizedEmail = email.toLowerCase().trim(); const row = await db .selectFrom("user_emails") .innerJoin("users", "users.id", "user_emails.user_id") .select([ "users.id", "users.status", "users.display_name", "users.created_at", "users.updated_at", "user_emails.email", ]) .where("user_emails.normalized_email", "=", normalizedEmail) .where("user_emails.revoked_at", "is", null) .executeTakeFirst(); if (!row) { return null; } return this.rowToUser(row); } async getUserById(userId: UserId): Promise { // Get user with their primary email const row = await db .selectFrom("users") .leftJoin("user_emails", (join) => join .onRef("user_emails.user_id", "=", "users.id") .on("user_emails.is_primary", "=", true) .on("user_emails.revoked_at", "is", null), ) .select([ "users.id", "users.status", "users.display_name", "users.created_at", "users.updated_at", "user_emails.email", ]) .where("users.id", "=", userId) .executeTakeFirst(); if (!row) { return null; } return this.rowToUser(row); } async createUser(data: CreateUserData): Promise { const userId = crypto.randomUUID(); const emailId = crypto.randomUUID(); const credentialId = crypto.randomUUID(); const now = new Date(); const normalizedEmail = data.email.toLowerCase().trim(); // Create user record await db .insertInto("users") .values({ id: userId, display_name: data.displayName ?? null, status: "pending", created_at: now, updated_at: now, }) .execute(); // Create user_email record await db .insertInto("user_emails") .values({ id: emailId, user_id: userId, email: data.email, normalized_email: normalizedEmail, is_primary: true, is_verified: false, created_at: now, }) .execute(); // Create user_credential record await db .insertInto("user_credentials") .values({ id: credentialId, user_id: userId, credential_type: "password", password_hash: data.passwordHash, created_at: now, updated_at: now, }) .execute(); return new AuthenticatedUser({ id: userId, email: data.email, displayName: data.displayName, status: "pending", roles: [], permissions: [], createdAt: now, updatedAt: now, }); } async getUserPasswordHash(userId: UserId): Promise { const row = await db .selectFrom("user_credentials") .select("password_hash") .where("user_id", "=", userId) .where("credential_type", "=", "password") .executeTakeFirst(); return row?.password_hash ?? null; } async setUserPassword(userId: UserId, passwordHash: string): Promise { const now = new Date(); // Try to update existing credential const result = await db .updateTable("user_credentials") .set({ password_hash: passwordHash, updated_at: now }) .where("user_id", "=", userId) .where("credential_type", "=", "password") .executeTakeFirst(); // If no existing credential, create one if (Number(result.numUpdatedRows) === 0) { await db .insertInto("user_credentials") .values({ id: crypto.randomUUID(), user_id: userId, credential_type: "password", password_hash: passwordHash, created_at: now, updated_at: now, }) .execute(); } // Update user's updated_at await db .updateTable("users") .set({ updated_at: now }) .where("id", "=", userId) .execute(); } async updateUserEmailVerified(userId: UserId): Promise { const now = new Date(); // Update user_emails to mark as verified await db .updateTable("user_emails") .set({ is_verified: true, verified_at: now, }) .where("user_id", "=", userId) .where("is_primary", "=", true) .execute(); // Update user status to active await db .updateTable("users") .set({ status: "active", updated_at: now, }) .where("id", "=", userId) .execute(); } // Helper to convert database row to User object private rowToUser(row: { id: string; status: string; display_name: string | null; created_at: Date; updated_at: Date; email: string | null; }): User { return new AuthenticatedUser({ id: row.id, email: row.email ?? "unknown@example.com", displayName: row.display_name ?? undefined, status: row.status as "active" | "suspended" | "pending", roles: [], // TODO: query from RBAC tables permissions: [], // TODO: query from RBAC tables createdAt: row.created_at, updatedAt: row.updated_at, }); } } // ============================================================================ // Exports // ============================================================================ export { db, raw, rawPool, pool, migrate, migrationStatus, connectionConfig, PostgresAuthStore, type Database, };