Callbacks added on post image processing.

This commit is contained in:
Alexander Shabarshov 2026-06-22 08:01:44 +01:00
parent 36b9343269
commit 3457aa6816
8 changed files with 38 additions and 34 deletions

View File

@ -68,10 +68,7 @@ public partial class JobViewModel : ObservableObject
: "";
public override string ToString() => $"{FileName}: {TextDesc}";
public ObservableCollection<ParameterEntry> ParametersList { get; }
= new();
public ObservableCollection<ParameterEntry> ParametersList { get; } = new();
public ObservableCollection<Segment> Segments { get; } = new();
public string CropText

View File

@ -72,7 +72,7 @@ public partial class MainViewModel : ViewModelBase
jobs.AddRange(fileJobs);
}
await _processor.ProcessJobs(jobs, jobs.First().Job.Enhance, _cancellationTokenSource.Token);
await _processor.ProcessJobs(jobs, jobs.First().Job.Enhance, null, _cancellationTokenSource.Token);
}
catch (Exception ex)
{

View File

@ -3,5 +3,5 @@
public interface IJobProcessor
{
Task<List<SingleTask>> GenerateJobs(SingleJob job, bool estimateOnly, IReadOnlyCollection<Segment> predefinedSegments, CancellationToken token);
Task<bool> ProcessJobs(List<SingleTask> tasks, bool singleThreaded, CancellationToken token);
Task<bool> ProcessJobs(List<SingleTask> tasks, bool singleThreaded, Action<SingleTask, FrameProcessingResult>? onFrameProcessed, CancellationToken token);
}

View File

@ -105,18 +105,18 @@ public class JobProcessor(ILogger logger) : LoggingBase(logger, 0), IJobProcesso
).ToList();
}
public async Task<bool> ProcessJobs(List<SingleTask> tasks, bool singleThreaded, CancellationToken token)
public async Task<bool> ProcessJobs(List<SingleTask> tasks, bool singleThreaded, Action<SingleTask, FrameProcessingResult>? onFrameProcessed, CancellationToken token)
{
if (singleThreaded)
{
LogInfo("Starting single-threaded splitting...");
await RunSingleThreaded(tasks, token);
await RunSingleThreaded(tasks, onFrameProcessed, token);
}
else
{
LogInfo("Starting multi-threaded splitting...");
await RunMultiThreaded(tasks, token);
await RunMultiThreaded(tasks, onFrameProcessed, token);
}
LogInfo("Done.");
@ -133,7 +133,7 @@ public class JobProcessor(ILogger logger) : LoggingBase(logger, 0), IJobProcesso
// Multi-threaded splitting
// -----------------------------
private async Task RunMultiThreaded(List<SingleTask> jobs, CancellationToken token)
private async Task RunMultiThreaded(List<SingleTask> jobs, Action<SingleTask,FrameProcessingResult>? onFrameProcessed, CancellationToken token)
{
LogProgress(0.0, TimeSpan.Zero, 0.0);
@ -167,7 +167,7 @@ public class JobProcessor(ILogger logger) : LoggingBase(logger, 0), IJobProcesso
return;
await Task.Yield();
}
await ProcessSegment(job, slot + 1, token);
await ProcessSegment(job, slot + 1, onFrameProcessed, token);
var processed = Interlocked.Increment(ref processedSegments);
var elapsed = sw.Elapsed;
@ -194,21 +194,21 @@ public class JobProcessor(ILogger logger) : LoggingBase(logger, 0), IJobProcesso
// Single-threaded splitting
// -----------------------------
private async Task RunSingleThreaded(List<SingleTask> jobs, CancellationToken token)
private async Task RunSingleThreaded(List<SingleTask> jobs, Action<SingleTask, FrameProcessingResult>? onFrameProcessed, CancellationToken token)
{
foreach (var job in jobs)
{
await ProcessSegment(job, 0, token);
await ProcessSegment(job, 0, onFrameProcessed, token);
}
}
private async Task ProcessSegment(SingleTask t, int slot, CancellationToken token)
private async Task ProcessSegment(SingleTask t, int slot, Action<SingleTask, FrameProcessingResult>? onFrameProcessed, CancellationToken token)
{
var processor = t.ProcessorFactory(slot);
try
{
await processor.ProcessSegment(t, token);
await processor.ProcessSegment(t, x=>onFrameProcessed?.Invoke(t, x), token);
}
finally
{

View File

@ -58,12 +58,12 @@ public sealed class SimpleSplitter : LoggingBase, ISegmentProcessor
// GetNextProcessedFrame
// ============================================================
public Mat? GetNextProcessedFrame(IFrameProcessingState processorState, CancellationToken token)
public FrameProcessingResult GetNextProcessedFrame(IFrameProcessingState processorState, CancellationToken token)
{
var state = (State)processorState;
if (state.DecodeStdout == null)
return null;
return new FrameProcessingResult(null, [], null);
// SimpleSplitter does not modify frames; it only copies or rotates.
// For preview, we decode raw frames and return them as-is.
@ -76,12 +76,12 @@ public sealed class SimpleSplitter : LoggingBase, ISegmentProcessor
var buffer = new byte[bytes];
var read = state.DecodeStdout.Read(buffer, 0, bytes);
if (read != bytes)
return null;
return new FrameProcessingResult(null, [], null);
var mat = new Mat(h, w, MatType.CV_8UC3);
System.Runtime.InteropServices.Marshal.Copy(buffer, 0, mat.Data, bytes);
return mat;
return new FrameProcessingResult(mat, [], null);
}
// ============================================================
@ -111,7 +111,7 @@ public sealed class SimpleSplitter : LoggingBase, ISegmentProcessor
// ProcessSegment (now uses preview API)
// ============================================================
public async Task ProcessSegment(SingleTask job, CancellationToken token)
public async Task ProcessSegment(SingleTask job, Action<FrameProcessingResult>? onFrameProcessed, CancellationToken token)
{
var state = (State)InitSegment(job, token);
@ -125,7 +125,8 @@ public sealed class SimpleSplitter : LoggingBase, ISegmentProcessor
{
token.ThrowIfCancellationRequested();
var frame = GetNextProcessedFrame(state, token);
var res = GetNextProcessedFrame(state, token);
var frame = res.Image;
if (frame == null)
break;
@ -135,6 +136,8 @@ public sealed class SimpleSplitter : LoggingBase, ISegmentProcessor
System.Runtime.InteropServices.Marshal.Copy(frame.Data, buffer, 0, bytes);
encodeStdin.Write(buffer, 0, bytes);
onFrameProcessed?.Invoke(res);
frame.Dispose();
}

View File

@ -103,17 +103,17 @@ public sealed class TrackingSplitter : LoggingBase, ISegmentProcessor
// GetNextProcessedFrame
// ------------------------------------------------------------
public Mat? GetNextProcessedFrame(
public FrameProcessingResult GetNextProcessedFrame(
IFrameProcessingState processorState,
CancellationToken token)
{
var state = (FrameProcessingState)processorState;
if (state.DecodeStdout == null)
return null;
return new FrameProcessingResult(null, [], null);
if (!TryReadNextFrame(state.DecodeStdout, state, token))
return null;
return new FrameProcessingResult(null, [], null);
return ProcessFrame(state.FrameMat, state, state.Job, token);
}
@ -150,7 +150,7 @@ public sealed class TrackingSplitter : LoggingBase, ISegmentProcessor
// PROCESSSEGMENT (full pipeline)
// ============================================================
public async Task ProcessSegment(SingleTask job, CancellationToken token)
public async Task ProcessSegment(SingleTask job, Action<FrameProcessingResult>? onFrameProcessed, CancellationToken token)
{
var name = Path.GetFileNameWithoutExtension(job.OutputFileName);
var fps = job.Info.Fps;
@ -180,12 +180,14 @@ public sealed class TrackingSplitter : LoggingBase, ISegmentProcessor
token.ThrowIfCancellationRequested();
var frame = GetNextProcessedFrame(state, token);
if (frame == null)
if (frame.Image == null)
break;
frameIndex++;
EncodeFrame(frame, state, encodeStdin);
EncodeFrame(frame.Image, state, encodeStdin);
onFrameProcessed?.Invoke(frame);
var elapsed = DateTime.UtcNow - startTime;
var progress = totalFrames > 0 ? (double)frameIndex / totalFrames : 0.0;
@ -272,7 +274,7 @@ public sealed class TrackingSplitter : LoggingBase, ISegmentProcessor
return true;
}
private Mat ProcessFrame(
private FrameProcessingResult ProcessFrame(
Mat inputFrame,
FrameProcessingState state,
SingleTask job,
@ -298,12 +300,12 @@ public sealed class TrackingSplitter : LoggingBase, ISegmentProcessor
if (state.Enhancer != null)
{
if (state.Enhancer.TryProcessFrame(state.OutMat, out var enhanced, token))
return enhanced;
return new FrameProcessingResult(enhanced, objects, primary);
return state.OutMat;
return new FrameProcessingResult(state.OutMat, objects, primary);
}
return state.OutMat;
return new FrameProcessingResult(state.OutMat, objects, primary);
}
private void EncodeFrame(

View File

@ -4,11 +4,13 @@ public interface IFrameProcessingState
{
}
public sealed record FrameProcessingResult(Mat? Image, List<DetectedPerson> Detected, DetectedPerson? Primary);
public interface ISegmentProcessor
{
IFrameProcessingState InitSegment(SingleTask job, CancellationToken token);
Mat? GetNextProcessedFrame( IFrameProcessingState processorState, CancellationToken token);
FrameProcessingResult GetNextProcessedFrame(IFrameProcessingState processorState, CancellationToken token);
void FinishSegment(IFrameProcessingState processorState);
Task ProcessSegment( SingleTask job, CancellationToken token);
Task ProcessSegment( SingleTask job, Action<FrameProcessingResult>? onFrameProcessed, CancellationToken token);
}

View File

@ -49,7 +49,7 @@ static partial class Program
return 0;
}
var success = await processor.ProcessJobs(allJobs, cmd.Master.SingleThreaded, CancellationToken.None);
var success = await processor.ProcessJobs(allJobs, cmd.Master.SingleThreaded, null, CancellationToken.None);
if (uiTask != null)
{
if ( cts != null )