Skip to content

Commit 758ad66

Browse files
authored
[ST] Blob Tool 1: Get pages of packages to process (#10750)
1 parent 422c215 commit 758ad66

13 files changed

Lines changed: 441 additions & 0 deletions

NuGet.Jobs.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NuGet.Services.Messaging.Em
243243
EndProject
244244
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NuGet.Services.Validation.Issues", "src\NuGet.Services.Validation.Issues\NuGet.Services.Validation.Issues.csproj", "{9B159802-2A27-42EC-A357-06AAEFA70892}"
245245
EndProject
246+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "UpdateBlobProperties", "src\UpdateBlobProperties\UpdateBlobProperties.csproj", "{6F2096D0-BA21-42D1-9C49-36038C51128F}"
247+
EndProject
246248
Global
247249
GlobalSection(SolutionConfigurationPlatforms) = preSolution
248250
Debug|Any CPU = Debug|Any CPU
@@ -631,6 +633,10 @@ Global
631633
{9B159802-2A27-42EC-A357-06AAEFA70892}.Debug|Any CPU.Build.0 = Debug|Any CPU
632634
{9B159802-2A27-42EC-A357-06AAEFA70892}.Release|Any CPU.ActiveCfg = Release|Any CPU
633635
{9B159802-2A27-42EC-A357-06AAEFA70892}.Release|Any CPU.Build.0 = Release|Any CPU
636+
{6F2096D0-BA21-42D1-9C49-36038C51128F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
637+
{6F2096D0-BA21-42D1-9C49-36038C51128F}.Debug|Any CPU.Build.0 = Debug|Any CPU
638+
{6F2096D0-BA21-42D1-9C49-36038C51128F}.Release|Any CPU.ActiveCfg = Release|Any CPU
639+
{6F2096D0-BA21-42D1-9C49-36038C51128F}.Release|Any CPU.Build.0 = Release|Any CPU
634640
EndGlobalSection
635641
GlobalSection(SolutionProperties) = preSolution
636642
HideSolutionNode = FALSE
@@ -733,6 +739,7 @@ Global
733739
{63214B67-733D-4E9D-8249-25EEB2908FBE} = {54773F65-1227-4B35-8991-9E9C1CDACCF3}
734740
{9F3DD471-DBEE-4680-B49D-5AA451E59C4C} = {54773F65-1227-4B35-8991-9E9C1CDACCF3}
735741
{9B159802-2A27-42EC-A357-06AAEFA70892} = {54773F65-1227-4B35-8991-9E9C1CDACCF3}
742+
{6F2096D0-BA21-42D1-9C49-36038C51128F} = {523E282D-A2FE-49C2-A782-0EEC435E765E}
736743
EndGlobalSection
737744
GlobalSection(ExtensibilityGlobals) = postSolution
738745
SolutionGuid = {284A7AC3-FB43-4F1F-9C9C-2AF0E1F46C2B}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Threading.Tasks;
7+
using NuGet.Services.Entities;
8+
using NuGetGallery;
9+
10+
namespace UpdateBlobProperties
11+
{
12+
public abstract class BlobInfo
13+
{
14+
public abstract string GetContainerName();
15+
16+
public abstract string GetBlobName(PackageInfo packageInfo);
17+
18+
public abstract IDictionary<string, string> GetUpdatedBlobProperties();
19+
20+
public abstract Func<IEntityRepository<Package>, int, int, int, Task<List<PackageInfo>>> GetPageOfPackageInfosToUpdateBlobsAsync
21+
{
22+
get;
23+
}
24+
}
25+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Threading.Tasks;
7+
using Azure.Storage.Blobs.Models;
8+
using NuGet.Services.Entities;
9+
using NuGetGallery;
10+
11+
namespace UpdateBlobProperties
12+
{
13+
public class BlobInfoOfPackageVersionIndexInFlatContainer : BlobInfo
14+
{
15+
public override string GetContainerName()
16+
{
17+
return "v3-flatcontainer";
18+
}
19+
20+
public override string GetBlobName(PackageInfo packageInfo)
21+
{
22+
if (packageInfo == null)
23+
{
24+
throw new ArgumentNullException(nameof(packageInfo));
25+
}
26+
27+
if (string.IsNullOrWhiteSpace(packageInfo.Id))
28+
{
29+
throw new ArgumentException($"Invalid package Id with null or whitespace. Package Key: {packageInfo.Key}.");
30+
}
31+
32+
return $"/{packageInfo.Id.ToLowerInvariant()}/index.json";
33+
}
34+
35+
public override IDictionary<string, string> GetUpdatedBlobProperties()
36+
{
37+
return new Dictionary<string, string>()
38+
{
39+
{ nameof(BlobHttpHeaders.CacheControl), "max-age=10" }
40+
};
41+
}
42+
43+
public override Func<IEntityRepository<Package>, int, int, int, Task<List<PackageInfo>>> GetPageOfPackageInfosToUpdateBlobsAsync
44+
{
45+
get
46+
{
47+
return PackageInfo.GetPageOfPackageInfosWithIdOrderedByPackageRegistrationKeyAsync;
48+
}
49+
}
50+
}
51+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using Microsoft.Extensions.Logging;
7+
using Microsoft.Extensions.Options;
8+
using NuGet.Services.Entities;
9+
using NuGetGallery;
10+
11+
namespace UpdateBlobProperties
12+
{
13+
public class Collector : ICollector
14+
{
15+
private readonly BlobInfo _blobInfo;
16+
private readonly IEntityRepository<Package> _packageRepo;
17+
private readonly IOptionsSnapshot<UpdateBlobPropertiesConfiguration> _configuration;
18+
private readonly ILogger<Collector> _logger;
19+
20+
public Collector(BlobInfo blobInfo,
21+
IEntityRepository<Package> packageRepo,
22+
IOptionsSnapshot<UpdateBlobPropertiesConfiguration> configuration,
23+
ILogger<Collector> logger)
24+
{
25+
_blobInfo = blobInfo;
26+
_packageRepo = packageRepo;
27+
_configuration = configuration;
28+
_logger = logger;
29+
}
30+
31+
public async IAsyncEnumerable<IList<PackageInfo>> GetPagesOfPackageInfosAsync(int minKey, int maxKey)
32+
{
33+
int maxPageSize = _configuration.Value.MaxPageSize;
34+
35+
int pageIndex = 1;
36+
int pageStartKey = minKey;
37+
while (pageStartKey <= maxKey)
38+
{
39+
_logger.LogInformation("Loading page: {pageIndex} of package infos from DB. The max size of each page is {maxPageSize}.", pageIndex, maxPageSize);
40+
41+
var pis = await _blobInfo.GetPageOfPackageInfosToUpdateBlobsAsync(_packageRepo, pageStartKey, maxKey, maxPageSize);
42+
if (pis.Count > 0)
43+
{
44+
_logger.LogInformation("Loaded page: {pageIndex} of package infos from DB. The page starts from key: {startPackageKey} and ends at key: {endPackageKey}.",
45+
pageIndex, pis.First().Key, pis.Last().Key);
46+
47+
pageStartKey = pis.Last().Key + 1;
48+
pageIndex++;
49+
50+
yield return pis;
51+
}
52+
else
53+
{
54+
_logger.LogInformation("No more pages to load from DB.");
55+
56+
yield break;
57+
}
58+
}
59+
}
60+
}
61+
}

src/UpdateBlobProperties/Cursor.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Newtonsoft.Json.Linq;
8+
using NuGet.Services.Cursor;
9+
using NuGet.Services.Storage;
10+
11+
namespace UpdateBlobProperties
12+
{
13+
public class Cursor : ReadWriteCursor<int>
14+
{
15+
private readonly Uri _address;
16+
private readonly Storage _storage;
17+
private readonly int _defaultValue;
18+
19+
public Cursor(Uri address, Storage storage, int defaultValue)
20+
{
21+
_address = address ?? throw new ArgumentNullException(nameof(address));
22+
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
23+
_defaultValue = defaultValue;
24+
}
25+
26+
public override async Task Save(CancellationToken cancellationToken)
27+
{
28+
JObject obj = new JObject { { "ProcessedMaxKey", Value } };
29+
StorageContent content = new StringStorageContent(obj.ToString());
30+
31+
await _storage.Save(_address, content, overwrite: true, cancellationToken: cancellationToken);
32+
}
33+
34+
public override async Task Load(CancellationToken cancellationToken)
35+
{
36+
string json = await _storage.LoadString(_address, cancellationToken);
37+
if (json == null)
38+
{
39+
Value = _defaultValue;
40+
41+
return;
42+
}
43+
44+
JObject obj = JObject.Parse(json);
45+
Value = obj["ProcessedMaxKey"].ToObject<int>();
46+
}
47+
}
48+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Collections.Generic;
5+
6+
namespace UpdateBlobProperties
7+
{
8+
public interface ICollector
9+
{
10+
IAsyncEnumerable<IList<PackageInfo>> GetPagesOfPackageInfosAsync(int minKey, int maxKey);
11+
}
12+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace UpdateBlobProperties
8+
{
9+
public interface IProcessor
10+
{
11+
Task ExecuteAsync(CancellationToken token);
12+
}
13+
}

src/UpdateBlobProperties/Job.cs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.ComponentModel.Design;
7+
using System.IO;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Microsoft.Extensions.Configuration;
11+
using Microsoft.Extensions.DependencyInjection;
12+
using Microsoft.Extensions.Logging;
13+
using Microsoft.Extensions.Options;
14+
using Autofac;
15+
using Azure.Storage.Blobs;
16+
using NuGet.Jobs;
17+
using NuGet.Jobs.Validation;
18+
using NuGet.Services.Entities;
19+
using NuGet.Services.Storage;
20+
using NuGetGallery;
21+
22+
namespace UpdateBlobProperties
23+
{
24+
public class Job : ValidationJobBase
25+
{
26+
private const string UpdateBlobPropertiesConfigurationSectionName = "UpdateBlobProperties";
27+
28+
public override void Init(IServiceContainer serviceContainer, IDictionary<string, string> jobArgsDictionary)
29+
{
30+
base.Init(serviceContainer, jobArgsDictionary);
31+
}
32+
33+
public override async Task Run()
34+
{
35+
var processor = _serviceProvider.GetRequiredService<IProcessor>();
36+
37+
await processor.ExecuteAsync(CancellationToken.None);
38+
}
39+
40+
protected override void ConfigureAutofacServices(ContainerBuilder containerBuilder, IConfigurationRoot configurationRoot)
41+
{
42+
}
43+
44+
protected override void ConfigureJobServices(IServiceCollection services, IConfigurationRoot configurationRoot)
45+
{
46+
// Update blob properties of package version index in FlatContainer
47+
services.AddTransient<BlobInfo, BlobInfoOfPackageVersionIndexInFlatContainer>();
48+
49+
services.Configure<UpdateBlobPropertiesConfiguration>(configurationRoot.GetSection(UpdateBlobPropertiesConfigurationSectionName));
50+
services.AddTransient<IProcessor, Processor>();
51+
services.AddTransient<ICollector, Collector>();
52+
services.AddTransient<IEntityRepository<Package>, EntityRepository<Package>>();
53+
54+
services.AddTransient(s =>
55+
{
56+
var address = new Uri("http://localhost");
57+
var fileStorageFactory = new FileStorageFactory(
58+
address,
59+
Directory.GetCurrentDirectory(),
60+
s.GetRequiredService<ILogger<FileStorage>>());
61+
62+
return new Cursor(new Uri(address, "cursor.json"), fileStorageFactory.Create(), defaultValue: 0);
63+
});
64+
}
65+
}
66+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Data.Entity;
7+
using System.Linq;
8+
using System.Threading.Tasks;
9+
using NuGet.Services.Entities;
10+
using NuGetGallery;
11+
12+
namespace UpdateBlobProperties
13+
{
14+
public class PackageInfo
15+
{
16+
public int Key { get; }
17+
public string Id { get; }
18+
public string Version { get; }
19+
20+
public PackageInfo(int key, string id)
21+
{
22+
Key = key;
23+
Id = id;
24+
}
25+
26+
public PackageInfo(int key, string id, string version)
27+
: this(key, id)
28+
{
29+
Version = version;
30+
}
31+
32+
public static Func<IEntityRepository<Package>, int, int, int, Task<List<PackageInfo>>>
33+
GetPageOfPackageInfosWithIdOrderedByPackageRegistrationKeyAsync = async (packageRepository, pageStartKey, maxKey, maxPageSize) =>
34+
{
35+
var prs = await packageRepository.GetAll()
36+
.Include(p => p.PackageRegistration)
37+
.Where(p => p.PackageRegistration.Key >= pageStartKey && p.PackageRegistration.Key <= maxKey)
38+
.Select(p => new { p.PackageRegistration.Key, p.PackageRegistration.Id })
39+
.Distinct()
40+
.OrderBy(pr => pr.Key)
41+
.Take(maxPageSize)
42+
.ToListAsync();
43+
44+
var pis = prs.Select(pr => new PackageInfo(pr.Key, pr.Id))
45+
.OrderBy(pr => pr.Key)
46+
.ToList();
47+
48+
return pis;
49+
};
50+
51+
public static Func<IEntityRepository<Package>, int, int, int, Task<List<PackageInfo>>>
52+
GetPageOfPackageInfosWithIdAndVersionOrderedByPackageKeyAsync = async (packageRepository, pageStartKey, maxKey, maxPageSize) =>
53+
{
54+
var ps = await packageRepository.GetAll()
55+
.Include(p => p.PackageRegistration)
56+
.Where(p => p.Key >= pageStartKey && p.Key <= maxKey)
57+
.OrderBy(p => p.Key)
58+
.Take(maxPageSize)
59+
.ToListAsync();
60+
61+
var pis = ps.Select(p => new PackageInfo(p.Key, p.PackageRegistration.Id, p.NormalizedVersion))
62+
.OrderBy(p => p.Key)
63+
.ToList();
64+
65+
return pis;
66+
};
67+
}
68+
}

0 commit comments

Comments
 (0)