DEEP-20 Prometheus connection/query manager added

This commit is contained in:
Andrey Shabarshov 2023-08-07 22:14:07 +01:00
parent bfb3de9331
commit b36552c9a1
5 changed files with 247 additions and 50 deletions

View File

@ -9,6 +9,7 @@
@inject ITrainedModelStorageService TrainedModelService
@inject ILogger<ModelCard> Logger
@inject IMLProcessorFactory MlProcessorFactory
@inject IPrometheusWatcher Watcher
<style>
.card {
@ -112,60 +113,15 @@
return;
}
// use automatic step value to always request 500 elements
var seconds = (endDate - startDate).TotalSeconds / 500.0;
if (seconds < 1.0)
seconds = 1.0;
var step = TimeSpan.FromSeconds(seconds);
var data = Watcher.GetData(_modelDefinition!.DataSource.Queries);
var tasks = _modelDefinition!.DataSource.Queries
.Select(x => Prometheus.RangeQuery(x.Query, startDate, endDate, step, TimeSpan.FromSeconds(2)))
.ToArray();
try
if (data == null)
{
await Task.WhenAll(tasks);
}
catch (Exception e)
{
await ShowError(e.Message);
_prediction = new Prediction { PredictedLabel = "Gathering..." };
return;
}
var data = new List<TimeSeriesDataSet>();
foreach (var (res, def) in tasks.Select((x, i) => (x.Result, _modelDefinition.DataSource.Queries[i])))
{
if (res.Status != StatusType.Success)
{
Logger.LogError(res.Error ?? "Error");
return;
}
if (res.ResultType != ResultTypeType.Matrix)
{
Logger.LogError($"Got {res.ResultType}, but Matrix expected for {def.Query}");
return;
}
var m = res.AsMatrix().Result;
if (m == null || m.Length != 1)
{
Logger.LogError($"No data returned for {def.Query}");
return;
}
data.Add(
new()
{
Name = def.Query,
Color = def.Color,
Data = m[0].Values!.ToList()
}
);
}
_prediction = await _mlProcessor!.Predict(Model, _modelDefinition, data);
_prediction = await _mlProcessor!.Predict(Model, _modelDefinition, data.Data);
_updated = DateTime.Now;
await InvokeAsync(StateHasChanged);
}

View File

@ -11,5 +11,5 @@ public class TimeSeriesDataSet
{
public string Name { get; init; } = "Value";
public string Color { get; init; } = "";
public IReadOnlyCollection<TimeSeries> Data { get; init; } = Array.Empty<TimeSeries>();
public List<TimeSeries> Data { get; init; } = new List<TimeSeries>();
}

View File

@ -5,6 +5,7 @@ using DeepTrace.Services;
using DeepTrace.ML;
using ApexCharts;
using log4net;
using Microsoft.Extensions.DependencyInjection;
var builder = WebApplication.CreateBuilder(args);
@ -30,6 +31,8 @@ builder.Services
.AddSingleton<ITrainedModelStorageService, TrainedModelStorageService>()
.AddSingleton<IEstimatorBuilder, EstimatorBuilder>()
.AddSingleton<IMLProcessorFactory, MLProcessorFactory>()
.AddHostedService<PrometheusWatcher>()
.AddSingleton<IPrometheusWatcher>(_ => PrometheusWatcher.Instance)
;
var app = builder.Build();

View File

@ -0,0 +1,23 @@
using DeepTrace.Data;
using PrometheusAPI;
namespace DeepTrace.Services;
public class CoherentDataSet
{
public DateTime StartTimeUtc { get; set; } = DateTime.MinValue;
public DateTime EndTimeUtc { get; set; } = DateTime.MinValue;
/// <summary>
/// Map: query text to position within Data.
/// </summary>
public Dictionary<string, int> QueryPosition { get; set; } = new();
public List<TimeSeriesDataSet> Data { get; } = new();
}
public interface IPrometheusWatcher
{
/// <summary>
/// Order of results within Data is the same as in source queries.
/// </summary>
CoherentDataSet? GetData(List<DataSourceQuery> queries);
}

View File

@ -0,0 +1,215 @@
using DeepTrace.Data;
using PrometheusAPI;
namespace DeepTrace.Services;
public class PrometheusWatcher : IPrometheusWatcher, IHostedService
{
private readonly ILogger _logger;
private readonly PrometheusClient _prometheus;
private CoherentDataSet? _currentSnapshot;
private readonly object _lock = new();
private bool _stopLoop = false;
private readonly TimeSpan _timeout = TimeSpan.FromSeconds(5);
private readonly TimeSpan _inactiveQueryTimeout = TimeSpan.FromSeconds(20);
private readonly Dictionary<string, (DateTime LastAccessUtc, DataSourceQuery Query)> _activeQueries = new();
private static PrometheusWatcher? _instance;
public static PrometheusWatcher Instance => _instance!;
public PrometheusWatcher(ILogger<PrometheusWatcher> logger, PrometheusClient prometheus)
{
_logger = logger;
_prometheus = prometheus;
_instance = this;
}
public CoherentDataSet? GetData(List<DataSourceQuery> queries)
{
CoherentDataSet? currentSnapshot = null;
lock (_lock)
{
foreach (var query in queries)
{
if (!_activeQueries.ContainsKey(query.Query))
_logger.LogInformation($"Monitoring added for query: {query.Query}");
_activeQueries[query.Query] = (LastAccessUtc: DateTime.UtcNow, Query: query);
}
currentSnapshot = _currentSnapshot;
}
if (currentSnapshot == null)
return null;
var res = new CoherentDataSet
{
StartTimeUtc = currentSnapshot.StartTimeUtc,
EndTimeUtc = currentSnapshot.EndTimeUtc,
QueryPosition = queries.Select((x, i) => (x.Query, i)).ToDictionary(x => x.Query, x => x.i)
};
foreach (var query in queries)
{
if (!currentSnapshot.QueryPosition.TryGetValue(query.Query, out var pos)
|| pos < 0
|| pos > currentSnapshot.Data.Count
)
{
return null;
}
res.Data.Add(currentSnapshot.Data[pos]);
}
return res;
}
public Task StartAsync(CancellationToken cancellationToken)
{
Task.Run(UpdateLoop, cancellationToken);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
lock (_lock)
{
_stopLoop = true;
}
return Task.CompletedTask;
}
private async Task UpdateLoop()
{
while(!_stopLoop)
{
var iterationStart = DateTime.UtcNow;
RemoveInactiveQueries();
await UpdateIteration();
// we really need to try gathering our results with the same time interval if possible
var timeout = DateTime.UtcNow - iterationStart + _timeout;
if (timeout.TotalMilliseconds < 0)
timeout = _timeout;
await Task.Delay(timeout);
}
}
private void RemoveInactiveQueries()
{
lock(_lock)
{
var toRemove = new List<string>();
var expired = DateTime.UtcNow - _inactiveQueryTimeout;
foreach( var (_,(lastAccess,query)) in _activeQueries )
{
if ( lastAccess < expired )
{
toRemove.Add(query.Query);
}
}
foreach (var query in toRemove)
{
_activeQueries.Remove(query);
_logger.LogInformation($"Monitoring removed for query: {query}");
}
}
}
private async Task UpdateIteration()
{
DateTime endDate = DateTime.UtcNow;
DateTime startDate = endDate - _timeout;
// use automatic step value to always request 500 elements
var seconds = (endDate - startDate).TotalSeconds / 500.0;
if (seconds < 1.0)
seconds = 1.0;
var step = TimeSpan.FromSeconds(seconds);
var tasks = _activeQueries
.Select(x =>
(
x.Value.Query.Query,
Task: _prometheus.RangeQuery(
x.Value.Query.Query,
startDate,
endDate,
step,
TimeSpan.FromSeconds(2)
)
)
)
.ToDictionary(x => x.Task, x => x.Query );
var newSet = new CoherentDataSet
{
StartTimeUtc = startDate,
EndTimeUtc = endDate
};
while (tasks.Any())
{
Task<InstantQueryResponse>? task = null;
try
{
task = await Task.WhenAny(tasks.Select(x => x.Key));
if ( task != null)
{
var res = task.Result;
var query = tasks[task];
if (res?.Status != StatusType.Success)
{
_logger.LogError(res?.Error ?? "Error");
}
else if (res.ResultType != ResultTypeType.Matrix)
{
_logger.LogError($"Got {res.ResultType}, but Matrix expected for {query}");
}
else
{
var m = res.AsMatrix().Result;
if (m == null || m.Length != 1)
{
_logger.LogWarning($"No data returned for {query}");
}
else
{
newSet.QueryPosition[query] = newSet.Data.Count;
var timeSeries = new TimeSeriesDataSet
{
Name = query
};
timeSeries.Data.AddRange(m[0].Values!.ToList());
newSet.Data.Add(timeSeries);
}
}
}
}
catch (Exception e)
{
_logger.LogError(e.Message);
}
if ( task != null )
{
tasks.Remove(task);
}
}
lock (_lock)
{
_currentSnapshot = newSet;
}
}
}