final dev

This commit is contained in:
2026-03-11 12:52:21 +00:00
parent f565f83aea
commit 40fa30b532

View File

@@ -8968,12 +8968,97 @@ class PipelineService extends EventEmitter {
}; };
} }
const buildForcedCancelError = (message) => {
const reason = String(message || 'Vom Benutzer hart abgebrochen.').trim() || 'Vom Benutzer hart abgebrochen.';
const endedAt = nowIso();
const error = new Error(reason);
error.statusCode = 409;
error.runInfo = {
source: 'USER_CANCEL',
stage: this.snapshot.state || null,
cmd: null,
args: [],
startedAt: endedAt,
endedAt,
durationMs: 0,
status: 'CANCELLED',
exitCode: null,
stdoutLines: 0,
stderrLines: 0,
lastProgress: 0,
eta: null,
lastDetail: null,
highlights: []
};
return error;
};
const forceFinalizeCancelledJob = async (reason, stageHint = null) => {
const rawStage = String(stageHint || this.snapshot.state || '').trim().toUpperCase();
const effectiveStage = RUNNING_STATES.has(rawStage)
? rawStage
: (
RUNNING_STATES.has(String(this.snapshot.state || '').trim().toUpperCase())
? String(this.snapshot.state || '').trim().toUpperCase()
: 'ENCODING'
);
try {
await historyService.appendLog(normalizedJobId, 'USER_ACTION', reason);
} catch (_error) {
// continue with force-cancel even if logging failed
}
try {
await this.failJob(normalizedJobId, effectiveStage, buildForcedCancelError(reason));
} catch (forceError) {
logger.error('cancel:force-finalize:failed', {
jobId: normalizedJobId,
stage: effectiveStage,
reason,
error: errorToMeta(forceError)
});
const fallbackJob = await historyService.getJobById(normalizedJobId);
await historyService.updateJob(normalizedJobId, {
status: 'CANCELLED',
last_state: 'CANCELLED',
end_time: nowIso(),
error_message: reason
});
await this.setState('CANCELLED', {
activeJobId: normalizedJobId,
progress: this.snapshot.progress,
eta: null,
statusText: reason,
context: {
jobId: normalizedJobId,
rawPath: fallbackJob?.raw_path || null,
error: reason,
canRestartReviewFromRaw: Boolean(fallbackJob?.raw_path)
}
});
} finally {
this.cancelRequestedByJob.delete(normalizedJobId);
this.activeProcesses.delete(normalizedJobId);
this.syncPrimaryActiveProcess();
}
return {
cancelled: true,
queuedOnly: false,
forced: true,
jobId: normalizedJobId
};
};
const runningJob = await historyService.getJobById(normalizedJobId);
const runningStatus = String(
runningJob?.status
|| runningJob?.last_state
|| this.snapshot.state
|| ''
).trim().toUpperCase();
const processHandle = this.activeProcesses.get(normalizedJobId) || null; const processHandle = this.activeProcesses.get(normalizedJobId) || null;
if (!processHandle) { if (!processHandle) {
const runningJob = await historyService.getJobById(normalizedJobId); if (runningStatus === 'READY_TO_ENCODE') {
const status = String(runningJob?.status || '').trim().toUpperCase();
if (status === 'READY_TO_ENCODE') {
// Kein laufender Prozess Job direkt abbrechen // Kein laufender Prozess Job direkt abbrechen
await historyService.updateJob(normalizedJobId, { await historyService.updateJob(normalizedJobId, {
status: 'CANCELLED', status: 'CANCELLED',
@@ -8997,19 +9082,11 @@ class PipelineService extends EventEmitter {
return { cancelled: true, queuedOnly: false, jobId: normalizedJobId }; return { cancelled: true, queuedOnly: false, jobId: normalizedJobId };
} }
if (['ANALYZING', 'RIPPING', 'MEDIAINFO_CHECK', 'ENCODING'].includes(status)) { if (RUNNING_STATES.has(runningStatus)) {
this.cancelRequestedByJob.add(normalizedJobId); return forceFinalizeCancelledJob(
await historyService.appendLog( `Abbruch erzwungen: kein aktiver Prozess-Handle gefunden (Status ${runningStatus}).`,
normalizedJobId, runningStatus
'USER_ACTION',
'Abbruch angefordert. Wird beim nächsten Prozessschritt angewendet.'
); );
return {
cancelled: true,
queuedOnly: false,
pending: true,
jobId: normalizedJobId
};
} }
const error = new Error(`Kein laufender Prozess für Job #${normalizedJobId} zum Abbrechen.`); const error = new Error(`Kein laufender Prozess für Job #${normalizedJobId} zum Abbrechen.`);
@@ -9020,15 +9097,64 @@ class PipelineService extends EventEmitter {
logger.warn('cancel:requested', { logger.warn('cancel:requested', {
state: this.snapshot.state, state: this.snapshot.state,
activeJobId: this.snapshot.activeJobId, activeJobId: this.snapshot.activeJobId,
requestedJobId: normalizedJobId requestedJobId: normalizedJobId,
pid: processHandle?.child?.pid || null
}); });
this.cancelRequestedByJob.add(normalizedJobId); this.cancelRequestedByJob.add(normalizedJobId);
processHandle.cancel(); processHandle.cancel();
return { try {
cancelled: true, await historyService.appendLog(
queuedOnly: false, normalizedJobId,
jobId: normalizedJobId 'USER_ACTION',
}; `Abbruch angefordert (hard-cancel). Status=${runningStatus || '-'}.`
);
} catch (_error) {
// keep hard-cancel flow even if logging fails
}
const settleResult = await Promise.race([
Promise.resolve(processHandle.promise)
.then(() => 'settled')
.catch(() => 'settled'),
new Promise((resolve) => setTimeout(() => resolve('timeout'), 2200))
]);
const stillActive = this.activeProcesses.has(normalizedJobId);
if (settleResult === 'settled' && !stillActive) {
return {
cancelled: true,
queuedOnly: false,
jobId: normalizedJobId
};
}
logger.error('cancel:hard-timeout', {
jobId: normalizedJobId,
runningStatus,
settleResult,
stillActive,
pid: processHandle?.child?.pid || null
});
try {
processHandle.cancel();
} catch (_error) {
// ignore second cancel errors
}
const childPid = Number(processHandle?.child?.pid);
if (Number.isFinite(childPid) && childPid > 0) {
try { process.kill(-childPid, 'SIGKILL'); } catch (_error) { /* noop */ }
try { process.kill(childPid, 'SIGKILL'); } catch (_error) { /* noop */ }
}
try {
processHandle?.child?.kill?.('SIGKILL');
} catch (_error) {
// noop
}
this.activeProcesses.delete(normalizedJobId);
this.syncPrimaryActiveProcess();
return forceFinalizeCancelledJob(
`Abbruch erzwungen: Prozess reagierte nicht rechtzeitig auf Kill-Signal (Status ${runningStatus || '-'}).`,
runningStatus
);
} }
async runCommand({ async runCommand({