188 lines
5.1 KiB
TypeScript
188 lines
5.1 KiB
TypeScript
import { readFile, readdir } from "node:fs/promises";
|
|
import { resolve } from "node:path";
|
|
import { Echo } from "@atums/echo";
|
|
import { environment } from "#environment/config";
|
|
import { migrationsPath } from "#environment/constants";
|
|
import { cassandra } from "#lib/database";
|
|
|
|
import type { SqlMigration } from "#types/config";
|
|
|
|
class MigrationRunner {
|
|
private migrations: SqlMigration[] = [];
|
|
private static logger: Echo = new Echo({ subDirectory: "migrations" });
|
|
private static loggerNoFile: Echo = new Echo({ disableFile: true });
|
|
|
|
async loadMigrations(): Promise<void> {
|
|
try {
|
|
const upPath = resolve(migrationsPath, "up");
|
|
const downPath = resolve(migrationsPath, "down");
|
|
|
|
const upFiles = await readdir(upPath);
|
|
const sqlFiles = upFiles.filter((file) => file.endsWith(".sql")).sort();
|
|
|
|
for (const sqlFile of sqlFiles) {
|
|
try {
|
|
const baseName = sqlFile.replace(".sql", "");
|
|
const parts = baseName.split("_");
|
|
const id = parts[0];
|
|
const nameParts = parts.slice(1);
|
|
const name = nameParts.join("_") || "migration";
|
|
|
|
if (!id || id.trim() === "") {
|
|
MigrationRunner.loggerNoFile.debug(
|
|
`Skipping migration file with invalid ID: ${sqlFile}`,
|
|
);
|
|
continue;
|
|
}
|
|
|
|
const upSql = await readFile(resolve(upPath, sqlFile), "utf-8");
|
|
let downSql: string | undefined;
|
|
|
|
try {
|
|
downSql = await readFile(resolve(downPath, sqlFile), "utf-8");
|
|
} catch {
|
|
// down is optional
|
|
}
|
|
|
|
this.migrations.push({
|
|
id,
|
|
name,
|
|
upSql: upSql.trim(),
|
|
...(downSql && { downSql: downSql.trim() }),
|
|
});
|
|
} catch (error) {
|
|
MigrationRunner.logger.error({
|
|
message: `Failed to load migration ${sqlFile}:`,
|
|
error,
|
|
});
|
|
}
|
|
}
|
|
|
|
MigrationRunner.loggerNoFile.debug(
|
|
`Loaded ${this.migrations.length} migrations`,
|
|
);
|
|
} catch (error) {
|
|
MigrationRunner.loggerNoFile.debug({
|
|
message: "No migrations directory found or error reading:",
|
|
error,
|
|
});
|
|
}
|
|
}
|
|
|
|
private async createMigrationsTable(): Promise<void> {
|
|
const query = `
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT,
|
|
executed_at TIMESTAMP,
|
|
checksum TEXT
|
|
)
|
|
`;
|
|
await cassandra.execute(query);
|
|
MigrationRunner.loggerNoFile.debug("Schema migrations table ready");
|
|
}
|
|
|
|
private async getExecutedMigrations(): Promise<Set<string>> {
|
|
try {
|
|
const result = (await cassandra.execute(
|
|
"SELECT id FROM schema_migrations",
|
|
)) as { rows: Array<{ id: string }> };
|
|
return new Set(result.rows.map((row) => row.id));
|
|
} catch (error) {
|
|
MigrationRunner.loggerNoFile.debug({
|
|
message: "Could not fetch executed migrations:",
|
|
error,
|
|
});
|
|
return new Set();
|
|
}
|
|
}
|
|
|
|
private async markMigrationExecuted(migration: SqlMigration): Promise<void> {
|
|
const query = `
|
|
INSERT INTO schema_migrations (id, name, executed_at, checksum)
|
|
VALUES (?, ?, ?, ?)
|
|
`;
|
|
const checksum = this.generateChecksum(migration.upSql);
|
|
await cassandra.execute(query, [
|
|
migration.id,
|
|
migration.name,
|
|
new Date(),
|
|
checksum,
|
|
]);
|
|
}
|
|
|
|
private generateChecksum(input: string): string {
|
|
let hash = 0;
|
|
for (let i = 0; i < input.length; i++) {
|
|
const char = input.charCodeAt(i);
|
|
hash = (hash << 5) - hash + char;
|
|
hash = hash & hash;
|
|
}
|
|
return hash.toString(16);
|
|
}
|
|
|
|
private async executeSql(sql: string): Promise<void> {
|
|
const statements = sql
|
|
.split(";")
|
|
.map((stmt) => stmt.trim())
|
|
.filter((stmt) => stmt.length > 0);
|
|
|
|
for (const statement of statements) {
|
|
if (statement.trim()) {
|
|
await cassandra.execute(statement);
|
|
}
|
|
}
|
|
}
|
|
|
|
async runMigrations(): Promise<void> {
|
|
if (this.migrations.length === 0) {
|
|
MigrationRunner.loggerNoFile.debug("No migrations to run");
|
|
return;
|
|
}
|
|
await this.createMigrationsTable();
|
|
const executedMigrations = await this.getExecutedMigrations();
|
|
const pendingMigrations = this.migrations.filter(
|
|
(migration) => !executedMigrations.has(migration.id),
|
|
);
|
|
if (pendingMigrations.length === 0) {
|
|
MigrationRunner.loggerNoFile.debug("All migrations are up to date");
|
|
return;
|
|
}
|
|
MigrationRunner.loggerNoFile.debug(
|
|
`Running ${pendingMigrations.length} pending migrations...`,
|
|
);
|
|
for (const migration of pendingMigrations) {
|
|
try {
|
|
MigrationRunner.loggerNoFile.debug(
|
|
`Running migration: ${migration.id} - ${migration.name}`,
|
|
);
|
|
await this.executeSql(migration.upSql);
|
|
await this.markMigrationExecuted(migration);
|
|
MigrationRunner.loggerNoFile.debug(
|
|
`Migration ${migration.id} completed`,
|
|
);
|
|
} catch (error) {
|
|
MigrationRunner.logger.error({
|
|
message: `Failed to run migration ${migration.id}:`,
|
|
error,
|
|
});
|
|
throw error;
|
|
}
|
|
}
|
|
MigrationRunner.loggerNoFile.debug("All migrations completed successfully");
|
|
}
|
|
|
|
async initialize(): Promise<void> {
|
|
await cassandra.connect({
|
|
withKeyspace: false,
|
|
logging: environment.development,
|
|
});
|
|
await cassandra.createKeyspaceIfNotExists();
|
|
await cassandra.shutdown(!environment.development);
|
|
await cassandra.connect({ withKeyspace: true });
|
|
await this.loadMigrations();
|
|
await this.runMigrations();
|
|
}
|
|
}
|
|
|
|
export const migrationRunner = new MigrationRunner();
|