Skip to content
This repository was archived by the owner on Jul 30, 2024. It is now read-only.

Commit 297a5cf

Browse files
authored
[Revalidation] Improve throughput by batching revalidations (#667)
Increase the throughput of the revalidation job by batching revalidations. The revalidation job does a bunch of work before enqueueing revalidations as it must: 1. Ensure the revalidation job hasn't been killswitched 2. Verify that the ingestion pipeline is healthy 3. Verify that the desired package event rate hasn't been reached This change amortizes that work by enqueueing revalidations in batches. Also, the job now takes into account how long it spent enqueueing revalidations when deciding how long to sleep for. Addresses https://github.com/NuGet/Engineering/issues/1877
1 parent 9accd1f commit 297a5cf

24 files changed

Lines changed: 477 additions & 258 deletions

src/NuGet.Services.Revalidate/Configuration/RevalidationQueueConfiguration.cs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,13 @@ namespace NuGet.Services.Revalidate
88
public class RevalidationQueueConfiguration
99
{
1010
/// <summary>
11-
/// If non-null, this skips revalidations of packages with more than this many versions.
11+
/// The maximum number of revalidations that should be returned by <see cref="IRevalidationQueue.NextAsync"/>.
1212
/// </summary>
13-
public int? MaximumPackageVersions { get; set; }
13+
public int MaxBatchSize { get; set; } = 64;
1414

1515
/// <summary>
16-
/// The maximum times that the <see cref="RevalidationQueue"/> should look for a revalidation
17-
/// before giving up.
18-
/// </summary>
19-
public int MaximumAttempts { get; set; } = 5;
20-
21-
/// <summary>
22-
/// The time to sleep after an initialized revalidation is deemed completed.
16+
/// If non-null, this skips revalidations of packages with more than this many versions.
2317
/// </summary>
24-
public TimeSpan SleepBetweenAttempts { get; set; } = TimeSpan.FromSeconds(5);
18+
public int? MaximumPackageVersions { get; set; }
2519
}
2620
}

src/NuGet.Services.Revalidate/NuGet.Services.Revalidate.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
<Compile Include="Initialization\PackageRevalidationInserter.cs" />
7070
<Compile Include="Services\RevalidationOperation.cs" />
7171
<Compile Include="Services\RevalidationQueue.cs" />
72-
<Compile Include="Services\RevalidationResult.cs" />
72+
<Compile Include="Services\StartRevalidationStatus.cs" />
7373
<Compile Include="Services\RevalidationService.cs" />
7474
<Compile Include="Services\RevalidationJobStateService.cs" />
7575
<Compile Include="Services\PackageRevalidationStateService.cs" />
@@ -82,6 +82,7 @@
8282
<Compile Include="Program.cs" />
8383
<Compile Include="Properties\AssemblyInfo.cs" />
8484
<Compile Include="Properties\AssemblyInfo.*.cs" />
85+
<Compile Include="Services\StartRevalidationResult.cs" />
8586
</ItemGroup>
8687
<ItemGroup>
8788
<None Include="App.config" />

src/NuGet.Services.Revalidate/Services/IPackageRevalidationStateService.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ public interface IPackageRevalidationStateService
3434
Task<int> CountRevalidationsEnqueuedInPastHourAsync();
3535

3636
/// <summary>
37-
/// Update the package revalidation and mark is as enqueued.
37+
/// Update the package revalidations and mark them as enqueued.
3838
/// </summary>
39-
/// <param name="revalidation">The revalidation to update.</param>
40-
/// <returns>A task that completes once the revalidation has been updated.</returns>
41-
Task MarkPackageRevalidationAsEnqueuedAsync(PackageRevalidation revalidation);
39+
/// <param name="revalidations">The revalidations to update.</param>
40+
/// <returns>A task that completes once the revalidations have been updated.</returns>
41+
Task MarkPackageRevalidationsAsEnqueuedAsync(IReadOnlyList<PackageRevalidation> revalidations);
4242
}
4343
}
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

4+
using System.Collections.Generic;
45
using System.Threading.Tasks;
56
using NuGet.Services.Validation;
67

@@ -9,9 +10,9 @@ namespace NuGet.Services.Revalidate
910
public interface IRevalidationQueue
1011
{
1112
/// <summary>
12-
/// Fetch the next package to revalidate.
13+
/// Fetch the next packages to revalidate.
1314
/// </summary>
14-
/// <returns>The next package to revalidate, or null if there are no packages to revalidate at this time.</returns>
15-
Task<PackageRevalidation> NextOrNullAsync();
15+
/// <returns>The next packages to revalidate, or an empty list if there are no packages to revalidate at this time.</returns>
16+
Task<IReadOnlyList<PackageRevalidation>> NextAsync();
1617
}
1718
}

src/NuGet.Services.Revalidate/Services/IRevalidationStarter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ public interface IRevalidationStarter
2020
/// 4. A revalidation could not be found at this time
2121
/// </summary>
2222
/// <returns>The result of the revalidation attempt.</returns>
23-
Task<RevalidationResult> StartNextRevalidationAsync();
23+
Task<StartRevalidationResult> StartNextRevalidationsAsync();
2424
}
2525
}

src/NuGet.Services.Revalidate/Services/IRevalidationThrottler.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

4+
using System;
45
using System.Threading.Tasks;
56

67
namespace NuGet.Services.Revalidate
@@ -16,8 +17,10 @@ public interface IRevalidationThrottler
1617
/// <summary>
1718
/// Delay the current task to achieve the desired revalidation rate.
1819
/// </summary>
20+
/// <param name="started">The number of revalidations started.</param>
21+
/// <param name="startDuration">How long it took it took to start the revalidations.</param>
1922
/// <returns>Delay the task to ensure the desired revalidation rate.</returns>
20-
Task DelayUntilNextRevalidationAsync();
23+
Task DelayUntilNextRevalidationAsync(int started, TimeSpan startDuration);
2124

2225
/// <summary>
2326
/// Delay the current task until when a revalidation can be retried.

src/NuGet.Services.Revalidate/Services/ITelemetryService.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

4+
using System;
45
using NuGet.Services.Logging;
56

67
namespace NuGet.Services.Revalidate
78
{
89
public interface ITelemetryService
910
{
11+
IDisposable TrackFindNextRevalidations();
12+
1013
DurationMetric<StartNextRevalidationOperation> TrackStartNextRevalidationOperation();
1114

1215
void TrackPackageRevalidationMarkedAsCompleted(string packageId, string normalizedVersion);

src/NuGet.Services.Revalidate/Services/PackageRevalidationStateService.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,21 +67,21 @@ public async Task<int> CountRevalidationsEnqueuedInPastHourAsync()
6767
.CountAsync();
6868
}
6969

70-
public async Task MarkPackageRevalidationAsEnqueuedAsync(PackageRevalidation revalidation)
70+
public async Task MarkPackageRevalidationsAsEnqueuedAsync(IReadOnlyList<PackageRevalidation> revalidations)
7171
{
7272
try
7373
{
74-
revalidation.Enqueued = DateTime.UtcNow;
74+
var enqueueTime = DateTime.UtcNow;
75+
foreach (var revalidation in revalidations)
76+
{
77+
revalidation.Enqueued = enqueueTime;
78+
}
7579

7680
await _context.SaveChangesAsync();
7781
}
7882
catch (DbUpdateConcurrencyException)
7983
{
80-
_logger.LogWarning(
81-
"Failed to update revalidation as enqueued for {PackageId} {PackageNormalizedVersion}",
82-
revalidation.PackageId,
83-
revalidation.PackageNormalizedVersion);
84-
84+
_logger.LogWarning("Failed to update revalidations as enqueued");
8585
throw;
8686
}
8787
}

src/NuGet.Services.Revalidate/Services/RevalidationOperation.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@ namespace NuGet.Services.Revalidate
66
public class StartNextRevalidationOperation
77
{
88
/// <summary>
9-
/// The result of attempting to start the next revalidation.
9+
/// The result of attempting to start the next revalidations.
1010
/// </summary>
11-
public RevalidationResult Result { get; set; }
11+
public StartRevalidationStatus Result { get; set; }
12+
13+
/// <summary>
14+
/// The number of revalidations started.
15+
/// </summary>
16+
public int Started { get; set; }
1217
}
1318
}

src/NuGet.Services.Revalidate/Services/RevalidationQueue.cs

Lines changed: 110 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Collections.Generic;
56
using System.Data.Entity;
67
using System.Data.Entity.Infrastructure;
78
using System.Linq;
@@ -37,19 +38,17 @@ public RevalidationQueue(
3738
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
3839
}
3940

40-
public async Task<PackageRevalidation> NextOrNullAsync()
41+
public async Task<IReadOnlyList<PackageRevalidation>> NextAsync()
4142
{
42-
for (var i = 0; i < _config.MaximumAttempts; i++)
43+
// Find the next package to revalidate. We will skip packages if:
44+
// 1. The package has more than "MaximumPackageVersions" versions
45+
// 2. The package has already been enqueued for revalidation
46+
// 3. The package's revalidation was completed by an external factory (like manual admin revalidation)
47+
List<PackageRevalidation> next;
48+
using (_telemetry.TrackFindNextRevalidations())
4349
{
44-
_logger.LogInformation(
45-
"Attempting to find the next revalidation. Try {Attempt} of {MaxAttempts}",
46-
i + 1,
47-
_config.MaximumAttempts);
48-
49-
// Find the next package to revalidate. We will skip packages if:
50-
// 1. The package has more than "MaximumPackageVersions" versions
51-
// 2. The package has already been enqueued for revalidation
52-
// 3. The package's revalidation was completed by an external factory (like manual admin revalidation)
50+
_logger.LogInformation("Finding the next packages to revalidate...");
51+
5352
IQueryable<PackageRevalidation> query = _validationContext.PackageRevalidations;
5453

5554
if (_config.MaximumPackageVersions.HasValue)
@@ -61,82 +60,130 @@ public async Task<PackageRevalidation> NextOrNullAsync()
6160
.Any(g => g.Key == r.PackageId));
6261
}
6362

64-
var next = await query
63+
next = await query
6564
.Where(r => r.Enqueued == null)
6665
.Where(r => r.Completed == false)
6766
.OrderBy(r => r.Key)
68-
.FirstOrDefaultAsync();
67+
.Take(_config.MaxBatchSize)
68+
.ToListAsync();
69+
}
6970

70-
if (next == null)
71-
{
72-
_logger.LogWarning("Could not find any incomplete revalidations");
73-
return null;
74-
}
71+
_logger.LogInformation("Found {Revalidations} packages to revalidate", next.Count);
7572

76-
// Don't revalidate packages that already have a repository signature or that no longer exist.
77-
if (await HasRepositorySignature(next) || await IsDeleted(next))
78-
{
79-
await MarkAsCompleted(next);
80-
await Task.Delay(_config.SleepBetweenAttempts);
73+
// Return all the revalidations that aren't already completed.
74+
return await FilterCompletedRevalidationsAsync(next);
75+
}
8176

82-
continue;
83-
}
77+
private async Task<IReadOnlyList<PackageRevalidation>> FilterCompletedRevalidationsAsync(IReadOnlyList<PackageRevalidation> revalidations)
78+
{
79+
if (!revalidations.Any())
80+
{
81+
return revalidations;
82+
}
8483

85-
_logger.LogInformation(
86-
"Found revalidation for {PackageId} {PackageNormalizedVersion} after {Attempt} attempts",
87-
next.PackageId,
88-
next.PackageNormalizedVersion,
89-
i + 1);
84+
var completed = new List<PackageRevalidation>();
85+
var uncompleted = revalidations.ToDictionary(
86+
r => $"{r.PackageId}/{r.PackageNormalizedVersion}",
87+
r => r);
9088

91-
return next;
92-
}
89+
// Packages that already have a repository signature do not need to be revalidated.
90+
_logger.LogInformation("Finding revalidations that can be skipped because their packages are already repository signed...");
91+
92+
var hasRepositorySignatures = await _validationContext.PackageSigningStates
93+
.Select(s => new {
94+
IdAndVersion = s.PackageId + "/" + s.PackageNormalizedVersion,
95+
s.PackageSignatures
96+
})
97+
.Where(s => uncompleted.Keys.Contains(s.IdAndVersion))
98+
.Where(s => s.PackageSignatures.Any(sig => sig.Type == PackageSignatureType.Repository))
99+
.Select(s => s.IdAndVersion)
100+
.ToListAsync();
93101

94102
_logger.LogInformation(
95-
"Did not find any revalidations after {MaxAttempts}. Retry later...",
96-
_config.MaximumAttempts);
103+
"Found {RevalidationCount} revalidations that can be skipped because their packages are already repository signed",
104+
hasRepositorySignatures.Count);
97105

98-
return null;
99-
}
106+
foreach (var idAndVersion in hasRepositorySignatures)
107+
{
108+
completed.Add(uncompleted[idAndVersion]);
109+
uncompleted.Remove(idAndVersion);
110+
}
100111

101-
private Task<bool> HasRepositorySignature(PackageRevalidation revalidation)
102-
{
103-
return _validationContext.PackageSigningStates
104-
.Where(s => s.PackageId == revalidation.PackageId)
105-
.Where(s => s.PackageNormalizedVersion == revalidation.PackageNormalizedVersion)
106-
.Where(s => s.PackageSignatures.Any(sig => sig.Type == PackageSignatureType.Repository))
107-
.AnyAsync();
108-
}
112+
// Packages that are no longer available should not be revalidated.
113+
_logger.LogInformation("Finding revalidations' package statuses...");
109114

110-
private async Task<bool> IsDeleted(PackageRevalidation revalidation)
111-
{
112-
var packageStatus = await _galleryContext.Set<Package>()
113-
.Where(p => p.PackageRegistration.Id == revalidation.PackageId)
114-
.Where(p => p.NormalizedVersion == revalidation.PackageNormalizedVersion)
115-
.Select(p => (PackageStatus?)p.PackageStatusKey)
116-
.FirstOrDefaultAsync();
115+
var packageStatuses = await _galleryContext.Set<Package>()
116+
.Select(p => new
117+
{
118+
Identity = p.PackageRegistration.Id + "/" + p.NormalizedVersion,
119+
p.PackageStatusKey
120+
})
121+
.Where(p => uncompleted.Keys.Contains(p.Identity))
122+
.ToDictionaryAsync(
123+
p => p.Identity,
124+
p => p.PackageStatusKey);
117125

118-
return (packageStatus == null || packageStatus == PackageStatus.Deleted);
119-
}
126+
_logger.LogInformation("Found {PackageStatusCount} revalidations' package statuses", packageStatuses.Count);
127+
128+
foreach (var key in uncompleted.Keys.ToList())
129+
{
130+
// Packages that are hard deleted won't have a status.
131+
if (!packageStatuses.TryGetValue(key, out var status) || status == PackageStatus.Deleted)
132+
{
133+
completed.Add(uncompleted[key]);
134+
uncompleted.Remove(key);
135+
continue;
136+
}
137+
}
120138

121-
private async Task MarkAsCompleted(PackageRevalidation revalidation)
122-
{
123139
_logger.LogInformation(
124-
"Marking package revalidation as completed as it has a repository signature or is deleted for {PackageId} {PackageNormalizedVersion}",
125-
revalidation.PackageId,
126-
revalidation.PackageNormalizedVersion);
140+
"Found {CompletedRevalidations} revalidations that can be skipped. There are {UncompletedRevalidations} " +
141+
"revalidations remaining in this batch",
142+
completed.Count,
143+
uncompleted.Count);
127144

145+
// Update revalidations that were determined to be completed and return the remaining revalidations.
146+
if (completed.Any())
147+
{
148+
await MarkRevalidationsAsCompletedAsync(completed);
149+
}
150+
151+
return uncompleted.Values.ToList();
152+
}
153+
154+
private async Task MarkRevalidationsAsCompletedAsync(IReadOnlyList<PackageRevalidation> revalidations)
155+
{
128156
try
129157
{
130-
revalidation.Completed = true;
158+
foreach (var revalidation in revalidations)
159+
{
160+
_logger.LogInformation(
161+
"Marking package {PackageId} {PackageNormalizedVersion} revalidation as completed as the package is unavailable or the package is already repository signed...",
162+
revalidation.PackageId,
163+
revalidation.PackageNormalizedVersion);
164+
165+
revalidation.Completed = true;
166+
}
131167

132168
await _validationContext.SaveChangesAsync();
133169

134-
_telemetry.TrackPackageRevalidationMarkedAsCompleted(revalidation.PackageId, revalidation.PackageNormalizedVersion);
170+
foreach (var revalidation in revalidations)
171+
{
172+
_logger.LogInformation(
173+
"Marked package {PackageId} {PackageNormalizedVersion} revalidation as completed",
174+
revalidation.PackageId,
175+
revalidation.PackageNormalizedVersion);
176+
177+
_telemetry.TrackPackageRevalidationMarkedAsCompleted(revalidation.PackageId, revalidation.PackageNormalizedVersion);
178+
}
135179
}
136-
catch (DbUpdateConcurrencyException)
180+
catch (DbUpdateConcurrencyException e)
137181
{
138-
// Swallow concurrency exceptions. The package will be marked as completed
139-
// on the next iteration of "NextOrNullAsync".
182+
_logger.LogError(
183+
0,
184+
e,
185+
"Failed to mark package revalidations as completed. " +
186+
$"These revalidations will be marked as completed on the next iteration of {nameof(NextAsync)}...");
140187
}
141188
}
142189
}

0 commit comments

Comments
 (0)