./kaisetsu-app/src/lib/storage.ts
import { promises as fs, createReadStream, createWriteStream } from 'fs';
import { dirname, join, resolve } from 'path';
import { pipeline } from 'stream/promises';
import { STORAGE_BACKEND, LOCAL_DATA_DIR, GCP_PROJECT_ID, GCS_BUCKET_NAME, requiredGcp } from './config';
export interface StorageBackend {
saveJobInput(jobId: string, input: unknown): Promise<void>;
getJobInput<T = unknown>(jobId: string): Promise<T>;
saveJobStatus(jobId: string, status: unknown): Promise<void>;
getJobStatus<T = unknown>(jobId: string): Promise<T | null>;
saveJobResult(jobId: string, result: unknown): Promise<void>;
getJobResult<T = unknown>(jobId: string): Promise<T | null>;
saveVideo(buffer: Buffer, fileName: string, contentType: string): Promise<string>;
downloadVideoToFile(uri: string, destPath: string): Promise<void>;
}
// --- Local filesystem backend ---
class LocalBackend implements StorageBackend {
constructor(private root: string) {}
private jobPath(jobId: string, file: string): string {
return join(this.root, 'jobs', jobId, file);
}
private async writeJson(path: string, data: unknown): Promise<void> {
await fs.mkdir(dirname(path), { recursive: true });
await fs.writeFile(path, JSON.stringify(data), 'utf-8');
}
private async readJson<T>(path: string): Promise<T | null> {
try {
const text = await fs.readFile(path, 'utf-8');
return JSON.parse(text) as T;
} catch (err: unknown) {
if ((err as NodeJS.ErrnoException).code === 'ENOENT') return null;
throw err;
}
}
async saveJobInput(jobId: string, input: unknown): Promise<void> {
await this.writeJson(this.jobPath(jobId, 'input.json'), input);
}
async getJobInput<T = unknown>(jobId: string): Promise<T> {
const data = await this.readJson<T>(this.jobPath(jobId, 'input.json'));
if (!data) throw new Error(`Job input not found: ${jobId}`);
return data;
}
async saveJobStatus(jobId: string, status: unknown): Promise<void> {
await this.writeJson(this.jobPath(jobId, 'status.json'), status);
}
async getJobStatus<T = unknown>(jobId: string): Promise<T | null> {
return this.readJson<T>(this.jobPath(jobId, 'status.json'));
}
async saveJobResult(jobId: string, result: unknown): Promise<void> {
await this.writeJson(this.jobPath(jobId, 'result.json'), result);
}
async getJobResult<T = unknown>(jobId: string): Promise<T | null> {
return this.readJson<T>(this.jobPath(jobId, 'result.json'));
}
async saveVideo(buffer: Buffer, fileName: string): Promise<string> {
const name = `${Date.now()}-${fileName}`;
const path = join(this.root, 'videos', name);
await fs.mkdir(dirname(path), { recursive: true });
await fs.writeFile(path, buffer);
return `file://${resolve(path)}`;
}
async downloadVideoToFile(uri: string, destPath: string): Promise<void> {
const src = uri.replace(/^file:\/\//, '');
await fs.mkdir(dirname(destPath), { recursive: true });
await pipeline(createReadStream(src), createWriteStream(destPath));
}
}
// --- GCS backend (production) ---
class GcsBackend implements StorageBackend {
private storage: import('@google-cloud/storage').Storage;
constructor(private bucketName: string, projectId: string) {
// Lazy require so local dev doesn't need the GCS SDK loaded unless used
const { Storage } = require('@google-cloud/storage');
this.storage = new Storage({ projectId });
}
private jobFile(jobId: string, name: string) {
return this.storage.bucket(this.bucketName).file(`jobs/${jobId}/${name}`);
}
private async writeJson(file: ReturnType<GcsBackend['jobFile']>, data: unknown): Promise<void> {
await file.save(JSON.stringify(data), { contentType: 'application/json' });
}
private async readJson<T>(file: ReturnType<GcsBackend['jobFile']>): Promise<T | null> {
const [exists] = await file.exists();
if (!exists) return null;
const [contents] = await file.download();
return JSON.parse(contents.toString()) as T;
}
async saveJobInput(jobId: string, input: unknown) { await this.writeJson(this.jobFile(jobId, 'input.json'), input); }
async getJobInput<T = unknown>(jobId: string): Promise<T> {
const data = await this.readJson<T>(this.jobFile(jobId, 'input.json'));
if (!data) throw new Error(`Job input not found: ${jobId}`);
return data;
}
async saveJobStatus(jobId: string, status: unknown) { await this.writeJson(this.jobFile(jobId, 'status.json'), status); }
async getJobStatus<T = unknown>(jobId: string) { return this.readJson<T>(this.jobFile(jobId, 'status.json')); }
async saveJobResult(jobId: string, result: unknown) { await this.writeJson(this.jobFile(jobId, 'result.json'), result); }
async getJobResult<T = unknown>(jobId: string) { return this.readJson<T>(this.jobFile(jobId, 'result.json')); }
async saveVideo(buffer: Buffer, fileName: string, contentType: string): Promise<string> {
const name = `videos/${Date.now()}-${fileName}`;
await this.storage.bucket(this.bucketName).file(name).save(buffer, { contentType, resumable: false });
return `gs://${this.bucketName}/${name}`;
}
async downloadVideoToFile(uri: string, destPath: string): Promise<void> {
const path = uri.replace(`gs://${this.bucketName}/`, '').replace(/^gs:\/\/[^/]+\//, '');
await this.storage.bucket(this.bucketName).file(path).download({ destination: destPath });
}
}
function makeBackend(): StorageBackend {
if (STORAGE_BACKEND === 'gcs') {
const { projectId, bucket } = requiredGcp();
return new GcsBackend(bucket, projectId);
}
return new LocalBackend(resolve(process.cwd(), LOCAL_DATA_DIR));
}
export const storage = makeBackend();