-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTaskStateManager.cs
More file actions
153 lines (126 loc) · 5.71 KB
/
Copy pathTaskStateManager.cs
File metadata and controls
153 lines (126 loc) · 5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
using System.Diagnostics;
using AsyncKeyedLock;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Options;
namespace TaskTurnstile;
internal sealed class TaskStateManager(ITaskStateStore store, TaskTurnstileOptions options) : ITaskStateManager
{
private readonly AsyncKeyedLocker<string> _locker = new();
private readonly IMemoryCache _localCache = new MemoryCache(Options.Create(new MemoryCacheOptions()));
private const int DefaultPollIntervalMs = 250;
public Task<bool> IsRunningAsync(object taskKey, CancellationToken cancellationToken = default) =>
store.IsRunningAsync(TaskKeyConverter.ToKey(taskKey), cancellationToken);
public Task<bool> CanStartAsync(object taskKey, CancellationToken cancellationToken = default) =>
CanStartCoreAsync(TaskKeyConverter.ToKey(taskKey), cancellationToken);
private async Task<bool> CanStartCoreAsync(string key, CancellationToken cancellationToken)
{
if (!await store.IsRunningAsync(key, cancellationToken))
return true;
return await store.IsExpiredAsync(key, cancellationToken);
}
public async Task RunAsync(object taskKey, Func<CancellationToken, Task> work, TimeSpan? maxRuntime = null, CancellationToken cancellationToken = default)
{
var key = TaskKeyConverter.ToKey(taskKey);
await WaitAndStartAsync(key, maxRuntime ?? options.DefaultMaxRuntime, DefaultPollIntervalMs, null, cancellationToken);
try
{
await work(cancellationToken);
}
finally
{
await StopAsync(key);
}
}
public async Task<bool> TryRunAsync(object taskKey, Func<CancellationToken, Task> work, TimeSpan? maxRuntime = null, CancellationToken cancellationToken = default)
{
var key = TaskKeyConverter.ToKey(taskKey);
if (!await StartCoreAsync(key, maxRuntime, cancellationToken))
return false;
try
{
await work(cancellationToken);
return true;
}
finally
{
await StopAsync(key);
}
}
public async Task<TryRunResult<T>> TryRunAsync<T>(object taskKey, Func<CancellationToken, Task<T>> work, TimeSpan? maxRuntime = null, CancellationToken cancellationToken = default)
{
var key = TaskKeyConverter.ToKey(taskKey);
if (!await StartCoreAsync(key, maxRuntime, cancellationToken))
return TryRunResult<T>.Skipped;
try
{
var value = await work(cancellationToken);
return TryRunResult<T>.Ran(value);
}
finally
{
await StopAsync(key);
}
}
public Task WaitAsync(object taskKey, CancellationToken cancellationToken = default) =>
WaitAndStartAsync(TaskKeyConverter.ToKey(taskKey), options.DefaultMaxRuntime, DefaultPollIntervalMs, null, cancellationToken);
public Task WaitAsync(object taskKey, int pollIntervalMs, int? maxWaitMs = null, CancellationToken cancellationToken = default) =>
WaitAndStartAsync(TaskKeyConverter.ToKey(taskKey), options.DefaultMaxRuntime, pollIntervalMs, maxWaitMs, cancellationToken);
private async Task WaitAndStartAsync(string key, TimeSpan? maxRuntime, int pollIntervalMs, int? maxWaitMs, CancellationToken cancellationToken)
{
var stopwatch = Stopwatch.StartNew();
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
// Fast path: if this instance started the task and it hasn't expired locally,
// skip the backing store query entirely.
if (_localCache.TryGetValue(key, out _))
{
if (maxWaitMs.HasValue && stopwatch.ElapsedMilliseconds >= maxWaitMs.Value)
throw new TimeoutException($"Task '{key}' did not become available within {maxWaitMs.Value}ms.");
await Task.Delay(pollIntervalMs, cancellationToken);
continue;
}
if (await StartCoreAsync(key, maxRuntime, cancellationToken))
return;
if (maxWaitMs.HasValue && stopwatch.ElapsedMilliseconds >= maxWaitMs.Value)
throw new TimeoutException($"Task '{key}' did not become available within {maxWaitMs.Value}ms.");
await Task.Delay(pollIntervalMs, cancellationToken);
}
}
public async Task<bool> StartAsync(object taskKey, TimeSpan? maxRuntime = null, CancellationToken cancellationToken = default) =>
await StartCoreAsync(TaskKeyConverter.ToKey(taskKey), maxRuntime, cancellationToken);
private async Task<bool> StartCoreAsync(string key, TimeSpan? maxRuntime, CancellationToken cancellationToken)
{
using (await _locker.LockAsync(key, cancellationToken))
{
if (!await CanStartCoreAsync(key, cancellationToken))
return false;
await store.SetRunningAsync(key, maxRuntime ?? options.DefaultMaxRuntime, cancellationToken);
var effectiveRuntime = maxRuntime ?? options.DefaultMaxRuntime;
if (effectiveRuntime.HasValue)
_localCache.Set(key, true, absoluteExpirationRelativeToNow: effectiveRuntime.Value);
else
_localCache.Set(key, true);
return true;
}
}
public async Task<bool> TryStopAsync(object taskKey)
{
var key = TaskKeyConverter.ToKey(taskKey);
try
{
_localCache.Remove(key);
await store.SetStoppedAsync(key, CancellationToken.None);
return true;
}
catch
{
return false;
}
}
private Task StopAsync(string key)
{
_localCache.Remove(key);
return store.SetStoppedAsync(key, CancellationToken.None);
}
}