Skip to content
This repository was archived by the owner on Jul 30, 2024. It is now read-only.

Commit 22e445e

Browse files
author
Scott Bommarito
authored
Status - add second layer of aggregation for incidents (#563)
1 parent 60a77e1 commit 22e445e

148 files changed

Lines changed: 8324 additions & 1450 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
@@ -8,10 +8,13 @@
88
using NuGet.Services.Status.Table;
99
using StatusAggregator.Table;
1010

11-
namespace StatusAggregator
11+
namespace StatusAggregator.Collector
1212
{
1313
public class Cursor : ICursor
1414
{
15+
private readonly ITableWrapper _table;
16+
private readonly ILogger<Cursor> _logger;
17+
1518
public Cursor(
1619
ITableWrapper table,
1720
ILogger<Cursor> logger)
@@ -20,16 +23,13 @@ public Cursor(
2023
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
2124
}
2225

23-
private readonly ITableWrapper _table;
24-
25-
private readonly ILogger<Cursor> _logger;
26-
2726
public async Task<DateTime> Get(string name)
2827
{
28+
name = name ?? throw new ArgumentNullException(nameof(name));
29+
2930
using (_logger.Scope("Fetching cursor with name {CursorName}.", name))
3031
{
31-
var cursor = await _table.RetrieveAsync<CursorEntity>(
32-
CursorEntity.DefaultPartitionKey, name);
32+
var cursor = await _table.RetrieveAsync<CursorEntity>(name);
3333

3434
DateTime value;
3535
if (cursor == null)
@@ -50,11 +50,13 @@ public async Task<DateTime> Get(string name)
5050

5151
public Task Set(string name, DateTime value)
5252
{
53+
name = name ?? throw new ArgumentNullException(nameof(name));
54+
5355
using (_logger.Scope("Updating cursor with name {CursorName} to {CursorValue}.", name, value))
5456
{
5557
var cursorEntity = new CursorEntity(name, value);
5658
return _table.InsertOrReplaceAsync(cursorEntity);
5759
}
5860
}
5961
}
60-
}
62+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading.Tasks;
6+
7+
namespace StatusAggregator.Collector
8+
{
9+
public class EntityCollector : IEntityCollector
10+
{
11+
private readonly ICursor _cursor;
12+
private readonly IEntityCollectorProcessor _processor;
13+
14+
public EntityCollector(
15+
ICursor cursor,
16+
IEntityCollectorProcessor processor)
17+
{
18+
_cursor = cursor ?? throw new ArgumentNullException(nameof(cursor));
19+
_processor = processor ?? throw new ArgumentNullException(nameof(processor));
20+
}
21+
22+
public string Name => _processor.Name;
23+
24+
public async Task<DateTime> FetchLatest()
25+
{
26+
var lastCursor = await _cursor.Get(Name);
27+
var nextCursor = await _processor.FetchSince(lastCursor);
28+
if (nextCursor.HasValue)
29+
{
30+
await _cursor.Set(Name, nextCursor.Value);
31+
}
32+
33+
return nextCursor ?? lastCursor;
34+
}
35+
}
36+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading.Tasks;
6+
7+
namespace StatusAggregator.Collector
8+
{
9+
/// <summary>
10+
/// Persists a <see cref="DateTime"/> that represents that job's current progress by a string.
11+
/// </summary>
12+
public interface ICursor
13+
{
14+
/// <summary>
15+
/// Gets the current progress of the job by <paramref name="name"/>.
16+
/// </summary>
17+
Task<DateTime> Get(string name);
18+
19+
/// <summary>
20+
/// Saves <paramref name="value"/> as the current progress of the job by <paramref name="name"/>.
21+
/// </summary>
22+
Task Set(string name, DateTime value);
23+
}
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading.Tasks;
6+
7+
namespace StatusAggregator.Collector
8+
{
9+
/// <summary>
10+
/// Collects new entities from a source.
11+
/// </summary>
12+
public interface IEntityCollector
13+
{
14+
/// <summary>
15+
/// A unique name for this collector.
16+
/// </summary>
17+
string Name { get; }
18+
19+
/// <summary>
20+
/// Fetches all new entities from the source and returns the newest timestamp found.
21+
/// </summary>
22+
Task<DateTime> FetchLatest();
23+
}
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading.Tasks;
6+
7+
namespace StatusAggregator.Collector
8+
{
9+
/// <summary>
10+
/// Used by <see cref="IEntityCollector"/> to fetch the latest entities from a source.
11+
/// </summary>
12+
public interface IEntityCollectorProcessor
13+
{
14+
/// <summary>
15+
/// A unique name for this processor.
16+
/// </summary>
17+
string Name { get; }
18+
19+
/// <summary>
20+
/// Fetches all entities from the source newer than <paramref name="cursor"/> and returns the latest timestamp found.
21+
/// </summary>
22+
Task<DateTime?> FetchSince(DateTime cursor);
23+
}
24+
}

src/StatusAggregator/IncidentUpdater.cs renamed to src/StatusAggregator/Collector/IncidentEntityCollectorProcessor.cs

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,49 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using Microsoft.Extensions.Logging;
55
using NuGet.Jobs.Extensions;
66
using NuGet.Services.Incidents;
77
using NuGet.Services.Status.Table;
8+
using StatusAggregator.Factory;
89
using StatusAggregator.Parse;
9-
using StatusAggregator.Table;
1010
using System;
1111
using System.Linq;
1212
using System.Threading.Tasks;
1313

14-
namespace StatusAggregator
14+
namespace StatusAggregator.Collector
1515
{
16-
public class IncidentUpdater : IIncidentUpdater
16+
/// <summary>
17+
/// Fetches new <see cref="IncidentEntity"/>s using an <see cref="IIncidentApiClient"/>.
18+
/// </summary>
19+
public class IncidentEntityCollectorProcessor : IEntityCollectorProcessor
1720
{
18-
private readonly ITableWrapper _table;
21+
public const string IncidentsCollectorName = "incidents";
22+
1923
private readonly IAggregateIncidentParser _aggregateIncidentParser;
2024
private readonly IIncidentApiClient _incidentApiClient;
21-
private readonly IIncidentFactory _incidentFactory;
22-
private readonly ILogger<IncidentUpdater> _logger;
25+
private readonly IComponentAffectingEntityFactory<IncidentEntity> _incidentFactory;
26+
private readonly ILogger<IncidentEntityCollectorProcessor> _logger;
2327

2428
private readonly string _incidentApiTeamId;
2529

26-
public IncidentUpdater(
27-
ITableWrapper table,
30+
public IncidentEntityCollectorProcessor(
2831
IIncidentApiClient incidentApiClient,
2932
IAggregateIncidentParser aggregateIncidentParser,
30-
IIncidentFactory incidentFactory,
33+
IComponentAffectingEntityFactory<IncidentEntity> incidentFactory,
3134
StatusAggregatorConfiguration configuration,
32-
ILogger<IncidentUpdater> logger)
35+
ILogger<IncidentEntityCollectorProcessor> logger)
3336
{
34-
_table = table ?? throw new ArgumentNullException(nameof(table));
3537
_incidentApiClient = incidentApiClient ?? throw new ArgumentNullException(nameof(incidentApiClient));
3638
_aggregateIncidentParser = aggregateIncidentParser ?? throw new ArgumentNullException(nameof(aggregateIncidentParser));
3739
_incidentFactory = incidentFactory ?? throw new ArgumentNullException(nameof(incidentFactory));
3840
_incidentApiTeamId = configuration?.TeamId ?? throw new ArgumentNullException(nameof(configuration));
3941
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
4042
}
4143

42-
public async Task RefreshActiveIncidents()
43-
{
44-
using (_logger.Scope("Refreshing active incidents."))
45-
{
46-
var activeIncidentEntities = _table
47-
.CreateQuery<IncidentEntity>()
48-
.Where(i => i.PartitionKey == IncidentEntity.DefaultPartitionKey && i.IsActive)
49-
.ToList();
50-
51-
_logger.LogInformation("Refreshing {ActiveIncidentsCount} active incidents.", activeIncidentEntities.Count());
52-
foreach (var activeIncidentEntity in activeIncidentEntities)
53-
{
54-
using (_logger.Scope("Refreshing active incident '{IncidentRowKey}'.", activeIncidentEntity.RowKey))
55-
{
56-
var activeIncident = await _incidentApiClient.GetIncident(activeIncidentEntity.IncidentApiId);
57-
activeIncidentEntity.MitigationTime = activeIncident.MitigationData?.Date;
58-
_logger.LogInformation("Updated mitigation time of active incident to {MitigationTime}", activeIncidentEntity.MitigationTime);
59-
await _table.InsertOrReplaceAsync(activeIncidentEntity);
60-
}
61-
}
62-
}
63-
}
44+
public string Name => IncidentsCollectorName;
6445

65-
public async Task<DateTime?> FetchNewIncidents(DateTime cursor)
46+
public async Task<DateTime?> FetchSince(DateTime cursor)
6647
{
6748
using (_logger.Scope("Fetching all new incidents since {Cursor}.", cursor))
6849
{
@@ -73,12 +54,19 @@ public async Task RefreshActiveIncidents()
7354
.Where(i => i.CreateDate > cursor)
7455
.ToList();
7556

57+
_logger.LogInformation("Found {IncidentCount} incidents to parse.", incidents.Count);
7658
var parsedIncidents = incidents
77-
.SelectMany(i => _aggregateIncidentParser.ParseIncident(i))
59+
.SelectMany(_aggregateIncidentParser.ParseIncident)
7860
.ToList();
79-
foreach (var parsedIncident in parsedIncidents.OrderBy(i => i.CreationTime))
61+
62+
_logger.LogInformation("Parsed {ParsedIncidentCount} incidents.", parsedIncidents.Count);
63+
foreach (var parsedIncident in parsedIncidents.OrderBy(i => i.StartTime))
8064
{
81-
await _incidentFactory.CreateIncident(parsedIncident);
65+
using (_logger.Scope("Creating incident for parsed incident with ID {ParsedIncidentID} affecting {ParsedIncidentPath} at {ParsedIncidentStartTime} with status {ParsedIncidentStatus}.",
66+
parsedIncident.Id, parsedIncident.AffectedComponentPath, parsedIncident.StartTime, parsedIncident.AffectedComponentStatus))
67+
{
68+
await _incidentFactory.CreateAsync(parsedIncident);
69+
}
8270
}
8371

8472
return incidents.Any() ? incidents.Max(i => i.CreateDate) : (DateTime?)null;
@@ -97,4 +85,4 @@ private string GetRecentIncidentsQuery(DateTime cursor)
9785
return query;
9886
}
9987
}
100-
}
88+
}

src/StatusAggregator/Manual/ManualStatusChangeUpdater.cs renamed to src/StatusAggregator/Collector/ManualStatusChangeCollectorProcessor.cs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,49 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using Microsoft.Extensions.Logging;
55
using NuGet.Jobs.Extensions;
66
using NuGet.Services.Status.Table.Manual;
7+
using StatusAggregator.Manual;
78
using StatusAggregator.Table;
89
using System;
910
using System.Linq;
1011
using System.Threading.Tasks;
1112

12-
namespace StatusAggregator.Manual
13+
namespace StatusAggregator.Collector
1314
{
14-
public class ManualStatusChangeUpdater : IManualStatusChangeUpdater
15+
/// <summary>
16+
/// Fetches new <see cref="ManualStatusChangeEntity"/>s using an <see cref="ITableWrapper"/>.
17+
/// </summary>
18+
public class ManualStatusChangeCollectorProcessor : IEntityCollectorProcessor
1519
{
20+
public const string ManualCollectorNamePrefix = "manual";
21+
1622
private readonly ITableWrapper _table;
1723
private readonly IManualStatusChangeHandler _handler;
18-
private readonly ILogger<ManualStatusChangeUpdater> _logger;
24+
private readonly ILogger<ManualStatusChangeCollectorProcessor> _logger;
1925

20-
public ManualStatusChangeUpdater(
26+
public ManualStatusChangeCollectorProcessor(
2127
string name,
2228
ITableWrapper table,
2329
IManualStatusChangeHandler handler,
24-
ILogger<ManualStatusChangeUpdater> logger)
30+
ILogger<ManualStatusChangeCollectorProcessor> logger)
2531
{
26-
Name = name;
32+
Name = ManualCollectorNamePrefix +
33+
(name ?? throw new ArgumentNullException(nameof(name)));
2734
_table = table ?? throw new ArgumentNullException(nameof(table));
2835
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
2936
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
3037
}
3138

3239
public string Name { get; }
3340

34-
public async Task<DateTime?> ProcessNewManualChanges(DateTime cursor)
41+
public async Task<DateTime?> FetchSince(DateTime cursor)
3542
{
3643
using (_logger.Scope("Processing manual status changes."))
3744
{
3845
var manualChangesQuery = _table
39-
.CreateQuery<ManualStatusChangeEntity>()
40-
.Where(c => c.PartitionKey == ManualStatusChangeEntity.DefaultPartitionKey);
46+
.CreateQuery<ManualStatusChangeEntity>();
4147

4248
// Table storage throws on queries with DateTime values that are too low.
4349
// If we are fetching manual changes for the first time, don't filter on the timestamp.
@@ -49,7 +55,7 @@ public ManualStatusChangeUpdater(
4955
var manualChanges = manualChangesQuery.ToList();
5056

5157
_logger.LogInformation("Processing {ManualChangesCount} manual status changes.", manualChanges.Count());
52-
foreach (var manualChange in manualChanges)
58+
foreach (var manualChange in manualChanges.OrderBy(m => m.Timestamp))
5359
{
5460
await _handler.Handle(_table, manualChange);
5561
}
@@ -58,4 +64,4 @@ public ManualStatusChangeUpdater(
5864
}
5965
}
6066
}
61-
}
67+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Threading.Tasks;
5+
using Microsoft.WindowsAzure.Storage.Blob;
6+
7+
namespace StatusAggregator.Container
8+
{
9+
public class ContainerWrapper : IContainerWrapper
10+
{
11+
private readonly CloudBlobContainer _container;
12+
13+
public ContainerWrapper(CloudBlobContainer container)
14+
{
15+
_container = container;
16+
}
17+
18+
public Task CreateIfNotExistsAsync()
19+
{
20+
return _container.CreateIfNotExistsAsync();
21+
}
22+
23+
public Task SaveBlobAsync(string name, string contents)
24+
{
25+
var blob = _container.GetBlockBlobReference(name);
26+
return blob.UploadTextAsync(contents);
27+
}
28+
}
29+
}

0 commit comments

Comments
 (0)