This commit is contained in:
2026-03-06 07:07:47 +00:00
parent afca677b1c
commit 3abb53fb8e
5 changed files with 693 additions and 110 deletions

View File

@@ -195,9 +195,37 @@ router.get(
router.post(
'/queue/reorder',
asyncHandler(async (req, res) => {
const orderedJobIds = Array.isArray(req.body?.orderedJobIds) ? req.body.orderedJobIds : [];
logger.info('post:queue:reorder', { reqId: req.reqId, orderedJobIds });
const queue = await pipelineService.reorderQueue(orderedJobIds);
// Accept orderedEntryIds (new) or orderedJobIds (legacy fallback for job-only queues).
const orderedEntryIds = Array.isArray(req.body?.orderedEntryIds)
? req.body.orderedEntryIds
: (Array.isArray(req.body?.orderedJobIds) ? req.body.orderedJobIds : []);
logger.info('post:queue:reorder', { reqId: req.reqId, orderedEntryIds });
const queue = await pipelineService.reorderQueue(orderedEntryIds);
res.json({ queue });
})
);
router.post(
'/queue/entry',
asyncHandler(async (req, res) => {
const { type, scriptId, chainId, waitSeconds, insertAfterEntryId } = req.body || {};
logger.info('post:queue:entry', { reqId: req.reqId, type });
const result = await pipelineService.enqueueNonJobEntry(
type,
{ scriptId, chainId, waitSeconds },
insertAfterEntryId ?? null
);
const queue = await pipelineService.getQueueSnapshot();
res.json({ result, queue });
})
);
router.delete(
'/queue/entry/:entryId',
asyncHandler(async (req, res) => {
const entryId = req.params.entryId;
logger.info('delete:queue:entry', { reqId: req.reqId, entryId });
const queue = await pipelineService.removeQueueEntry(entryId);
res.json({ queue });
})
);

View File

@@ -2181,7 +2181,10 @@ class PipelineService extends EventEmitter {
const maxParallelJobs = await this.getMaxParallelJobs();
const runningJobs = await historyService.getRunningJobs();
const runningEncodeCount = runningJobs.filter((job) => job.status === 'ENCODING').length;
const queuedJobIds = this.queueEntries.map((entry) => Number(entry.jobId)).filter((id) => Number.isFinite(id) && id > 0);
const queuedJobIds = this.queueEntries
.filter((entry) => !entry.type || entry.type === 'job')
.map((entry) => Number(entry.jobId))
.filter((id) => Number.isFinite(id) && id > 0);
const queuedRows = queuedJobIds.length > 0
? await historyService.getJobsByIds(queuedJobIds)
: [];
@@ -2197,16 +2200,51 @@ class PipelineService extends EventEmitter {
lastState: job.last_state || null
})),
queuedJobs: this.queueEntries.map((entry, index) => {
const row = queuedById.get(Number(entry.jobId));
return {
const entryType = entry.type || 'job';
const base = {
entryId: entry.id,
position: index + 1,
type: entryType,
enqueuedAt: entry.enqueuedAt
};
if (entryType === 'script') {
return { ...base, scriptId: entry.scriptId, title: entry.scriptName || `Skript #${entry.scriptId}`, status: 'QUEUED' };
}
if (entryType === 'chain') {
return { ...base, chainId: entry.chainId, title: entry.chainName || `Kette #${entry.chainId}`, status: 'QUEUED' };
}
if (entryType === 'wait') {
return { ...base, waitSeconds: entry.waitSeconds, title: `Warten ${entry.waitSeconds}s`, status: 'QUEUED' };
}
// type === 'job'
const row = queuedById.get(Number(entry.jobId));
let hasScripts = false;
let hasChains = false;
if (row?.encode_plan_json) {
try {
const plan = JSON.parse(row.encode_plan_json);
hasScripts = Boolean(
(Array.isArray(plan?.preEncodeScriptIds) && plan.preEncodeScriptIds.length > 0)
|| (Array.isArray(plan?.postEncodeScriptIds) && plan.postEncodeScriptIds.length > 0)
);
hasChains = Boolean(
(Array.isArray(plan?.preEncodeChainIds) && plan.preEncodeChainIds.length > 0)
|| (Array.isArray(plan?.postEncodeChainIds) && plan.postEncodeChainIds.length > 0)
);
} catch (_) { /* ignore */ }
}
return {
...base,
jobId: Number(entry.jobId),
action: entry.action,
actionLabel: QUEUE_ACTION_LABELS[entry.action] || entry.action,
enqueuedAt: entry.enqueuedAt,
title: row?.title || row?.detected_title || `Job #${entry.jobId}`,
status: row?.status || null,
lastState: row?.last_state || null
lastState: row?.last_state || null,
hasScripts,
hasChains
};
}),
queuedCount: this.queueEntries.length,
@@ -2225,28 +2263,101 @@ class PipelineService extends EventEmitter {
}
}
async reorderQueue(orderedJobIds = []) {
const incoming = Array.isArray(orderedJobIds)
? orderedJobIds
.map((value) => this.normalizeQueueJobId(value))
.filter((value) => value !== null)
async reorderQueue(orderedEntryIds = []) {
const incoming = Array.isArray(orderedEntryIds)
? orderedEntryIds.map((value) => Number(value)).filter((v) => Number.isFinite(v) && v > 0)
: [];
const currentIds = this.queueEntries.map((entry) => Number(entry.jobId));
if (incoming.length !== currentIds.length) {
if (incoming.length !== this.queueEntries.length) {
const error = new Error('Queue-Reihenfolge ungültig: Anzahl passt nicht.');
error.statusCode = 400;
throw error;
}
const incomingSet = new Set(incoming.map((id) => String(id)));
if (incomingSet.size !== incoming.length || currentIds.some((id) => !incomingSet.has(String(id)))) {
const currentIdSet = new Set(this.queueEntries.map((entry) => entry.id));
const incomingSet = new Set(incoming);
if (incomingSet.size !== incoming.length || incoming.some((id) => !currentIdSet.has(id))) {
const error = new Error('Queue-Reihenfolge ungültig: IDs passen nicht zur aktuellen Queue.');
error.statusCode = 400;
throw error;
}
const byId = new Map(this.queueEntries.map((entry) => [Number(entry.jobId), entry]));
this.queueEntries = incoming.map((id) => byId.get(Number(id))).filter(Boolean);
const byEntryId = new Map(this.queueEntries.map((entry) => [entry.id, entry]));
this.queueEntries = incoming.map((id) => byEntryId.get(id)).filter(Boolean);
await this.emitQueueChanged();
return this.lastQueueSnapshot;
}
async enqueueNonJobEntry(type, params = {}, insertAfterEntryId = null) {
const validTypes = new Set(['script', 'chain', 'wait']);
if (!validTypes.has(type)) {
const error = new Error(`Unbekannter Queue-Eintragstyp: ${type}`);
error.statusCode = 400;
throw error;
}
let entry;
if (type === 'script') {
const scriptId = Number(params.scriptId);
if (!Number.isFinite(scriptId) || scriptId <= 0) {
const error = new Error('scriptId fehlt oder ist ungültig.');
error.statusCode = 400;
throw error;
}
const scriptService = require('./scriptService');
let script;
try { script = await scriptService.getScriptById(scriptId); } catch (_) { /* ignore */ }
entry = { id: this.queueEntrySeq++, type: 'script', scriptId, scriptName: script?.name || null, enqueuedAt: nowIso() };
} else if (type === 'chain') {
const chainId = Number(params.chainId);
if (!Number.isFinite(chainId) || chainId <= 0) {
const error = new Error('chainId fehlt oder ist ungültig.');
error.statusCode = 400;
throw error;
}
const scriptChainService = require('./scriptChainService');
let chain;
try { chain = await scriptChainService.getChainById(chainId); } catch (_) { /* ignore */ }
entry = { id: this.queueEntrySeq++, type: 'chain', chainId, chainName: chain?.name || null, enqueuedAt: nowIso() };
} else {
const waitSeconds = Math.round(Number(params.waitSeconds));
if (!Number.isFinite(waitSeconds) || waitSeconds < 1 || waitSeconds > 3600) {
const error = new Error('waitSeconds muss zwischen 1 und 3600 liegen.');
error.statusCode = 400;
throw error;
}
entry = { id: this.queueEntrySeq++, type: 'wait', waitSeconds, enqueuedAt: nowIso() };
}
if (insertAfterEntryId != null) {
const idx = this.queueEntries.findIndex((e) => e.id === Number(insertAfterEntryId));
if (idx >= 0) {
this.queueEntries.splice(idx + 1, 0, entry);
} else {
this.queueEntries.push(entry);
}
} else {
this.queueEntries.push(entry);
}
await this.emitQueueChanged();
void this.pumpQueue();
return { entryId: entry.id, type, position: this.queueEntries.indexOf(entry) + 1 };
}
async removeQueueEntry(entryId) {
const normalizedId = Number(entryId);
if (!Number.isFinite(normalizedId) || normalizedId <= 0) {
const error = new Error('Ungültige entryId.');
error.statusCode = 400;
throw error;
}
const idx = this.queueEntries.findIndex((e) => e.id === normalizedId);
if (idx < 0) {
const error = new Error(`Queue-Eintrag #${normalizedId} nicht gefunden.`);
error.statusCode = 404;
throw error;
}
this.queueEntries.splice(idx, 1);
await this.emitQueueChanged();
return this.lastQueueSnapshot;
}
@@ -2315,6 +2426,56 @@ class PipelineService extends EventEmitter {
};
}
async dispatchNonJobEntry(entry) {
const type = entry?.type;
logger.info('queue:non-job:dispatch', { type, entryId: entry?.id });
if (type === 'wait') {
const seconds = Math.max(1, Number(entry.waitSeconds || 1));
logger.info('queue:wait:start', { seconds });
await new Promise((resolve) => setTimeout(resolve, seconds * 1000));
logger.info('queue:wait:done', { seconds });
return;
}
if (type === 'script') {
const scriptService = require('./scriptService');
let script;
try { script = await scriptService.getScriptById(entry.scriptId); } catch (_) { /* ignore */ }
if (!script) {
logger.warn('queue:script:not-found', { scriptId: entry.scriptId });
return;
}
let prepared = null;
try {
prepared = await scriptService.createExecutableScriptFile(script, { source: 'queue', scriptId: script.id, scriptName: script.name });
const { spawn } = require('child_process');
await new Promise((resolve, reject) => {
const child = spawn(prepared.cmd, prepared.args, { env: process.env, stdio: 'ignore' });
child.on('error', reject);
child.on('close', (code) => {
logger.info('queue:script:done', { scriptId: script.id, exitCode: code });
resolve();
});
});
} catch (err) {
logger.error('queue:script:error', { scriptId: entry.scriptId, error: errorToMeta(err) });
} finally {
if (prepared?.cleanup) await prepared.cleanup();
}
return;
}
if (type === 'chain') {
const scriptChainService = require('./scriptChainService');
try {
await scriptChainService.executeChain(entry.chainId, { source: 'queue' });
} catch (err) {
logger.error('queue:chain:error', { chainId: entry.chainId, error: errorToMeta(err) });
}
}
}
async dispatchQueuedEntry(entry) {
const action = entry?.action;
const jobId = Number(entry?.jobId);
@@ -2352,10 +2513,16 @@ class PipelineService extends EventEmitter {
this.queuePumpRunning = true;
try {
while (this.queueEntries.length > 0) {
const maxParallelJobs = await this.getMaxParallelJobs();
const runningEncodeJobs = await historyService.getRunningEncodeJobs();
if (runningEncodeJobs.length >= maxParallelJobs) {
break;
const firstEntry = this.queueEntries[0];
const isNonJob = firstEntry?.type && firstEntry.type !== 'job';
if (!isNonJob) {
// Job entries: respect the parallel encode limit.
const maxParallelJobs = await this.getMaxParallelJobs();
const runningEncodeJobs = await historyService.getRunningEncodeJobs();
if (runningEncodeJobs.length >= maxParallelJobs) {
break;
}
}
const entry = this.queueEntries.shift();
@@ -2365,6 +2532,10 @@ class PipelineService extends EventEmitter {
await this.emitQueueChanged();
try {
if (isNonJob) {
await this.dispatchNonJobEntry(entry);
continue;
}
await historyService.appendLog(
entry.jobId,
'SYSTEM',
@@ -2378,15 +2549,18 @@ class PipelineService extends EventEmitter {
break;
}
logger.error('queue:entry:failed', {
type: entry.type || 'job',
action: entry.action,
jobId: entry.jobId,
error: errorToMeta(error)
});
await historyService.appendLog(
entry.jobId,
'SYSTEM',
`Queue-Start fehlgeschlagen (${QUEUE_ACTION_LABELS[entry.action] || entry.action}): ${error.message}`
);
if (entry.jobId) {
await historyService.appendLog(
entry.jobId,
'SYSTEM',
`Queue-Start fehlgeschlagen (${QUEUE_ACTION_LABELS[entry.action] || entry.action}): ${error.message}`
);
}
}
}
} finally {
@@ -4004,6 +4178,20 @@ class PipelineService extends EventEmitter {
const posterValue = poster === undefined
? (job.poster_url || null)
: (poster || null);
// Fetch full OMDb details when selecting from OMDb with a valid IMDb ID.
let omdbJsonValue = job.omdb_json || null;
if (fromOmdb && effectiveImdbId) {
try {
const omdbFull = await omdbService.fetchByImdbId(effectiveImdbId);
if (omdbFull?.raw) {
omdbJsonValue = JSON.stringify(omdbFull.raw);
}
} catch (omdbErr) {
logger.warn('metadata:omdb-fetch-failed', { jobId, imdbId: effectiveImdbId, error: errorToMeta(omdbErr) });
}
}
const selectedMetadata = {
title: effectiveTitle,
year: effectiveYear,
@@ -4059,6 +4247,7 @@ class PipelineService extends EventEmitter {
imdb_id: effectiveImdbId,
poster_url: posterValue,
selected_from_omdb: selectedFromOmdb,
omdb_json: omdbJsonValue,
status: nextStatus,
last_state: nextStatus,
raw_path: updatedRawPath,
@@ -5541,6 +5730,17 @@ class PipelineService extends EventEmitter {
const preRipPostEncodeScriptIds = hasPreRipConfirmedSelection
? normalizeScriptIdList(preRipPlanBeforeRip?.postEncodeScriptIds || [])
: [];
const preRipPreEncodeScriptIds = hasPreRipConfirmedSelection
? normalizeScriptIdList(preRipPlanBeforeRip?.preEncodeScriptIds || [])
: [];
const preRipPostEncodeChainIds = hasPreRipConfirmedSelection
? (Array.isArray(preRipPlanBeforeRip?.postEncodeChainIds) ? preRipPlanBeforeRip.postEncodeChainIds : [])
.map(Number).filter((id) => Number.isFinite(id) && id > 0)
: [];
const preRipPreEncodeChainIds = hasPreRipConfirmedSelection
? (Array.isArray(preRipPlanBeforeRip?.preEncodeChainIds) ? preRipPlanBeforeRip.preEncodeChainIds : [])
.map(Number).filter((id) => Number.isFinite(id) && id > 0)
: [];
const playlistDecision = this.resolvePlaylistDecisionForJob(jobId, job);
const selectedTitleId = playlistDecision.selectedTitleId;
const selectedPlaylist = playlistDecision.selectedPlaylist;
@@ -5726,7 +5926,10 @@ class PipelineService extends EventEmitter {
await this.confirmEncodeReview(jobId, {
selectedEncodeTitleId: review?.encodeInputTitleId || null,
selectedTrackSelection: preRipTrackSelectionPayload || null,
selectedPostEncodeScriptIds: preRipPostEncodeScriptIds
selectedPostEncodeScriptIds: preRipPostEncodeScriptIds,
selectedPreEncodeScriptIds: preRipPreEncodeScriptIds,
selectedPostEncodeChainIds: preRipPostEncodeChainIds,
selectedPreEncodeChainIds: preRipPreEncodeChainIds
});
const autoStartResult = await this.startPreparedJob(jobId);
logger.info('rip:auto-encode-started', {