From 40fa30b532bc401029557630f1aa3dad7260f219 Mon Sep 17 00:00:00 2001 From: mboehmlaender Date: Wed, 11 Mar 2026 12:52:21 +0000 Subject: [PATCH] final dev --- backend/src/services/pipelineService.js | 170 +++++++++++++++++++++--- 1 file changed, 148 insertions(+), 22 deletions(-) diff --git a/backend/src/services/pipelineService.js b/backend/src/services/pipelineService.js index 166dfb2..792cafa 100644 --- a/backend/src/services/pipelineService.js +++ b/backend/src/services/pipelineService.js @@ -8968,12 +8968,97 @@ class PipelineService extends EventEmitter { }; } + const buildForcedCancelError = (message) => { + const reason = String(message || 'Vom Benutzer hart abgebrochen.').trim() || 'Vom Benutzer hart abgebrochen.'; + const endedAt = nowIso(); + const error = new Error(reason); + error.statusCode = 409; + error.runInfo = { + source: 'USER_CANCEL', + stage: this.snapshot.state || null, + cmd: null, + args: [], + startedAt: endedAt, + endedAt, + durationMs: 0, + status: 'CANCELLED', + exitCode: null, + stdoutLines: 0, + stderrLines: 0, + lastProgress: 0, + eta: null, + lastDetail: null, + highlights: [] + }; + return error; + }; + + const forceFinalizeCancelledJob = async (reason, stageHint = null) => { + const rawStage = String(stageHint || this.snapshot.state || '').trim().toUpperCase(); + const effectiveStage = RUNNING_STATES.has(rawStage) + ? rawStage + : ( + RUNNING_STATES.has(String(this.snapshot.state || '').trim().toUpperCase()) + ? String(this.snapshot.state || '').trim().toUpperCase() + : 'ENCODING' + ); + try { + await historyService.appendLog(normalizedJobId, 'USER_ACTION', reason); + } catch (_error) { + // continue with force-cancel even if logging failed + } + try { + await this.failJob(normalizedJobId, effectiveStage, buildForcedCancelError(reason)); + } catch (forceError) { + logger.error('cancel:force-finalize:failed', { + jobId: normalizedJobId, + stage: effectiveStage, + reason, + error: errorToMeta(forceError) + }); + const fallbackJob = await historyService.getJobById(normalizedJobId); + await historyService.updateJob(normalizedJobId, { + status: 'CANCELLED', + last_state: 'CANCELLED', + end_time: nowIso(), + error_message: reason + }); + await this.setState('CANCELLED', { + activeJobId: normalizedJobId, + progress: this.snapshot.progress, + eta: null, + statusText: reason, + context: { + jobId: normalizedJobId, + rawPath: fallbackJob?.raw_path || null, + error: reason, + canRestartReviewFromRaw: Boolean(fallbackJob?.raw_path) + } + }); + } finally { + this.cancelRequestedByJob.delete(normalizedJobId); + this.activeProcesses.delete(normalizedJobId); + this.syncPrimaryActiveProcess(); + } + return { + cancelled: true, + queuedOnly: false, + forced: true, + jobId: normalizedJobId + }; + }; + + const runningJob = await historyService.getJobById(normalizedJobId); + const runningStatus = String( + runningJob?.status + || runningJob?.last_state + || this.snapshot.state + || '' + ).trim().toUpperCase(); + const processHandle = this.activeProcesses.get(normalizedJobId) || null; if (!processHandle) { - const runningJob = await historyService.getJobById(normalizedJobId); - const status = String(runningJob?.status || '').trim().toUpperCase(); - - if (status === 'READY_TO_ENCODE') { + if (runningStatus === 'READY_TO_ENCODE') { // Kein laufender Prozess – Job direkt abbrechen await historyService.updateJob(normalizedJobId, { status: 'CANCELLED', @@ -8997,19 +9082,11 @@ class PipelineService extends EventEmitter { return { cancelled: true, queuedOnly: false, jobId: normalizedJobId }; } - if (['ANALYZING', 'RIPPING', 'MEDIAINFO_CHECK', 'ENCODING'].includes(status)) { - this.cancelRequestedByJob.add(normalizedJobId); - await historyService.appendLog( - normalizedJobId, - 'USER_ACTION', - 'Abbruch angefordert. Wird beim nächsten Prozessschritt angewendet.' + if (RUNNING_STATES.has(runningStatus)) { + return forceFinalizeCancelledJob( + `Abbruch erzwungen: kein aktiver Prozess-Handle gefunden (Status ${runningStatus}).`, + runningStatus ); - return { - cancelled: true, - queuedOnly: false, - pending: true, - jobId: normalizedJobId - }; } const error = new Error(`Kein laufender Prozess für Job #${normalizedJobId} zum Abbrechen.`); @@ -9020,15 +9097,64 @@ class PipelineService extends EventEmitter { logger.warn('cancel:requested', { state: this.snapshot.state, activeJobId: this.snapshot.activeJobId, - requestedJobId: normalizedJobId + requestedJobId: normalizedJobId, + pid: processHandle?.child?.pid || null }); this.cancelRequestedByJob.add(normalizedJobId); processHandle.cancel(); - return { - cancelled: true, - queuedOnly: false, - jobId: normalizedJobId - }; + try { + await historyService.appendLog( + normalizedJobId, + 'USER_ACTION', + `Abbruch angefordert (hard-cancel). Status=${runningStatus || '-'}.` + ); + } catch (_error) { + // keep hard-cancel flow even if logging fails + } + + const settleResult = await Promise.race([ + Promise.resolve(processHandle.promise) + .then(() => 'settled') + .catch(() => 'settled'), + new Promise((resolve) => setTimeout(() => resolve('timeout'), 2200)) + ]); + const stillActive = this.activeProcesses.has(normalizedJobId); + if (settleResult === 'settled' && !stillActive) { + return { + cancelled: true, + queuedOnly: false, + jobId: normalizedJobId + }; + } + + logger.error('cancel:hard-timeout', { + jobId: normalizedJobId, + runningStatus, + settleResult, + stillActive, + pid: processHandle?.child?.pid || null + }); + try { + processHandle.cancel(); + } catch (_error) { + // ignore second cancel errors + } + const childPid = Number(processHandle?.child?.pid); + if (Number.isFinite(childPid) && childPid > 0) { + try { process.kill(-childPid, 'SIGKILL'); } catch (_error) { /* noop */ } + try { process.kill(childPid, 'SIGKILL'); } catch (_error) { /* noop */ } + } + try { + processHandle?.child?.kill?.('SIGKILL'); + } catch (_error) { + // noop + } + this.activeProcesses.delete(normalizedJobId); + this.syncPrimaryActiveProcess(); + return forceFinalizeCancelledJob( + `Abbruch erzwungen: Prozess reagierte nicht rechtzeitig auf Kill-Signal (Status ${runningStatus || '-'}).`, + runningStatus + ); } async runCommand({