Added input file mask processing with parallel splitting tasks.

This commit is contained in:
Alexander Shabarshov 2026-05-13 20:42:00 +01:00
parent d0aade4fca
commit 1dac08d21a
7 changed files with 350 additions and 228 deletions

View File

@ -17,7 +17,7 @@ public sealed class CameraController
private readonly int _cropWidth; private readonly int _cropWidth;
private readonly int _cropHeight; private readonly int _cropHeight;
private readonly KalmanTracker _kalman; private readonly KalmanTracker _kalman;
private readonly CommandLine _cmd; private readonly SingleJob _cmd;
private int _dropoutCounter; private int _dropoutCounter;
// --- Dropout tolerance --- // --- Dropout tolerance ---
@ -41,7 +41,7 @@ public sealed class CameraController
int cropWidth, int cropWidth,
int cropHeight, int cropHeight,
KalmanTracker kalman, KalmanTracker kalman,
CommandLine cmd SingleJob cmd
) )
{ {
_videoWidth = videoWidth; _videoWidth = videoWidth;

View File

@ -5,123 +5,22 @@ using System.Text;
namespace splitter; namespace splitter;
public sealed class CommandLine public class SingleJob
{ {
public string InputFile { get; private init; } public string InputFile { get; set; } = null!;
public string OutputFolder { get; private init; } public string OutputFolder { get; set; } = null!;
public (int width, int height)? Crop { get; private init; } public (int width, int height)? Crop { get; set; }
public Point2f? GravitateTo { get; private init; } public Point2f? GravitateTo { get; set; }
public string? Mask { get; private init; } public string? Mask { get; set; }
public bool Debug { get; private init; } public bool Debug { get; set; }
public string? Detect { get; private init; } public string? Detect { get; set; }
public double? OverrideTargetDuration { get; private init; } public double? OverrideTargetDuration { get; set; }
public string[] Passthrough { get; private init; } = Array.Empty<string>(); public string[] Passthrough { get; set; } = [];
public bool PlainText { get; private init; } public bool PlainText { get; set; }
public bool EstimateOnly { get; private init; } public bool EstimateOnly { get; set; }
public bool ForceFixed { get; private init; } public bool ForceFixed { get; set; }
public bool SingleThreaded { get; private init; } public bool SingleThreaded { get; set; }
public Dictionary<string, string> Parameters { get; } = []; public Dictionary<string, string> Parameters { get; set; } = [];
public bool IsValid => !string.IsNullOrEmpty(InputFile) && !string.IsNullOrEmpty(OutputFolder);
public CommandLine(string[] args)
{
InputFile = "";
OutputFolder = "";
if (args.Length == 0 || args.Contains("--help"))
{
PrintHelp();
return;
}
// Extract passthrough parameters after "--"
var passthroughIndex = Array.IndexOf(args, "--");
if (passthroughIndex >= 0)
{
if (passthroughIndex < args.Length - 1)
Passthrough = args.Skip(passthroughIndex + 1).ToArray();
args = args.Take(passthroughIndex).ToArray();
}
if (args.Length < 2)
{
Console.WriteLine("Missing required parameters.");
PrintHelp();
return;
}
InputFile = args[0];
OutputFolder = args[1];
foreach (var arg in args.Skip(2))
{
if (arg.StartsWith("--mask="))
{
Mask = arg.Substring("--mask=".Length);
}
else if (arg.StartsWith("--detect="))
{
Detect = arg.Substring("--detect=".Length).ToLowerInvariant();
}
else if (arg.StartsWith("--crop="))
{
Crop = ParseCrop(arg.Substring("--crop=".Length));
}
else if (arg == "--crop")
{
Crop = ParseCrop("");
}
else if (arg == "--text")
{
PlainText = true;
}
else if (arg == "--debug")
{
Debug = true;
}
else if (arg == "--single-thread")
{
SingleThreaded = true;
}
else if (arg.StartsWith("--gravitate="))
{
var val = arg.Substring("--gravitate=".Length);
GravitateTo = ParseGravitate(val);
}
else if (arg.StartsWith("--duration="))
{
var dur = arg.Substring("--duration=".Length);
OverrideTargetDuration = ParseDuration(dur);
if (OverrideTargetDuration <= 0)
{
Console.WriteLine($"Invalid --duration value: {dur}");
return;
}
}
else if (arg.StartsWith("-p:", StringComparison.Ordinal))
{
var spec = arg.Substring("-p:".Length);
if (!TryParseParameter(spec, out var key, out var value))
{
Console.WriteLine($"Invalid -p parameter: {spec}");
return;
}
Parameters[key] = value;
}
else if (arg == "--estimate")
{
EstimateOnly = true;
}
else if (arg == "--force")
{
ForceFixed = true;
}
}
}
public void Override<T>(ref T member, string name) public void Override<T>(ref T member, string name)
{ {
@ -145,6 +44,138 @@ public sealed class CommandLine
} }
} }
}
public sealed class CommandLine
{
public SingleJob Master { get; } = new SingleJob();
public SingleJob[] Jobs { get; }
public bool IsValid => !string.IsNullOrEmpty(Master.InputFile) && !string.IsNullOrEmpty(Master.OutputFolder) && Jobs.Length > 0;
public CommandLine(string[] args)
{
Master.InputFile = "";
Master.OutputFolder = "";
Jobs = [];
if (args.Length == 0 || args.Contains("--help"))
{
PrintHelp();
return;
}
// Extract passthrough parameters after "--"
var passthroughIndex = Array.IndexOf(args, "--");
if (passthroughIndex >= 0)
{
if (passthroughIndex < args.Length - 1)
Master.Passthrough = args.Skip(passthroughIndex + 1).ToArray();
args = args.Take(passthroughIndex).ToArray();
}
if (args.Length < 1)
{
Console.WriteLine("Missing required parameters.");
PrintHelp();
return;
}
Master.InputFile = args[0];
var hasOutputFolder = args.Length > 1 && !args[1].StartsWith("-");
if (hasOutputFolder)
Master.OutputFolder = args[1];
else
Master.OutputFolder = Path.Combine(Path.GetDirectoryName(Master.InputFile) ?? Directory.GetCurrentDirectory(), "Splitter");
foreach (var arg in args.Skip(hasOutputFolder ? 2 : 1))
{
if (arg.StartsWith("--mask="))
{
Master.Mask = arg.Substring("--mask=".Length);
}
else if (arg.StartsWith("--detect="))
{
Master.Detect = arg.Substring("--detect=".Length).ToLowerInvariant();
}
else if (arg.StartsWith("--crop="))
{
Master.Crop = ParseCrop(arg.Substring("--crop=".Length));
}
else if (arg == "--crop")
{
Master.Crop = ParseCrop("");
}
else if (arg == "--text")
{
Master.PlainText = true;
}
else if (arg == "--debug")
{
Master.Debug = true;
}
else if (arg == "--single-thread")
{
Master.SingleThreaded = true;
}
else if (arg.StartsWith("--gravitate="))
{
var val = arg.Substring("--gravitate=".Length);
Master.GravitateTo = ParseGravitate(val);
}
else if (arg.StartsWith("--duration="))
{
var dur = arg.Substring("--duration=".Length);
Master.OverrideTargetDuration = ParseDuration(dur);
if (Master.OverrideTargetDuration <= 0)
{
Console.WriteLine($"Invalid --duration value: {dur}");
return;
}
}
else if (arg.StartsWith("-p:", StringComparison.Ordinal))
{
var spec = arg.Substring("-p:".Length);
if (!TryParseParameter(spec, out var key, out var value))
{
Console.WriteLine($"Invalid -p parameter: {spec}");
return;
}
Master.Parameters[key] = value;
}
else if (arg == "--estimate")
{
Master.EstimateOnly = true;
}
else if (arg == "--force")
{
Master.ForceFixed = true;
}
}
var files = FileMaskExpander.Expand(Master.InputFile);
Jobs = files.Select(x => new SingleJob
{
InputFile = x,
OutputFolder = Master.OutputFolder,
Crop = Master.Crop,
GravitateTo = Master.GravitateTo,
Mask = Master.Mask,
Debug = Master.Debug,
Detect = Master.Detect,
OverrideTargetDuration = Master.OverrideTargetDuration,
Passthrough = Master.Passthrough,
PlainText = Master.PlainText,
EstimateOnly = Master.EstimateOnly,
ForceFixed = Master.ForceFixed,
SingleThreaded = Master.SingleThreaded,
Parameters = new Dictionary<string, string>(Master.Parameters)
}).ToArray();
}
private static bool TryParseParameter(string spec, out string key, out string value) private static bool TryParseParameter(string spec, out string key, out string value)
{ {
key = ""; key = "";
@ -299,6 +330,8 @@ Options:
Passthrough: Passthrough:
Anything after -- is passed directly to ffmpeg. Anything after -- is passed directly to ffmpeg.
input.mp4 can be a file mask, e.g. ""videos/*.mp4"". Output files will be named based on the input filename and the --mask pattern if provided.
Examples: Examples:
splitter vertical-video.mp4 out/ splitter vertical-video.mp4 out/
splitter vertical-video.mp4 out/ --duration=90s splitter vertical-video.mp4 out/ --duration=90s

22
FileMaskExpander.cs Normal file
View File

@ -0,0 +1,22 @@
namespace splitter;
public static class FileMaskExpander
{
public static string[] Expand(string input)
{
// If no mask, return the single full path
if (!HasMask(input))
return new[] { Path.GetFullPath(input) };
string directory = Path.GetDirectoryName(input) ?? Directory.GetCurrentDirectory();
string pattern = Path.GetFileName(input);
if (string.IsNullOrEmpty(directory))
directory = Directory.GetCurrentDirectory();
return Directory.GetFiles(directory, pattern, SearchOption.TopDirectoryOnly);
}
private static bool HasMask(string path)
=> path.IndexOfAny(['*', '?']) >= 0;
}

View File

@ -6,11 +6,11 @@
}, },
"Prod": { "Prod": {
"commandName": "Project", "commandName": "Project",
"commandLineArgs": "\"C:\\Users\\uncls\\Pictures\\2026\\2026 - Secret Rule\\20260426_212004.mp4\" \"C:\\Users\\uncls\\Pictures\\2026\\2026 - Secret Rule\\Shorts\" --crop --detect=body" "commandLineArgs": "\"C:\\Users\\uncls\\Pictures\\2026\\2026 - Secret Rule\\20260426_212*.mp4\" --crop --detect=body"
}, },
"Debug": { "Debug": {
"commandName": "Project", "commandName": "Project",
"commandLineArgs": "\"C:\\Users\\uncls\\Pictures\\2026\\2026 - Secret Rule\\20260426_212004.mp4\" \"C:\\Users\\uncls\\Pictures\\2026\\2026 - Secret Rule\\Shorts\" --crop --detect=body --debug --single-thread --text" "commandLineArgs": "\"C:\\Users\\uncls\\Pictures\\2026\\2026 - Secret Rule\\20260426_212004.mp4\" --crop --detect=body --debug --single-thread --text"
} }
} }
} }

View File

@ -131,10 +131,17 @@ public sealed class SpectreConsoleLogger : ILogger, IDisposable
.StartAsync(async ctx => .StartAsync(async ctx =>
{ {
while (!token.IsCancellationRequested) while (!token.IsCancellationRequested)
{
try
{ {
ctx.UpdateTarget(BuildRoot()); ctx.UpdateTarget(BuildRoot());
await Task.Delay(100, token); await Task.Delay(100, token);
} }
catch ( Exception ex )
{
break;
}
}
}); });
} }
@ -178,13 +185,13 @@ public sealed class SpectreConsoleLogger : ILogger, IDisposable
var layout = new Layout("root") var layout = new Layout("root")
.SplitRows( .SplitRows(
new Layout("progress") { Size = Math.Max(3, numberOfProcessesSnapshot + 2) }, new Layout("progress") { Size = Math.Max(3, numberOfProcessesSnapshot + 2) },
new Layout("log"), new Layout("log")
new Layout("buttons") { Size = 3 } //new Layout("buttons") { Size = 3 }
); );
layout["progress"].Update(BuildProgressPanel(progressSnapshot)); layout["progress"].Update(BuildProgressPanel(progressSnapshot));
layout["log"].Update(BuildLogPanel(logSnapshot)); layout["log"].Update(BuildLogPanel(logSnapshot));
layout["buttons"].Update(BuildButtonsPanel()); //layout["buttons"].Update(BuildButtonsPanel());
return layout; return layout;
} }

View File

@ -9,19 +9,17 @@ namespace splitter;
public class TrackingSplitter : LoggingBase, ISegmentProcessor, IDisposable public class TrackingSplitter : LoggingBase, ISegmentProcessor, IDisposable
{ {
private readonly int _segmentNo;
private readonly int _cropWidth; private readonly int _cropWidth;
private readonly int _cropHeight; private readonly int _cropHeight;
private readonly bool _debugOverlay; private readonly bool _debugOverlay;
private readonly bool _plainText; private readonly bool _plainText;
private readonly IObjectDetector _detector; private readonly IObjectDetector _detector;
private readonly CommandLine _cmd; private readonly SingleJob _cmd;
public TrackingSplitter(int segmentNo, int cropWidth, int cropHeight, bool debugOverlay, bool plainText, IObjectDetector detector, CommandLine cmd, ILogger logger) public TrackingSplitter(int segmentNo, int cropWidth, int cropHeight, bool debugOverlay, bool plainText, IObjectDetector detector, SingleJob cmd, ILogger logger)
: base(logger, segmentNo) : base(logger, segmentNo)
{ {
_segmentNo = segmentNo;
_cropWidth = cropWidth; _cropWidth = cropWidth;
_cropHeight = cropHeight; _cropHeight = cropHeight;
_debugOverlay = debugOverlay; _debugOverlay = debugOverlay;
@ -40,7 +38,10 @@ public class TrackingSplitter : LoggingBase, ISegmentProcessor, IDisposable
{ {
using var capture = new VideoCapture(inputFile); using var capture = new VideoCapture(inputFile);
if (!capture.IsOpened()) if (!capture.IsOpened())
throw new Exception("Cannot open video"); {
LogError($"{Path.GetFileName(inputFile)}: Cannot open video");
return;
}
var name = Path.GetFileNameWithoutExtension(outputFile); var name = Path.GetFileNameWithoutExtension(outputFile);
var skip = TimeSpan.FromSeconds(start); var skip = TimeSpan.FromSeconds(start);
@ -56,7 +57,7 @@ public class TrackingSplitter : LoggingBase, ISegmentProcessor, IDisposable
var originalCropWidth = _cropWidth; var originalCropWidth = _cropWidth;
var originalCropHeight = _cropHeight; var originalCropHeight = _cropHeight;
LogInfo($"[TrackingSplitter] skip={skip}, duration={duration}, fps={fps}, totalFrames={totalFrames}"); LogInfo($"{Path.GetFileName(outputFile)}:: [TrackingSplitter] skip={skip}, duration={duration}, fps={fps}, totalFrames={totalFrames}");
var encWidth = _debugOverlay ? videoWidth : originalCropWidth; var encWidth = _debugOverlay ? videoWidth : originalCropWidth;
var encHeight = _debugOverlay ? videoHeight : originalCropHeight; var encHeight = _debugOverlay ? videoHeight : originalCropHeight;
@ -170,9 +171,9 @@ public class TrackingSplitter : LoggingBase, ISegmentProcessor, IDisposable
ClearProgress(); ClearProgress();
if (ffmpeg.ExitCode != 0) if (ffmpeg.ExitCode != 0)
LogError($"Segment {name} FFmpeg encoding failed"); LogError($"{Path.GetFileName(outputFile)}: Segment {name} FFmpeg encoding failed");
else else
LogInfo($"Segment {name} processing completed"); LogInfo($"{Path.GetFileName(outputFile)}: Segment {name} processing completed");
} }
private (Rect box, Point2f center)? SelectTrackedObject( private (Rect box, Point2f center)? SelectTrackedObject(

View File

@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using System.Diagnostics; using System.Diagnostics;
using System.Globalization; using System.Globalization;
using System.Text; using System.Text;
@ -7,6 +8,17 @@ using splitter;
static class Program static class Program
{ {
private static ILogger _logger = null!; private static ILogger _logger = null!;
private record SingleTask(
SingleJob Job,
string OutputFileName,
int SegmentIndex,
int TotalSegments,
double SegmentStart,
double SegmentLength,
Func<int, ISegmentProcessor> ProcessorFactory
);
static async Task<int> Main(string[] args) static async Task<int> Main(string[] args)
{ {
Task? uiTask = null; Task? uiTask = null;
@ -15,30 +27,44 @@ static class Program
if ( !cmd.IsValid) if ( !cmd.IsValid)
return -1; return -1;
if (cmd.PlainText) CancellationTokenSource? cts = null;
if (cmd.Master.PlainText)
{ {
_logger = new TextLogger(); _logger = new TextLogger();
} }
else else
{ {
Console.SetBufferSize(Console.WindowWidth, Console.BufferHeight);
var logger = new SpectreConsoleLogger var logger = new SpectreConsoleLogger
{ {
Title = "Splitter", Title = "Splitter",
NumberOfProcesses = cmd.SingleThreaded ? 1 : Math.Max(1, Environment.ProcessorCount / 2) NumberOfProcesses = cmd.Master.SingleThreaded ? 1 : Math.Max(1, Environment.ProcessorCount / 2) + 1
}; };
_logger = logger; _logger = logger;
using var cts = new CancellationTokenSource(); cts = new CancellationTokenSource();
uiTask = logger.RunAsync(cts.Token); uiTask = logger.RunAsync(cts.Token);
} }
var success = await ProcessAll(cmd); var allJobs = new List<SingleTask>();
foreach ( var job in cmd.Jobs )
{
var jobs = await GenerateJobs(cmd, job);
allJobs.AddRange(jobs);
}
if ( allJobs.Count == 0)
{
if ( !cmd.Master.EstimateOnly)
LogWarn("No valid jobs to process.");
return 0;
}
var success = await ProcessJobs(cmd, allJobs);
if (uiTask != null) if (uiTask != null)
{ {
if ( cts != null )
await cts.CancelAsync();
await uiTask; await uiTask;
} }
if (_logger is IDisposable disposable) if (_logger is IDisposable disposable)
@ -47,34 +73,35 @@ static class Program
return success ? 1 : 0; return success ? 1 : 0;
} }
private static async Task<bool> ProcessAll(CommandLine cmd) private static async Task<List<SingleTask>> GenerateJobs(CommandLine cmd, SingleJob job)
{ {
if (!File.Exists(cmd.InputFile)) var baseName = Path.GetFileNameWithoutExtension(job.InputFile);
if (!File.Exists(job.InputFile))
{ {
LogError("Input file not found."); LogError($"{baseName}: Input file not found.");
return false; return [];
} }
if (!Directory.Exists(cmd.OutputFolder)) if (!Directory.Exists(job.OutputFolder))
Directory.CreateDirectory(cmd.OutputFolder); Directory.CreateDirectory(job.OutputFolder);
var baseName = Path.GetFileNameWithoutExtension(cmd.InputFile); job.Mask ??= $"{baseName}_seg%03d.mp4";
var outputMask = cmd.Mask ?? $"{baseName}_Seg%03d.mp4"; LogInfo($"{baseName}: Reading duration via ffprobe...");
LogInfo("Reading duration via ffprobe...");
var duration = GetDuration(cmd.InputFile); var duration = GetDuration(job.InputFile);
if (duration <= 0) if (duration <= 0)
{ {
LogError("Could not read duration."); LogError($"{baseName}: Could not read duration.");
return false; return [];
} }
var target = cmd.OverrideTargetDuration ?? 58.0; var target = job.OverrideTargetDuration ?? 58.0;
int segments; int segments;
double segmentLength; double segmentLength;
if (cmd.ForceFixed) if (job.ForceFixed)
{ {
// Fixed chunk size, last one may be shorter // Fixed chunk size, last one may be shorter
segments = (int)Math.Ceiling(duration / target); segments = (int)Math.Ceiling(duration / target);
@ -87,63 +114,82 @@ static class Program
segmentLength = duration / segments; segmentLength = duration / segments;
} }
if (cmd.EstimateOnly) if (cmd.Master.EstimateOnly)
{ {
LogInfo("=== ESTIMATE MODE ==="); LogInfo("=== ESTIMATE MODE ===");
LogInfo($"Total duration: {duration:F2}s"); LogInfo($"{baseName}: Total duration: {duration:F2}s");
LogInfo($"Target duration: {target:F2}s"); LogInfo($"{baseName}: Target duration: {target:F2}s");
LogInfo($"Segments: {segments}"); LogInfo($"{baseName}: Segments: {segments}");
LogInfo(cmd.ForceFixed LogInfo(job.ForceFixed
? $"Fixed segment length: {segmentLength:F2}s (last may be shorter)" ? $"{baseName}: Fixed segment length: {segmentLength:F2}s (last may be shorter)"
: $"Equalized segment length: {segmentLength:F2}s"); : $"{baseName}: Equalized segment length: {segmentLength:F2}s");
return false; return [];
} }
LogInfo($"Duration: {duration:F2}s"); LogInfo($"{baseName}: Duration: {duration:F2}s");
LogInfo($"Segments: {segments}"); LogInfo($"{baseName}: Segments: {segments}");
LogInfo($"Equal segment length: {segmentLength:F3}s"); LogInfo($"{baseName}: Equal segment length: {segmentLength:F3}s");
Func<int, ISegmentProcessor> processorFactory; Func<int, ISegmentProcessor> processorFactory;
if (cmd.Crop != null) if (job.Crop != null)
{ {
processorFactory = i => processorFactory = i =>
{ {
IObjectDetector detector = cmd.Detect switch IObjectDetector detector = job.Detect switch
{ {
"face" => new UltraFaceDetector(_logger), "face" => new UltraFaceDetector(_logger),
"body" => new YoloOnnxObjectDetector(_logger), "body" => new YoloOnnxObjectDetector(_logger),
_ => throw new InvalidOperationException($"Unknown detector: {cmd.Detect}") _ => throw new InvalidOperationException($"Unknown detector: {job.Detect}")
}; };
return new TrackingSplitter(i, cmd.Crop.Value.width, cmd.Crop.Value.height, cmd.Debug, cmd.PlainText, detector, cmd, _logger); return new TrackingSplitter(i, job.Crop.Value.width, job.Crop.Value.height, cmd.Master.Debug, cmd.Master.PlainText, detector, job, _logger);
}; };
} }
else else
{ {
processorFactory = i => new SimpleSplitter(i, _logger); processorFactory = i => new SimpleSplitter(i, _logger);
} }
if (cmd.SingleThreaded)
var jobs = Enumerable.Range(0, segments)
.Select(i => new SingleTask
(
Job : job,
OutputFileName : BuildOutputFileName(job.OutputFolder, job.Mask, i),
SegmentIndex : i,
TotalSegments : segments,
SegmentStart : i * segmentLength,
SegmentLength : (i == segments - 1)
? Math.Max(0.1, duration - i * segmentLength)
: segmentLength,
ProcessorFactory : processorFactory
)
)
.ToList();
return jobs;
}
private static async Task<bool> ProcessJobs(CommandLine cmd, List<SingleTask> tasks)
{
if (cmd.Master.SingleThreaded)
{ {
LogInfo("Starting single-threaded splitting..."); LogInfo("Starting single-threaded splitting...");
await RunSingleThreaded(processorFactory, cmd.InputFile, cmd.OutputFolder, outputMask, duration, segments, segmentLength, cmd.Passthrough); await RunSingleThreaded(tasks);
} }
else else
{ {
LogInfo("Starting multi-threaded splitting..."); LogInfo("Starting multi-threaded splitting...");
await RunMultiThreaded(processorFactory, cmd.InputFile, cmd.OutputFolder, outputMask, duration, segments, segmentLength, cmd.Passthrough); await RunMultiThreaded(tasks);
} }
LogInfo("Done."); LogInfo("Done.");
return true; return true;
} }
private static void LogInfo(string message) private static void LogInfo(string message) => _logger.LogInfo(message);
=> _logger.LogInfo(message); private static void LogWarn(string message) => _logger.LogWarn(message);
private static void LogError(string message) => _logger.LogError(message);
private static void LogWarn(string message) private static void LogProgress(double progress, TimeSpan eta, double speed) => _logger.DrawProgress("Total", 0, progress, eta, speed);
=> _logger.LogWarn(message);
private static void LogError(string message)
=> _logger.LogError(message);
// ----------------------------- // -----------------------------
// ffprobe // ffprobe
@ -175,43 +221,59 @@ static class Program
// Multi-threaded splitting // Multi-threaded splitting
// ----------------------------- // -----------------------------
static async Task RunMultiThreaded( static async Task RunMultiThreaded(List<SingleTask> jobs)
Func<int, ISegmentProcessor> processorFactory,
string inputFile,
string outputFolder,
string mask,
double totalDuration,
int segments,
double segmentLength,
string[] passthrough)
{ {
var jobs = Enumerable.Range(0, segments) LogProgress(0.0, TimeSpan.Zero, 0.0);
.Select(i => new
{
Index = i,
Start = i * segmentLength,
Length = (i == segments - 1)
? Math.Max(0.1, totalDuration - i * segmentLength)
: segmentLength
})
.ToList();
var maxDegree = Math.Max(1, Environment.ProcessorCount / 2); var maxDegree = Math.Max(1, Environment.ProcessorCount / 2);
using var sem = new SemaphoreSlim(maxDegree); using var sem = new SemaphoreSlim(maxDegree);
var tasks = new List<Task>(); var tasks = new List<Task>();
// Slot pool: 0..maxDegree-1
var freeSlots = new ConcurrentQueue<int>(Enumerable.Range(0, maxDegree));
var totalSegments = jobs.Count;
var processedSegments = 0;
var totalDuration = jobs.Sum(j => j.SegmentLength);
var sw = Stopwatch.StartNew();
foreach (var job in jobs) foreach (var job in jobs)
{ {
await sem.WaitAsync(); await sem.WaitAsync();
tasks.Add(Task.Run(async () => tasks.Add(Task.Run(async () =>
{ {
int slot = -1;
try try
{ {
await ProcessSegment(processorFactory, inputFile, outputFolder, mask, passthrough, job.Index, job.Start, job.Length); // Acquire a slot ID
while (!freeSlots.TryDequeue(out slot))
await Task.Yield();
await ProcessSegment(
job.ProcessorFactory,
job.Job.InputFile,
job.OutputFileName,
job.Job.Passthrough,
slot + 1, // <-- slot instead of SegmentIndex (+1 for totals)
job.SegmentStart,
job.SegmentLength
);
var processed = Interlocked.Increment(ref processedSegments);
var elapsed = sw.Elapsed;
var eta = TimeSpan.FromTicks(elapsed.Ticks * (totalSegments - processed) / processed);
var speed = (processed * totalDuration) / elapsed.TotalSeconds;
LogProgress((double)processed / totalSegments, eta, speed);
} }
finally finally
{ {
// Return slot to pool
if (slot >= 0)
freeSlots.Enqueue(slot);
sem.Release(); sem.Release();
} }
})); }));
@ -220,41 +282,34 @@ static class Program
await Task.WhenAll(tasks); await Task.WhenAll(tasks);
} }
static async Task RunSingleThreaded(
Func<int, ISegmentProcessor> processorFactory,
string inputFile,
string outputFolder,
string mask,
double totalDuration,
int segments,
double segmentLength,
string[] passthrough)
{
var jobs = Enumerable.Range(0, segments)
.Select(i => new
{
Index = i,
Start = i * segmentLength,
Length = (i == segments - 1)
? Math.Max(0.1, totalDuration - i * segmentLength)
: segmentLength
})
.ToList();
// -----------------------------
// Single-threaded splitting
// -----------------------------
static async Task RunSingleThreaded(List<SingleTask> jobs)
{
foreach (var job in jobs) foreach (var job in jobs)
{ {
await ProcessSegment(processorFactory, inputFile, outputFolder, mask, passthrough, job.Index, job.Start, job.Length); await ProcessSegment(
job.ProcessorFactory,
job.Job.InputFile,
job.OutputFileName,
job.Job.Passthrough,
job.SegmentIndex,
job.SegmentStart,
job.SegmentLength
);
} }
} }
private static async Task ProcessSegment(Func<int, ISegmentProcessor> processorFactory, string inputFile, string outputFolder, string mask, string[] passthrough, int index, double start, double length) private static async Task ProcessSegment(Func<int, ISegmentProcessor> processorFactory, string inputFile, string outputFileName, string[] passthrough, int index, double start, double length)
{ {
var outputFile = BuildOutputFileName(outputFolder, mask, index);
var processor = processorFactory(index); var processor = processorFactory(index);
try try
{ {
await processor.ProcessSegment(inputFile, outputFile, start, length, passthrough); await processor.ProcessSegment(inputFile, outputFileName, start, length, passthrough);
} }
finally finally
{ {
@ -271,6 +326,10 @@ static class Program
{ {
fileName = string.Format(mask.Replace("%03d", "{0:000}"), index); fileName = string.Format(mask.Replace("%03d", "{0:000}"), index);
} }
else if (mask.Contains("%02d"))
{
fileName = string.Format(mask.Replace("%02d", "{0:00}"), index);
}
else if (mask.Contains("%d")) else if (mask.Contains("%d"))
{ {
fileName = string.Format(mask.Replace("%d", "{0}"), index); fileName = string.Format(mask.Replace("%d", "{0}"), index);