Skip to content

Commit 30702d2

Browse files
authored
[ST] 2. Catalog2Dnx: Cursor value and updates (#10696)
* Add * fix test * update * update * update * update * update * update * update * update * update
1 parent 821fac2 commit 30702d2

14 files changed

Lines changed: 452 additions & 25 deletions

src/Catalog/Dnx/DnxConstants.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

4+
using System;
45
using System.Collections.Generic;
56
using NuGet.Services.Metadata.Catalog.Persistence;
67

78
namespace NuGet.Services.Metadata.Catalog.Dnx
89
{
9-
internal static class DnxConstants
10+
public static class DnxConstants
1011
{
1112
internal const string ApplicationOctetStreamContentType = "application/octet-stream";
1213
internal const string DefaultCacheControl = "max-age=120";
@@ -16,5 +17,13 @@ internal static class DnxConstants
1617
{ StorageConstants.CacheControl, DefaultCacheControl },
1718
{ StorageConstants.ContentType, ApplicationOctetStreamContentType }
1819
};
20+
21+
// Cache Duration of Package Version Index (at CDN)
22+
public static readonly TimeSpan CacheDurationOfPackageVersionIndex = TimeSpan.FromSeconds(60);
23+
24+
// Front Cursor with Updates
25+
// (MaxNumberOfUpdatesToKeepOfFrontCursor - 1) * MinIntervalBetweenTwoUpdatesOfFrontCursor > CacheDurationOfPackageVersionIndex
26+
public const int MaxNumberOfUpdatesToKeepOfFrontCursor = 31;
27+
public static readonly TimeSpan MinIntervalBetweenTwoUpdatesOfFrontCursor = TimeSpan.FromSeconds(60);
1928
}
20-
}
29+
}

src/Catalog/DurableCursor.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ namespace NuGet.Services.Metadata.Catalog
1111
{
1212
public class DurableCursor : ReadWriteCursor
1313
{
14-
Uri _address;
15-
Persistence.Storage _storage;
16-
DateTime _defaultValue;
14+
protected Uri _address;
15+
protected Persistence.Storage _storage;
16+
protected DateTime _defaultValue;
1717

1818
public DurableCursor(Uri address, Persistence.Storage storage, DateTime defaultValue)
1919
{
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Diagnostics;
7+
using System.Linq;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Microsoft.Extensions.Logging;
11+
using Newtonsoft.Json;
12+
using NuGet.Services.Metadata.Catalog.Persistence;
13+
14+
namespace NuGet.Services.Metadata.Catalog
15+
{
16+
public class DurableCursorWithUpdates : DurableCursor
17+
{
18+
private readonly ILogger _logger;
19+
20+
private readonly int _maxNumberOfUpdatesToKeep;
21+
private readonly TimeSpan _minIntervalBetweenTwoUpdates;
22+
23+
public DurableCursorWithUpdates(Uri address, Persistence.Storage storage, DateTime defaultValue, ILogger logger,
24+
int maxNumberOfUpdatesToKeep, TimeSpan minIntervalBetweenTwoUpdates)
25+
: base(address, storage, defaultValue)
26+
{
27+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
28+
29+
if (maxNumberOfUpdatesToKeep < 0)
30+
{
31+
throw new ArgumentOutOfRangeException(nameof(maxNumberOfUpdatesToKeep), $"{nameof(maxNumberOfUpdatesToKeep)} must be equal or larger than 0.");
32+
}
33+
34+
if (minIntervalBetweenTwoUpdates < TimeSpan.Zero)
35+
{
36+
throw new ArgumentOutOfRangeException(nameof(minIntervalBetweenTwoUpdates), $"{nameof(minIntervalBetweenTwoUpdates)} must be equal or larger than 0.");
37+
}
38+
39+
_maxNumberOfUpdatesToKeep = maxNumberOfUpdatesToKeep;
40+
_minIntervalBetweenTwoUpdates = minIntervalBetweenTwoUpdates;
41+
}
42+
43+
public override async Task SaveAsync(CancellationToken cancellationToken)
44+
{
45+
var cursorValueWithUpdates = new CursorValueWithUpdates();
46+
47+
var storageContent = await _storage.LoadStringStorageContentAsync(_address, cancellationToken);
48+
if (storageContent != null && storageContent.Content != null)
49+
{
50+
cursorValueWithUpdates = JsonConvert.DeserializeObject<CursorValueWithUpdates>(storageContent.Content, CursorValueWithUpdates.SerializerSettings);
51+
52+
_logger.LogInformation("Read the cursor value: {CursorValue} with the count of updates: {CursorUpdatesCount}, at {Address}.",
53+
cursorValueWithUpdates.Value, cursorValueWithUpdates.Updates.Count, _address.AbsoluteUri);
54+
}
55+
56+
cursorValueWithUpdates.Value = Value.ToString("O");
57+
if (storageContent != null)
58+
{
59+
cursorValueWithUpdates.Updates = GetUpdates(cursorValueWithUpdates, storageContent.StorageDateTimeInUtc);
60+
}
61+
62+
var content = new StringStorageContent(JsonConvert.SerializeObject(cursorValueWithUpdates, CursorValueWithUpdates.SerializerSettings),
63+
"application/json", Constants.NoStoreCacheControl);
64+
65+
await _storage.SaveAsync(_address, content, cancellationToken);
66+
67+
_logger.LogInformation("Updated the cursor value: {CursorValue} with the count of updates: {CursorUpdatesCount}, at {Address}.",
68+
cursorValueWithUpdates.Value, cursorValueWithUpdates.Updates.Count, _address.AbsoluteUri);
69+
}
70+
71+
private IList<CursorValueUpdate> GetUpdates(CursorValueWithUpdates cursorValueWithUpdates, DateTime? storageDateTimeInUtc)
72+
{
73+
if (!storageDateTimeInUtc.HasValue)
74+
{
75+
throw new ArgumentNullException(nameof(storageDateTimeInUtc));
76+
}
77+
78+
var updates = cursorValueWithUpdates.Updates.OrderByDescending(u => u.TimeStamp).ToList();
79+
if (updates.Count == 0 || (updates.Count > 0 && (storageDateTimeInUtc.Value - updates.First().TimeStamp >= _minIntervalBetweenTwoUpdates)))
80+
{
81+
var update = new CursorValueUpdate(storageDateTimeInUtc.Value, cursorValueWithUpdates.Value);
82+
updates.Insert(0, update);
83+
84+
_logger.LogInformation("Added the cursor update with timeStamp: {TimeStamp} and value: {UpdateValue}.",
85+
update.TimeStamp.ToString(CursorValueWithUpdates.SerializerSettings.DateFormatString), update.Value);
86+
}
87+
88+
while (updates.Count > 0 && updates.Count > _maxNumberOfUpdatesToKeep)
89+
{
90+
var update = updates[updates.Count - 1];
91+
92+
_logger.LogInformation("Deleted the cursor update with timeStamp: {TimeStamp} and value: {UpdateValue}.",
93+
update.TimeStamp.ToString(CursorValueWithUpdates.SerializerSettings.DateFormatString), update.Value);
94+
95+
updates.RemoveAt(updates.Count - 1);
96+
}
97+
98+
return updates;
99+
}
100+
}
101+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using Newtonsoft.Json;
7+
8+
namespace NuGet.Services.Metadata.Catalog
9+
{
10+
public class CursorValueWithUpdates
11+
{
12+
public readonly static JsonSerializerSettings SerializerSettings = new JsonSerializerSettings
13+
{
14+
DateFormatString = "O"
15+
};
16+
17+
[JsonProperty("value")]
18+
public string Value { get; set; }
19+
[JsonProperty("updates")]
20+
public IList<CursorValueUpdate> Updates { get; set; } = new List<CursorValueUpdate>();
21+
}
22+
23+
public class CursorValueUpdate
24+
{
25+
public CursorValueUpdate(DateTime timeStamp, string value)
26+
{
27+
TimeStamp = timeStamp;
28+
Value = value;
29+
}
30+
31+
[JsonProperty("timeStamp")]
32+
public DateTime TimeStamp { get; set; }
33+
[JsonProperty("value")]
34+
public string Value { get; set; }
35+
}
36+
}

src/Catalog/Persistence/AzureStorage.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,9 +376,11 @@ protected override async Task<StorageContent> OnLoadAsync(Uri resourceUri, Cance
376376
Response<BlobProperties> properties = await blobClient.GetPropertiesAsync(cancellationToken: cancellationToken);
377377

378378
string content;
379+
DateTimeOffset? storageDateTimeOffset;
379380
using (var originalStream = new MemoryStream())
380381
{
381-
await blobClient.DownloadToAsync(originalStream, cancellationToken);
382+
Response response = await blobClient.DownloadToAsync(originalStream, cancellationToken);
383+
storageDateTimeOffset = response.Headers.Date;
382384

383385
originalStream.Seek(0, SeekOrigin.Begin);
384386

@@ -401,7 +403,14 @@ protected override async Task<StorageContent> OnLoadAsync(Uri resourceUri, Cance
401403
}
402404
}
403405

404-
return new StringStorageContentWithETag(content, properties.Value.ETag.ToString());
406+
if (storageDateTimeOffset.HasValue)
407+
{
408+
return new StringStorageContentWithETag(content, storageDateTimeOffset.Value.UtcDateTime, properties.Value.ETag.ToString());
409+
}
410+
else
411+
{
412+
return new StringStorageContentWithETag(content, properties.Value.ETag.ToString());
413+
}
405414
}
406415
catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.NotFound)
407416
{

src/Catalog/Persistence/Storage.cs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,19 +138,34 @@ public async Task DeleteAsync(Uri resourceUri, CancellationToken cancellationTok
138138
TraceExecutionTime(nameof(DeleteAsync), resourceUri, stopwatch.ElapsedMilliseconds);
139139
}
140140

141-
public async Task<string> LoadStringAsync(Uri resourceUri, CancellationToken cancellationToken)
141+
public virtual async Task<string> LoadStringAsync(Uri resourceUri, CancellationToken cancellationToken)
142142
{
143-
StorageContent content = await LoadAsync(resourceUri, cancellationToken);
144-
if (content == null)
143+
StringStorageContent stringStorageContent = await LoadStringStorageContentAsync(resourceUri, cancellationToken);
144+
if (stringStorageContent == null)
145145
{
146146
return null;
147147
}
148148
else
149149
{
150-
using (Stream stream = content.GetContentStream())
150+
return stringStorageContent.Content;
151+
}
152+
}
153+
154+
public virtual async Task<StringStorageContent> LoadStringStorageContentAsync(Uri resourceUri, CancellationToken cancellationToken)
155+
{
156+
StorageContent storageContent = await LoadAsync(resourceUri, cancellationToken);
157+
if (storageContent == null)
158+
{
159+
return null;
160+
}
161+
else
162+
{
163+
using (Stream stream = storageContent.GetContentStream())
151164
{
152165
StreamReader reader = new StreamReader(stream);
153-
return await reader.ReadToEndAsync();
166+
string content = await reader.ReadToEndAsync();
167+
168+
return new StringStorageContent(content, storageContent.StorageDateTimeInUtc, contentType: storageContent.ContentType, cacheControl: storageContent.CacheControl);
154169
}
155170
}
156171
}

src/Catalog/Persistence/StorageContent.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
35
using System.IO;
46

57
namespace NuGet.Services.Metadata.Catalog.Persistence
68
{
79
public abstract class StorageContent
810
{
11+
public DateTime? StorageDateTimeInUtc
12+
{
13+
get;
14+
set;
15+
}
16+
917
public string ContentType
1018
{
1119
get;

src/Catalog/Persistence/StringStorageContent.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

4+
using System;
45
using System.IO;
56

67
namespace NuGet.Services.Metadata.Catalog.Persistence
@@ -14,6 +15,14 @@ public StringStorageContent(string content, string contentType = "", string cach
1415
CacheControl = cacheControl;
1516
}
1617

18+
public StringStorageContent(string content, DateTime? storageDateTimeInUtc, string contentType = "", string cacheControl = "")
19+
{
20+
Content = content;
21+
StorageDateTimeInUtc = storageDateTimeInUtc;
22+
ContentType = contentType;
23+
CacheControl = cacheControl;
24+
}
25+
1726
public string Content
1827
{
1928
get;
Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,27 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

4+
using System;
5+
46
namespace NuGet.Services.Metadata.Catalog.Persistence
57
{
68
/// <summary>
79
/// Used by <see cref="AzureStorage"/> to expose the ETag associated with a <see cref="Storage.LoadAsync(System.Uri, System.Threading.CancellationToken)"/> operation.
810
/// </summary>
911
public class StringStorageContentWithETag : StringStorageContent
1012
{
11-
public StringStorageContentWithETag(
12-
string content,
13-
string eTag,
14-
string contentType = "",
15-
string cacheControl = "")
13+
public StringStorageContentWithETag(string content, string eTag, string contentType = "", string cacheControl = "")
1614
: base(content, contentType, cacheControl)
1715
{
1816
ETag = eTag;
1917
}
2018

19+
public StringStorageContentWithETag(string content, DateTime? storageDateTimeInUtc, string eTag, string contentType = "", string cacheControl = "")
20+
: base(content, storageDateTimeInUtc, contentType, cacheControl)
21+
{
22+
ETag = eTag;
23+
}
24+
2125
public string ETag { get; }
2226
}
2327
}

src/Ng/Jobs/Catalog2DnxJob.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ protected override void Init(IDictionary<string, string> arguments, Cancellation
101101
httpClientTimeout);
102102

103103
var storage = storageFactory.Create();
104-
_front = new DurableCursor(storage.ResolveUri("cursor.json"), storage, MemoryCursor.MinValue);
104+
_front = new DurableCursorWithUpdates(storage.ResolveUri("cursor.json"), storage, MemoryCursor.MinValue, Logger,
105+
DnxConstants.MaxNumberOfUpdatesToKeepOfFrontCursor, DnxConstants.MinIntervalBetweenTwoUpdatesOfFrontCursor);
105106
_back = MemoryCursor.CreateMax();
106107

107108
_destination = storageFactory.BaseAddress;

0 commit comments

Comments
 (0)