Skip to content

Commit 5e0677a

Browse files
authored
[ST] Blob Tool 2: Update blob properties (#10759)
* Add * update * Update * Update
1 parent 770af00 commit 5e0677a

6 files changed

Lines changed: 205 additions & 6 deletions

File tree

src/UpdateBlobProperties/Collector.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ public async IAsyncEnumerable<IList<PackageInfo>> GetPagesOfPackageInfosAsync(in
4141
var pis = await _blobInfo.GetPageOfPackageInfosToUpdateBlobsAsync(_packageRepo, pageStartKey, maxKey, maxPageSize);
4242
if (pis.Count > 0)
4343
{
44-
_logger.LogInformation("Loaded page: {pageIndex} of package infos from DB. The page starts from key: {startPackageKey} and ends at key: {endPackageKey}.",
45-
pageIndex, pis.First().Key, pis.Last().Key);
44+
_logger.LogInformation("Loaded page: {pageIndex} of {pageSize} package infos from DB. The page starts from key: {startPackageKey} and ends at key: {endPackageKey}.",
45+
pageIndex, pis.Count, pis.First().Key, pis.Last().Key);
4646

4747
pageStartKey = pis.Last().Key + 1;
4848
pageIndex++;
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace UpdateBlobProperties
8+
{
9+
public interface IUpdater
10+
{
11+
Task UpdateBlobPropertiesAsync(PackageInfo packageInfo, CancellationToken token);
12+
}
13+
}

src/UpdateBlobProperties/Job.cs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@ protected override void ConfigureJobServices(IServiceCollection services, IConfi
4747
services.AddTransient<BlobInfo, BlobInfoOfPackageVersionIndexInFlatContainer>();
4848

4949
services.Configure<UpdateBlobPropertiesConfiguration>(configurationRoot.GetSection(UpdateBlobPropertiesConfigurationSectionName));
50+
services.AddTransient<IEntityRepository<Package>, EntityRepository<Package>>();
5051
services.AddTransient<IProcessor, Processor>();
5152
services.AddTransient<ICollector, Collector>();
52-
services.AddTransient<IEntityRepository<Package>, EntityRepository<Package>>();
53+
services.AddTransient<IUpdater, Updater>();
5354

5455
services.AddTransient(s =>
5556
{
@@ -61,6 +62,30 @@ protected override void ConfigureJobServices(IServiceCollection services, IConfi
6162

6263
return new Cursor(new Uri(address, "cursor.json"), fileStorageFactory.Create(), defaultValue: 0);
6364
});
65+
66+
services.AddSingleton(s =>
67+
{
68+
var blobServiceClientFactory = StorageAccountHelper.CreateBlobServiceClientFactory(
69+
s.GetService<IOptions<StorageMsiConfiguration>>().Value,
70+
s.GetService<IOptions<UpdateBlobPropertiesConfiguration>>().Value.StorageConnectionString);
71+
72+
var clientOptions = new BlobClientOptions
73+
{
74+
Retry =
75+
{
76+
Mode = Azure.Core.RetryMode.Exponential,
77+
Delay = TimeSpan.FromSeconds(2),
78+
MaxRetries = 6,
79+
NetworkTimeout = TimeSpan.FromMinutes(1)
80+
}
81+
};
82+
83+
var blobInfo = s.GetService<BlobInfo>();
84+
var blobServiceClient = blobServiceClientFactory.GetBlobServiceClient(clientOptions);
85+
var blobContainerClient = blobServiceClient.GetBlobContainerClient(blobInfo.GetContainerName());
86+
87+
return blobContainerClient;
88+
});
6489
}
6590
}
6691
}

src/UpdateBlobProperties/Processor.cs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,39 +14,79 @@ namespace UpdateBlobProperties
1414
public class Processor : IProcessor
1515
{
1616
private readonly ICollector _collector;
17+
private readonly IUpdater _updater;
1718
private readonly IOptionsSnapshot<UpdateBlobPropertiesConfiguration> _configuration;
1819
private readonly Cursor _cursor;
1920
private readonly ILogger<Processor> _logger;
2021

2122
public Processor(ICollector collector,
23+
IUpdater updater,
2224
IOptionsSnapshot<UpdateBlobPropertiesConfiguration> configuration,
2325
Cursor cursor,
2426
ILogger<Processor> logger)
2527
{
2628
_collector = collector;
29+
_updater = updater;
2730
_configuration = configuration;
2831
_cursor = cursor;
2932
_logger = logger;
3033
}
3134

3235
public async Task ExecuteAsync(CancellationToken token)
3336
{
37+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(token);
38+
3439
await _cursor.Load(token);
40+
3541
_logger.LogInformation("Loaded the cursor value. ProcessedMaxKey: {cursorValue}.", _cursor.Value);
3642

3743
var minKey = _cursor.Value + 1;
3844
var maxKey = _configuration.Value.MaxKey;
39-
_logger.LogInformation("Processing pages with minKey: {minKey} and maxKey: {maxKey}.", minKey, maxKey);
45+
var maxDegreeOfParallelism = _configuration.Value.MaxDegreeOfParallelism;
46+
47+
if (minKey > maxKey)
48+
{
49+
_logger.LogInformation("All keys have been processed.");
50+
51+
return;
52+
}
53+
54+
_logger.LogInformation("Processing pages with minKey: {minKey} and maxKey: {maxKey}. Max degree of parallelism is {maxDegreeOfParallelism}", minKey, maxKey, maxDegreeOfParallelism);
4055

4156
await foreach (IList<PackageInfo> packageInfos in _collector.GetPagesOfPackageInfosAsync(minKey: minKey, maxKey: maxKey))
4257
{
43-
// Update blob properties
44-
// Next PR
58+
var packageInfosToProcess = new ConcurrentBag<PackageInfo>(packageInfos);
59+
60+
var tasks = Enumerable
61+
.Range(0, maxDegreeOfParallelism)
62+
.Select(x => ProcessPackageInfosAsync(packageInfosToProcess, cts.Token))
63+
.ToList();
64+
65+
while (tasks.Count > 0)
66+
{
67+
var completedTask = await Task.WhenAny(tasks);
68+
if (completedTask.IsFaulted || completedTask.IsCanceled)
69+
{
70+
cts.Cancel();
71+
await completedTask;
72+
}
73+
74+
tasks.Remove(completedTask);
75+
}
4576

4677
_cursor.Value = packageInfos.Last().Key;
4778
await _cursor.Save(token);
79+
4880
_logger.LogInformation("Saved the cursor value. ProcessedMaxKey: {cursorValue}.", _cursor.Value);
4981
}
5082
}
83+
84+
private async Task ProcessPackageInfosAsync(ConcurrentBag<PackageInfo> packageInfos, CancellationToken token)
85+
{
86+
while (packageInfos.TryTake(out var packageInfo) && !token.IsCancellationRequested)
87+
{
88+
await _updater.UpdateBlobPropertiesAsync(packageInfo, token);
89+
}
90+
}
5191
}
5292
}

src/UpdateBlobProperties/UpdateBlobPropertiesConfiguration.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,6 @@ public class UpdateBlobPropertiesConfiguration
88
public string StorageConnectionString { get; set; }
99
public int MaxPageSize { get; set; }
1010
public int MaxKey { get; set; }
11+
public int MaxDegreeOfParallelism { get; set; }
1112
}
1213
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Net;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Microsoft.Extensions.Logging;
10+
using Azure;
11+
using Azure.Storage.Blobs;
12+
using Azure.Storage.Blobs.Models;
13+
14+
namespace UpdateBlobProperties
15+
{
16+
public class Updater : IUpdater
17+
{
18+
private const int MaxRetries = 3;
19+
20+
private readonly BlobContainerClient _blobContainerClient;
21+
private readonly BlobInfo _blobInfo;
22+
private readonly ILogger<Updater> _logger;
23+
24+
public Updater(BlobContainerClient blobContainerClient, BlobInfo blobInfo, ILogger<Updater> logger)
25+
{
26+
_blobContainerClient = blobContainerClient;
27+
_blobInfo = blobInfo;
28+
_logger = logger;
29+
}
30+
31+
public async Task UpdateBlobPropertiesAsync(PackageInfo packageInfo, CancellationToken token)
32+
{
33+
var blobName = _blobInfo.GetBlobName(packageInfo);
34+
var blobClient = _blobContainerClient.GetBlobClient(blobName);
35+
36+
// Retries for concurrency control (Blob client is enabled with the retry policy)
37+
var retries = 0;
38+
while (retries < MaxRetries)
39+
{
40+
try
41+
{
42+
var response = await blobClient.GetPropertiesAsync(cancellationToken: token);
43+
if (!TryGetHeadersWhenBlobPropertiesNotMatched(response.Value, _blobInfo.GetUpdatedBlobProperties(), out var blobHttpHeaders))
44+
{
45+
_logger.LogInformation("Blob properties of Package Id: {packageId} and Version: {packageVersion} (Blob Uri: {blobUri}) are matched.",
46+
packageInfo.Id, packageInfo.Version, blobClient.Uri);
47+
48+
return;
49+
}
50+
51+
if (await UpdateAsync(blobClient, blobHttpHeaders, response.Value.ETag, token))
52+
{
53+
_logger.LogInformation("Blob properties of Package Id: {packageId} and Version: {packageVersion} (Blob Uri: {blobUri}) are updated successfully.",
54+
packageInfo.Id, packageInfo.Version, blobClient.Uri);
55+
56+
return;
57+
}
58+
else
59+
{
60+
_logger.LogInformation("The blob of Package Id: {packageId} and Version: {packageVersion} (Blob Uri: {blobUri}) has been updated since the last read.",
61+
packageInfo.Id, packageInfo.Version, blobClient.Uri);
62+
}
63+
}
64+
catch (RequestFailedException e) when (e.Status == (int)HttpStatusCode.NotFound)
65+
{
66+
_logger.LogInformation("The blob of Package Id: {packageId} and Version: {packageVersion} (Blob Uri: {blobUri}) does not exist.",
67+
packageInfo.Id, packageInfo.Version, blobClient.Uri);
68+
69+
return;
70+
}
71+
72+
retries++;
73+
if (retries == MaxRetries)
74+
{
75+
throw new Exception($"Failed to update blob properties of Package Id: {packageInfo.Id} and Version: {packageInfo.Version} (Blob Uri: {blobClient.Uri}) after {MaxRetries} retries.");
76+
}
77+
78+
await Task.Delay(TimeSpan.FromSeconds(retries * 5), token);
79+
}
80+
}
81+
82+
private async Task<bool> UpdateAsync(BlobClient blobClient, BlobHttpHeaders blobHttpHeaders, ETag eTag, CancellationToken token)
83+
{
84+
try
85+
{
86+
await blobClient.SetHttpHeadersAsync(blobHttpHeaders, new BlobRequestConditions() { IfMatch = eTag }, cancellationToken: token);
87+
88+
return true;
89+
}
90+
catch (RequestFailedException e) when (e.Status == (int)HttpStatusCode.PreconditionFailed)
91+
{
92+
return false;
93+
}
94+
}
95+
96+
private bool TryGetHeadersWhenBlobPropertiesNotMatched(BlobProperties blobProperties, IDictionary<string, string> updatedBlobProperties,
97+
out BlobHttpHeaders blobHttpHeaders)
98+
{
99+
blobHttpHeaders = new BlobHttpHeaders
100+
{
101+
CacheControl = blobProperties.CacheControl,
102+
ContentType = blobProperties.ContentType,
103+
ContentHash = blobProperties.ContentHash,
104+
ContentEncoding = blobProperties.ContentEncoding,
105+
ContentLanguage = blobProperties.ContentLanguage,
106+
ContentDisposition = blobProperties.ContentDisposition,
107+
};
108+
109+
var notMatched = false;
110+
if (updatedBlobProperties.TryGetValue(nameof(BlobProperties.CacheControl), out var value) &&
111+
blobProperties.CacheControl != value)
112+
{
113+
blobHttpHeaders.CacheControl = value;
114+
notMatched = true;
115+
}
116+
117+
return notMatched;
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)