Skip to content
This repository was archived by the owner on Jul 30, 2024. It is now read-only.

Commit caae4e2

Browse files
authored
Hardening the orchestrator shutdown process. (#292)
1 parent aa86424 commit caae4e2

10 files changed

Lines changed: 211 additions & 25 deletions
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Threading;
5+
6+
namespace NuGet.Services.Validation.Orchestrator
7+
{
8+
/// <summary>
9+
/// Provides the ability to send shutdown notification
10+
/// </summary>
11+
public interface IShutdownNotificationProvider
12+
{
13+
/// <summary>
14+
/// Sends a message that shutdown was initiated
15+
/// </summary>
16+
void NotifyShutdownInitiated();
17+
18+
/// <summary>
19+
/// Cancellation token the gets signaled about initiated shutdown
20+
/// </summary>
21+
CancellationToken Token { get; }
22+
}
23+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Threading;
5+
6+
namespace NuGet.Services.Validation.Orchestrator
7+
{
8+
/// <summary>
9+
/// DI helper that wraps <see cref="CancellationToken"/>
10+
/// </summary>
11+
public interface IShutdownNotificationTokenProvider
12+
{
13+
14+
/// <summary>
15+
/// Cancellation token the gets signaled about initiated shutdown
16+
/// </summary>
17+
CancellationToken Token { get; }
18+
}
19+
}

src/NuGet.Services.Validation.Orchestrator/Job.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ private void ConfigureJobServices(IServiceCollection services, IConfigurationRoo
149149
services.AddTransient<ConfigurationValidator>();
150150
services.AddTransient<OrchestrationRunner>();
151151

152+
services.AddSingleton<IShutdownNotificationProvider, ShutdownNotificationProvider>();
153+
services.AddSingleton<IShutdownNotificationTokenProvider>(serviceProvider =>
154+
new ShutdownNotificationTokenProvider(serviceProvider.GetRequiredService<IShutdownNotificationProvider>().Token));
155+
152156
services.AddScoped<NuGetGallery.IEntitiesContext>(serviceProvider =>
153157
new NuGetGallery.EntitiesContext(
154158
serviceProvider.GetRequiredService<IOptionsSnapshot<GalleryDbConfiguration>>().Value.ConnectionString,

src/NuGet.Services.Validation.Orchestrator/NuGet.Services.Validation.Orchestrator.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
<Compile Include="EmailConfiguration.cs" />
5151
<Compile Include="Error.cs" />
5252
<Compile Include="IMessageService.cs" />
53+
<Compile Include="IShutdownNotificationProvider.cs" />
54+
<Compile Include="IShutdownNotificationTokenProvider.cs" />
5355
<Compile Include="IValidationOutcomeProcessor.cs" />
5456
<Compile Include="IValidationSetProcessor.cs" />
5557
<Compile Include="IValidationSetProvider.cs" />
@@ -71,6 +73,8 @@
7173
<Compile Include="Program.cs" />
7274
<Compile Include="Properties\AssemblyInfo.cs" />
7375
<Compile Include="Properties\AssemblyInfo.*.cs" />
76+
<Compile Include="ShutdownNotificationProvider.cs" />
77+
<Compile Include="ShutdownNotificationTokenProvider.cs" />
7478
<Compile Include="SmtpConfiguration.cs" />
7579
<Compile Include="ValidationConfiguration.cs" />
7680
<Compile Include="ValidationConfigurationItem.cs" />

src/NuGet.Services.Validation.Orchestrator/OrchestrationRunner.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,19 @@ public class OrchestrationRunner
2121

2222
private readonly ISubscriptionProcessor<PackageValidationMessageData> _subscriptionProcessor;
2323
private readonly OrchestrationRunnerConfiguration _configuration;
24+
private readonly IShutdownNotificationProvider _shutdownNotificationProvider;
2425
private readonly ILogger<OrchestrationRunner> _logger;
2526

2627
public OrchestrationRunner(
2728
ISubscriptionProcessor<PackageValidationMessageData> subscriptionProcessor,
2829
IOptionsSnapshot<OrchestrationRunnerConfiguration> configurationAccessor,
30+
IShutdownNotificationProvider shutdownNotificationProvider,
2931
ILogger<OrchestrationRunner> logger)
3032
{
3133
_subscriptionProcessor = subscriptionProcessor ?? throw new ArgumentNullException(nameof(subscriptionProcessor));
3234
configurationAccessor = configurationAccessor ?? throw new ArgumentNullException(nameof(configurationAccessor));
3335
_configuration = configurationAccessor.Value ?? throw new ArgumentException("Value property cannot be null", nameof(configurationAccessor));
36+
_shutdownNotificationProvider = shutdownNotificationProvider ?? throw new ArgumentNullException(nameof(shutdownNotificationProvider));
3437
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
3538
}
3639

@@ -42,7 +45,14 @@ public async Task RunOrchestrationAsync()
4245
await Task.Delay(_configuration.ProcessRecycleInterval);
4346

4447
_logger.LogInformation("Recycling the process...");
45-
await _subscriptionProcessor.StartShutdownAsync();
48+
_shutdownNotificationProvider.NotifyShutdownInitiated();
49+
var shutdownTask = _subscriptionProcessor.StartShutdownAsync();
50+
// make sure we don't block on waiting shutdownTask to finish
51+
if (await Task.WhenAny(shutdownTask, Task.Delay(_configuration.ShutdownWaitInterval)) != shutdownTask )
52+
{
53+
_logger.LogWarning("Failed to wait for shutdown initiation task to finish. Waited for {ShutdownWaitInterval}. Will proceed with task termination",
54+
_configuration.ShutdownWaitInterval);
55+
}
4656

4757
DateTimeOffset waitStart = DateTimeOffset.Now;
4858

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Threading;
5+
6+
namespace NuGet.Services.Validation.Orchestrator
7+
{
8+
public class ShutdownNotificationProvider : IShutdownNotificationProvider
9+
{
10+
private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
11+
12+
public CancellationToken Token => _cancellationTokenSource.Token;
13+
14+
public void NotifyShutdownInitiated()
15+
{
16+
_cancellationTokenSource.Cancel();
17+
}
18+
}
19+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Threading;
5+
6+
namespace NuGet.Services.Validation.Orchestrator
7+
{
8+
public class ShutdownNotificationTokenProvider : IShutdownNotificationTokenProvider
9+
{
10+
public ShutdownNotificationTokenProvider(CancellationToken cancellationToken)
11+
{
12+
Token = cancellationToken;
13+
}
14+
15+
public CancellationToken Token { get; }
16+
}
17+
}

src/NuGet.Services.Validation.Orchestrator/ValidationMessageHandler.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,37 @@ public class ValidationMessageHandler : IMessageHandler<PackageValidationMessage
1515
private readonly IValidationSetProvider _validationSetProvider;
1616
private readonly IValidationSetProcessor _validationSetProcessor;
1717
private readonly IValidationOutcomeProcessor _validationOutcomeProcessor;
18+
private readonly IShutdownNotificationTokenProvider _shutdownNotificationTokenProvider;
1819
private readonly ILogger<ValidationMessageHandler> _logger;
1920

2021
public ValidationMessageHandler(
2122
ICorePackageService galleryPackageService,
2223
IValidationSetProvider validationSetProvider,
2324
IValidationSetProcessor validationSetProcessor,
2425
IValidationOutcomeProcessor validationOutcomeProcessor,
26+
IShutdownNotificationTokenProvider shutdownNotificationTokenProvider,
2527
ILogger<ValidationMessageHandler> logger)
2628
{
2729
_galleryPackageService = galleryPackageService ?? throw new ArgumentNullException(nameof(galleryPackageService));
2830
_validationSetProvider = validationSetProvider ?? throw new ArgumentNullException(nameof(validationSetProvider));
2931
_validationSetProcessor = validationSetProcessor ?? throw new ArgumentNullException(nameof(validationSetProcessor));
3032
_validationOutcomeProcessor = validationOutcomeProcessor ?? throw new ArgumentNullException(nameof(validationOutcomeProcessor));
33+
_shutdownNotificationTokenProvider = shutdownNotificationTokenProvider ?? throw new ArgumentNullException(nameof(shutdownNotificationTokenProvider));
34+
if (shutdownNotificationTokenProvider.Token == null)
35+
{
36+
throw new ArgumentException($"{nameof(shutdownNotificationTokenProvider.Token)} property cannot be null", nameof(shutdownNotificationTokenProvider));
37+
}
3138
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
3239
}
3340

3441
public async Task<bool> HandleAsync(PackageValidationMessageData message)
3542
{
43+
if (_shutdownNotificationTokenProvider.Token.IsCancellationRequested)
44+
{
45+
_logger.LogInformation("Service shutdown was requested, will not process new message");
46+
return false;
47+
}
48+
3649
var package = _galleryPackageService.FindPackageByIdAndVersionStrict(message.PackageId, message.PackageVersion);
3750

3851
if (package == null)

tests/NuGet.Services.Validation.Orchestrator.Tests/OrchestrationRunnerFacts.cs

Lines changed: 71 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,69 +16,116 @@ public class OrchestrationRunnerFacts
1616
[Fact]
1717
public async Task StartsMessageProcessing()
1818
{
19-
var subscriptionProcessor = new Mock<ISubscriptionProcessor<PackageValidationMessageData>>();
20-
var loggerMock = new Mock<ILogger<OrchestrationRunner>>();
21-
var optionsAccessorMock = CreateOptionsAccessorMock(TimeSpan.Zero, TimeSpan.Zero);
22-
23-
var runner = new OrchestrationRunner(subscriptionProcessor.Object, optionsAccessorMock.Object, loggerMock.Object);
19+
var runner = CreateRunner();
2420
await runner.RunOrchestrationAsync();
2521

26-
subscriptionProcessor.Verify(o => o.Start(), Times.Once());
22+
SubscriptionProcessorMock.Verify(o => o.Start(), Times.Once());
2723
}
2824

2925
[Fact]
3026
public async Task ShutsDownMessageProcessing()
3127
{
32-
var orchestratorMock = new Mock<ISubscriptionProcessor<PackageValidationMessageData>>();
33-
var loggerMock = new Mock<ILogger<OrchestrationRunner>>();
34-
var optionsAccessorMock = CreateOptionsAccessorMock(TimeSpan.Zero, TimeSpan.Zero);
35-
3628
var startCalled = false;
37-
orchestratorMock
29+
SubscriptionProcessorMock
3830
.Setup(o => o.Start())
3931
.Callback(() => startCalled = true);
4032

41-
orchestratorMock
33+
SubscriptionProcessorMock
4234
.Setup(o => o.StartShutdownAsync())
4335
.Callback(() => Assert.True(startCalled))
4436
.Returns(Task.FromResult(0));
45-
var runner = new OrchestrationRunner(orchestratorMock.Object, optionsAccessorMock.Object, loggerMock.Object);
37+
var runner = CreateRunner();
4638
await runner.RunOrchestrationAsync();
4739

48-
orchestratorMock.Verify(o => o.StartShutdownAsync(), Times.Once());
40+
SubscriptionProcessorMock.Verify(o => o.StartShutdownAsync(), Times.Once());
4941
}
5042

5143
[Fact(Skip = "Flaky test. Won't run it as part of CI.")]
5244
public async Task WaitsOrchestratorToShutDown()
5345
{
54-
var orchestratorMock = new Mock<ISubscriptionProcessor<PackageValidationMessageData>>();
55-
var loggerMock = new Mock<ILogger<OrchestrationRunner>>();
56-
var optionsAccessorMock = CreateOptionsAccessorMock(TimeSpan.Zero, TimeSpan.FromSeconds(3));
46+
SetupOptionsAccessorMock(TimeSpan.Zero, TimeSpan.FromSeconds(3));
5747

5848
int numberOfRequestsInProgress = 2;
59-
orchestratorMock
49+
SubscriptionProcessorMock
6050
.SetupGet(o => o.NumberOfMessagesInProgress)
6151
.Returns(() => numberOfRequestsInProgress--);
6252

63-
var runner = new OrchestrationRunner(orchestratorMock.Object, optionsAccessorMock.Object, loggerMock.Object);
53+
var runner = CreateRunner();
6454
await runner.RunOrchestrationAsync();
6555

66-
orchestratorMock.Verify(o => o.NumberOfMessagesInProgress, Times.Exactly(3));
56+
SubscriptionProcessorMock.Verify(o => o.NumberOfMessagesInProgress, Times.AtLeast(3));
6757
}
6858

69-
private static Mock<IOptionsSnapshot<OrchestrationRunnerConfiguration>> CreateOptionsAccessorMock(
59+
[Fact]
60+
public async Task SendsShutdownNotificationAfterRunningValidations()
61+
{
62+
bool startedProcessing = false;
63+
SubscriptionProcessorMock
64+
.Setup(sp => sp.Start())
65+
.Callback(() => startedProcessing = true);
66+
67+
ShutdownNotificationProviderMock
68+
.Setup(snp => snp.NotifyShutdownInitiated())
69+
.Callback(() => Assert.True(startedProcessing));
70+
71+
var runner = CreateRunner();
72+
await runner.RunOrchestrationAsync();
73+
74+
ShutdownNotificationProviderMock
75+
.Verify(snp => snp.NotifyShutdownInitiated());
76+
}
77+
78+
[Fact(Skip = "Time based test, will not run in CI")]
79+
public async Task DoesNotBlockOnWaitingForSubscriptionProcessorToInitiateShutdown()
80+
{
81+
SubscriptionProcessorMock
82+
.Setup(sp => sp.StartShutdownAsync())
83+
.Returns(Task.Delay(TimeSpan.FromDays(1))); // veeeery long shutdown
84+
85+
var runner = CreateRunner();
86+
var orchestrationTask = runner.RunOrchestrationAsync();
87+
var delayTask = Task.Delay(TimeSpan.FromSeconds(5));
88+
89+
var didNotWait = await Task.WhenAny(orchestrationTask, delayTask) == orchestrationTask;
90+
91+
Assert.True(didNotWait);
92+
SubscriptionProcessorMock
93+
.Verify(sp => sp.StartShutdownAsync(), Times.AtLeastOnce());
94+
}
95+
96+
private Mock<IOptionsSnapshot<OrchestrationRunnerConfiguration>> SetupOptionsAccessorMock(
7097
TimeSpan processRecycleInterval,
7198
TimeSpan shutdownWaitInterval)
7299
{
73-
var optionsAccessorMock = new Mock<IOptionsSnapshot<OrchestrationRunnerConfiguration>>();
74-
optionsAccessorMock
100+
OrchestrationRunnerConfigurationAccessorMock = new Mock<IOptionsSnapshot<OrchestrationRunnerConfiguration>>();
101+
OrchestrationRunnerConfigurationAccessorMock
75102
.SetupGet(o => o.Value)
76103
.Returns(new OrchestrationRunnerConfiguration
77104
{
78105
ProcessRecycleInterval = processRecycleInterval,
79106
ShutdownWaitInterval = shutdownWaitInterval
80107
});
81-
return optionsAccessorMock;
108+
return OrchestrationRunnerConfigurationAccessorMock;
82109
}
110+
111+
private OrchestrationRunner CreateRunner()
112+
=> new OrchestrationRunner(
113+
SubscriptionProcessorMock.Object,
114+
OrchestrationRunnerConfigurationAccessorMock.Object,
115+
ShutdownNotificationProviderMock.Object,
116+
LoggerMock.Object);
117+
118+
public OrchestrationRunnerFacts()
119+
{
120+
SubscriptionProcessorMock = new Mock<ISubscriptionProcessor<PackageValidationMessageData>>();
121+
LoggerMock = new Mock<ILogger<OrchestrationRunner>>();
122+
ShutdownNotificationProviderMock = new Mock<IShutdownNotificationProvider>();
123+
SetupOptionsAccessorMock(TimeSpan.Zero, TimeSpan.Zero);
124+
}
125+
126+
private Mock<ISubscriptionProcessor<PackageValidationMessageData>> SubscriptionProcessorMock { get; }
127+
private Mock<ILogger<OrchestrationRunner>> LoggerMock { get; }
128+
private Mock<IShutdownNotificationProvider> ShutdownNotificationProviderMock { get; }
129+
private Mock<IOptionsSnapshot<OrchestrationRunnerConfiguration>> OrchestrationRunnerConfigurationAccessorMock { get; set; }
83130
}
84131
}

tests/NuGet.Services.Validation.Orchestrator.Tests/ValidationMessageHandlerFacts.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using Microsoft.Extensions.Logging;
78
using Moq;
@@ -87,6 +88,29 @@ public async Task CallsProcessValidationOutcome()
8788
ValidationOutcomeProcessorMock
8889
.Verify(vop => vop.ProcessValidationOutcomeAsync(ValidationSet, Package));
8990
}
91+
92+
[Fact]
93+
public async Task AbandonsMessageProcessingIfShutdownIsInProgress()
94+
{
95+
var cancellationTokenSource = new CancellationTokenSource();
96+
ShutdownNotificationTokenProviderMock
97+
.SetupGet(x => x.Token)
98+
.Returns(cancellationTokenSource.Token);
99+
cancellationTokenSource.Cancel();
100+
101+
var handler = CreateHandler();
102+
var result = await handler.HandleAsync(MessageData);
103+
104+
Assert.False(result);
105+
CorePackageServiceMock
106+
.Verify(cps => cps.FindPackageByIdAndVersionStrict(It.IsAny<string>(), It.IsAny<string>()), Times.Never());
107+
ValidationSetProviderMock
108+
.Verify(vsp => vsp.GetOrCreateValidationSetAsync(It.IsAny<Guid>(), It.IsAny<Package>()), Times.Never());
109+
ValidationSetProcessorMock
110+
.Verify(vsp => vsp.ProcessValidationsAsync(It.IsAny<PackageValidationSet>(), It.IsAny<Package>()), Times.Never());
111+
ValidationOutcomeProcessorMock
112+
.Verify(vop => vop.ProcessValidationOutcomeAsync(It.IsAny<PackageValidationSet>(), It.IsAny<Package>()), Times.Never());
113+
}
90114
}
91115

92116
public class ValidationMessageHandlerFactsBase
@@ -95,6 +119,7 @@ public class ValidationMessageHandlerFactsBase
95119
protected Mock<IValidationSetProvider> ValidationSetProviderMock { get; }
96120
protected Mock<IValidationSetProcessor> ValidationSetProcessorMock { get; }
97121
protected Mock<IValidationOutcomeProcessor> ValidationOutcomeProcessorMock { get; }
122+
protected Mock<IShutdownNotificationTokenProvider> ShutdownNotificationTokenProviderMock { get; }
98123
protected Mock<ILogger<ValidationMessageHandler>> LoggerMock { get; }
99124

100125
public ValidationMessageHandlerFactsBase(MockBehavior mockBehavior)
@@ -103,6 +128,10 @@ public ValidationMessageHandlerFactsBase(MockBehavior mockBehavior)
103128
ValidationSetProviderMock = new Mock<IValidationSetProvider>(mockBehavior);
104129
ValidationSetProcessorMock = new Mock<IValidationSetProcessor>(mockBehavior);
105130
ValidationOutcomeProcessorMock = new Mock<IValidationOutcomeProcessor>(mockBehavior);
131+
ShutdownNotificationTokenProviderMock = new Mock<IShutdownNotificationTokenProvider>(mockBehavior);
132+
ShutdownNotificationTokenProviderMock
133+
.SetupGet(x => x.Token)
134+
.Returns(CancellationToken.None);
106135
LoggerMock = new Mock<ILogger<ValidationMessageHandler>>(); // we generally don't care about how logger is called, so it's loose all the time
107136
}
108137

@@ -113,6 +142,7 @@ public ValidationMessageHandler CreateHandler()
113142
ValidationSetProviderMock.Object,
114143
ValidationSetProcessorMock.Object,
115144
ValidationOutcomeProcessorMock.Object,
145+
ShutdownNotificationTokenProviderMock.Object,
116146
LoggerMock.Object);
117147
}
118148
}

0 commit comments

Comments
 (0)