/* * dbMango * * Copyright 2025 Deutsche Bank AG * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using log4net; using MongoDB.Bson; using MongoDB.Bson.Serialization.Attributes; using MongoDB.Driver; using Rms.Risk.Mango.Pivot.Core.MongoDb; using System.Collections.Concurrent; using System.Diagnostics; using System.Globalization; using System.Reflection; using System.Security.Cryptography; using System.Text; using System.Text.RegularExpressions; namespace Rms.Risk.Mango.Pivot.Core.Models; /// /// Implementation of IPivotTableDataSource that goes direct to the mongo database /// public class MongoDbDataSource : IPivotTableDataSource, IPivotTableDataSourceMetaProvider { private static readonly ILog _log = LogManager.GetLogger(MethodBase.GetCurrentMethod()!.DeclaringType!); // ReSharper disable InconsistentNaming public static int PivotCacheTTLHours = 1; public static TimeSpan MetaCacheTTL = TimeSpan.FromMinutes(30); public static int PivotMaxReturnedRows = 500_000; public static string SourcePrefix = "Forge"; public static string CacheCollectionName = "dbMango-Pivot-Cache"; // ReSharper restore InconsistentNaming private const int MaxCachedRows = 25000; private readonly MongoDbConfigRecord _config; private readonly MongoDbSettings _settings; private readonly string _databaseInstance; private IMongoDatabase? _database; private readonly Lock _syncObject = new(); private readonly List _allMeta = []; private DateTime _allMetaValidUntil = DateTime.MinValue; public string User { get; set; } = ""; readonly SemaphoreSlim _threadLock = new(1); internal IMongoDatabase Database { get { if (_database != null) return _database; lock (_syncObject) { Thread.MemoryBarrier(); if (_database != null) return _database; return _database = MongoDbHelper.GetDatabase(_config, _settings, _databaseInstance); } } } public string SourceId => _config!.GetKey(_databaseInstance); public string Prefix => SourcePrefix; public override string ToString() => SourceId; private readonly ConcurrentDictionary _fieldMap = new(); public MongoDbDataSource(MongoDbConfigRecord config, MongoDbSettings settings, string? databaseInstance) { _config = config; _settings = settings; _databaseInstance = databaseInstance ?? config.MongoDbDatabase; _config.Check(); } private async Task InitAsync(string collectionName, bool skipCache, CancellationToken token = default ) { //multiple callers call this method, so ensure only 1 at a time passes through this logic await _threadLock.WaitAsync(token); try { // Force test connection, and reset _database if disconnected if (!MongoDbHelper.IsConnected(_database, true)) _database = null; if (_fieldMap.ContainsKey(collectionName) && !skipCache) return; var sw = Stopwatch.StartNew(); _log.Debug($"Loading field map for Collection=\"{collectionName}\" MongoDb=\"{_config?.MongoDbDatabase}\" URL=\"{_config?.MongoDbUrl}\""); if (await LoadCachedFieldMappingAsync(collectionName, skipCache, token)) { sw.Stop(); _log.Info($"Loaded cached field map for Collection=\"{collectionName}\" FieldDefs={_fieldMap[collectionName].Count} CalcFields={_fieldMap[collectionName].CalculatedFields.Count()} Lookups={_fieldMap[collectionName].Data.Lookups.Count} MongoDb=\"{_config?.MongoDbDatabase}\" URL=\"{_config?.MongoDbUrl}\"" + $"\tElapsed=\"{sw.Elapsed}\"" ); return; } //no cached field mappings found, so create it and store it in the db var swUpdateFieldMappings = Stopwatch.StartNew(); await UpdateFieldMappings(collectionName, skipCache, token); swUpdateFieldMappings.Stop(); var swUpdateLookups = Stopwatch.StartNew(); var lookups = await UpdateLookups(collectionName, token); swUpdateLookups.Stop(); var swUpdateCalculatedFields = Stopwatch.StartNew(); await UpdateCalculatedFields(collectionName, token); swUpdateCalculatedFields.Stop(); await CacheFieldMappingAsync(collectionName, token); sw.Stop(); _log.Info($"Created field map for Collection=\"{collectionName}\", FieldDefs={_fieldMap[collectionName].Count}, CalcFields={_fieldMap[collectionName].CalculatedFields.Count()}, Lookups={lookups}, MongoDb=\"{_config?.MongoDbDatabase}\", URL=\"{_config?.MongoDbUrl}\",\n" + $"ElapsedMs=\"{sw.Elapsed.TotalMilliseconds}\",\n" + $"UpdateFieldMappingsMs=\"{swUpdateFieldMappings.Elapsed.TotalMilliseconds}\",\n" + $"UpdateLookupsMs=\"{swUpdateLookups.Elapsed.TotalMilliseconds}\",\n" + $"UpdateCalculatedFieldsMs=\"{swUpdateCalculatedFields.Elapsed.TotalMilliseconds}\"\n" ); } finally { _threadLock.Release(); } } public async Task> GetAllMeta(bool force = false, CancellationToken token = default) { if ( force ) { _allMeta.Clear(); PivotMetaCache.Clear(); } if (DateTime.Now > _allMetaValidUntil ) _allMeta.Clear(); if ( _allMeta.Count > 0 ) return _allMeta; await this.PreloadCollections(_allMeta, User, token); _allMetaValidUntil = DateTime.Now + MetaCacheTTL; return _allMeta; } private async Task LoadCachedFieldMappingAsync(string collectionName, bool skipCache, CancellationToken token = default ) { if ( skipCache ) return false; var coll = GetCollectionWithRetries(collectionName+"-Meta"); var cursor = await coll.FindAsync("{ _id : \"CachedFieldMapping\"}", cancellationToken: token); while ( await cursor.MoveNextAsync(token) ) { var doc = cursor.Current.FirstOrDefault(); if ( doc == null ) return false; if ( doc.CachedAt.Date != DateTime.UtcNow.Date ) // cache is too old return false; doc.PostLoad(); _fieldMap[collectionName] = new( doc.UseMapping ) { Data = doc }; return true; } return false; } public async Task CacheFieldMappingAsync(string collectionName, CancellationToken token = default) { var doc = _fieldMap[collectionName].Data.Clone(); doc.PreSave(); var coll = GetCollectionWithRetries(collectionName + "-Meta"); await coll.ReplaceOneAsync( "{_id : \"CachedFieldMapping\"}", doc, new ReplaceOptions {IsUpsert = true}, token); } private async Task UpdateLookups(string collectionName, CancellationToken token = default) { var coll = GetCollectionWithRetries($"{collectionName }-Meta"); var doc = (await coll.FindAsync(new BsonDocument { { "_id", "Lookups" } }, cancellationToken: token)).FirstOrDefault(); if (doc == null) return 0; var count = 0; foreach (var elem in doc.Elements.Where(x => !x.Name.StartsWith("_")).Select(x => x.Name)) { try { var stages = doc[elem] .ToJson() .Replace( "@", "$" ) .Replace( "#", "." ) .TrimStart(' ', '\t', '\r', '\n', '[') .TrimEnd( ' ', '\t', '\r', '\n', ']') ; _fieldMap[collectionName].AddLookup( elem, stages ); count += 1; } catch { // ignore } } return count; } private async Task UpdateCalculatedFields(string collectionName, CancellationToken token = default) { var coll = GetCollectionWithRetries($"{collectionName}-Meta"); var doc = (await coll.FindAsync(new BsonDocument { { "_id", "CalculatedFields" } }, cancellationToken: token)).FirstOrDefault(); if (doc == null) return; foreach (var elem in doc.Elements.Where(x => !x.Name.StartsWith("_")).Select(x => x.Name)) { try { var e = doc[elem].AsBsonDocument; var formula = e.GetValue("Formula", null)?.ToJson() .Replace("@", "$") ; if (string.IsNullOrWhiteSpace(formula)) continue; var drill = e.GetValue("DrillDown", null)?.ToJson() .Replace("@", "$") .Replace("#", ".") ?? "" ; var lookupDef = e.GetValue("LookupDef", null)?.AsBsonArray ; var aggregationOperator = e.GetValue("AggregationOperator", null)?.AsString ?? "$sum"; _fieldMap[collectionName].AddCalculatedField(elem, formula, drill, lookupDef?.Values.Select( x => x.AsString ).ToArray(), aggregationOperator); } catch { // ignore } } } private async Task UpdateFieldMappings(string collectionName, bool skipCache, CancellationToken token = default ) { var coll = GetCollectionWithRetries($"{collectionName}-Meta"); var doc = (await coll.FindAsync(new BsonDocument { { "_id", "FieldsMap" } }, cancellationToken: token)).FirstOrDefault(); if (doc == null) { _fieldMap[collectionName] = new(false); return; } var useIds = doc.Contains("_UseIds") ? doc["_UseIds"] : null; var res = new FieldMapping(useIds?.AsBoolean ?? false); foreach (var elem in doc.Elements.Where(x => !x.Name.StartsWith("_")).Select(x => x.Name)) { var d = doc[elem].AsBsonDocument; res[elem] = new() { Id = d.Names.Any(x => x == "Id") ? d["Id"].ToInt32() : 0, Type = ReinterpretType( d["Type"].ToString()! ), Purpose = Convert(d["Purpose"].ToString()!) }; } _fieldMap[collectionName] = res; // add fields missing in the mapping but present in the data set await UpdateMissingFieldMappings(collectionName, skipCache, token); } private static PivotFieldPurpose Convert( string s ) { switch (s.ToLower()) { case "key": return PivotFieldPurpose.Key; case "primary key 1": return PivotFieldPurpose.PrimaryKey1; case "primary key 2": return PivotFieldPurpose.PrimaryKey2; case "info": return PivotFieldPurpose.Info; case "hidden": return PivotFieldPurpose.Hidden; default: return PivotFieldPurpose.Data; } } private async Task UpdateMissingFieldMappings(string collectionName, bool skipCache, CancellationToken token = default ) { if ( _fieldMap[collectionName].UseMapping ) // can't pick up missing fields return; // collection does not use field ids. pick up missing fields var coll = GetCollectionWithRetries(collectionName); var cobs = await GetCobDatesAsync(collectionName, skipCache, token); var filter = ""; if ( cobs.Length > 0 ) filter = $"{{ $match : {{ \"COB\" : ISODate(\"{cobs[^1]}\") }} }}, "; var json = "["+filter+"{ $sample: { size: 16 } }]"; var pipeline = MongoDB.Bson.Serialization.BsonSerializer.Deserialize(json).Select(p => p.AsBsonDocument).ToList(); try { var options = new AggregateOptions { BypassDocumentValidation = true, //AllowDiskUse = true }; using var cursor = await coll.AggregateAsync( pipeline, options, token); while ( await cursor.MoveNextAsync(token) ) { var batch = cursor.Current; foreach ( var doc in batch ) { UpdateMissingFieldMappings(collectionName, doc ); } } } catch ( Exception ) { // ignore } } public void UpdateMissingFieldMappings( string collectionName, BsonDocument doc, string prefix = "") { foreach ( var element in doc.Elements ) { if ( element.Name == "_id" ) continue; var name = prefix == "" ? element.Name : $"{prefix}.{element.Name}" ; if ( !_fieldMap.ContainsKey(collectionName)) _fieldMap[collectionName] = new(false); if (_fieldMap[collectionName].ContainsKey(name) ) continue; var shouldIgnore = _fieldMap[collectionName].Fields .Any( x => x.Value.Purpose == PivotFieldPurpose.Hidden && name.IndexOf( x.Key + ".", StringComparison.Ordinal ) == 0 ); if ( shouldIgnore ) continue; var val = element.Value; Type t; switch ( val.BsonType ) { case BsonType.Double : t = typeof(double); break; case BsonType.String : t = typeof(string); break; case BsonType.Document : UpdateMissingFieldMappings(collectionName, val.AsBsonDocument, name ); continue; case BsonType.Boolean : t = typeof(bool); break; case BsonType.DateTime : t = typeof(DateTime); break; case BsonType.Null : continue; // this can be an object case BsonType.Symbol : t = typeof(string); break; case BsonType.Int32 : t = typeof(int); break; case BsonType.Timestamp : t = typeof(DateTime); break; case BsonType.Int64 : t = typeof(long); break; default : continue; } UpdateMappingIfFieldIsMissing( collectionName, name, t); } } public void UpdateMappingIfFieldIsMissing( string collectionName, string name, Type t ) { if ( !_fieldMap.ContainsKey(collectionName)) _fieldMap[collectionName] = new(false); if (_fieldMap[collectionName].ContainsKey(name) ) return; var shouldIgnore = _fieldMap[collectionName].Fields .Any( x => x.Value.Purpose == PivotFieldPurpose.Hidden && name.IndexOf( x.Key + ".", StringComparison.Ordinal ) == 0 ); if (shouldIgnore) return; var isNumber = t == typeof(double) || t == typeof(decimal) || t == typeof(int) || t == typeof(long) ; _fieldMap[collectionName][name] = new() { Id = 0, Purpose = isNumber ? PivotFieldPurpose.Data : PivotFieldPurpose.Info, Type = t }; } private static Type ReinterpretType( string type ) { var t = Type.GetType( type ); if ( t != null ) return t; switch ( type ) { case "double": return typeof(double); case "string": return typeof(string); case "int32": return typeof(int); case "int64": return typeof(long); case "decimal": return typeof(decimal); case "date": return typeof(DateTime); default: return typeof(double); } } /// /// CollectionType.All: If includeMeta is CollectionType.All, it simply returns all collection names as an array of strings. /// The OfType_string_() ensures that only strings are included (though they should all be strings already). /// /// CollectionType.NoMeta: If includeMeta is CollectionType.NoMeta, it filters the collection names to include only those /// that do not have a corresponding metadata collection (i.e., a collection with the same name plus "-Meta"). /// /// CollectionType.MetaOnly: If includeMeta is CollectionType.MetaOnly, it filters the collection names to include only those /// that do have a corresponding metadata collection. /// public async Task GetCollectionsAsync(CollectionType includeMeta = CollectionType.All, CancellationToken token = default) { var cursor = await Database.ListCollectionsAsync(cancellationToken: token); var list = await cursor.ToListAsync(cancellationToken: token); var coll = list.Select( x => x["name"].ToString() ).OfType().ToHashSet(StringComparer.OrdinalIgnoreCase); var res = includeMeta switch { CollectionType.All => coll.OrderBy(x => x).ToArray(), CollectionType.NoMeta => coll.Where(x => IsNotMeta(x) && !HaveMeta(coll,x)).OrderBy(x => x).ToArray(), CollectionType.HaveMeta => coll.Where(x => IsNotMeta(x) && HaveMeta(coll,x)).OrderBy(x => x).ToArray(), _ => throw new ArgumentOutOfRangeException(nameof(includeMeta), includeMeta, null) }; return res; static bool IsNotMeta(string? name) => !string.IsNullOrWhiteSpace(name) && !name.EndsWith("-Meta", StringComparison.OrdinalIgnoreCase); static bool HaveMeta(HashSet allCollections, string? collectionName) => allCollections.Contains(collectionName + "-Meta"); } public async Task GetKeyFieldsAsync(string collectionName, CancellationToken token = default) { var fields = await GetFieldMapping(collectionName, token); if ( fields?.Count > 0 ) return fields.Fields .Where( x => !IsData( x.Value ) ) .OrderBy( x => x.Value.Id ) .Select( x => x.Key ) .ToArray() ; return new[] { "Department", "Location", "Book", "Currency", "TradeType", "SystemId", "Ver", "Expiry Date", "COB" } .OrderBy( x => x ) .ToArray(); } public FieldMapping GetFieldMapping(string collectionName) => !_fieldMap.TryGetValue(collectionName, out var fields) ? new(false) : fields; public async Task GetFieldMapping(string collectionName, CancellationToken token) { if ( !_fieldMap.TryGetValue(collectionName, out var fields)) { await InitAsync(collectionName, false, token); _fieldMap.TryGetValue(collectionName, out fields); } return fields ?? throw new ApplicationException($"Mapping for Collection=\"{collectionName}\" is not found"); } private static bool IsData( SingleFieldMapping x ) { if ( x.Purpose == PivotFieldPurpose.Key || x.Purpose == PivotFieldPurpose.PrimaryKey1 || x.Purpose == PivotFieldPurpose.PrimaryKey2 || x.Purpose == PivotFieldPurpose.Info ) { return false; } return x.Type == typeof(double) || x.Type == typeof(decimal) || x.Type == typeof(int) || x.Type == typeof(long) ; } public async Task GetDrilldownKeyFieldsAsync(string collectionName,PivotFieldPurpose keyLevel, CancellationToken token = default) { var fields = await GetFieldMapping(collectionName, token); if ( fields.Count > 0 ) return fields.Fields .Where( x => x.Value.Purpose == keyLevel ) .OrderBy( x => x.Value.Id ) .Select( x => x.Key ) .ToArray(); return new[] { "SystemId", "Ver" } .OrderBy( x => x ) .ToArray(); } public async Task GetDrilldownAsync(string collectionName, string name, string nullValue = "\"\"", bool equals = false, CancellationToken token = default) { var fields = await GetFieldMapping(collectionName, token); return fields.GetDrilldown(name, nullValue, equals ); } public async Task GetDataFieldsAsync(string collectionName, CancellationToken token = default) { var fields = await GetFieldMapping(collectionName, token); if ( fields == null ) return []; var fieldNames = fields.FieldNames; var calculatedFields = fields.CalculatedFields; var all = fieldNames.Concat(calculatedFields).ToList(); all.Sort(); var keys = new HashSet( await GetKeyFieldsAsync(collectionName, token) ); return all.Where( x => x != "_id" && x != "id" && !keys.Contains( x ) ).ToArray(); } private async Task InternalGetCobDatesAsync(string collectionName, CancellationToken token = default) { var coll = GetCollectionWithRetries(collectionName); var emptyFilter = new FilterDefinitionBuilder().Empty; // using DateTime? in case we have a document without COB // in this case Null can't be converted to DateTime and MongoDB driver throws an exception var fields = await GetFieldMapping(collectionName, token); var docs = (await coll.DistinctAsync( fields.MapField( "COB" ), emptyFilter, cancellationToken: token)).ToList(); var cobs = docs.Where(x => x != null).Select( x => x!.Value.ToString( "yyyy-MM-dd" ) ).ToArray(); return cobs; } public async Task GetCobDatesAsync(string collectionName, bool force = false, CancellationToken token = default) { var cobs = force ? [] : await this.LoadCachedCobDatesAsync(collectionName, token) ; // contains today or yesterday or last Friday if today is Monday if ( cobs.Contains( DateTime.Now.ToString( "yyyy-MM-dd" ) ) || cobs.Contains( (DateTime.Now - TimeSpan.FromDays( 1 )).ToString( "yyyy-MM-dd" ) ) || (DateTime.Now.DayOfWeek == DayOfWeek.Monday && cobs.Contains( (DateTime.Now - TimeSpan.FromDays( 3 )).ToString( "yyyy-MM-dd" ) )) ) { return cobs; } cobs = await InternalGetCobDatesAsync(collectionName, token); await this.CacheCobDates(collectionName, cobs, token); return cobs; } public async Task GetDepartmentsAsync(string collectionName, CancellationToken token = default) { var cob = await GetCobDatesAsync(collectionName, token: token); if ( cob.Length == 0 ) return []; var (stillValid,cached) = await this.LoadCachedDepartmentsAsync(collectionName, token); if (stillValid && cached.Length > 0) return cached; var coll = GetCollectionWithRetries(collectionName); var fields = await GetFieldMapping(collectionName, token); var filter = new FilterDefinitionBuilder().Eq(fields.MapField("COB"), DateTime.ParseExact(cob[^1], "yyyy-MM-dd", null, DateTimeStyles.AssumeUniversal)); var docs = await (await coll.DistinctAsync(fields.MapField("Department"), filter, cancellationToken: token)).ToListAsync(cancellationToken: token); var departments = docs.Concat(cached).Distinct().OrderBy(x => x).ToArray(); await this.CacheDepartments(collectionName, departments, token); return departments; } public async Task<(string, string)[]> GetDesksWithDepartmentAsync(string collectionName, CancellationToken token = default) { var (stillValid,cached) = await this.LoadCachedDesksAsync(collectionName, token); if (stillValid) return cached; var res = new List<(string, string)>(); var coll = GetCollectionWithRetries(collectionName); try { var json = "[{ \"$group\": { \"_id\": { Department: \"$Department\", Desk: \"$Desk\" } } }]"; var pipeline = MongoDB.Bson.Serialization.BsonSerializer.Deserialize(json) .Select(p => p.AsBsonDocument).ToList(); var options = new AggregateOptions { BypassDocumentValidation = true, //AllowDiskUse = true }; using var cursor = await coll.AggregateAsync(pipeline, options, token); while ( await cursor.MoveNextAsync(token) ) { var batch = cursor.Current; foreach ( var doc in batch ) { foreach ( var element in doc.Elements ) { var bsonValue = element.Value.AsBsonDocument; var desk = bsonValue.GetValue("Desk", null)?.ToString(); var department = bsonValue.GetValue("Department", null)?.ToString(); if (!string.IsNullOrEmpty(desk) && !string.IsNullOrEmpty(department)) res.Add((desk, department)); } } } } catch { //do nothing } var desks = res.ToArray(); await this.CacheDesks(collectionName, desks, token); return desks; } public async Task PivotAsync( string collectionName, PivotDefinition def, FilterExpressionTree.ExpressionGroup? extraFilter, bool skipCache, string? userName, int maxFetchSize = -1, CancellationToken token = default ) { var sw = new Stopwatch(); sw.Start(); ArrayBasedPivotData? data; // preload field mapping. we'll need it in GetQueryText _ = await GetFieldMapping(collectionName, token); if ( !skipCache ) { data = await GetCachedResultAsync(collectionName, def, extraFilter, GetQueryText, token); if ( data != null ) { sw.Stop(); _log.Info( $"Loaded from cache. User=\"{userName}\" Pivot=\"{def.Name}\" PivotType={def.PivotType} Collection=\"{collectionName}\" ExtraFilter=\"{extraFilter}\" executed in Elapsed=\"{sw.Elapsed}\"" ); return data.Count == 0 ? ArrayBasedPivotData.NoData : data ; } } try { data = await AggregationPivotAsync(collectionName, def, extraFilter, maxFetchSize, token); if ( data is { Count: > 0, Headers.Count: > 0 } ) await CacheResultsAsync(collectionName, data, def, extraFilter, GetQueryText, token); return data.Count == 0 ? ArrayBasedPivotData.NoData : data ; } finally { sw.Stop(); #pragma warning disable CS8604 // Possible null reference argument. _log.Info( $"Executed. User=\"{userName}\" Pivot=\"{def.Name}\" PivotType={def.PivotType} Collection=\"{collectionName}\" ExtraFilter=\"{extraFilter}\" executed in Elapsed=\"{sw.Elapsed}\"" ); #pragma warning restore CS8604 // Possible null reference argument. } } public async Task GetCachedResultAsync(string collectionName, PivotDefinition def, FilterExpressionTree.ExpressionGroup? extraFilter, Func getQueryText, CancellationToken token = default ) { try { var id = GetCacheKey(collectionName, def, extraFilter, getQueryText ); var coll = GetCollectionWithRetries( CacheCollectionName ); var c = coll.Find( new BsonDocument {{"_id", id}} ); var doc = await c.FirstOrDefaultAsync(cancellationToken: token); doc?.ReorderColumns( def.ColumnsOrder ); return doc; } catch ( Exception ) { // ignore return null; } } private static string GetCacheKey(string collectionName, PivotDefinition def, FilterExpressionTree.ExpressionGroup? extraFilter, Func getQueryText) { var text = getQueryText(collectionName, def, extraFilter ); using var hash = MD5.Create(); hash.Initialize(); var bytes = Encoding.UTF8.GetBytes( text ); hash.TransformFinalBlock( bytes, 0, bytes.Length ); var name = $"{collectionName}-{def.Name}-{BitConverter.ToString( hash.Hash! ).Replace( "-", "" )}"; return name; } public async Task CacheResultsAsync(string collectionName, ArrayBasedPivotData data, PivotDefinition def, FilterExpressionTree.ExpressionGroup? extraFilter, Func getQueryText, CancellationToken token = default ) { var id = GetCacheKey(collectionName, def, extraFilter, getQueryText); data.Id = id; if ( data.Count == 0 || data.Count > MaxCachedRows ) return; try { data.ExpireAt = DateTime.UtcNow + TimeSpan.FromHours( PivotCacheTTLHours ); var coll = GetCollectionWithRetries(CacheCollectionName); var filter = $"{{_id : \"{id}\" }}"; try { await coll.FindOneAndDeleteAsync( filter, cancellationToken: token); } catch ( Exception ) { // ignore } await coll.InsertOneAsync( data, new() {BypassDocumentValidation = true}, token); } catch (Exception) { // ignore } } public Task GetQueryTextAsync(string collectionName, PivotDefinition def, FilterExpressionTree.ExpressionGroup? extraFilter, CancellationToken token = default ) => Task.FromResult( GetQueryText(collectionName, def, extraFilter ) ); private string GetQueryText(string collectionName, PivotDefinition def, FilterExpressionTree.ExpressionGroup? extraFilter) => GetPivotPipelineText(collectionName, def, extraFilter ); //""" // db.getCollection("[COLLECTION]").aggregate( // [TEXT] // ) // """ // .Replace( "[COLLECTION]", collectionName) // .Replace( "[TEXT]", GetPivotPipelineText(collectionName, def, extraFilter ) ); private string GetPivotPipelineText(string collectionName, PivotDefinition def, FilterExpressionTree.ExpressionGroup? extraFilter ) { try { var fields = GetFieldMapping(collectionName); def = def.Clone(); foreach ( var f in def.DataFields .Where( x => fields.IsCalculated( x ) && !string.IsNullOrWhiteSpace( fields.GetLookup( x ) ) ) .Select( x => Tuple.Create( x, fields.GetLookup( x ) ) ) ) { if ( !string.IsNullOrWhiteSpace( def.BeforeGrouping ) ) def.BeforeGrouping += ",\n"; def.BeforeGrouping += f.Item2; // lookup def } var json = def.ToJson(extraFilter, GetAllFields(collectionName), x => fields.GetAggregationOperator( x ) ); json = RemoveComments(json); var pipeline = MongoDB.Bson.Serialization.BsonSerializer.Deserialize( json ).Select( p => p.AsBsonDocument ).ToList(); fields.MapAllFields( pipeline ); json = pipeline.ToJson( new() {Indent = true, OutputMode = MongoDB.Bson.IO.JsonOutputMode.RelaxedExtendedJson} ); return json; } catch { try { var json = def.ToJson( extraFilter, GetAllFields(collectionName),x => _fieldMap[collectionName].GetAggregationOperator( x ) ); json = RemoveComments(json); return json; } catch { return ""; } } } private Dictionary GetAllFields(string collectionName) { if ( !_fieldMap.TryGetValue( collectionName, out var mapping ) || mapping.Count == 0 ) return []; var res = new Dictionary(); foreach ( var field in mapping.Fields ) { res[field.Key] = field.Value.Type; } return res; } private static string RemoveComments(string input) { // quick check if (input.IndexOf("//", StringComparison.Ordinal) < 0 && input.IndexOf("/*", StringComparison.Ordinal) < 0) return input; //https://stackoverflow.com/questions/3524317/regex-to-strip-line-comments-from-c-sharp/3524689#3524689 const string blockComments = @"/\*(.*?)\*/"; const string lineComments = @"//(.*?)\r?\n"; const string strings = """ "((\\[^\n]|[^"\n])*)" """; const string verbatimStrings = """@("[^"]*")+"""; try { var noComments = Regex.Replace(input, blockComments + "|" + lineComments + "|" + strings + "|" + verbatimStrings, me => { if (me.Value.StartsWith("/*") || me.Value.StartsWith("//")) return me.Value.StartsWith("//") ? Environment.NewLine : ""; // Keep the literal strings return me.Value; }, RegexOptions.Singleline); return noComments; } catch (Exception ex) { _log.Error("Can't remove comments from this pipeline:\n"+input, ex); return input; } } private async Task AggregationPivotAsync( string collectionName, PivotDefinition def, FilterExpressionTree.ExpressionGroup? extraFilter, int maxFetchSize = -1, CancellationToken token = default ) { var coll = GetCollectionWithRetries( collectionName ); def.AllowDiskUsage = true; var options = new AggregateOptions { BypassDocumentValidation = true, AllowDiskUse = true }; // preload field mapping. we'll need it in GetPivotPipelineText _ = await GetFieldMapping(collectionName, token); var json = GetPivotPipelineText(collectionName, def, extraFilter); Debug.WriteLine(json); var pipeline = MongoDB.Bson.Serialization.BsonSerializer.Deserialize(json) .Select(p => p.AsBsonDocument) .ToList(); using var cursor = await coll.AggregateAsync( pipeline, options, token); var pd = await FetchPivotData( collectionName, def, cursor, extraFilter, maxFetchSize, false, token); return pd; } public static async IAsyncEnumerable ToAsyncEnumerable(IAsyncCursor asyncCursor) { while (await asyncCursor.MoveNextAsync()) { foreach (var current in asyncCursor.Current) { yield return current; } } } private async Task FetchPivotData(string collectionName, PivotDefinition def, IAsyncCursor cursor, FilterExpressionTree.ExpressionGroup? extraFilter, int maxFetchSize = -1, bool includeId = false, CancellationToken token = default ) { var (pd, _) = await FetchPivotData( collectionName, def.Name, _fieldMap, ToAsyncEnumerable(cursor), extraFilter, false, maxFetchSize, includeId, token); pd.ReorderColumns( def.ColumnsOrder ); return pd; } public static async Task<(ArrayBasedPivotData, List)> FetchPivotData( string collectionName, string pivotName, ConcurrentDictionary fieldMap, IAsyncEnumerable batch, FilterExpressionTree.ExpressionGroup? extraFilter, bool needBson = false, int maxFetchSize = -1, bool includeId = false, CancellationToken token = default) { var sw = new Stopwatch(); sw.Start(); var pd = new ArrayBasedPivotData( Array.Empty() ); var headers = new List>(); // display text / column in the data set var headersSet = new HashSet(); // for speed if (includeId) { headers.Add(Tuple.Create("_id", "_id")); headersSet.Add("_id"); pd.AddHeader("_id"); } List documents = []; var docs = 0; await using var enumerator = batch.GetAsyncEnumerator(token); while ( await enumerator.MoveNextAsync() ) { var doc = enumerator.Current; if ( doc.IsBsonNull ) continue; //{ _id: wateva, values: {Book:a, tenor:1w, value:2}} //{ _id: wateva, values: {rows : { anyId1: {Book:a, tenor:1w, value:2}, anyid2: {Book:a, tenor:1y, value:1}} }} } if (needBson) documents.Add(doc); var res = doc.ToDictionary(); var id = res["_id"] as Dictionary; // for map/reduce results var value = ( res.TryGetValue("value", out var re) ? (Dictionary)re : null ) ?? []; if ( id == null && value.Count == 0 && res.Count == 2 && res.ContainsKey("_id") && res.ContainsKey("value") ) continue; //The columns are sparse, so we have to process every row, looking for new/additional columns to add UpdateHeaders(collectionName, res, headersSet, headers, pd, fieldMap); if (value.TryGetValue("rows", out var value1)) { var rows = (Dictionary)value1; //multirow response, this is for optimising large Rho/Vega aggregations //{ _id: wateva, values: {rows : { anyId1: {Book:a, tenor:1w, value:2}, anyid2: {Book:a, tenor:1y, value:1}} }} } foreach (var row in rows) { var rowId = new Dictionary { { "_id", row.Key } }; var rowValues = (Dictionary)row.Value; pd.Add(headers .Select(x => x.Item2) .Select(x => rowId.TryGetValue(x, out var val) || rowValues.TryGetValue(x, out val) ? val : null)); } } else { var objects = headers .Select(x => x.Item2) .Select(x => res.TryGetValue(x, out var val) || id != null && id.TryGetValue(x, out val) || value.TryGetValue(x, out val) ? val : null); pd.Add(objects); } docs++; if (docs >= PivotMaxReturnedRows || ( maxFetchSize > 0 && docs >= maxFetchSize) ) { _log.Warn($"Data truncated at PivotMaxReturnedRows. Pivot=\"{pivotName}\", Collection=\"{collectionName}\", ExtraFilter=\"{extraFilter}\" Rows={docs}"); break; } } if ( pd.Headers.Count == 0 ) { _log.Warn( $"No results fetched for Pivot=\"{pivotName}\"" ); pd = new(["Message"]); pd.Add(["No results"]); } sw.Stop(); _log.Debug($"Data fetched. Pivot=\"{pivotName}\", Collection=\"{collectionName}\", ExtraFilter=\"{extraFilter}\" Rows={pd.Count}, Cols={pd.Headers.Count}, ElapsedMs={sw.Elapsed.TotalMilliseconds}"); return (pd, documents); } private static void UpdateHeaders( string collectionName, Dictionary document, ISet headersSet, ICollection> headers, ArrayBasedPivotData pd, ConcurrentDictionary fieldMap ) { foreach (var kvp in document) { switch (kvp.Key) { case "_id": if (kvp.Value is IDictionary kvpIDict) { foreach (var key in kvpIDict.Keys) { var unmapped = fieldMap[collectionName].UnmapField(key); if (headersSet.Contains(unmapped)) continue; pd.AddHeader(unmapped); // new header! headers.Add(Tuple.Create(unmapped, key)); headersSet.Add(unmapped); } } break; case "value": if (kvp.Value is Dictionary kvpDict) { foreach (var (key, value) in kvpDict) { if (key.ToUpper() == "ROWS") { //this structure allows us to return multiple rows from a single map/reduce //{ _id: wateva, values : {rows : { anyId1: {Book:a, tenor:1w, value:2}, anyid2: {Book:a, tenor:1y, value:1}} }} } foreach (var inner in value as Dictionary ?? []) foreach (var innerInner in inner.Value as Dictionary ?? []) { var unmapped = fieldMap[collectionName].UnmapField(innerInner.Key); if (headersSet.Contains(unmapped)) continue; pd.AddHeader(unmapped); // new header! headers.Add(Tuple.Create(unmapped, innerInner.Key)); headersSet.Add(unmapped); } } else { var unmapped = fieldMap[collectionName].UnmapField(key); if (headersSet.Contains(unmapped)) continue; pd.AddHeader(unmapped); // new header! headers.Add(Tuple.Create(unmapped, key)); headersSet.Add(unmapped); } } } break; default: { var unmapped = fieldMap[collectionName].UnmapField(kvp.Key); if (headersSet.Contains(unmapped)) continue; pd.AddHeader(unmapped); // new header! headers.Add(Tuple.Create(unmapped, kvp.Key)); headersSet.Add(unmapped); } break; } } } public async Task> GetPivotsAsync(string collectionName, IPivotTableDataSource.PivotType pivotType, string? userName = null, CancellationToken token = default) { userName = string.IsNullOrWhiteSpace(userName) ? User : userName; var coll = GetCollectionWithRetries($"{collectionName}-Meta"); //get all the predefined pivots var standardPivots = (await coll.Find(x => x.Id.Equals("PredefinedPivots")).FirstOrDefaultAsync(cancellationToken: token))?.Pivots ?? []; //get all user pivots var allUserPivots = await coll.Find(x => x.Id.StartsWith("PredefinedPivots-")).ToListAsync(cancellationToken: token) ?? []; //find the specific users private pivots from the set of user pivots var userPivots = (allUserPivots.FirstOrDefault(doc => doc.Id.Equals( $"PredefinedPivots-{userName}", StringComparison.OrdinalIgnoreCase)))?.Pivots ?? []; // fix accidental saves where a user pivot is saved into the global section foreach (var def in standardPivots.Where(def => def.Group == PivotDefinition.UserPivotsGroup)) { def.Group = PivotDefinition.PredefinedPivotsGroup; } // Change group for user pivots to match the user's name foreach (var pivots in allUserPivots) { var name = pivots.Id.StartsWith("PredefinedPivots-") ? pivots.Id["PredefinedPivots-".Length..] : string.Empty; if (string.IsNullOrEmpty(name)) continue; foreach (var def in pivots.Pivots) def.Group = $"User pivots - {name}"; } var res = pivotType switch { IPivotTableDataSource.PivotType.Predefined => standardPivots, IPivotTableDataSource.PivotType.User => userPivots, IPivotTableDataSource.PivotType.UserAndPredefined => standardPivots.Concat(userPivots), IPivotTableDataSource.PivotType.All => standardPivots.Concat(allUserPivots.SelectMany(x => x.Pivots)), _ => throw new ArgumentOutOfRangeException(nameof(pivotType), pivotType, null) }; var result = res .OrderBy(x => (x.Group, x.Name)) .ToList() ; return result; } public async Task UpdatePredefinedPivotsAsync(string collectionName, IEnumerable pivots, bool predefined = false, string? userName = null, CancellationToken token = default ) { var coll = GetCollectionWithRetries($"{collectionName}-Meta"); PivotMetaCache.Reset(this, collectionName); userName = string.IsNullOrWhiteSpace(userName) ? Environment.UserName : userName; var id = predefined ? "PredefinedPivots" : $"PredefinedPivots-{userName}" ; var doc = await coll.Find( new BsonDocument {{"_id", id}} ) .FirstOrDefaultAsync(cancellationToken: token) ?? new PivotDefinitions { Id = id, Pivots = [] } ; foreach (var p in pivots) { var existing = doc.Pivots.FirstOrDefault(x => x.Name == p.Name && x.Group == p.Group); if (existing == null) { p.Owner = userName; doc.Pivots.Add(p); } else { // retain order for easier git diffs var idx = doc.Pivots.IndexOf(existing); p.Owner = userName; doc.Pivots[idx] = p; } } var options = new ReplaceOptions { IsUpsert = true }; await coll.ReplaceOneAsync( new BsonDocument {{"_id", id}}, doc, options, token); } // ReSharper disable MemberCanBePrivate.Local // ReSharper disable UnusedAutoPropertyAccessor.Local // ReSharper disable CollectionNeverUpdated.Local // ReSharper disable AutoPropertyCanBeMadeGetOnly.Local // ReSharper disable UnusedMember.Local [BsonIgnoreExtraElements] // ReSharper disable once ClassNeverInstantiated.Local private class InternalPivotColumnDescriptor { public override string ToString() => $"{NameRegex} {Background} {Format}"; public string NameRegex { get; set; } = ""; public string? Background { get; set; } public string? AlternateBackground { get; set; } public string? Format { get; set; } public PivotColumnDescriptor ToPivotColumnDescriptor() => new() { NameRegexString = NameRegex, // ReSharper disable PossibleNullReferenceException Background = ColorConverter.ConvertFromString( Background ?? "White" ), AlternateBackground = ColorConverter.ConvertFromString( AlternateBackground ?? "White" ), // ReSharper restore PossibleNullReferenceException Format = Format ?? "" }; } [BsonIgnoreExtraElements] // ReSharper disable once ClassNeverInstantiated.Local private class InternalPivotColumnDescriptors { [BsonId] public string Id { get; set; } = ""; public List Fields { get; set; } = []; } // ReSharper restore UnusedMember.Local // ReSharper restore AutoPropertyCanBeMadeGetOnly.Local // ReSharper restore CollectionNeverUpdated.Local // ReSharper restore UnusedAutoPropertyAccessor.Local // ReSharper restore MemberCanBePrivate.Local public async Task GetColumnDescriptorsAsync(string collectionName, CancellationToken token = default) { var coll = GetCollectionWithRetries($"{collectionName}-Meta"); var descriptors = await coll.Find( new BsonDocument {{"_id", "FieldDescriptors"}} ).FirstOrDefaultAsync(cancellationToken: token); if ( descriptors == null ) { coll = GetCollectionWithRetries("Global-Meta"); descriptors = await coll.Find( new BsonDocument {{"_id", "FieldDescriptors"}} ).FirstOrDefaultAsync(cancellationToken: token); } return descriptors?.Fields.Select( x => x.ToPivotColumnDescriptor() ).ToArray() ?? []; } public Dictionary GetFieldTypes(string collectionName) { var fields = _fieldMap[collectionName]; var data = fields.Fields .Select( x => new KeyValuePair( x.Key, x.Value.Type ) ) .Concat( fields.CalculatedFields.Where( x => !fields.ContainsKey( x ) ).Select( x => new KeyValuePair( x, typeof(double) ) ) ); var res = new Dictionary(StringComparer.OrdinalIgnoreCase); foreach (var item in data) { if (res.ContainsKey(item.Key)) continue; res[item.Key] = new() { Name = item.Key, Purpose = GetPurpose(collectionName, item.Key), Type = GetFieldType(collectionName, item.Key) }; } return res; } private Type GetFieldType(string collectionName, string key ) => !_fieldMap[collectionName].TryGetValue( key, out var m ) ? typeof(double) : m?.Type ?? typeof(object); private PivotFieldPurpose GetPurpose(string collectionName, string key ) => !_fieldMap[collectionName].TryGetValue( key, out var m ) ? PivotFieldPurpose.Data : m?.Purpose ?? PivotFieldPurpose.Data; [BsonIgnoreExtraElements] private class WindowStateDesc { [BsonId] public string Id { get; set; } = ""; public int Width { get; set; } public int Height { get; set; } public int Left { get; set; } public int Top { get; set; } public string? WindowState { get; set; } public bool AutoRunEnabled { get; set; } public bool ExpandTable { get; set; } public bool ExpandDataFields { get; set; } public string []? SelectedDepartments { get; set; } public bool ShowCobRange { get; set; } public string? DefaultCollection { get; set; } } /// /// /// Get a single document /// /// /// Primary key fields in no particular order /// Extra $match stage /// /// Json string public Task GetDocumentAsync(string collectionName, KeyValuePair [] keys, FilterExpressionTree.ExpressionGroup? extraFilter, CancellationToken token = default ) { var filter = new FilterExpressionTree.ExpressionGroup(); filter.Children.AddRange( keys.Select( x => new FilterExpressionTree.FieldExpression() { Field = x.Key, Argument = x.Value.ToString() ?? "", Condition = FilterExpressionTree.FieldConditionType.EqualTo } ) ); if ( extraFilter != null ) filter.Children.AddRange( extraFilter.Children ); return GetDocumentAsync(collectionName, filter, token); } /// public async Task GetDocumentAsync(string collectionName, FilterExpressionTree.ExpressionGroup extraFilter, CancellationToken token = default ) { var filter = BsonDocument.Parse( extraFilter.ToJson(GetAllFields(collectionName)) ); var coll = GetCollectionWithRetries(collectionName); var res = await (await coll.FindAsync( filter, cancellationToken: token)).FirstOrDefaultAsync(cancellationToken: token); return res?.ToJson(new() { Indent = true }) ?? $"Not found: {extraFilter}"; } public async Task DeletePivotAsync(string collectionName, string pivotName, string groupName, string userName, CancellationToken token = default) { _log.Debug($"Deleting Pivot=\"{pivotName}\" from Group=\"{groupName}\" for User=\"{userName}\"..."); if (groupName != PivotDefinition.UserPivotsGroup) throw new ApplicationException($"You can only delete from \"{PivotDefinition.UserPivotsGroup}\" group"); var coll = GetCollectionWithRetries($"{collectionName}-Meta"); userName = string.IsNullOrWhiteSpace(userName) ? Environment.UserName : userName; var id = $"PredefinedPivots-{userName}"; var doc = await coll.Find( new BsonDocument {{"_id", id}} ) .FirstOrDefaultAsync(cancellationToken: token) ?? new PivotDefinitions { Id = id, Pivots = [] } ; var existing = doc.Pivots.FirstOrDefault(x => !x.IsPredefined && x.Name == pivotName) ?? throw new ApplicationException($"Pivot=\"{pivotName}\" from Group=\"{groupName}\" was NOT deleted for User=\"{userName}\" as it was not found") ; // retain order for easier git diffs var idx = doc.Pivots.IndexOf(existing); doc.Pivots.RemoveAt(idx); var options = new ReplaceOptions { IsUpsert = true }; await coll.ReplaceOneAsync( new BsonDocument {{"_id", id}}, doc, options, token); _log.Info($"Pivot=\"{pivotName}\" from Group=\"{groupName}\" deleted for User=\"{userName}\""); } internal IMongoCollection GetCollectionWithRetries(string name) where T: class => RetryHelper.DoWithRetries(() => { if (_database == null || !MongoDbHelper.IsConnected(_database)) _database = null; return Database.GetCollection(name); }, 3, TimeSpan.FromSeconds(5), logFunc: LogMethod); /// /// Log method for retry function, called for each iteration /// /// /// /// private static void LogMethod(int iteration, int maxRetries, Exception e) { if (iteration < maxRetries - 1) _log.Warn($"MongoDB error, retrying RetriesLeft={maxRetries-iteration-1}\"", e); else _log.Error("MongoDB error, retries exhausted", e); } }