Fixxes + Skriptketten

This commit is contained in:
2026-03-05 20:46:39 +00:00
parent 6836892907
commit afca677b1c
14 changed files with 1785 additions and 68 deletions

View File

@@ -6,6 +6,7 @@ const settingsService = require('./settingsService');
const historyService = require('./historyService');
const omdbService = require('./omdbService');
const scriptService = require('./scriptService');
const scriptChainService = require('./scriptChainService');
const wsService = require('./websocketService');
const diskDetectionService = require('./diskDetectionService');
const notificationService = require('./notificationService');
@@ -2045,6 +2046,7 @@ class PipelineService extends EventEmitter {
this.activeProcess = null;
this.activeProcesses = new Map();
this.cancelRequestedByJob = new Set();
this.jobProgress = new Map();
this.lastPersistAt = 0;
this.lastProgressKey = null;
this.queueEntries = [];
@@ -2126,8 +2128,13 @@ class PipelineService extends EventEmitter {
}
getSnapshot() {
const jobProgress = {};
for (const [id, data] of this.jobProgress) {
jobProgress[id] = data;
}
return {
...this.snapshot,
jobProgress,
queue: this.lastQueueSnapshot
};
}
@@ -2173,6 +2180,7 @@ class PipelineService extends EventEmitter {
async getQueueSnapshot() {
const maxParallelJobs = await this.getMaxParallelJobs();
const runningJobs = await historyService.getRunningJobs();
const runningEncodeCount = runningJobs.filter((job) => job.status === 'ENCODING').length;
const queuedJobIds = this.queueEntries.map((entry) => Number(entry.jobId)).filter((id) => Number.isFinite(id) && id > 0);
const queuedRows = queuedJobIds.length > 0
? await historyService.getJobsByIds(queuedJobIds)
@@ -2181,7 +2189,7 @@ class PipelineService extends EventEmitter {
const queue = {
maxParallelJobs,
runningCount: runningJobs.length,
runningCount: runningEncodeCount,
runningJobs: runningJobs.map((job) => ({
jobId: Number(job.id),
title: job.title || job.detected_title || `Job #${job.id}`,
@@ -2272,8 +2280,8 @@ class PipelineService extends EventEmitter {
}
const maxParallelJobs = await this.getMaxParallelJobs();
const runningJobs = await historyService.getRunningJobs();
const shouldQueue = this.queueEntries.length > 0 || runningJobs.length >= maxParallelJobs;
const runningEncodeJobs = await historyService.getRunningEncodeJobs();
const shouldQueue = this.queueEntries.length > 0 || runningEncodeJobs.length >= maxParallelJobs;
if (!shouldQueue) {
const result = await startNow();
await this.emitQueueChanged();
@@ -2345,8 +2353,8 @@ class PipelineService extends EventEmitter {
try {
while (this.queueEntries.length > 0) {
const maxParallelJobs = await this.getMaxParallelJobs();
const runningJobs = await historyService.getRunningJobs();
if (runningJobs.length >= maxParallelJobs) {
const runningEncodeJobs = await historyService.getRunningEncodeJobs();
if (runningEncodeJobs.length >= maxParallelJobs) {
break;
}
@@ -2473,6 +2481,7 @@ class PipelineService extends EventEmitter {
async setState(state, patch = {}) {
const previous = this.snapshot.state;
const previousActiveJobId = this.snapshot.activeJobId;
this.snapshot = {
...this.snapshot,
state,
@@ -2482,6 +2491,20 @@ class PipelineService extends EventEmitter {
statusText: patch.statusText !== undefined ? patch.statusText : this.snapshot.statusText,
context: patch.context !== undefined ? patch.context : this.snapshot.context
};
// Keep per-job progress map in sync when a job starts or finishes.
if (patch.activeJobId != null) {
this.jobProgress.set(Number(patch.activeJobId), {
state,
progress: patch.progress ?? 0,
eta: patch.eta ?? null,
statusText: patch.statusText ?? null
});
} else if (patch.activeJobId === null && previousActiveJobId != null) {
// Job slot cleared remove the finished job's live entry so it falls
// back to DB data in the frontend.
this.jobProgress.delete(Number(previousActiveJobId));
}
logger.info('state:changed', {
from: previous,
to: state,
@@ -2531,34 +2554,53 @@ class PipelineService extends EventEmitter {
);
}
async updateProgress(stage, percent, eta, statusText) {
this.snapshot = {
...this.snapshot,
state: stage,
progress: percent ?? this.snapshot.progress,
eta: eta ?? this.snapshot.eta,
statusText: statusText ?? this.snapshot.statusText
};
async updateProgress(stage, percent, eta, statusText, jobIdOverride = null) {
const effectiveJobId = jobIdOverride != null ? Number(jobIdOverride) : this.snapshot.activeJobId;
const effectiveProgress = percent ?? this.snapshot.progress;
const effectiveEta = eta ?? this.snapshot.eta;
const effectiveStatusText = statusText ?? this.snapshot.statusText;
await this.persistSnapshot(false);
const rounded = Number((this.snapshot.progress || 0).toFixed(2));
const key = `${stage}:${rounded}`;
// Update per-job progress so concurrent jobs don't overwrite each other.
if (effectiveJobId != null) {
this.jobProgress.set(effectiveJobId, {
state: stage,
progress: effectiveProgress,
eta: effectiveEta,
statusText: effectiveStatusText
});
}
// Only update the global snapshot fields when this update belongs to the
// currently active job (avoids the snapshot jumping between parallel jobs).
if (effectiveJobId === this.snapshot.activeJobId || effectiveJobId == null) {
this.snapshot = {
...this.snapshot,
state: stage,
progress: effectiveProgress,
eta: effectiveEta,
statusText: effectiveStatusText
};
await this.persistSnapshot(false);
}
const rounded = Number((effectiveProgress || 0).toFixed(2));
const key = `${effectiveJobId}:${stage}:${rounded}`;
if (key !== this.lastProgressKey) {
this.lastProgressKey = key;
logger.debug('progress:update', {
stage,
activeJobId: this.snapshot.activeJobId,
activeJobId: effectiveJobId,
progress: rounded,
eta: this.snapshot.eta,
statusText: this.snapshot.statusText
eta: effectiveEta,
statusText: effectiveStatusText
});
}
wsService.broadcast('PIPELINE_PROGRESS', {
state: stage,
activeJobId: this.snapshot.activeJobId,
progress: this.snapshot.progress,
eta: this.snapshot.eta,
statusText: this.snapshot.statusText
activeJobId: effectiveJobId,
progress: effectiveProgress,
eta: effectiveEta,
statusText: effectiveStatusText
});
}
@@ -4156,6 +4198,14 @@ class PipelineService extends EventEmitter {
const isReadyToEncode = preloadedJob.status === 'READY_TO_ENCODE' || preloadedJob.last_state === 'READY_TO_ENCODE';
if (isReadyToEncode) {
// Check whether this confirmed job will rip first (pre_rip mode) or encode directly.
// Pre-rip jobs bypass the encode queue because the next step is a rip, not an encode.
const jobEncodePlan = this.safeParseJson(preloadedJob.encode_plan_json);
const jobMode = String(jobEncodePlan?.mode || '').trim().toLowerCase();
const willRipFirst = jobMode === 'pre_rip' || Boolean(jobEncodePlan?.preRip);
if (willRipFirst) {
return this.startPreparedJob(jobId, { ...options, immediate: true });
}
return this.enqueueOrStartAction(
QUEUE_ACTIONS.START_PREPARED,
jobId,
@@ -4179,11 +4229,8 @@ class PipelineService extends EventEmitter {
}
if (!hasUsableRawInput) {
return this.enqueueOrStartAction(
QUEUE_ACTIONS.START_PREPARED,
jobId,
() => this.startPreparedJob(jobId, { ...options, immediate: true })
);
// No raw input yet → will rip from disc. Bypass the encode queue entirely.
return this.startPreparedJob(jobId, { ...options, immediate: true });
}
return this.startPreparedJob(jobId, { ...options, immediate: true, preloadedJob });
@@ -4397,6 +4444,28 @@ class PipelineService extends EventEmitter {
const selectedPostEncodeScripts = await scriptService.resolveScriptsByIds(selectedPostEncodeScriptIds, {
strict: true
});
const normalizeChainIdList = (raw) => {
const list = Array.isArray(raw) ? raw : [];
return list.map(Number).filter((id) => Number.isFinite(id) && id > 0).map(Math.trunc);
};
const hasExplicitPreScriptSelection = options?.selectedPreEncodeScriptIds !== undefined;
const selectedPreEncodeScriptIds = hasExplicitPreScriptSelection
? normalizeScriptIdList(options?.selectedPreEncodeScriptIds || [])
: normalizeScriptIdList(planForConfirm?.preEncodeScriptIds || encodePlan?.preEncodeScriptIds || []);
const selectedPreEncodeScripts = await scriptService.resolveScriptsByIds(selectedPreEncodeScriptIds, { strict: true });
const hasExplicitPostChainSelection = options?.selectedPostEncodeChainIds !== undefined;
const selectedPostEncodeChainIds = hasExplicitPostChainSelection
? normalizeChainIdList(options?.selectedPostEncodeChainIds || [])
: normalizeChainIdList(planForConfirm?.postEncodeChainIds || encodePlan?.postEncodeChainIds || []);
const hasExplicitPreChainSelection = options?.selectedPreEncodeChainIds !== undefined;
const selectedPreEncodeChainIds = hasExplicitPreChainSelection
? normalizeChainIdList(options?.selectedPreEncodeChainIds || [])
: normalizeChainIdList(planForConfirm?.preEncodeChainIds || encodePlan?.preEncodeChainIds || []);
const confirmedMode = String(planForConfirm?.mode || encodePlan?.mode || 'rip').trim().toLowerCase();
const isPreRipMode = confirmedMode === 'pre_rip' || Boolean(planForConfirm?.preRip);
@@ -4413,6 +4482,13 @@ class PipelineService extends EventEmitter {
id: Number(item.id),
name: item.name
})),
preEncodeScriptIds: selectedPreEncodeScripts.map((item) => Number(item.id)),
preEncodeScripts: selectedPreEncodeScripts.map((item) => ({
id: Number(item.id),
name: item.name
})),
postEncodeChainIds: selectedPostEncodeChainIds,
preEncodeChainIds: selectedPreEncodeChainIds,
reviewConfirmed: true,
reviewConfirmedAt: nowIso()
};
@@ -4434,7 +4510,10 @@ class PipelineService extends EventEmitter {
`Mediainfo-Prüfung bestätigt.${isPreRipMode ? ' Backup/Rip darf gestartet werden.' : ' Encode darf gestartet werden.'}${confirmedPlan.encodeInputTitleId ? ` Gewählter Titel #${confirmedPlan.encodeInputTitleId}.` : ''}`
+ ` Audio-Spuren: ${trackSelectionResult.audioTrackIds.length > 0 ? trackSelectionResult.audioTrackIds.join(',') : 'none'}.`
+ ` Subtitle-Spuren: ${trackSelectionResult.subtitleTrackIds.length > 0 ? trackSelectionResult.subtitleTrackIds.join(',') : 'none'}.`
+ ` Pre-Encode-Scripte: ${selectedPreEncodeScripts.length > 0 ? selectedPreEncodeScripts.map((item) => item.name).join(' -> ') : 'none'}.`
+ ` Pre-Encode-Ketten: ${selectedPreEncodeChainIds.length > 0 ? selectedPreEncodeChainIds.join(',') : 'none'}.`
+ ` Post-Encode-Scripte: ${selectedPostEncodeScripts.length > 0 ? selectedPostEncodeScripts.map((item) => item.name).join(' -> ') : 'none'}.`
+ ` Post-Encode-Ketten: ${selectedPostEncodeChainIds.length > 0 ? selectedPostEncodeChainIds.join(',') : 'none'}.`
);
if (!skipPipelineStateUpdate) {
@@ -4817,9 +4896,138 @@ class PipelineService extends EventEmitter {
return enrichedReview;
}
async runEncodeChains(jobId, chainIds, context = {}, phase = 'post') {
const ids = Array.isArray(chainIds) ? chainIds.map(Number).filter((id) => Number.isFinite(id) && id > 0) : [];
if (ids.length === 0) {
return { configured: 0, succeeded: 0, failed: 0, results: [] };
}
const results = [];
let succeeded = 0;
let failed = 0;
for (const chainId of ids) {
await historyService.appendLog(jobId, 'SYSTEM', `${phase === 'pre' ? 'Pre' : 'Post'}-Encode Kette startet (ID ${chainId})...`);
try {
const chainResult = await scriptChainService.executeChain(chainId, {
...context,
source: phase === 'pre' ? 'pre_encode_chain' : 'post_encode_chain'
}, {
appendLog: (src, msg) => historyService.appendLog(jobId, src, msg)
});
if (chainResult.aborted || chainResult.failed > 0) {
failed += 1;
await historyService.appendLog(jobId, 'ERROR', `${phase === 'pre' ? 'Pre' : 'Post'}-Encode Kette "${chainResult.chainName}" fehlgeschlagen.`);
} else {
succeeded += 1;
await historyService.appendLog(jobId, 'SYSTEM', `${phase === 'pre' ? 'Pre' : 'Post'}-Encode Kette "${chainResult.chainName}" erfolgreich.`);
}
results.push({ chainId, ...chainResult });
} catch (error) {
failed += 1;
results.push({ chainId, success: false, error: error.message });
await historyService.appendLog(jobId, 'ERROR', `${phase === 'pre' ? 'Pre' : 'Post'}-Encode Kette ${chainId} Fehler: ${error.message}`);
logger.warn(`encode:${phase}-chain:failed`, { jobId, chainId, error: errorToMeta(error) });
}
}
return { configured: ids.length, succeeded, failed, results };
}
async runPreEncodeScripts(jobId, encodePlan, context = {}) {
const scriptIds = normalizeScriptIdList(encodePlan?.preEncodeScriptIds || []);
const chainIds = Array.isArray(encodePlan?.preEncodeChainIds) ? encodePlan.preEncodeChainIds : [];
if (scriptIds.length === 0 && chainIds.length === 0) {
return { configured: 0, attempted: 0, succeeded: 0, failed: 0, skipped: 0, results: [] };
}
const scripts = await scriptService.resolveScriptsByIds(scriptIds, { strict: false });
const scriptById = new Map(scripts.map((item) => [Number(item.id), item]));
const results = [];
let succeeded = 0;
let failed = 0;
let skipped = 0;
let aborted = false;
for (let index = 0; index < scriptIds.length; index += 1) {
const scriptId = scriptIds[index];
const script = scriptById.get(Number(scriptId));
if (!script) {
failed += 1;
aborted = true;
results.push({ scriptId, scriptName: null, status: 'ERROR', error: 'missing' });
await historyService.appendLog(jobId, 'SYSTEM', `Pre-Encode Skript #${scriptId} nicht gefunden. Kette abgebrochen.`);
break;
}
await historyService.appendLog(jobId, 'SYSTEM', `Pre-Encode Skript startet (${index + 1}/${scriptIds.length}): ${script.name}`);
let prepared = null;
try {
prepared = await scriptService.createExecutableScriptFile(script, {
source: 'pre_encode',
mode: context?.mode || null,
jobId,
jobTitle: context?.jobTitle || null,
inputPath: context?.inputPath || null,
outputPath: context?.outputPath || null,
rawPath: context?.rawPath || null
});
const runInfo = await this.runCommand({
jobId,
stage: 'ENCODING',
source: 'PRE_ENCODE_SCRIPT',
cmd: prepared.cmd,
args: prepared.args,
argsForLog: prepared.argsForLog
});
succeeded += 1;
results.push({ scriptId: script.id, scriptName: script.name, status: 'SUCCESS', runInfo });
await historyService.appendLog(jobId, 'SYSTEM', `Pre-Encode Skript erfolgreich: ${script.name}`);
} catch (error) {
failed += 1;
aborted = true;
results.push({ scriptId: script.id, scriptName: script.name, status: 'ERROR', error: error?.message || 'unknown' });
await historyService.appendLog(jobId, 'SYSTEM', `Pre-Encode Skript fehlgeschlagen: ${script.name} (${error?.message || 'unknown'})`);
logger.warn('encode:pre-script:failed', { jobId, scriptId: script.id, error: errorToMeta(error) });
break;
} finally {
if (prepared?.cleanup) {
await prepared.cleanup();
}
}
}
if (!aborted && chainIds.length > 0) {
const chainResult = await this.runEncodeChains(jobId, chainIds, context, 'pre');
if (chainResult.failed > 0) {
aborted = true;
failed += chainResult.failed;
}
succeeded += chainResult.succeeded;
results.push(...chainResult.results);
}
if (aborted) {
const pendingScripts = scriptIds.slice(results.filter((r) => r.scriptId != null).length);
for (const pendingId of pendingScripts) {
const s = scriptById.get(Number(pendingId));
skipped += 1;
results.push({ scriptId: Number(pendingId), scriptName: s?.name || null, status: 'SKIPPED_ABORTED' });
}
throw Object.assign(new Error('Pre-Encode Skripte fehlgeschlagen - Encode wird nicht gestartet.'), { statusCode: 500, preEncodeFailed: true });
}
return {
configured: scriptIds.length + chainIds.length,
attempted: scriptIds.length - skipped + chainIds.length,
succeeded,
failed,
skipped,
aborted,
results
};
}
async runPostEncodeScripts(jobId, encodePlan, context = {}) {
const scriptIds = normalizeScriptIdList(encodePlan?.postEncodeScriptIds || []);
if (scriptIds.length === 0) {
const chainIds = Array.isArray(encodePlan?.postEncodeChainIds) ? encodePlan.postEncodeChainIds : [];
if (scriptIds.length === 0 && chainIds.length === 0) {
return {
configured: 0,
attempted: 0,
@@ -4957,9 +5165,24 @@ class PipelineService extends EventEmitter {
});
}
if (!aborted && chainIds.length > 0) {
const chainResult = await this.runEncodeChains(jobId, chainIds, context, 'post');
if (chainResult.failed > 0) {
aborted = true;
failed += chainResult.failed;
abortReason = `Post-Encode Kette fehlgeschlagen`;
void this.notifyPushover('job_error', {
title: 'Ripster - Post-Encode Kettenfehler',
message: `${context?.jobTitle || `Job #${jobId}`}: Eine Post-Encode Kette ist fehlgeschlagen.`
});
}
succeeded += chainResult.succeeded;
results.push(...chainResult.results);
}
return {
configured: scriptIds.length,
attempted: scriptIds.length - skipped,
configured: scriptIds.length + chainIds.length,
attempted: scriptIds.length - skipped + chainIds.length,
succeeded,
failed,
skipped,
@@ -5060,6 +5283,29 @@ class PipelineService extends EventEmitter {
});
}
const preEncodeContext = {
mode,
jobId,
jobTitle: job.title || job.detected_title || null,
inputPath,
rawPath: job.raw_path || null
};
const preScriptIds = normalizeScriptIdList(encodePlan?.preEncodeScriptIds || []);
const preChainIds = Array.isArray(encodePlan?.preEncodeChainIds) ? encodePlan.preEncodeChainIds : [];
if (preScriptIds.length > 0 || preChainIds.length > 0) {
await historyService.appendLog(jobId, 'SYSTEM', 'Pre-Encode Skripte/Ketten werden ausgeführt...');
try {
await this.runPreEncodeScripts(jobId, encodePlan, preEncodeContext);
} catch (preError) {
if (preError.preEncodeFailed) {
await this.failJob(jobId, 'ENCODING', preError);
throw preError;
}
throw preError;
}
await historyService.appendLog(jobId, 'SYSTEM', 'Pre-Encode Skripte/Ketten abgeschlossen.');
}
try {
const trackSelection = extractHandBrakeTrackSelectionFromPlan(encodePlan, inputPath);
let handBrakeTitleId = null;
@@ -5524,11 +5770,8 @@ class PipelineService extends EventEmitter {
async retry(jobId, options = {}) {
const immediate = Boolean(options?.immediate);
if (!immediate) {
return this.enqueueOrStartAction(
QUEUE_ACTIONS.RETRY,
jobId,
() => this.retry(jobId, { ...options, immediate: true })
);
// Retry always starts a rip → bypass the encode queue entirely.
return this.retry(jobId, { ...options, immediate: true });
}
this.ensureNotBusy('retry', jobId);
@@ -6000,19 +6243,13 @@ class PipelineService extends EventEmitter {
runInfo.lastProgress = progress.percent;
runInfo.eta = progress.eta || runInfo.eta;
const statusText = composeStatusText(stage, progress.percent, runInfo.lastDetail);
void this.updateProgress(stage, progress.percent, progress.eta, statusText);
void this.updateProgress(stage, progress.percent, progress.eta, statusText, normalizedJobId);
} else if (detail) {
const statusText = composeStatusText(
stage,
Number(this.snapshot.progress || 0),
runInfo.lastDetail
);
void this.updateProgress(
stage,
Number(this.snapshot.progress || 0),
this.snapshot.eta,
statusText
);
const jobEntry = this.jobProgress.get(Number(normalizedJobId));
const currentProgress = jobEntry?.progress ?? Number(this.snapshot.progress || 0);
const currentEta = jobEntry?.eta ?? this.snapshot.eta;
const statusText = composeStatusText(stage, currentProgress, runInfo.lastDetail);
void this.updateProgress(stage, currentProgress, currentEta, statusText, normalizedJobId);
}
}
};