Files
diachron/express/database.ts
2026-01-04 15:24:29 -06:00

392 lines
11 KiB
TypeScript

// database.ts
// PostgreSQL database access with Kysely query builder and simple migrations
import * as fs from "fs";
import {
type Generated,
Kysely,
PostgresDialect,
type Selectable,
sql,
} from "kysely";
import * as path from "path";
import { Pool } from "pg";
import type {
AuthStore,
CreateSessionData,
CreateUserData,
} from "./auth/store";
import { generateToken, hashToken } from "./auth/token";
import type { SessionData, TokenId } from "./auth/types";
import { User, type UserId } from "./user";
// Connection configuration
const connectionConfig = {
host: "localhost",
port: 5432,
user: "diachron",
password: "diachron",
database: "diachron",
};
// Database schema types for Kysely
// Generated<T> marks columns with database defaults (optional on insert)
interface UsersTable {
id: string;
email: string;
password_hash: string;
display_name: string | null;
status: Generated<string>;
roles: Generated<string[]>;
permissions: Generated<string[]>;
email_verified: Generated<boolean>;
created_at: Generated<Date>;
updated_at: Generated<Date>;
}
interface SessionsTable {
token_id: string;
user_id: string;
token_type: string;
auth_method: string;
created_at: Generated<Date>;
expires_at: Date;
last_used_at: Date | null;
user_agent: string | null;
ip_address: string | null;
is_used: Generated<boolean | null>;
}
interface Database {
users: UsersTable;
sessions: SessionsTable;
}
// Create the connection pool
const pool = new Pool(connectionConfig);
// Create the Kysely instance
const db = new Kysely<Database>({
dialect: new PostgresDialect({ pool }),
});
// Raw pool access for when you need it
const rawPool = pool;
// Execute raw SQL (for when Kysely doesn't fit)
async function raw<T = unknown>(
query: string,
params: unknown[] = [],
): Promise<T[]> {
const result = await pool.query(query, params);
return result.rows as T[];
}
// ============================================================================
// Migrations
// ============================================================================
// Migration file naming convention:
// NNNN_description.sql
// e.g., 0001_initial.sql, 0002_add_users.sql
//
// Migrations directory: express/migrations/
const MIGRATIONS_DIR = path.join(__dirname, "migrations");
const MIGRATIONS_TABLE = "_migrations";
interface MigrationRecord {
id: number;
name: string;
applied_at: Date;
}
// Ensure migrations table exists
async function ensureMigrationsTable(): Promise<void> {
await pool.query(`
CREATE TABLE IF NOT EXISTS ${MIGRATIONS_TABLE} (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
`);
}
// Get list of applied migrations
async function getAppliedMigrations(): Promise<string[]> {
const result = await pool.query<MigrationRecord>(
`SELECT name FROM ${MIGRATIONS_TABLE} ORDER BY name`,
);
return result.rows.map((r) => r.name);
}
// Get pending migration files
function getMigrationFiles(): string[] {
if (!fs.existsSync(MIGRATIONS_DIR)) {
return [];
}
return fs
.readdirSync(MIGRATIONS_DIR)
.filter((f) => f.endsWith(".sql"))
.filter((f) => /^\d{4}_/.test(f))
.sort();
}
// Run a single migration
async function runMigration(filename: string): Promise<void> {
const filepath = path.join(MIGRATIONS_DIR, filename);
const content = fs.readFileSync(filepath, "utf-8");
// Run migration in a transaction
const client = await pool.connect();
try {
await client.query("BEGIN");
await client.query(content);
await client.query(
`INSERT INTO ${MIGRATIONS_TABLE} (name) VALUES ($1)`,
[filename],
);
await client.query("COMMIT");
console.log(`Applied migration: ${filename}`);
} catch (err) {
await client.query("ROLLBACK");
throw err;
} finally {
client.release();
}
}
// Run all pending migrations
async function migrate(): Promise<void> {
await ensureMigrationsTable();
const applied = new Set(await getAppliedMigrations());
const files = getMigrationFiles();
const pending = files.filter((f) => !applied.has(f));
if (pending.length === 0) {
console.log("No pending migrations");
return;
}
console.log(`Running ${pending.length} migration(s)...`);
for (const file of pending) {
await runMigration(file);
}
console.log("Migrations complete");
}
// List migration status
async function migrationStatus(): Promise<{
applied: string[];
pending: string[];
}> {
await ensureMigrationsTable();
const applied = new Set(await getAppliedMigrations());
const files = getMigrationFiles();
return {
applied: files.filter((f) => applied.has(f)),
pending: files.filter((f) => !applied.has(f)),
};
}
// ============================================================================
// PostgresAuthStore - Database-backed authentication storage
// ============================================================================
class PostgresAuthStore implements AuthStore {
// Session operations
async createSession(
data: CreateSessionData,
): Promise<{ token: string; session: SessionData }> {
const token = generateToken();
const tokenId = hashToken(token);
const row = await db
.insertInto("sessions")
.values({
token_id: tokenId,
user_id: data.userId,
token_type: data.tokenType,
auth_method: data.authMethod,
expires_at: data.expiresAt,
user_agent: data.userAgent ?? null,
ip_address: data.ipAddress ?? null,
})
.returningAll()
.executeTakeFirstOrThrow();
const session: SessionData = {
tokenId: row.token_id,
userId: row.user_id,
tokenType: row.token_type as SessionData["tokenType"],
authMethod: row.auth_method as SessionData["authMethod"],
createdAt: row.created_at,
expiresAt: row.expires_at,
lastUsedAt: row.last_used_at ?? undefined,
userAgent: row.user_agent ?? undefined,
ipAddress: row.ip_address ?? undefined,
isUsed: row.is_used ?? undefined,
};
return { token, session };
}
async getSession(tokenId: TokenId): Promise<SessionData | null> {
const row = await db
.selectFrom("sessions")
.selectAll()
.where("token_id", "=", tokenId)
.where("expires_at", ">", new Date())
.executeTakeFirst();
if (!row) return null;
return {
tokenId: row.token_id,
userId: row.user_id,
tokenType: row.token_type as SessionData["tokenType"],
authMethod: row.auth_method as SessionData["authMethod"],
createdAt: row.created_at,
expiresAt: row.expires_at,
lastUsedAt: row.last_used_at ?? undefined,
userAgent: row.user_agent ?? undefined,
ipAddress: row.ip_address ?? undefined,
isUsed: row.is_used ?? undefined,
};
}
async updateLastUsed(tokenId: TokenId): Promise<void> {
await db
.updateTable("sessions")
.set({ last_used_at: new Date() })
.where("token_id", "=", tokenId)
.execute();
}
async deleteSession(tokenId: TokenId): Promise<void> {
await db
.deleteFrom("sessions")
.where("token_id", "=", tokenId)
.execute();
}
async deleteUserSessions(userId: UserId): Promise<number> {
const result = await db
.deleteFrom("sessions")
.where("user_id", "=", userId)
.executeTakeFirst();
return Number(result.numDeletedRows);
}
// User operations
async getUserByEmail(email: string): Promise<User | null> {
const row = await db
.selectFrom("users")
.selectAll()
.where(sql`LOWER(email)`, "=", email.toLowerCase())
.executeTakeFirst();
if (!row) return null;
return this.rowToUser(row);
}
async getUserById(userId: UserId): Promise<User | null> {
const row = await db
.selectFrom("users")
.selectAll()
.where("id", "=", userId)
.executeTakeFirst();
if (!row) return null;
return this.rowToUser(row);
}
async createUser(data: CreateUserData): Promise<User> {
const id = crypto.randomUUID();
const now = new Date();
const row = await db
.insertInto("users")
.values({
id,
email: data.email,
password_hash: data.passwordHash,
display_name: data.displayName ?? null,
status: "pending",
roles: [],
permissions: [],
email_verified: false,
created_at: now,
updated_at: now,
})
.returningAll()
.executeTakeFirstOrThrow();
return this.rowToUser(row);
}
async getUserPasswordHash(userId: UserId): Promise<string | null> {
const row = await db
.selectFrom("users")
.select("password_hash")
.where("id", "=", userId)
.executeTakeFirst();
return row?.password_hash ?? null;
}
async setUserPassword(userId: UserId, passwordHash: string): Promise<void> {
await db
.updateTable("users")
.set({ password_hash: passwordHash, updated_at: new Date() })
.where("id", "=", userId)
.execute();
}
async updateUserEmailVerified(userId: UserId): Promise<void> {
await db
.updateTable("users")
.set({
email_verified: true,
status: "active",
updated_at: new Date(),
})
.where("id", "=", userId)
.execute();
}
// Helper to convert database row to User object
private rowToUser(row: Selectable<UsersTable>): User {
return new User({
id: row.id,
email: row.email,
displayName: row.display_name ?? undefined,
status: row.status as "active" | "suspended" | "pending",
roles: row.roles,
permissions: row.permissions,
createdAt: row.created_at,
updatedAt: row.updated_at,
});
}
}
// ============================================================================
// Exports
// ============================================================================
export {
db,
raw,
rawPool,
pool,
migrate,
migrationStatus,
connectionConfig,
PostgresAuthStore,
type Database,
};