This repository was archived by the owner on Mar 31, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathKustoIngestionMessageProcessor.cs
More file actions
348 lines (298 loc) · 16.6 KB
/
KustoIngestionMessageProcessor.cs
File metadata and controls
348 lines (298 loc) · 16.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using NuGet.Insights.Kusto;
namespace NuGet.Insights.Worker.KustoIngestion
{
public class KustoIngestionMessageProcessor : IMessageProcessor<KustoIngestionMessage>
{
private readonly KustoIngestionStorageService _storageService;
private readonly CsvRecordContainers _csvRecordContainers;
private readonly IMessageEnqueuer _messageEnqueuer;
private readonly CachingKustoClientFactory _kustoClientFactory;
private readonly KustoDataValidator _kustoDataValidator;
private readonly FanOutRecoveryService _fanOutRecoveryService;
private readonly ITelemetryClient _telemetryClient;
private readonly IOptions<NuGetInsightsWorkerSettings> _options;
private readonly ILogger<KustoIngestionMessageProcessor> _logger;
public KustoIngestionMessageProcessor(
KustoIngestionStorageService storageService,
CsvRecordContainers csvRecordContainers,
IMessageEnqueuer messageEnqueuer,
CachingKustoClientFactory kustoClientFactory,
KustoDataValidator kustoDataValidator,
FanOutRecoveryService fanOutRecoveryService,
ITelemetryClient telemetryClient,
IOptions<NuGetInsightsWorkerSettings> options,
ILogger<KustoIngestionMessageProcessor> logger)
{
_storageService = storageService;
_csvRecordContainers = csvRecordContainers;
_messageEnqueuer = messageEnqueuer;
_kustoClientFactory = kustoClientFactory;
_kustoDataValidator = kustoDataValidator;
_fanOutRecoveryService = fanOutRecoveryService;
_telemetryClient = telemetryClient;
_options = options;
_logger = logger;
}
public async Task ProcessAsync(KustoIngestionMessage message, long dequeueCount)
{
var ingestion = await _storageService.GetIngestionAsync(message.IngestionId);
if (ingestion is null)
{
if (message.AttemptCount < 10)
{
_logger.LogTransientWarning("After {AttemptCount} attempts, the Kusto ingestion {IngestionId} should have already been created. Trying again.",
message.AttemptCount,
message.IngestionId);
message.AttemptCount++;
await _messageEnqueuer.EnqueueAsync(new[] { message }, StorageUtility.GetMessageDelay(message.AttemptCount));
}
else
{
_logger.LogTransientWarning("After {AttemptCount} attempts, the Kusto ingestion {IngestionId} should have already been created. Giving up.",
message.AttemptCount,
message.IngestionId);
}
return;
}
if (ingestion.State == KustoIngestionState.Created)
{
_logger.LogInformation("The Kusto ingestion is starting.");
ingestion.Started = DateTimeOffset.UtcNow;
ingestion.State = KustoIngestionState.Expanding;
ingestion.AttemptCount = 1;
await _storageService.ReplaceIngestionAsync(ingestion);
}
if (ingestion.State == KustoIngestionState.Expanding)
{
await _storageService.InitializeChildTableAsync(ingestion.StorageSuffix);
await _storageService.AddContainersAsync(ingestion, _csvRecordContainers.ContainerNames);
ingestion.State = KustoIngestionState.Enqueueing;
await _storageService.ReplaceIngestionAsync(ingestion);
}
if (ingestion.State == KustoIngestionState.Retrying)
{
var containers = await _storageService.GetContainersAsync(ingestion);
// Move the failed containers to the retrying state. This allows us to delete the blob records without
// having the containers accidentally move to the completed state when all blob records are gone.
var failedContainers = containers.Where(x => x.State == KustoContainerIngestionState.Failed).ToList();
if (failedContainers.Count > 0)
{
foreach (var container in failedContainers)
{
container.State = KustoContainerIngestionState.Retrying;
}
await _storageService.ReplaceContainersAsync(failedContainers);
}
// Move the retrying containers to the created state after cleaning up any blob records (i.e. the
// failed/timed out ones that caused the retry)
var retryingContainers = containers.Where(x => x.State == KustoContainerIngestionState.Retrying).ToList();
foreach (var container in retryingContainers)
{
_telemetryClient.TrackMetric(
nameof(KustoIngestionMessageProcessor) + ".RetryingContainer.ElapsedMs",
(DateTimeOffset.UtcNow - container.Started.Value).TotalMilliseconds,
new Dictionary<string, string> { { "ContainerName", container.ContainerName } });
container.State = KustoContainerIngestionState.Created;
var blobs = await _storageService.GetBlobsAsync(container);
await _storageService.DeleteBlobsAsync(blobs);
}
await _storageService.ReplaceContainersAsync(retryingContainers);
ingestion.State = KustoIngestionState.Enqueueing;
await _storageService.ReplaceIngestionAsync(ingestion);
}
if (ingestion.State == KustoIngestionState.Enqueueing)
{
var containers = await _storageService.GetContainersAsync(ingestion);
var createdContainers = containers.Where(x => x.State == KustoContainerIngestionState.Created).ToList();
await EnqueueAsync(createdContainers);
ingestion.State = KustoIngestionState.Working;
await _storageService.ReplaceIngestionAsync(ingestion);
}
if (ingestion.State == KustoIngestionState.Requeueing)
{
await _fanOutRecoveryService.EnqueueUnstartedWorkAsync(
x => _storageService.GetUnstartedCcontainersAsync(ingestion, x),
EnqueueAsync,
metricStepName: $"{nameof(KustoIngestionEntity)}.Container");
ingestion.State = KustoIngestionState.Working;
await _storageService.ReplaceIngestionAsync(ingestion);
}
if (ingestion.State == KustoIngestionState.Working)
{
var containers = await _storageService.GetContainersAsync(ingestion);
var createdCount = containers.Count(x => x.State == KustoContainerIngestionState.Created);
var incompleteCount = containers.Count(x => x.State != KustoContainerIngestionState.Complete && x.State != KustoContainerIngestionState.Failed);
var errorCount = containers.Count(x => x.State == KustoContainerIngestionState.Failed);
if (containers.Count == 0)
{
ingestion.State = KustoIngestionState.Finalizing;
await _storageService.ReplaceIngestionAsync(ingestion);
}
else if (incompleteCount > 0)
{
if (createdCount > 0
&& await _fanOutRecoveryService.ShouldRequeueAsync(ingestion.Timestamp.Value, typeof(KustoContainerIngestionMessage)))
{
ingestion.State = KustoIngestionState.Working;
message.AttemptCount = 0;
await _storageService.ReplaceIngestionAsync(ingestion);
}
_logger.LogInformation("There are {Count} containers still being ingested into Kusto.", incompleteCount);
message.AttemptCount++;
await _messageEnqueuer.EnqueueAsync(new[] { message }, StorageUtility.GetMessageDelay(message.AttemptCount));
return;
}
else if (errorCount > 0)
{
_logger.LogWarning("There are {Count} containers that could not be fully ingested into Kusto.", errorCount);
if (ingestion.AttemptCount >= _options.Value.KustoIngestionMaxAttempts)
{
throw new InvalidOperationException($"At least one container failed to be ingested into Kusto, after {ingestion.AttemptCount} attempts.");
}
_telemetryClient.TrackMetric(
nameof(KustoIngestionMessageProcessor) + ".Retrying.ElapsedMs",
(DateTimeOffset.UtcNow - ingestion.Started.Value).TotalMilliseconds);
ingestion.AttemptCount++;
ingestion.State = KustoIngestionState.Retrying;
await _storageService.ReplaceIngestionAsync(ingestion);
// Delay the message based on the ingestion attempt count. If we're encountering recurring failures
// from Kusto, it's wise to slow down and try again a bit later.
await _messageEnqueuer.EnqueueAsync(new[] { message }, StorageUtility.GetMessageDelay(
ingestion.AttemptCount - 1, // First retry should be immediate
factor: 60, // Delay by one more minute each attempt
maxSeconds: 600)); // Only wait for up to 10 minutes between attempts
return;
}
else
{
ingestion.State = KustoIngestionState.Validating;
await _storageService.ReplaceIngestionAsync(ingestion);
}
}
if (ingestion.State == KustoIngestionState.Validating)
{
var validationAttempts = 0;
bool valid;
do
{
if (validationAttempts > 0)
{
_telemetryClient.TrackMetric(
nameof(KustoIngestionMessageProcessor) + ".RetryingValidation.ElapsedMs",
(DateTimeOffset.UtcNow - ingestion.Started.Value).TotalMilliseconds);
await Task.Delay(TimeSpan.FromSeconds(10));
}
valid = await _kustoDataValidator.ValidateAsync();
validationAttempts++;
}
while (!valid && validationAttempts < _options.Value.KustoValidationMaxAttempts);
if (!valid)
{
_telemetryClient.TrackMetric(
nameof(KustoIngestionMessageProcessor) + ".FailedValidation.ElapsedMs",
(DateTimeOffset.UtcNow - ingestion.Started.Value).TotalMilliseconds);
_logger.LogWarning("The Kusto validation failed.");
await CleanUpAndSetTerminalStateAsync(ingestion, KustoIngestionState.FailedValidation);
return;
}
ingestion.State = KustoIngestionState.ConfirmingNewerData;
await _storageService.ReplaceIngestionAsync(ingestion);
}
if (ingestion.State == KustoIngestionState.ConfirmingNewerData)
{
var newer = await _kustoDataValidator.IsIngestedDataNewerAsync();
if (newer == false)
{
_logger.LogWarning("The ingested data is older than the existing data. No table swap will occur.");
await DropTempTablesAsync(ingestion);
ingestion.State = KustoIngestionState.Finalizing;
await _storageService.ReplaceIngestionAsync(ingestion);
}
else
{
ingestion.State = KustoIngestionState.SwappingTables;
await _storageService.ReplaceIngestionAsync(ingestion);
}
}
if (ingestion.State == KustoIngestionState.SwappingTables)
{
await DropOldTablesAsync(ingestion);
await SwapIngestedTablesAsync(ingestion);
ingestion.State = KustoIngestionState.DroppingOldTables;
await _storageService.ReplaceIngestionAsync(ingestion);
}
if (ingestion.State == KustoIngestionState.DroppingOldTables)
{
await DropOldTablesAsync(ingestion);
ingestion.State = KustoIngestionState.Finalizing;
await _storageService.ReplaceIngestionAsync(ingestion);
}
if (ingestion.State == KustoIngestionState.Finalizing)
{
_telemetryClient.TrackMetric(
nameof(KustoIngestionMessageProcessor) + ".Complete.ElapsedMs",
(DateTimeOffset.UtcNow - ingestion.Started.Value).TotalMilliseconds);
_logger.LogInformation("The Kusto ingestion is complete.");
await CleanUpAndSetTerminalStateAsync(ingestion, KustoIngestionState.Complete);
}
}
private async Task EnqueueAsync(IReadOnlyList<KustoContainerIngestion> containers)
{
await _messageEnqueuer.EnqueueAsync(containers.Select(x => new KustoContainerIngestionMessage
{
StorageSuffix = x.StorageSuffix,
ContainerName = x.ContainerName,
}).ToList());
}
private async Task CleanUpAndSetTerminalStateAsync(KustoIngestionEntity ingestion, KustoIngestionState terminalState)
{
await _storageService.DeleteChildTableAsync(ingestion.StorageSuffix);
await _storageService.DeleteOldIngestionsAsync(ingestion.IngestionId);
ingestion.Completed = DateTimeOffset.UtcNow;
ingestion.State = terminalState;
await _storageService.ReplaceIngestionAsync(ingestion);
}
private async Task SwapIngestedTablesAsync(KustoIngestionEntity ingestion)
{
var ingestedContainers = await _storageService.GetContainersAsync(ingestion);
var swaps = new List<string>();
foreach (var container in ingestedContainers)
{
var containerName = container.ContainerName;
var old = _csvRecordContainers.GetOldKustoTableName(containerName);
var final = _csvRecordContainers.GetKustoTableName(containerName);
var temp = _csvRecordContainers.GetTempKustoTableName(containerName);
// "final" may not exist if this is the very first ingestion
swaps.Add($"{old} = {final} ifexists");
// "temp" may not exist if this swap step succeed in Kusto but failed with a transient error, causing a retry
swaps.Add($"{final} = {temp} ifexists");
}
var swapCommand = $".rename tables {string.Join(", ", swaps)}";
await ExecuteKustoCommandAsync(swapCommand);
}
private async Task DropTempTablesAsync(KustoIngestionEntity ingestion)
{
var ingestedContainers = await _storageService.GetContainersAsync(ingestion);
var allTemp = ingestedContainers.Select(x => _csvRecordContainers.GetTempKustoTableName(x.ContainerName));
var dropOldCommand = $".drop tables ({string.Join(", ", allTemp)}) ifexists";
await ExecuteKustoCommandAsync(dropOldCommand);
}
private async Task DropOldTablesAsync(KustoIngestionEntity ingestion)
{
var ingestedContainers = await _storageService.GetContainersAsync(ingestion);
var allOld = ingestedContainers.Select(x => _csvRecordContainers.GetOldKustoTableName(x.ContainerName));
var dropOldCommand = $".drop tables ({string.Join(", ", allOld)}) ifexists";
await ExecuteKustoCommandAsync(dropOldCommand);
}
private async Task ExecuteKustoCommandAsync(string command)
{
_logger.LogInformation("Executing Kusto command: {Command}", command);
var adminClient = await _kustoClientFactory.GetAdminClientAsync();
using (await adminClient.ExecuteControlCommandAsync(_options.Value.KustoDatabaseName, command))
{
}
}
}
}