./kaisetsu-app/src/lib/process-job.ts
import { ThinkingLevel } from '@google/genai';
import { execSync } from 'child_process';
import { readFileSync } from 'fs';
import { mkdir, readFile, stat } from 'fs/promises';
import { tmpdir } from 'os';
import { join } from 'path';
import { GEMINI_MODEL, WHISPER_BACKEND, OPENAI_API_KEY } from './config';
import { genai, prepareVideo, VideoRef } from './genai';
import { storage } from './storage';
// --- Interfaces ---
interface WhisperWord {
word: string;
start: number;
end: number;
probability: number;
}
interface WhisperSegment {
start: number;
end: number;
text: string;
words?: WhisperWord[];
}
interface JobInput {
gcsUri: string;
mimeType: string;
generateNarration: boolean;
tcOffset: string;
genre?: string;
}
interface JobStatus {
status: 'processing' | 'completed' | 'error';
step: string;
progress: number;
error?: string;
createdAt: string;
updatedAt: string;
}
// --- Prompt Loading ---
const PROMPTS_DIR = join(process.cwd(), 'prompts');
// Genre to prompt directory mapping
const GENRE_PROMPT_DIR: Record<string, string> = {
variety: 'variety',
anime: 'anime',
news: 'variety', // 報道/ドキュメンタリー → バラエティと同じプロンプトを使用
};
function getPrompts(genre: string = 'variety') {
const dir = join(PROMPTS_DIR, GENRE_PROMPT_DIR[genre] || 'variety');
const audioMeta = readFileSync(join(dir, '1onseimeta.txt'), 'utf-8').trim();
const visualMeta = readFileSync(join(dir, '2eizoumeta.txt'), 'utf-8').trim();
const narration = readFileSync(join(dir, '3genkouseisei.txt'), 'utf-8').trim();
console.log(`[prompts] Loaded from ${dir} (genre:${genre} audio:${audioMeta.length} visual:${visualMeta.length} narration:${narration.length} chars)`);
return { audioMeta, visualMeta, narration };
}
// --- TC Helpers ---
function parseTcOffset(tc: string): number {
const parts = tc.split(';').map(Number);
if (parts.length === 4 && parts.every(p => !isNaN(p))) {
return parts[0] * 3600 + parts[1] * 60 + parts[2] + parts[3] / 30;
}
return 35985; // default 09;59;45;00
}
function secondsToTimecode(relativeSeconds: number, offsetSeconds: number): string {
const absoluteSeconds = relativeSeconds + offsetSeconds;
const hrs = Math.floor(absoluteSeconds / 3600);
const mins = Math.floor((absoluteSeconds % 3600) / 60);
const secs = Math.floor(absoluteSeconds % 60);
const frames = Math.floor((relativeSeconds % 1) * 30);
return `${hrs.toString().padStart(2, '0')};${mins.toString().padStart(2, '0')};${secs.toString().padStart(2, '0')};${frames.toString().padStart(2, '0')}`;
}
// Parse TC string (handles both ; and : separators) into absolute seconds
function tcToAbsoluteSeconds(tc: string): number {
const normalized = tc.replace(/:/g, ';');
const parts = normalized.split(';').map(Number);
if (parts.length !== 4) return 0;
return parts[0] * 3600 + parts[1] * 60 + parts[2] + parts[3] / 30;
}
function timecodeToSeconds(tc: string, offsetSeconds: number): number {
return Math.max(0, tcToAbsoluteSeconds(tc) - offsetSeconds);
}
// --- Storage Helpers ---
async function getJobInput(jobId: string): Promise<JobInput> {
return storage.getJobInput<JobInput>(jobId);
}
async function updateJobStatus(jobId: string, update: Partial<JobStatus>): Promise<void> {
const current = (await storage.getJobStatus<JobStatus>(jobId)) ?? {
status: 'processing' as const,
step: 'unknown',
progress: 0,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
await storage.saveJobStatus(jobId, {
...current,
...update,
updatedAt: new Date().toISOString(),
});
}
async function saveJobResult(jobId: string, result: Record<string, unknown>): Promise<void> {
await storage.saveJobResult(jobId, result);
}
// --- Whisper ---
interface WhisperResult {
segments: WhisperSegment[];
videoStartTime: number; // PTS offset detected by ffprobe
}
interface WhisperApiResult {
segments: WhisperSegment[];
}
async function transcribeWithWhisper(audioPath: string, tmpDir: string): Promise<WhisperApiResult> {
if (WHISPER_BACKEND === 'openai') {
return transcribeWithOpenAI(audioPath);
}
const cmd = WHISPER_BACKEND === 'mlx'
? `mlx_whisper "${audioPath}" --model mlx-community/whisper-large-v3-turbo --language ja --output-format json --word-timestamps True --output-dir "${tmpDir}"`
: `whisper "${audioPath}" --model large-v3-turbo --language ja --output_format json --word_timestamps True --output_dir "${tmpDir}"`;
execSync(cmd, { stdio: 'pipe', timeout: 600000 });
const rawJson = readFileSync(join(tmpDir, 'audio.json'), 'utf-8');
const sanitized = rawJson.replace(/:\s*NaN\b/g, ': null');
return JSON.parse(sanitized);
}
async function transcribeWithOpenAI(audioPath: string): Promise<WhisperApiResult> {
if (!OPENAI_API_KEY) {
throw new Error('OPENAI_API_KEY is not set (required for WHISPER_BACKEND=openai)');
}
const { size } = await stat(audioPath);
const MAX_BYTES = 25 * 1024 * 1024;
if (size > MAX_BYTES) {
throw new Error(
`Audio file ${(size / 1024 / 1024).toFixed(1)}MB exceeds OpenAI API's 25MB limit. ` +
`Use WHISPER_BACKEND=mlx or chunk the input.`
);
}
const audioBuffer = await readFile(audioPath);
const form = new FormData();
form.append('file', new Blob([audioBuffer], { type: 'audio/mpeg' }), 'audio.mp3');
form.append('model', 'whisper-1');
form.append('language', 'ja');
form.append('response_format', 'verbose_json');
form.append('timestamp_granularities[]', 'segment');
form.append('timestamp_granularities[]', 'word');
const response = await fetch('https://api.openai.com/v1/audio/transcriptions', {
method: 'POST',
headers: { Authorization: `Bearer ${OPENAI_API_KEY}` },
body: form,
});
if (!response.ok) {
const body = await response.text();
throw new Error(`OpenAI Whisper API failed: ${response.status} ${body}`);
}
const data = (await response.json()) as {
segments?: Array<{ start: number; end: number; text: string }>;
words?: Array<{ word: string; start: number; end: number }>;
};
const segments: WhisperSegment[] = (data.segments ?? []).map((seg) => {
const words = (data.words ?? [])
.filter((w) => w.start >= seg.start - 0.01 && w.end <= seg.end + 0.2)
.map((w) => ({ word: w.word, start: w.start, end: w.end, probability: 1 }));
return { ...seg, words };
});
return { segments };
}
async function runWhisper(videoUri: string): Promise<WhisperResult> {
const tmpDir = join(tmpdir(), `whisper-${Date.now()}`);
await mkdir(tmpDir, { recursive: true });
const videoPath = join(tmpDir, 'video.mp4');
const audioPath = join(tmpDir, 'audio.mp3');
try {
// Materialize video to local disk (GCS: download; local: copy)
await storage.downloadVideoToFile(videoUri, videoPath);
// Detect audio vs video stream PTS offset with ffprobe
let videoStartTime = 0;
try {
const videoStreamStart = execSync(
`ffprobe -v error -show_entries stream=start_time -select_streams v:0 -of csv=p=0 "${videoPath}"`,
{ encoding: 'utf-8', timeout: 10000 }
).trim();
const audioStreamStart = execSync(
`ffprobe -v error -show_entries stream=start_time -select_streams a:0 -of csv=p=0 "${videoPath}"`,
{ encoding: 'utf-8', timeout: 10000 }
).trim();
const vStart = parseFloat(videoStreamStart) || 0;
const aStart = parseFloat(audioStreamStart) || 0;
videoStartTime = aStart - vStart; // positive = audio starts later than video
console.log(`[ffprobe] Video stream PTS: ${vStart}s, Audio stream PTS: ${aStart}s, offset: ${videoStartTime}s`);
} catch {
// Fallback to format-level start_time
try {
const probeResult = execSync(
`ffprobe -v error -show_entries format=start_time -of csv=p=0 "${videoPath}"`,
{ encoding: 'utf-8', timeout: 10000 }
).trim();
videoStartTime = parseFloat(probeResult) || 0;
console.log(`[ffprobe] Format start_time: ${videoStartTime}s`);
} catch {
console.log('[ffprobe] Could not detect PTS offset, assuming 0');
}
}
// Extract audio with ffmpeg
execSync(
`ffmpeg -i "${videoPath}" -vn -acodec libmp3lame -ar 16000 -ac 1 "${audioPath}" -y`,
{ stdio: 'pipe', timeout: 120000 }
);
// Run Whisper (with word-level timestamps for precise end-of-speech detection)
const result = await transcribeWithWhisper(audioPath, tmpDir);
// Use word-level timestamps to get precise end-of-speech times
// Whisper often extends segment.end to the next speech start, swallowing silence/music gaps
const segments: WhisperSegment[] = (result.segments || []).map((seg) => {
if (seg.words && seg.words.length > 0) {
const lastWord = seg.words[seg.words.length - 1];
const wordEnd = lastWord.end + 0.2; // 0.2s buffer after last word
if (wordEnd < seg.end) {
return { ...seg, end: wordEnd };
}
}
return seg;
});
const trimCount = segments.filter((s: WhisperSegment, i: number) =>
s.end !== (result.segments || [])[i]?.end
).length;
if (trimCount > 0) {
console.log(`[whisper] Word-level trim applied to ${trimCount}/${segments.length} segments`);
}
return { segments, videoStartTime };
} finally {
try { execSync(`rm -rf "${tmpDir}"`, { stdio: 'pipe' }); } catch { /* ignore */ }
}
}
// --- Whisper to Pre-Generated Meta ---
function whisperToPreGeneratedMeta(segments: WhisperSegment[], offsetSeconds: number): string {
const lines: string[] = ['開始タイムコード, 終了タイムコード, タイプ, 話者, 内容'];
let lastEnd = 0;
for (const seg of segments) {
if (seg.start - lastEnd > 0.3) {
lines.push(
`${secondsToTimecode(lastEnd, offsetSeconds)}, ${secondsToTimecode(seg.start, offsetSeconds)}, silence, ,`
);
}
lines.push(
`${secondsToTimecode(seg.start, offsetSeconds)}, ${secondsToTimecode(seg.end, offsetSeconds)}, speech, , ${seg.text.trim()}`
);
lastEnd = seg.end;
}
return lines.join('\n');
}
// --- Flexible Parsers (JSON + CSV) ---
interface AudioBlock {
startTC: string;
endTC: string;
type: string;
speaker?: string;
content?: string;
bgmLevel?: string;
}
interface NarrationEntry {
tc: string;
speaker: string;
text: string;
}
function tryParseJson(text: string): Record<string, unknown> | null {
try {
const jsonMatch = text.match(/```(?:json)?\s*\n?([\s\S]*?)\n?\s*```/);
const jsonStr = jsonMatch ? jsonMatch[1].trim() : text.trim();
return JSON.parse(jsonStr);
} catch {
return null;
}
}
function stripCodeBlock(text: string): string {
// Remove markdown code blocks and "Plaintext" labels
return text
.replace(/```(?:csv|plaintext|text)?\s*\n?/gi, '')
.replace(/```/g, '')
.replace(/^Plaintext\s*$/gim, '')
.trim();
}
function parseAudioMetaResponse(raw: string): AudioBlock[] {
// Try JSON first
const json = tryParseJson(raw);
if (json && Array.isArray(json.blocks)) {
return json.blocks as AudioBlock[];
}
// Fall back to CSV: 開始TC, 終了TC, タイプ, 話者, 内容
const text = stripCodeBlock(raw);
const lines = text.split('\n').filter(l => l.trim());
const blocks: AudioBlock[] = [];
for (const line of lines) {
if (line.includes('タイムコード') || line.includes('タイプ') || line.startsWith('#')) continue;
const parts = line.split(',').map(s => s.trim());
if (parts.length >= 3 && (parts[0].includes(';') || /^\d{2}:\d{2}:\d{2}:\d{2}$/.test(parts[0]))) {
blocks.push({
startTC: parts[0].replace(/:/g, ';'),
endTC: parts[1].replace(/:/g, ';'),
type: parts[2].toLowerCase(),
speaker: parts[3] || '',
content: parts.slice(4).join(',').trim(),
});
}
}
return blocks;
}
function parseNarrationResponse(raw: string): NarrationEntry[] {
// Try JSON first
const json = tryParseJson(raw);
if (json && Array.isArray(json.narrations)) {
return json.narrations as NarrationEntry[];
}
// Fall back to CSV: TC, 話者, 発話内容
const text = stripCodeBlock(raw);
const lines = text.split('\n').filter(l => l.trim());
const narrations: NarrationEntry[] = [];
for (const line of lines) {
if (line.includes('TC') && line.includes('話者')) continue;
if (line.startsWith('#') || line.startsWith('---') || line.startsWith('コード')) continue;
if (line.startsWith('※')) continue;
const parts = line.split(',').map(s => s.trim());
if (parts.length >= 3 && (parts[0].includes(';') || /^\d{2}:\d{2}:\d{2}:\d{2}$/.test(parts[0]))) {
const speaker = parts[1].replace('【候補】', '').trim();
// Only include narration lines
if (speaker.includes('ナレーション')) {
narrations.push({
tc: parts[0].replace(/:/g, ';'),
speaker,
text: parts.slice(2).join(',').trim(),
});
}
}
}
return narrations;
}
// --- Whisper Hallucination Detection ---
function isWhisperHallucinated(segments: WhisperSegment[]): boolean {
const textSegments = segments.filter(s => s.text.trim().length > 0);
if (textSegments.length < 3) return true; // Too few segments = unreliable
// Count text frequency
const textCounts = new Map<string, number>();
for (const seg of textSegments) {
const text = seg.text.trim();
textCounts.set(text, (textCounts.get(text) || 0) + 1);
}
// If the most common text appears in >40% of segments, it's hallucinated
const maxCount = Math.max(...textCounts.values());
const ratio = maxCount / textSegments.length;
if (ratio > 0.4) {