diff --git a/backend/src/routes/settingsRoutes.js b/backend/src/routes/settingsRoutes.js index 9231aba..be5a48c 100644 --- a/backend/src/routes/settingsRoutes.js +++ b/backend/src/routes/settingsRoutes.js @@ -2,6 +2,7 @@ const express = require('express'); const asyncHandler = require('../middleware/asyncHandler'); const settingsService = require('../services/settingsService'); const scriptService = require('../services/scriptService'); +const scriptChainService = require('../services/scriptChainService'); const notificationService = require('../services/notificationService'); const pipelineService = require('../services/pipelineService'); const wsService = require('../services/websocketService'); @@ -104,6 +105,59 @@ router.post( }) ); +router.get( + '/script-chains', + asyncHandler(async (req, res) => { + logger.debug('get:settings:script-chains', { reqId: req.reqId }); + const chains = await scriptChainService.listChains(); + res.json({ chains }); + }) +); + +router.post( + '/script-chains', + asyncHandler(async (req, res) => { + const payload = req.body || {}; + logger.info('post:settings:script-chains:create', { reqId: req.reqId, name: payload?.name }); + const chain = await scriptChainService.createChain(payload); + wsService.broadcast('SETTINGS_SCRIPT_CHAINS_UPDATED', { action: 'created', id: chain.id }); + res.status(201).json({ chain }); + }) +); + +router.get( + '/script-chains/:id', + asyncHandler(async (req, res) => { + const chainId = Number(req.params.id); + logger.debug('get:settings:script-chains:one', { reqId: req.reqId, chainId }); + const chain = await scriptChainService.getChainById(chainId); + res.json({ chain }); + }) +); + +router.put( + '/script-chains/:id', + asyncHandler(async (req, res) => { + const chainId = Number(req.params.id); + const payload = req.body || {}; + logger.info('put:settings:script-chains:update', { reqId: req.reqId, chainId, name: payload?.name }); + const chain = await scriptChainService.updateChain(chainId, payload); + wsService.broadcast('SETTINGS_SCRIPT_CHAINS_UPDATED', { action: 'updated', id: chain.id }); + res.json({ chain }); + }) +); + +router.delete( + '/script-chains/:id', + asyncHandler(async (req, res) => { + const chainId = Number(req.params.id); + logger.info('delete:settings:script-chains', { reqId: req.reqId, chainId }); + const removed = await scriptChainService.deleteChain(chainId); + wsService.broadcast('SETTINGS_SCRIPT_CHAINS_UPDATED', { action: 'deleted', id: removed.id }); + res.json({ removed }); + }) +); + router.put( '/:key', asyncHandler(async (req, res) => { diff --git a/backend/src/services/historyService.js b/backend/src/services/historyService.js index 880d91a..a793e91 100644 --- a/backend/src/services/historyService.js +++ b/backend/src/services/historyService.js @@ -607,6 +607,22 @@ class HistoryService { })); } + async getRunningEncodeJobs() { + const db = await getDb(); + const rows = await db.all( + ` + SELECT * + FROM jobs + WHERE status = 'ENCODING' + ORDER BY updated_at ASC, id ASC + ` + ); + return rows.map((job) => ({ + ...enrichJobRow(job), + log_count: hasProcessLogFile(job.id) ? 1 : 0 + })); + } + async getJobWithLogs(jobId, options = {}) { const db = await getDb(); const job = await db.get('SELECT * FROM jobs WHERE id = ?', [jobId]); diff --git a/backend/src/services/pipelineService.js b/backend/src/services/pipelineService.js index 596f1e1..f03134b 100644 --- a/backend/src/services/pipelineService.js +++ b/backend/src/services/pipelineService.js @@ -6,6 +6,7 @@ const settingsService = require('./settingsService'); const historyService = require('./historyService'); const omdbService = require('./omdbService'); const scriptService = require('./scriptService'); +const scriptChainService = require('./scriptChainService'); const wsService = require('./websocketService'); const diskDetectionService = require('./diskDetectionService'); const notificationService = require('./notificationService'); @@ -2045,6 +2046,7 @@ class PipelineService extends EventEmitter { this.activeProcess = null; this.activeProcesses = new Map(); this.cancelRequestedByJob = new Set(); + this.jobProgress = new Map(); this.lastPersistAt = 0; this.lastProgressKey = null; this.queueEntries = []; @@ -2126,8 +2128,13 @@ class PipelineService extends EventEmitter { } getSnapshot() { + const jobProgress = {}; + for (const [id, data] of this.jobProgress) { + jobProgress[id] = data; + } return { ...this.snapshot, + jobProgress, queue: this.lastQueueSnapshot }; } @@ -2173,6 +2180,7 @@ class PipelineService extends EventEmitter { async getQueueSnapshot() { const maxParallelJobs = await this.getMaxParallelJobs(); const runningJobs = await historyService.getRunningJobs(); + const runningEncodeCount = runningJobs.filter((job) => job.status === 'ENCODING').length; const queuedJobIds = this.queueEntries.map((entry) => Number(entry.jobId)).filter((id) => Number.isFinite(id) && id > 0); const queuedRows = queuedJobIds.length > 0 ? await historyService.getJobsByIds(queuedJobIds) @@ -2181,7 +2189,7 @@ class PipelineService extends EventEmitter { const queue = { maxParallelJobs, - runningCount: runningJobs.length, + runningCount: runningEncodeCount, runningJobs: runningJobs.map((job) => ({ jobId: Number(job.id), title: job.title || job.detected_title || `Job #${job.id}`, @@ -2272,8 +2280,8 @@ class PipelineService extends EventEmitter { } const maxParallelJobs = await this.getMaxParallelJobs(); - const runningJobs = await historyService.getRunningJobs(); - const shouldQueue = this.queueEntries.length > 0 || runningJobs.length >= maxParallelJobs; + const runningEncodeJobs = await historyService.getRunningEncodeJobs(); + const shouldQueue = this.queueEntries.length > 0 || runningEncodeJobs.length >= maxParallelJobs; if (!shouldQueue) { const result = await startNow(); await this.emitQueueChanged(); @@ -2345,8 +2353,8 @@ class PipelineService extends EventEmitter { try { while (this.queueEntries.length > 0) { const maxParallelJobs = await this.getMaxParallelJobs(); - const runningJobs = await historyService.getRunningJobs(); - if (runningJobs.length >= maxParallelJobs) { + const runningEncodeJobs = await historyService.getRunningEncodeJobs(); + if (runningEncodeJobs.length >= maxParallelJobs) { break; } @@ -2473,6 +2481,7 @@ class PipelineService extends EventEmitter { async setState(state, patch = {}) { const previous = this.snapshot.state; + const previousActiveJobId = this.snapshot.activeJobId; this.snapshot = { ...this.snapshot, state, @@ -2482,6 +2491,20 @@ class PipelineService extends EventEmitter { statusText: patch.statusText !== undefined ? patch.statusText : this.snapshot.statusText, context: patch.context !== undefined ? patch.context : this.snapshot.context }; + + // Keep per-job progress map in sync when a job starts or finishes. + if (patch.activeJobId != null) { + this.jobProgress.set(Number(patch.activeJobId), { + state, + progress: patch.progress ?? 0, + eta: patch.eta ?? null, + statusText: patch.statusText ?? null + }); + } else if (patch.activeJobId === null && previousActiveJobId != null) { + // Job slot cleared – remove the finished job's live entry so it falls + // back to DB data in the frontend. + this.jobProgress.delete(Number(previousActiveJobId)); + } logger.info('state:changed', { from: previous, to: state, @@ -2531,34 +2554,53 @@ class PipelineService extends EventEmitter { ); } - async updateProgress(stage, percent, eta, statusText) { - this.snapshot = { - ...this.snapshot, - state: stage, - progress: percent ?? this.snapshot.progress, - eta: eta ?? this.snapshot.eta, - statusText: statusText ?? this.snapshot.statusText - }; + async updateProgress(stage, percent, eta, statusText, jobIdOverride = null) { + const effectiveJobId = jobIdOverride != null ? Number(jobIdOverride) : this.snapshot.activeJobId; + const effectiveProgress = percent ?? this.snapshot.progress; + const effectiveEta = eta ?? this.snapshot.eta; + const effectiveStatusText = statusText ?? this.snapshot.statusText; - await this.persistSnapshot(false); - const rounded = Number((this.snapshot.progress || 0).toFixed(2)); - const key = `${stage}:${rounded}`; + // Update per-job progress so concurrent jobs don't overwrite each other. + if (effectiveJobId != null) { + this.jobProgress.set(effectiveJobId, { + state: stage, + progress: effectiveProgress, + eta: effectiveEta, + statusText: effectiveStatusText + }); + } + + // Only update the global snapshot fields when this update belongs to the + // currently active job (avoids the snapshot jumping between parallel jobs). + if (effectiveJobId === this.snapshot.activeJobId || effectiveJobId == null) { + this.snapshot = { + ...this.snapshot, + state: stage, + progress: effectiveProgress, + eta: effectiveEta, + statusText: effectiveStatusText + }; + await this.persistSnapshot(false); + } + + const rounded = Number((effectiveProgress || 0).toFixed(2)); + const key = `${effectiveJobId}:${stage}:${rounded}`; if (key !== this.lastProgressKey) { this.lastProgressKey = key; logger.debug('progress:update', { stage, - activeJobId: this.snapshot.activeJobId, + activeJobId: effectiveJobId, progress: rounded, - eta: this.snapshot.eta, - statusText: this.snapshot.statusText + eta: effectiveEta, + statusText: effectiveStatusText }); } wsService.broadcast('PIPELINE_PROGRESS', { state: stage, - activeJobId: this.snapshot.activeJobId, - progress: this.snapshot.progress, - eta: this.snapshot.eta, - statusText: this.snapshot.statusText + activeJobId: effectiveJobId, + progress: effectiveProgress, + eta: effectiveEta, + statusText: effectiveStatusText }); } @@ -4156,6 +4198,14 @@ class PipelineService extends EventEmitter { const isReadyToEncode = preloadedJob.status === 'READY_TO_ENCODE' || preloadedJob.last_state === 'READY_TO_ENCODE'; if (isReadyToEncode) { + // Check whether this confirmed job will rip first (pre_rip mode) or encode directly. + // Pre-rip jobs bypass the encode queue because the next step is a rip, not an encode. + const jobEncodePlan = this.safeParseJson(preloadedJob.encode_plan_json); + const jobMode = String(jobEncodePlan?.mode || '').trim().toLowerCase(); + const willRipFirst = jobMode === 'pre_rip' || Boolean(jobEncodePlan?.preRip); + if (willRipFirst) { + return this.startPreparedJob(jobId, { ...options, immediate: true }); + } return this.enqueueOrStartAction( QUEUE_ACTIONS.START_PREPARED, jobId, @@ -4179,11 +4229,8 @@ class PipelineService extends EventEmitter { } if (!hasUsableRawInput) { - return this.enqueueOrStartAction( - QUEUE_ACTIONS.START_PREPARED, - jobId, - () => this.startPreparedJob(jobId, { ...options, immediate: true }) - ); + // No raw input yet → will rip from disc. Bypass the encode queue entirely. + return this.startPreparedJob(jobId, { ...options, immediate: true }); } return this.startPreparedJob(jobId, { ...options, immediate: true, preloadedJob }); @@ -4397,6 +4444,28 @@ class PipelineService extends EventEmitter { const selectedPostEncodeScripts = await scriptService.resolveScriptsByIds(selectedPostEncodeScriptIds, { strict: true }); + + const normalizeChainIdList = (raw) => { + const list = Array.isArray(raw) ? raw : []; + return list.map(Number).filter((id) => Number.isFinite(id) && id > 0).map(Math.trunc); + }; + + const hasExplicitPreScriptSelection = options?.selectedPreEncodeScriptIds !== undefined; + const selectedPreEncodeScriptIds = hasExplicitPreScriptSelection + ? normalizeScriptIdList(options?.selectedPreEncodeScriptIds || []) + : normalizeScriptIdList(planForConfirm?.preEncodeScriptIds || encodePlan?.preEncodeScriptIds || []); + const selectedPreEncodeScripts = await scriptService.resolveScriptsByIds(selectedPreEncodeScriptIds, { strict: true }); + + const hasExplicitPostChainSelection = options?.selectedPostEncodeChainIds !== undefined; + const selectedPostEncodeChainIds = hasExplicitPostChainSelection + ? normalizeChainIdList(options?.selectedPostEncodeChainIds || []) + : normalizeChainIdList(planForConfirm?.postEncodeChainIds || encodePlan?.postEncodeChainIds || []); + + const hasExplicitPreChainSelection = options?.selectedPreEncodeChainIds !== undefined; + const selectedPreEncodeChainIds = hasExplicitPreChainSelection + ? normalizeChainIdList(options?.selectedPreEncodeChainIds || []) + : normalizeChainIdList(planForConfirm?.preEncodeChainIds || encodePlan?.preEncodeChainIds || []); + const confirmedMode = String(planForConfirm?.mode || encodePlan?.mode || 'rip').trim().toLowerCase(); const isPreRipMode = confirmedMode === 'pre_rip' || Boolean(planForConfirm?.preRip); @@ -4413,6 +4482,13 @@ class PipelineService extends EventEmitter { id: Number(item.id), name: item.name })), + preEncodeScriptIds: selectedPreEncodeScripts.map((item) => Number(item.id)), + preEncodeScripts: selectedPreEncodeScripts.map((item) => ({ + id: Number(item.id), + name: item.name + })), + postEncodeChainIds: selectedPostEncodeChainIds, + preEncodeChainIds: selectedPreEncodeChainIds, reviewConfirmed: true, reviewConfirmedAt: nowIso() }; @@ -4434,7 +4510,10 @@ class PipelineService extends EventEmitter { `Mediainfo-Prüfung bestätigt.${isPreRipMode ? ' Backup/Rip darf gestartet werden.' : ' Encode darf gestartet werden.'}${confirmedPlan.encodeInputTitleId ? ` Gewählter Titel #${confirmedPlan.encodeInputTitleId}.` : ''}` + ` Audio-Spuren: ${trackSelectionResult.audioTrackIds.length > 0 ? trackSelectionResult.audioTrackIds.join(',') : 'none'}.` + ` Subtitle-Spuren: ${trackSelectionResult.subtitleTrackIds.length > 0 ? trackSelectionResult.subtitleTrackIds.join(',') : 'none'}.` + + ` Pre-Encode-Scripte: ${selectedPreEncodeScripts.length > 0 ? selectedPreEncodeScripts.map((item) => item.name).join(' -> ') : 'none'}.` + + ` Pre-Encode-Ketten: ${selectedPreEncodeChainIds.length > 0 ? selectedPreEncodeChainIds.join(',') : 'none'}.` + ` Post-Encode-Scripte: ${selectedPostEncodeScripts.length > 0 ? selectedPostEncodeScripts.map((item) => item.name).join(' -> ') : 'none'}.` + + ` Post-Encode-Ketten: ${selectedPostEncodeChainIds.length > 0 ? selectedPostEncodeChainIds.join(',') : 'none'}.` ); if (!skipPipelineStateUpdate) { @@ -4817,9 +4896,138 @@ class PipelineService extends EventEmitter { return enrichedReview; } + async runEncodeChains(jobId, chainIds, context = {}, phase = 'post') { + const ids = Array.isArray(chainIds) ? chainIds.map(Number).filter((id) => Number.isFinite(id) && id > 0) : []; + if (ids.length === 0) { + return { configured: 0, succeeded: 0, failed: 0, results: [] }; + } + const results = []; + let succeeded = 0; + let failed = 0; + for (const chainId of ids) { + await historyService.appendLog(jobId, 'SYSTEM', `${phase === 'pre' ? 'Pre' : 'Post'}-Encode Kette startet (ID ${chainId})...`); + try { + const chainResult = await scriptChainService.executeChain(chainId, { + ...context, + source: phase === 'pre' ? 'pre_encode_chain' : 'post_encode_chain' + }, { + appendLog: (src, msg) => historyService.appendLog(jobId, src, msg) + }); + if (chainResult.aborted || chainResult.failed > 0) { + failed += 1; + await historyService.appendLog(jobId, 'ERROR', `${phase === 'pre' ? 'Pre' : 'Post'}-Encode Kette "${chainResult.chainName}" fehlgeschlagen.`); + } else { + succeeded += 1; + await historyService.appendLog(jobId, 'SYSTEM', `${phase === 'pre' ? 'Pre' : 'Post'}-Encode Kette "${chainResult.chainName}" erfolgreich.`); + } + results.push({ chainId, ...chainResult }); + } catch (error) { + failed += 1; + results.push({ chainId, success: false, error: error.message }); + await historyService.appendLog(jobId, 'ERROR', `${phase === 'pre' ? 'Pre' : 'Post'}-Encode Kette ${chainId} Fehler: ${error.message}`); + logger.warn(`encode:${phase}-chain:failed`, { jobId, chainId, error: errorToMeta(error) }); + } + } + return { configured: ids.length, succeeded, failed, results }; + } + + async runPreEncodeScripts(jobId, encodePlan, context = {}) { + const scriptIds = normalizeScriptIdList(encodePlan?.preEncodeScriptIds || []); + const chainIds = Array.isArray(encodePlan?.preEncodeChainIds) ? encodePlan.preEncodeChainIds : []; + if (scriptIds.length === 0 && chainIds.length === 0) { + return { configured: 0, attempted: 0, succeeded: 0, failed: 0, skipped: 0, results: [] }; + } + + const scripts = await scriptService.resolveScriptsByIds(scriptIds, { strict: false }); + const scriptById = new Map(scripts.map((item) => [Number(item.id), item])); + const results = []; + let succeeded = 0; + let failed = 0; + let skipped = 0; + let aborted = false; + + for (let index = 0; index < scriptIds.length; index += 1) { + const scriptId = scriptIds[index]; + const script = scriptById.get(Number(scriptId)); + if (!script) { + failed += 1; + aborted = true; + results.push({ scriptId, scriptName: null, status: 'ERROR', error: 'missing' }); + await historyService.appendLog(jobId, 'SYSTEM', `Pre-Encode Skript #${scriptId} nicht gefunden. Kette abgebrochen.`); + break; + } + await historyService.appendLog(jobId, 'SYSTEM', `Pre-Encode Skript startet (${index + 1}/${scriptIds.length}): ${script.name}`); + let prepared = null; + try { + prepared = await scriptService.createExecutableScriptFile(script, { + source: 'pre_encode', + mode: context?.mode || null, + jobId, + jobTitle: context?.jobTitle || null, + inputPath: context?.inputPath || null, + outputPath: context?.outputPath || null, + rawPath: context?.rawPath || null + }); + const runInfo = await this.runCommand({ + jobId, + stage: 'ENCODING', + source: 'PRE_ENCODE_SCRIPT', + cmd: prepared.cmd, + args: prepared.args, + argsForLog: prepared.argsForLog + }); + succeeded += 1; + results.push({ scriptId: script.id, scriptName: script.name, status: 'SUCCESS', runInfo }); + await historyService.appendLog(jobId, 'SYSTEM', `Pre-Encode Skript erfolgreich: ${script.name}`); + } catch (error) { + failed += 1; + aborted = true; + results.push({ scriptId: script.id, scriptName: script.name, status: 'ERROR', error: error?.message || 'unknown' }); + await historyService.appendLog(jobId, 'SYSTEM', `Pre-Encode Skript fehlgeschlagen: ${script.name} (${error?.message || 'unknown'})`); + logger.warn('encode:pre-script:failed', { jobId, scriptId: script.id, error: errorToMeta(error) }); + break; + } finally { + if (prepared?.cleanup) { + await prepared.cleanup(); + } + } + } + + if (!aborted && chainIds.length > 0) { + const chainResult = await this.runEncodeChains(jobId, chainIds, context, 'pre'); + if (chainResult.failed > 0) { + aborted = true; + failed += chainResult.failed; + } + succeeded += chainResult.succeeded; + results.push(...chainResult.results); + } + + if (aborted) { + const pendingScripts = scriptIds.slice(results.filter((r) => r.scriptId != null).length); + for (const pendingId of pendingScripts) { + const s = scriptById.get(Number(pendingId)); + skipped += 1; + results.push({ scriptId: Number(pendingId), scriptName: s?.name || null, status: 'SKIPPED_ABORTED' }); + } + throw Object.assign(new Error('Pre-Encode Skripte fehlgeschlagen - Encode wird nicht gestartet.'), { statusCode: 500, preEncodeFailed: true }); + } + + return { + configured: scriptIds.length + chainIds.length, + attempted: scriptIds.length - skipped + chainIds.length, + succeeded, + failed, + skipped, + aborted, + results + }; + } + async runPostEncodeScripts(jobId, encodePlan, context = {}) { const scriptIds = normalizeScriptIdList(encodePlan?.postEncodeScriptIds || []); - if (scriptIds.length === 0) { + const chainIds = Array.isArray(encodePlan?.postEncodeChainIds) ? encodePlan.postEncodeChainIds : []; + if (scriptIds.length === 0 && chainIds.length === 0) { return { configured: 0, attempted: 0, @@ -4957,9 +5165,24 @@ class PipelineService extends EventEmitter { }); } + if (!aborted && chainIds.length > 0) { + const chainResult = await this.runEncodeChains(jobId, chainIds, context, 'post'); + if (chainResult.failed > 0) { + aborted = true; + failed += chainResult.failed; + abortReason = `Post-Encode Kette fehlgeschlagen`; + void this.notifyPushover('job_error', { + title: 'Ripster - Post-Encode Kettenfehler', + message: `${context?.jobTitle || `Job #${jobId}`}: Eine Post-Encode Kette ist fehlgeschlagen.` + }); + } + succeeded += chainResult.succeeded; + results.push(...chainResult.results); + } + return { - configured: scriptIds.length, - attempted: scriptIds.length - skipped, + configured: scriptIds.length + chainIds.length, + attempted: scriptIds.length - skipped + chainIds.length, succeeded, failed, skipped, @@ -5060,6 +5283,29 @@ class PipelineService extends EventEmitter { }); } + const preEncodeContext = { + mode, + jobId, + jobTitle: job.title || job.detected_title || null, + inputPath, + rawPath: job.raw_path || null + }; + const preScriptIds = normalizeScriptIdList(encodePlan?.preEncodeScriptIds || []); + const preChainIds = Array.isArray(encodePlan?.preEncodeChainIds) ? encodePlan.preEncodeChainIds : []; + if (preScriptIds.length > 0 || preChainIds.length > 0) { + await historyService.appendLog(jobId, 'SYSTEM', 'Pre-Encode Skripte/Ketten werden ausgeführt...'); + try { + await this.runPreEncodeScripts(jobId, encodePlan, preEncodeContext); + } catch (preError) { + if (preError.preEncodeFailed) { + await this.failJob(jobId, 'ENCODING', preError); + throw preError; + } + throw preError; + } + await historyService.appendLog(jobId, 'SYSTEM', 'Pre-Encode Skripte/Ketten abgeschlossen.'); + } + try { const trackSelection = extractHandBrakeTrackSelectionFromPlan(encodePlan, inputPath); let handBrakeTitleId = null; @@ -5524,11 +5770,8 @@ class PipelineService extends EventEmitter { async retry(jobId, options = {}) { const immediate = Boolean(options?.immediate); if (!immediate) { - return this.enqueueOrStartAction( - QUEUE_ACTIONS.RETRY, - jobId, - () => this.retry(jobId, { ...options, immediate: true }) - ); + // Retry always starts a rip → bypass the encode queue entirely. + return this.retry(jobId, { ...options, immediate: true }); } this.ensureNotBusy('retry', jobId); @@ -6000,19 +6243,13 @@ class PipelineService extends EventEmitter { runInfo.lastProgress = progress.percent; runInfo.eta = progress.eta || runInfo.eta; const statusText = composeStatusText(stage, progress.percent, runInfo.lastDetail); - void this.updateProgress(stage, progress.percent, progress.eta, statusText); + void this.updateProgress(stage, progress.percent, progress.eta, statusText, normalizedJobId); } else if (detail) { - const statusText = composeStatusText( - stage, - Number(this.snapshot.progress || 0), - runInfo.lastDetail - ); - void this.updateProgress( - stage, - Number(this.snapshot.progress || 0), - this.snapshot.eta, - statusText - ); + const jobEntry = this.jobProgress.get(Number(normalizedJobId)); + const currentProgress = jobEntry?.progress ?? Number(this.snapshot.progress || 0); + const currentEta = jobEntry?.eta ?? this.snapshot.eta; + const statusText = composeStatusText(stage, currentProgress, runInfo.lastDetail); + void this.updateProgress(stage, currentProgress, currentEta, statusText, normalizedJobId); } } }; diff --git a/backend/src/services/scriptChainService.js b/backend/src/services/scriptChainService.js new file mode 100644 index 0000000..7c6df6b --- /dev/null +++ b/backend/src/services/scriptChainService.js @@ -0,0 +1,407 @@ +const { spawn } = require('child_process'); +const { getDb } = require('../db/database'); +const logger = require('./logger').child('SCRIPT_CHAINS'); +const { errorToMeta } = require('../utils/errorMeta'); + +const CHAIN_NAME_MAX_LENGTH = 120; +const STEP_TYPE_SCRIPT = 'script'; +const STEP_TYPE_WAIT = 'wait'; +const VALID_STEP_TYPES = new Set([STEP_TYPE_SCRIPT, STEP_TYPE_WAIT]); + +function normalizeChainId(rawValue) { + const value = Number(rawValue); + if (!Number.isFinite(value) || value <= 0) { + return null; + } + return Math.trunc(value); +} + +function createValidationError(message, details = null) { + const error = new Error(message); + error.statusCode = 400; + if (details) { + error.details = details; + } + return error; +} + +function mapChainRow(row, steps = []) { + if (!row) { + return null; + } + return { + id: Number(row.id), + name: String(row.name || ''), + steps: steps.map(mapStepRow), + createdAt: row.created_at, + updatedAt: row.updated_at + }; +} + +function mapStepRow(row) { + if (!row) { + return null; + } + return { + id: Number(row.id), + position: Number(row.position), + stepType: String(row.step_type || ''), + scriptId: row.script_id != null ? Number(row.script_id) : null, + scriptName: row.script_name != null ? String(row.script_name) : null, + waitSeconds: row.wait_seconds != null ? Number(row.wait_seconds) : null + }; +} + +function validateSteps(rawSteps) { + const steps = Array.isArray(rawSteps) ? rawSteps : []; + const errors = []; + const normalized = []; + + for (let i = 0; i < steps.length; i++) { + const step = steps[i] && typeof steps[i] === 'object' ? steps[i] : {}; + const stepType = String(step.stepType || step.step_type || '').trim(); + + if (!VALID_STEP_TYPES.has(stepType)) { + errors.push({ field: `steps[${i}].stepType`, message: `Ungültiger Schritt-Typ: '${stepType}'. Erlaubt: script, wait.` }); + continue; + } + + if (stepType === STEP_TYPE_SCRIPT) { + const scriptId = Number(step.scriptId ?? step.script_id); + if (!Number.isFinite(scriptId) || scriptId <= 0) { + errors.push({ field: `steps[${i}].scriptId`, message: 'scriptId fehlt oder ist ungültig.' }); + continue; + } + normalized.push({ stepType, scriptId: Math.trunc(scriptId), waitSeconds: null }); + } else if (stepType === STEP_TYPE_WAIT) { + const waitSeconds = Number(step.waitSeconds ?? step.wait_seconds); + if (!Number.isFinite(waitSeconds) || waitSeconds < 1 || waitSeconds > 3600) { + errors.push({ field: `steps[${i}].waitSeconds`, message: 'waitSeconds muss zwischen 1 und 3600 liegen.' }); + continue; + } + normalized.push({ stepType, scriptId: null, waitSeconds: Math.round(waitSeconds) }); + } + } + + if (errors.length > 0) { + throw createValidationError('Ungültige Schritte in der Skriptkette.', errors); + } + + return normalized; +} + +async function getStepsForChain(db, chainId) { + return db.all( + ` + SELECT + s.id, + s.chain_id, + s.position, + s.step_type, + s.script_id, + s.wait_seconds, + sc.name AS script_name + FROM script_chain_steps s + LEFT JOIN scripts sc ON sc.id = s.script_id + WHERE s.chain_id = ? + ORDER BY s.position ASC, s.id ASC + `, + [chainId] + ); +} + +class ScriptChainService { + async listChains() { + const db = await getDb(); + const rows = await db.all( + ` + SELECT id, name, created_at, updated_at + FROM script_chains + ORDER BY LOWER(name) ASC, id ASC + ` + ); + + if (rows.length === 0) { + return []; + } + + const chainIds = rows.map((row) => Number(row.id)); + const placeholders = chainIds.map(() => '?').join(', '); + const stepRows = await db.all( + ` + SELECT + s.id, + s.chain_id, + s.position, + s.step_type, + s.script_id, + s.wait_seconds, + sc.name AS script_name + FROM script_chain_steps s + LEFT JOIN scripts sc ON sc.id = s.script_id + WHERE s.chain_id IN (${placeholders}) + ORDER BY s.chain_id ASC, s.position ASC, s.id ASC + `, + chainIds + ); + + const stepsByChain = new Map(); + for (const step of stepRows) { + const cid = Number(step.chain_id); + if (!stepsByChain.has(cid)) { + stepsByChain.set(cid, []); + } + stepsByChain.get(cid).push(step); + } + + return rows.map((row) => mapChainRow(row, stepsByChain.get(Number(row.id)) || [])); + } + + async getChainById(chainId) { + const normalizedId = normalizeChainId(chainId); + if (!normalizedId) { + throw createValidationError('Ungültige chainId.'); + } + const db = await getDb(); + const row = await db.get( + `SELECT id, name, created_at, updated_at FROM script_chains WHERE id = ?`, + [normalizedId] + ); + if (!row) { + const error = new Error(`Skriptkette #${normalizedId} wurde nicht gefunden.`); + error.statusCode = 404; + throw error; + } + const steps = await getStepsForChain(db, normalizedId); + return mapChainRow(row, steps); + } + + async getChainsByIds(rawIds = []) { + const ids = Array.isArray(rawIds) + ? rawIds.map(normalizeChainId).filter(Boolean) + : []; + if (ids.length === 0) { + return []; + } + const db = await getDb(); + const placeholders = ids.map(() => '?').join(', '); + const rows = await db.all( + `SELECT id, name, created_at, updated_at FROM script_chains WHERE id IN (${placeholders})`, + ids + ); + const stepRows = await db.all( + ` + SELECT + s.id, s.chain_id, s.position, s.step_type, s.script_id, s.wait_seconds, + sc.name AS script_name + FROM script_chain_steps s + LEFT JOIN scripts sc ON sc.id = s.script_id + WHERE s.chain_id IN (${placeholders}) + ORDER BY s.chain_id ASC, s.position ASC, s.id ASC + `, + ids + ); + const stepsByChain = new Map(); + for (const step of stepRows) { + const cid = Number(step.chain_id); + if (!stepsByChain.has(cid)) { + stepsByChain.set(cid, []); + } + stepsByChain.get(cid).push(step); + } + const byId = new Map(rows.map((row) => [ + Number(row.id), + mapChainRow(row, stepsByChain.get(Number(row.id)) || []) + ])); + return ids.map((id) => byId.get(id)).filter(Boolean); + } + + async createChain(payload = {}) { + const body = payload && typeof payload === 'object' ? payload : {}; + const name = String(body.name || '').trim(); + if (!name) { + throw createValidationError('Skriptkettenname darf nicht leer sein.', [{ field: 'name', message: 'Name darf nicht leer sein.' }]); + } + if (name.length > CHAIN_NAME_MAX_LENGTH) { + throw createValidationError('Skriptkettenname zu lang.', [{ field: 'name', message: `Maximal ${CHAIN_NAME_MAX_LENGTH} Zeichen.` }]); + } + const steps = validateSteps(body.steps); + + const db = await getDb(); + try { + const result = await db.run( + `INSERT INTO script_chains (name, created_at, updated_at) VALUES (?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`, + [name] + ); + const chainId = result.lastID; + await this._saveSteps(db, chainId, steps); + return this.getChainById(chainId); + } catch (error) { + if (String(error?.message || '').includes('UNIQUE constraint failed')) { + throw createValidationError(`Skriptkettenname "${name}" existiert bereits.`, [{ field: 'name', message: 'Name muss eindeutig sein.' }]); + } + throw error; + } + } + + async updateChain(chainId, payload = {}) { + const normalizedId = normalizeChainId(chainId); + if (!normalizedId) { + throw createValidationError('Ungültige chainId.'); + } + const body = payload && typeof payload === 'object' ? payload : {}; + const name = String(body.name || '').trim(); + if (!name) { + throw createValidationError('Skriptkettenname darf nicht leer sein.', [{ field: 'name', message: 'Name darf nicht leer sein.' }]); + } + if (name.length > CHAIN_NAME_MAX_LENGTH) { + throw createValidationError('Skriptkettenname zu lang.', [{ field: 'name', message: `Maximal ${CHAIN_NAME_MAX_LENGTH} Zeichen.` }]); + } + const steps = validateSteps(body.steps); + + await this.getChainById(normalizedId); + + const db = await getDb(); + try { + await db.run( + `UPDATE script_chains SET name = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?`, + [name, normalizedId] + ); + await db.run(`DELETE FROM script_chain_steps WHERE chain_id = ?`, [normalizedId]); + await this._saveSteps(db, normalizedId, steps); + return this.getChainById(normalizedId); + } catch (error) { + if (String(error?.message || '').includes('UNIQUE constraint failed')) { + throw createValidationError(`Skriptkettenname "${name}" existiert bereits.`, [{ field: 'name', message: 'Name muss eindeutig sein.' }]); + } + throw error; + } + } + + async deleteChain(chainId) { + const normalizedId = normalizeChainId(chainId); + if (!normalizedId) { + throw createValidationError('Ungültige chainId.'); + } + const existing = await this.getChainById(normalizedId); + const db = await getDb(); + await db.run(`DELETE FROM script_chains WHERE id = ?`, [normalizedId]); + return existing; + } + + async _saveSteps(db, chainId, steps) { + for (let i = 0; i < steps.length; i++) { + const step = steps[i]; + await db.run( + ` + INSERT INTO script_chain_steps (chain_id, position, step_type, script_id, wait_seconds, created_at) + VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + `, + [chainId, i + 1, step.stepType, step.scriptId ?? null, step.waitSeconds ?? null] + ); + } + } + + async executeChain(chainId, context = {}, { appendLog = null } = {}) { + const chain = await this.getChainById(chainId); + logger.info('chain:execute:start', { chainId, chainName: chain.name, steps: chain.steps.length }); + + const results = []; + + for (const step of chain.steps) { + if (step.stepType === STEP_TYPE_WAIT) { + const seconds = Math.max(1, Number(step.waitSeconds || 1)); + logger.info('chain:step:wait', { chainId, seconds }); + if (typeof appendLog === 'function') { + await appendLog('SYSTEM', `Kette "${chain.name}" - Warte ${seconds} Sekunde(n)...`); + } + await new Promise((resolve) => setTimeout(resolve, seconds * 1000)); + results.push({ stepType: 'wait', waitSeconds: seconds, success: true }); + } else if (step.stepType === STEP_TYPE_SCRIPT) { + if (!step.scriptId) { + logger.warn('chain:step:script-missing', { chainId, stepId: step.id }); + results.push({ stepType: 'script', scriptId: null, success: false, skipped: true, reason: 'scriptId fehlt' }); + continue; + } + + const scriptService = require('./scriptService'); + let script; + try { + script = await scriptService.getScriptById(step.scriptId); + } catch (error) { + logger.warn('chain:step:script-not-found', { chainId, scriptId: step.scriptId, error: errorToMeta(error) }); + results.push({ stepType: 'script', scriptId: step.scriptId, success: false, skipped: true, reason: 'Skript nicht gefunden' }); + continue; + } + + if (typeof appendLog === 'function') { + await appendLog('SYSTEM', `Kette "${chain.name}" - Skript: ${script.name}`); + } + + let prepared = null; + try { + prepared = await scriptService.createExecutableScriptFile(script, { + ...context, + scriptId: script.id, + scriptName: script.name, + source: context?.source || 'chain' + }); + const run = await new Promise((resolve, reject) => { + const child = spawn(prepared.cmd, prepared.args, { + env: process.env, + stdio: ['ignore', 'pipe', 'pipe'] + }); + let stdout = ''; + let stderr = ''; + child.stdout?.on('data', (chunk) => { stdout += String(chunk); }); + child.stderr?.on('data', (chunk) => { stderr += String(chunk); }); + child.on('error', reject); + child.on('close', (code) => resolve({ code, stdout, stderr })); + }); + + const success = run.code === 0; + logger.info('chain:step:script-done', { chainId, scriptId: script.id, exitCode: run.code, success }); + if (typeof appendLog === 'function') { + await appendLog( + success ? 'SYSTEM' : 'ERROR', + `Kette "${chain.name}" - Skript "${script.name}": ${success ? 'OK' : `Fehler (Exit ${run.code})`}` + ); + } + results.push({ stepType: 'script', scriptId: script.id, scriptName: script.name, success, exitCode: run.code }); + + if (!success) { + logger.warn('chain:step:script-failed', { chainId, scriptId: script.id, exitCode: run.code }); + break; + } + } catch (error) { + logger.error('chain:step:script-error', { chainId, scriptId: step.scriptId, error: errorToMeta(error) }); + if (typeof appendLog === 'function') { + await appendLog('ERROR', `Kette "${chain.name}" - Skript-Fehler: ${error.message}`); + } + results.push({ stepType: 'script', scriptId: step.scriptId, success: false, error: error.message }); + break; + } finally { + if (prepared?.cleanup) { + await prepared.cleanup(); + } + } + } + } + + const succeeded = results.filter((r) => r.success).length; + const failed = results.filter((r) => !r.success && !r.skipped).length; + logger.info('chain:execute:done', { chainId, steps: results.length, succeeded, failed }); + + return { + chainId, + chainName: chain.name, + steps: results.length, + succeeded, + failed, + aborted: failed > 0, + results + }; + } +} + +module.exports = new ScriptChainService(); diff --git a/backend/src/utils/encodePlan.js b/backend/src/utils/encodePlan.js index 023958e..d00ac13 100644 --- a/backend/src/utils/encodePlan.js +++ b/backend/src/utils/encodePlan.js @@ -699,17 +699,24 @@ function resolveAudioEncoderAction(track, encoderToken, copyMask, fallbackEncode const normalizedMask = Array.isArray(copyMask) ? copyMask : []; let canCopy = false; + let effectiveCodec = sourceCodec; if (explicitCopyCodec) { canCopy = Boolean(sourceCodec && sourceCodec === explicitCopyCodec); } else if (sourceCodec && normalizedMask.length > 0) { canCopy = normalizedMask.includes(sourceCodec); + // DTS-HD MA contains an embedded DTS core track. When dtshd is not in + // the copy mask but dts is, HandBrake will extract and copy the DTS core. + if (!canCopy && sourceCodec === 'dtshd' && normalizedMask.includes('dts')) { + canCopy = true; + effectiveCodec = 'dts'; + } } if (canCopy) { return { type: 'copy', encoder: normalizedToken, - label: `Copy (${sourceCodec || track?.format || 'Quelle'})` + label: `Copy (${effectiveCodec || track?.format || 'Quelle'})` }; } diff --git a/db/schema.sql b/db/schema.sql index b9fe622..e78d4cb 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -62,6 +62,29 @@ CREATE TABLE scripts ( CREATE INDEX idx_scripts_name ON scripts(name); +CREATE TABLE script_chains ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL UNIQUE, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX idx_script_chains_name ON script_chains(name); + +CREATE TABLE script_chain_steps ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + chain_id INTEGER NOT NULL, + position INTEGER NOT NULL, + step_type TEXT NOT NULL, + script_id INTEGER, + wait_seconds INTEGER, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (chain_id) REFERENCES script_chains(id) ON DELETE CASCADE, + FOREIGN KEY (script_id) REFERENCES scripts(id) ON DELETE SET NULL +); + +CREATE INDEX idx_script_chain_steps_chain ON script_chain_steps(chain_id, position); + CREATE TABLE pipeline_state ( id INTEGER PRIMARY KEY CHECK (id = 1), state TEXT NOT NULL, diff --git a/deploy-ripster.sh b/deploy-ripster.sh index 6bcccae..56aa6ee 100755 --- a/deploy-ripster.sh +++ b/deploy-ripster.sh @@ -10,6 +10,7 @@ LOCAL_PATH="$(cd -- "$(dirname "${BASH_SOURCE[0]}")" && pwd)" REMOTE_TARGET="${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_PATH}" SSH_OPTS="-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o ConnectTimeout=10" DATA_RELATIVE_DIR="backend/data/***" +DATA_DIR="backend/data" if ! command -v sshpass >/dev/null 2>&1; then echo "sshpass ist nicht installiert. Bitte installieren, z. B.: sudo apt-get install -y sshpass" @@ -25,7 +26,7 @@ echo "Pruefe SSH-Verbindung zu ${REMOTE_USER}@${REMOTE_HOST} ..." sshpass -p "$SSH_PASSWORD" ssh $SSH_OPTS "${REMOTE_USER}@${REMOTE_HOST}" "echo connected" >/dev/null echo "Stelle sicher, dass Remote-Ordner ${REMOTE_PATH} existiert ..." -sshpass -p "$SSH_PASSWORD" ssh $SSH_OPTS "${REMOTE_USER}@${REMOTE_HOST}" "set -euo pipefail; mkdir -p '${REMOTE_PATH}'" +sshpass -p "$SSH_PASSWORD" ssh $SSH_OPTS "${REMOTE_USER}@${REMOTE_HOST}" "set -euo pipefail; mkdir -p '${REMOTE_PATH}' '${REMOTE_PATH}/${DATA_DIR}'" echo "Uebertrage lokalen Ordner ${LOCAL_PATH} nach ${REMOTE_TARGET} ..." echo "backend/data wird weder uebertragen noch auf dem Ziel geloescht: ${DATA_RELATIVE_DIR}" @@ -36,4 +37,10 @@ sshpass -p "$SSH_PASSWORD" rsync -az --progress --delete \ -e "ssh $SSH_OPTS" \ "${LOCAL_PATH}/" "${REMOTE_TARGET}/" -echo "Fertig: ${LOCAL_PATH} wurde nach ${REMOTE_TARGET} uebertragen (backend/data ausgenommen)." +echo "Hole ${DATA_DIR} nach dem Deploy vom Zielserver auf den Quellserver ..." +mkdir -p "${LOCAL_PATH}/${DATA_DIR}" +sshpass -p "$SSH_PASSWORD" rsync -az --progress \ + -e "ssh $SSH_OPTS" \ + "${REMOTE_TARGET}/${DATA_DIR}/" "${LOCAL_PATH}/${DATA_DIR}/" + +echo "Fertig: ${LOCAL_PATH} wurde nach ${REMOTE_TARGET} uebertragen und ${DATA_DIR} wurde vom Zielserver auf den Quellserver geholt." diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index 5c5ddfe..07ac146 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -31,10 +31,31 @@ function App() { } if (message.type === 'PIPELINE_PROGRESS') { - setPipeline((prev) => ({ - ...prev, - ...message.payload - })); + const payload = message.payload; + const progressJobId = payload?.activeJobId; + setPipeline((prev) => { + const next = { ...prev }; + // Update per-job progress map so concurrent jobs don't overwrite each other. + if (progressJobId != null) { + next.jobProgress = { + ...(prev?.jobProgress || {}), + [progressJobId]: { + state: payload.state, + progress: payload.progress, + eta: payload.eta, + statusText: payload.statusText + } + }; + } + // Update global snapshot fields only for the primary active job. + if (progressJobId === prev?.activeJobId || progressJobId == null) { + next.state = payload.state ?? prev?.state; + next.progress = payload.progress ?? prev?.progress; + next.eta = payload.eta ?? prev?.eta; + next.statusText = payload.statusText ?? prev?.statusText; + } + return next; + }); } if (message.type === 'PIPELINE_QUEUE_CHANGED') { diff --git a/frontend/src/api/client.js b/frontend/src/api/client.js index 4e8e40c..b1f647b 100644 --- a/frontend/src/api/client.js +++ b/frontend/src/api/client.js @@ -64,6 +64,26 @@ export const api = { method: 'POST' }); }, + getScriptChains() { + return request('/settings/script-chains'); + }, + createScriptChain(payload = {}) { + return request('/settings/script-chains', { + method: 'POST', + body: JSON.stringify(payload) + }); + }, + updateScriptChain(chainId, payload = {}) { + return request(`/settings/script-chains/${encodeURIComponent(chainId)}`, { + method: 'PUT', + body: JSON.stringify(payload) + }); + }, + deleteScriptChain(chainId) { + return request(`/settings/script-chains/${encodeURIComponent(chainId)}`, { + method: 'DELETE' + }); + }, updateSetting(key, value) { return request(`/settings/${encodeURIComponent(key)}`, { method: 'PUT', diff --git a/frontend/src/components/MediaInfoReviewPanel.jsx b/frontend/src/components/MediaInfoReviewPanel.jsx index 237c3b4..c8d2f61 100644 --- a/frontend/src/components/MediaInfoReviewPanel.jsx +++ b/frontend/src/components/MediaInfoReviewPanel.jsx @@ -486,14 +486,21 @@ function resolveAudioEncoderPreviewLabel(track, encoderToken, copyMask, fallback : []; let canCopy = false; + let effectiveCodec = sourceCodec; if (explicitCopyCodec) { canCopy = Boolean(sourceCodec && sourceCodec === explicitCopyCodec); } else if (sourceCodec && normalizedCopyMask.length > 0) { canCopy = normalizedCopyMask.includes(sourceCodec); + // DTS-HD MA contains an embedded DTS core. When dtshd is not in the copy + // mask but dts is, HandBrake will extract and copy the DTS core layer. + if (!canCopy && sourceCodec === 'dtshd' && normalizedCopyMask.includes('dts')) { + canCopy = true; + effectiveCodec = 'dts'; + } } if (canCopy) { - return `Copy (${sourceCodec || track?.format || 'Quelle'})`; + return `Copy (${effectiveCodec || track?.format || 'Quelle'})`; } const fallback = String(fallbackEncoder || DEFAULT_AUDIO_FALLBACK_PREVIEW).trim().toLowerCase() || DEFAULT_AUDIO_FALLBACK_PREVIEW; @@ -684,7 +691,21 @@ export default function MediaInfoReviewPanel({ onAddPostEncodeScript = null, onChangePostEncodeScript = null, onRemovePostEncodeScript = null, - onReorderPostEncodeScript = null + onReorderPostEncodeScript = null, + availablePreScripts = [], + selectedPreEncodeScriptIds = [], + allowPreScriptSelection = false, + onAddPreEncodeScript = null, + onChangePreEncodeScript = null, + onRemovePreEncodeScript = null, + availableChains = [], + selectedPreEncodeChainIds = [], + selectedPostEncodeChainIds = [], + allowChainSelection = false, + onAddPreEncodeChain = null, + onRemovePreEncodeChain = null, + onAddPostEncodeChain = null, + onRemovePostEncodeChain = null }) { if (!review) { return
Keine Mediainfo-Daten vorhanden.
; @@ -759,6 +780,136 @@ export default function MediaInfoReviewPanel({ ) : null} + {/* Pre-Encode Scripts */} + {(allowPreScriptSelection || normalizeScriptIdList(selectedPreEncodeScriptIds).length > 0) ? ( +Lade Skriptketten...
+ ) : chains.length === 0 ? ( +Keine Skriptketten vorhanden.
+ ) : ( +