/* * dbMango * * Copyright 2025 Deutsche Bank AG * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using log4net; using MongoDB.Bson; using MongoDB.Bson.Serialization; using MongoDB.Driver; using System.Diagnostics; using System.Reflection; using System.Runtime.CompilerServices; namespace Rms.Risk.Mango.Pivot.Core.MongoDb; internal static class FetchInfo { public static long FetchId; public static int ParallelFinds; } public abstract class MongoDbServiceBase : IMongoDbService where T : class { // ReSharper disable once StaticMemberInGenericType // ReSharper disable once InconsistentNaming protected static readonly ILog _log = LogManager.GetLogger(MethodBase.GetCurrentMethod()!.DeclaringType!); protected readonly IMongoCollection Collection; private readonly MongoDbSettings _settings; public long Count => Collection.CountDocuments(x => true); public string CollectionName => Collection.CollectionNamespace.CollectionName; protected MongoDbServiceBase( MongoDbConfigRecord config, MongoDbSettings settings, string collectionName, string? databaseInstance = null) { _settings = settings; var db = MongoDbHelper.GetDatabase(config, settings, databaseInstance); Collection = db.GetCollection(collectionName); } /// /// When inserting items into the collection, sometimes we have duplicates, so this method handles the duplicate items /// /// /// /// protected abstract int ProcessDuplicates(HashSet duplicateIds, List duplicateItems); public IEnumerable FindKeys(FilterDefinition filter, CancellationToken token = default) => RetryHelper.DoWithRetries(() => FindKeysInternal(filter), 3, TimeSpan.FromSeconds(5), logFunc: LogMethod); public Task InsertAsync(IEnumerable data, bool overrideExisting, bool suppressWarning = false, CancellationToken token = default) => RetryHelper.DoWithRetriesAsync(() => InsertAsyncInternalAsync(data, overrideExisting, suppressWarning), 3, TimeSpan.FromSeconds(5), logFunc: LogMethod, token: token); public async IAsyncEnumerable> AggregateAsync( string jsonPipeline, int maxFetchSize = -1, [EnumeratorCancellation] CancellationToken token = default) { await foreach (var doc in AggregateInternalAsync(jsonPipeline, maxFetchSize, token)) { yield return ConvertBsonToDictionary(doc); } } public IAsyncEnumerable AggregateAsyncRaw(string jsonPipeline, int maxFetchSize = -1, CancellationToken token = default) => AggregateInternalAsync(jsonPipeline, maxFetchSize, token); public Task ClearCOBAsync(DateTime cob, string? layer, string? book = null, string? root = null, CancellationToken token = default) => RetryHelper.DoWithRetriesAsync(() => ClearCobInternalAsync(cob, layer, book, root, token), 3, TimeSpan.FromSeconds(5), logFunc: LogMethod, token: token); private const int ReportEveryNDocuments = 50_000; private const int NumberOfRetries = 3; public IAsyncEnumerable FindAsync(FilterDefinition filter, bool allowRetries = true, ProjectionDefinition? projection = null, int? limit = null, CancellationToken token = default) => allowRetries ? FindAllowRetries(RenderToBsonDocument(filter).ToJson(), RenderToBsonDocument(projection)?.ToJson(), limit, token) : FindNoRetries(RenderToBsonDocument(filter).ToJson(), RenderToBsonDocument(projection)?.ToJson(), limit, token) ; public Task CountAsync(FilterDefinition filter, CancellationToken token = default) => CountNoRetries(RenderToBsonDocument(filter).ToJson(), token) ; public async Task ExplainAsync(string command, CancellationToken token) { var explain = new BsonDocument { { "explain", BsonDocument.Parse(command) } }; var res = await Collection.Database.RunCommandAsync(new BsonDocumentCommand(explain), cancellationToken: token); return res; } public static BsonDocument? RenderToBsonDocument(FilterDefinition? filter) { if (filter == null) return null; var serializerRegistry = BsonSerializer.SerializerRegistry; var documentSerializer = serializerRegistry.GetSerializer(); var args = new RenderArgs(documentSerializer, serializerRegistry); return filter.Render(args); } public static BsonDocument? RenderToBsonDocument(ProjectionDefinition? filter) { if (filter == null) return null; var serializerRegistry = BsonSerializer.SerializerRegistry; var documentSerializer = serializerRegistry.GetSerializer(); var args = new RenderArgs(documentSerializer, serializerRegistry); return filter.Render(args); } private async Task CountNoRetries(string filter, CancellationToken token = default) { var options = new CountOptions { MaxTime = _settings.MongoDbQueryTimeout }; var count = await Collection.CountDocumentsAsync(filter, options, token); return count; } public async IAsyncEnumerable FindNoRetries(string filter, string? projection = null, int? limit = null, [EnumeratorCancellation] CancellationToken token = default) { var processed = 0; var options = new FindOptions { MaxTime = _settings.MongoDbQueryTimeout, NoCursorTimeout = true, BatchSize = _settings.MongoDbQueryBatchSize, }; if (limit != null) options.Limit = limit.Value; if (projection != null) options.Projection = projection; var jsonFilter = filter .Replace("\n", " ") .Replace("\r", " ") .Replace("\t", " ") .Replace(" ", " ") ; var id = Interlocked.Increment(ref FetchInfo.FetchId); Interlocked.Increment(ref FetchInfo.ParallelFinds); try { var sw = Stopwatch.StartNew(); _log.Debug($"Id={id:000} Starting Find (no retries) Collection=\"{CollectionName}\" Concurrency={FetchInfo.ParallelFinds} Filter=\"{jsonFilter}\""); var cursor = await Collection.FindAsync(filter, options, token); if (sw.Elapsed > TimeSpan.FromSeconds(10)) _log.Debug($"Id={id:000} Slow Find (no retries) Collection=\"{CollectionName}\" Concurrency={FetchInfo.ParallelFinds} Elapsed=\"{sw.Elapsed:g}\" Filter=\"{jsonFilter}\""); sw.Restart(); var prevBatchElapsed = TimeSpan.Zero; while (true) { IEnumerable batch; try { if (!await cursor.MoveNextAsync(token)) { _log.Debug($"Id={id:000} Find complete (no retries) Collection=\"{CollectionName}\" Concurrency={FetchInfo.ParallelFinds} Docs={processed} " + $"Elapsed=\"{sw.Elapsed:g}\" DocsSec={processed / (sw.ElapsedMilliseconds / 1000.0):0.00} Filter=\"{jsonFilter}\""); yield break; } batch = cursor.Current; } catch (Exception e) { var dps = processed / (sw.ElapsedMilliseconds / 1000.0); throw new ApplicationException($"Id={id:000} Find (no retries) failed: {e.Message}. " + $"Collection=\"{CollectionName}\" Concurrency={FetchInfo.ParallelFinds} Docs={processed} Elapsed=\"{sw.Elapsed:g}\" DocsSec={dps:0.00} Filter=\"{jsonFilter}\"", e); } foreach (var bson in batch) { yield return bson; processed += 1; if ((processed % ReportEveryNDocuments) != 0) continue; var dps = ReportEveryNDocuments / ((sw.ElapsedMilliseconds - prevBatchElapsed.TotalMilliseconds) / 1000.0); _log.Debug($"Id={id:000} Fetch {ReportEveryNDocuments:N0} documents (no retries) Collection=\"{CollectionName}\" Concurrency={FetchInfo.ParallelFinds} " + $"Elapsed=\"{sw.Elapsed - prevBatchElapsed:g}\" DocsSec={dps:0.00} (processed {processed:N0} so far)"); prevBatchElapsed = sw.Elapsed; } } } finally { Interlocked.Decrement(ref FetchInfo.ParallelFinds); } } public async IAsyncEnumerable FindAllowRetries(string filter, string? projection = null, int? limit = null, [EnumeratorCancellation] CancellationToken token = default) { var processed = 0; var attempts = 0; Exception? firstException = null; var options = new FindOptions { MaxTime = _settings.MongoDbQueryTimeout, NoCursorTimeout = true, BatchSize = _settings.MongoDbQueryBatchSize, Sort = "{ _id: 1 }" }; if (limit != null) options.Limit = limit.Value; if (projection != null) options.Projection = projection; var jsonFilter = filter .Replace("\n", " ") .Replace("\r", " ") .Replace("\t", " ") .Replace(" ", " ") ; var id = Interlocked.Increment(ref FetchInfo.FetchId); Interlocked.Increment(ref FetchInfo.ParallelFinds); try { var sw = Stopwatch.StartNew(); _log.Debug($"Id={id:000} Starting Find Collection=\"{CollectionName}\" Concurrency={FetchInfo.ParallelFinds} Filter=\"{jsonFilter}\""); //var coll = Collection.Database.GetCollection(CollectionName); //default is 10, prevent the cursor expiring between calling MoveNext() //unrestricted, it appears batch sizes are ~10K when running locally //https://stackoverflow.com/questions/44248108/mongodb-error-getmore-command-failed-cursor-not-found //the cursor returns batches, so iterate through each batch while (true) { options.Skip = processed; var cursor = await Collection.FindAsync(filter, options, token); if (sw.Elapsed > TimeSpan.FromSeconds(5)) _log.Debug($"Id={id:000} Slow Find Collection=\"{CollectionName}\" Concurrency={FetchInfo.ParallelFinds} Elapsed=\"{sw.Elapsed:g}\" Filter=\"{jsonFilter}\""); sw.Restart(); var prevBatchElapsed = TimeSpan.Zero; while (true) { IEnumerable batch; try { if (!await cursor.MoveNextAsync(token)) { yield break; } batch = cursor.Current; } catch (Exception e) { attempts++; _log.Warn($"Id={id:000} Exception when calling Find Attempt={attempts}/3 Collection=\"{CollectionName}\" Concurrency={FetchInfo.ParallelFinds} Processed={processed} Filter=\"{jsonFilter}\"", e); firstException ??= e; if (attempts == NumberOfRetries) { var dps = processed / (sw.ElapsedMilliseconds / 1000.0); throw new ApplicationException($"Id={id:000} Find failed: {firstException.Message}. " + $"Collection=\"{CollectionName}\" Concurrency={FetchInfo.ParallelFinds} Docs={processed} Elapsed=\"{sw.Elapsed:g}\" DocsSec={dps:0.00} Filter=\"{jsonFilter}\"", firstException); } //pause before trying again Thread.Sleep(TimeSpan.FromSeconds(5)); break; //return back to the while loop } foreach (var bson in batch) { yield return bson; processed++; if ((processed % ReportEveryNDocuments) != 0) continue; var dps = ReportEveryNDocuments / ((sw.ElapsedMilliseconds - prevBatchElapsed.TotalMilliseconds) / 1000.0); _log.Debug($"Id={id:000} Fetch {ReportEveryNDocuments:N0} documents Collection=\"{CollectionName}\" Concurrency={FetchInfo.ParallelFinds} " + $"Elapsed=\"{sw.Elapsed - prevBatchElapsed:g}\" DocsSec={dps:0.00} (processed {processed:N0} so far)"); prevBatchElapsed = sw.Elapsed; } } } } finally { Interlocked.Decrement(ref FetchInfo.ParallelFinds); } } public void DeleteCollection() => RetryHelper.DoWithRetries(() => Collection.Database.DropCollection(Collection.CollectionNamespace.CollectionName), 3, TimeSpan.FromSeconds(5)); public void UpdateOne(FilterDefinition filter, UpdateDefinition update, CancellationToken token = default) => RetryHelper.DoWithRetries(() => Collection.UpdateOne(filter, update), 3, TimeSpan.FromSeconds(5)); public void ReplaceOne(FilterDefinition filter, T doc, CancellationToken token) => RetryHelper.DoWithRetries(() => ReplaceOne(filter, doc, token), 3, TimeSpan.FromSeconds(5)); #region private methods private IEnumerable FindKeysInternal(FilterDefinition filter) { var projection = BsonSerializer.Deserialize("{ _id : 1}"); var ids = Collection .Find(filter) .Project(projection) .ToEnumerable() .Select(doc => doc[0].AsString) .ToList(); return ids; } private async Task InsertAsyncInternalAsync(IEnumerable data, bool overrideExisting, bool suppressWarning) { var ignored = 0; var options = new InsertManyOptions { BypassDocumentValidation = true, IsOrdered = false }; var dataList = data.ToList(); if (dataList.Count == 0) return 0; try { await Collection.InsertManyAsync(dataList, options); } catch (Exception ex) { if (MongoDbHelper.IsDuplicateKeyError(ex)) { var toProcess = MongoDbHelper.ExtractDuplicateKeys(ex); if (!overrideExisting) { if ( !suppressWarning ) _log.Warn($"Duplicate entries found and ignored. DataCount={dataList.Count} DuplicateCount={toProcess.Count} DocumentClass=\"{typeof(T).Name}\" Collection=\"{Collection.CollectionNamespace.CollectionName}\""); ignored = toProcess.Count; } else ProcessDuplicates(toProcess, dataList); } else throw; } //_log.Debug($"Collection=\"{Collection.CollectionNamespace.CollectionName}\" Inserted={dataList.Count - replaced - ignored} Replaced={replaced} Ignored={ignored} of Total={dataList.Count}"); return dataList.Count - ignored; } private async Task ClearCobInternalAsync(DateTime cob, string? layer, string? book = null, string? root = null, CancellationToken token = default) { var layerName = string.IsNullOrWhiteSpace(root) ? "Layer" : root + ".Layer"; var bookName = string.IsNullOrWhiteSpace(root) ? "Book" : root + ".Book"; var cobName = string.IsNullOrWhiteSpace(root) ? "COB" : root + ".COB"; FilterDefinition filter; if (!string.IsNullOrWhiteSpace(layer) && !string.IsNullOrWhiteSpace(book)) filter = new FilterDefinitionBuilder().And( new FilterDefinitionBuilder().Eq(cobName, cob.Date), new FilterDefinitionBuilder().Eq(layerName, layer), new FilterDefinitionBuilder().Eq(bookName, book) ); else if (!string.IsNullOrWhiteSpace(layer)) filter = new FilterDefinitionBuilder().And( new FilterDefinitionBuilder().Eq(cobName, cob.Date), new FilterDefinitionBuilder().Eq(layerName, layer) ) ; else filter = !string.IsNullOrWhiteSpace(book) ? new FilterDefinitionBuilder().And( new FilterDefinitionBuilder().Eq(cobName, cob.Date), new FilterDefinitionBuilder().Eq(bookName, book) ) : new FilterDefinitionBuilder().Eq(cobName, cob.Date); var result = await Collection.DeleteManyAsync(filter, token); _log.Debug($"Collection=\"{Collection.CollectionNamespace.CollectionName}\" Deleted={result.DeletedCount} Book=\"{book}\" COB=\"{cob:yyyy-MM-dd}\" Layer=\"{layer}\""); } public static Dictionary ConvertBsonToDictionary(BsonDocument doc) { var res = doc.ToDictionary(); // for map/reduce results var value = res.TryGetValue("value", out var re) ? (Dictionary)re : null; var result = new Dictionary(StringComparer.OrdinalIgnoreCase); foreach (var data in res.Where(x => x.Key != "_id" && x.Key != "value")) { result[data.Key] = data.Value; } if (value != null) { foreach (var data in value) { result[data.Key] = data.Value; } } if (res["_id"] is not Dictionary id) return result; foreach (var data in id) { result[data.Key] = data.Value; } return result; } private async IAsyncEnumerable AggregateInternalAsync( string jsonPipeline, int maxFetchSize, [EnumeratorCancellation] CancellationToken token = default ) { var options = new AggregateOptions { BypassDocumentValidation = true, AllowDiskUse = true, }; var pipeline = BsonSerializer.Deserialize(jsonPipeline).Select(p => p.AsBsonDocument).ToList(); using var cursor = await Collection.AggregateAsync(pipeline, options, token); var docs = 0; while (await cursor.MoveNextAsync(token)) { var batch = cursor.Current; foreach (var doc in batch) yield return doc; if ( maxFetchSize > 0 && docs++ >= maxFetchSize) yield break; } } /// /// Log method for retry function, called for each iteration /// /// /// /// private void LogMethod(int iteration, int maxRetries, Exception e) { if (iteration < maxRetries - 1) _log.Warn($"MongoDB error, retrying RetriesLeft={maxRetries-iteration-1}, Collection=\"{Collection.CollectionNamespace.CollectionName}\"", e); else _log.Error($"MongoDB error, retries exhausted, Collection=\"{Collection.CollectionNamespace.CollectionName}\"", e); } /// /// Delete documents matching the filter /// /// Filter selecting a single document /// public async Task Delete(FilterDefinition filter, CancellationToken token = default) { var result = await Collection.DeleteManyAsync(filter, token); _log.Debug($"Collection=\"{Collection.CollectionNamespace.CollectionName}\" Deleted={result.DeletedCount} Filter:\n{filter.ToJson(new () {Indent = true})}"); return result.DeletedCount; } #endregion private methods }