This commit is contained in:
commit
421043c9b5
67 changed files with 3455 additions and 0 deletions
183
src/lib/database/migrations.ts
Normal file
183
src/lib/database/migrations.ts
Normal file
|
@ -0,0 +1,183 @@
|
|||
import { readFile, readdir } from "node:fs/promises";
|
||||
import { resolve } from "node:path";
|
||||
import { echo } from "@atums/echo";
|
||||
import { environment } from "#environment/config";
|
||||
import { migrationsPath } from "#environment/constants";
|
||||
import { noFileLog } from "#index";
|
||||
import { cassandra } from "#lib/database";
|
||||
|
||||
import type { SqlMigration } from "#types/config";
|
||||
|
||||
class MigrationRunner {
|
||||
private migrations: SqlMigration[] = [];
|
||||
|
||||
async loadMigrations(): Promise<void> {
|
||||
try {
|
||||
const upPath = resolve(migrationsPath, "up");
|
||||
const downPath = resolve(migrationsPath, "down");
|
||||
|
||||
const upFiles = await readdir(upPath);
|
||||
const sqlFiles = upFiles.filter((file) => file.endsWith(".sql")).sort();
|
||||
|
||||
for (const sqlFile of sqlFiles) {
|
||||
try {
|
||||
const baseName = sqlFile.replace(".sql", "");
|
||||
const parts = baseName.split("_");
|
||||
const id = parts[0];
|
||||
const nameParts = parts.slice(1);
|
||||
const name = nameParts.join("_") || "migration";
|
||||
|
||||
if (!id || id.trim() === "") {
|
||||
noFileLog.debug(
|
||||
`Skipping migration file with invalid ID: ${sqlFile}`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
const upSql = await readFile(resolve(upPath, sqlFile), "utf-8");
|
||||
let downSql: string | undefined;
|
||||
|
||||
try {
|
||||
downSql = await readFile(resolve(downPath, sqlFile), "utf-8");
|
||||
} catch {
|
||||
// down is optional
|
||||
}
|
||||
|
||||
this.migrations.push({
|
||||
id,
|
||||
name,
|
||||
upSql: upSql.trim(),
|
||||
...(downSql && { downSql: downSql.trim() }),
|
||||
});
|
||||
} catch (error) {
|
||||
echo.error({
|
||||
message: `Failed to load migration ${sqlFile}:`,
|
||||
error,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
noFileLog.debug(`Loaded ${this.migrations.length} migrations`);
|
||||
} catch (error) {
|
||||
noFileLog.debug({
|
||||
message: "No migrations directory found or error reading:",
|
||||
error,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async createMigrationsTable(): Promise<void> {
|
||||
const query = `
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT,
|
||||
executed_at TIMESTAMP,
|
||||
checksum TEXT
|
||||
)
|
||||
`;
|
||||
await cassandra.execute(query);
|
||||
noFileLog.debug("Schema migrations table ready");
|
||||
}
|
||||
|
||||
private async getExecutedMigrations(): Promise<Set<string>> {
|
||||
try {
|
||||
const result = (await cassandra.execute(
|
||||
"SELECT id FROM schema_migrations",
|
||||
)) as { rows: Array<{ id: string }> };
|
||||
return new Set(result.rows.map((row) => row.id));
|
||||
} catch (error) {
|
||||
noFileLog.debug({
|
||||
message: "Could not fetch executed migrations:",
|
||||
error,
|
||||
});
|
||||
return new Set();
|
||||
}
|
||||
}
|
||||
|
||||
private async markMigrationExecuted(migration: SqlMigration): Promise<void> {
|
||||
const query = `
|
||||
INSERT INTO schema_migrations (id, name, executed_at, checksum)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`;
|
||||
const checksum = this.generateChecksum(migration.upSql);
|
||||
await cassandra.execute(query, [
|
||||
migration.id,
|
||||
migration.name,
|
||||
new Date(),
|
||||
checksum,
|
||||
]);
|
||||
}
|
||||
|
||||
private generateChecksum(input: string): string {
|
||||
let hash = 0;
|
||||
for (let i = 0; i < input.length; i++) {
|
||||
const char = input.charCodeAt(i);
|
||||
hash = (hash << 5) - hash + char;
|
||||
hash = hash & hash;
|
||||
}
|
||||
return hash.toString(16);
|
||||
}
|
||||
|
||||
private async executeSql(sql: string): Promise<void> {
|
||||
const statements = sql
|
||||
.split(";")
|
||||
.map((stmt) => stmt.trim())
|
||||
.filter((stmt) => stmt.length > 0);
|
||||
|
||||
for (const statement of statements) {
|
||||
if (statement.trim()) {
|
||||
await cassandra.execute(statement);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async runMigrations(): Promise<void> {
|
||||
if (this.migrations.length === 0) {
|
||||
noFileLog.debug("No migrations to run");
|
||||
return;
|
||||
}
|
||||
await this.createMigrationsTable();
|
||||
const executedMigrations = await this.getExecutedMigrations();
|
||||
const pendingMigrations = this.migrations.filter(
|
||||
(migration) => !executedMigrations.has(migration.id),
|
||||
);
|
||||
if (pendingMigrations.length === 0) {
|
||||
noFileLog.debug("All migrations are up to date");
|
||||
return;
|
||||
}
|
||||
noFileLog.debug(
|
||||
`Running ${pendingMigrations.length} pending migrations...`,
|
||||
);
|
||||
for (const migration of pendingMigrations) {
|
||||
try {
|
||||
noFileLog.debug(
|
||||
`Running migration: ${migration.id} - ${migration.name}`,
|
||||
);
|
||||
await this.executeSql(migration.upSql);
|
||||
await this.markMigrationExecuted(migration);
|
||||
noFileLog.debug(`Migration ${migration.id} completed`);
|
||||
} catch (error) {
|
||||
echo.error({
|
||||
message: `Failed to run migration ${migration.id}:`,
|
||||
error,
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
noFileLog.debug("All migrations completed successfully");
|
||||
}
|
||||
|
||||
async initialize(): Promise<void> {
|
||||
await cassandra.connect({
|
||||
withKeyspace: false,
|
||||
logging: environment.development,
|
||||
});
|
||||
await cassandra.createKeyspaceIfNotExists();
|
||||
await cassandra.shutdown(!environment.development);
|
||||
await cassandra.connect({ withKeyspace: true });
|
||||
await this.loadMigrations();
|
||||
await this.runMigrations();
|
||||
}
|
||||
}
|
||||
|
||||
export const migrationRunner = new MigrationRunner();
|
Loading…
Add table
Add a link
Reference in a new issue