-
Notifications
You must be signed in to change notification settings - Fork 659
Expand file tree
/
Copy pathPopularityTransferDataClient.cs
More file actions
145 lines (122 loc) · 6.57 KB
/
PopularityTransferDataClient.cs
File metadata and controls
145 lines (122 loc) · 6.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using NuGetGallery;
namespace NuGet.Services.AzureSearch.AuxiliaryFiles
{
public class PopularityTransferDataClient : IPopularityTransferDataClient
{
private static readonly JsonSerializer Serializer = new JsonSerializer();
private readonly ICloudBlobClient _cloudBlobClient;
private readonly IOptionsSnapshot<AzureSearchConfiguration> _options;
private readonly IAzureSearchTelemetryService _telemetryService;
private readonly ILogger<PopularityTransferDataClient> _logger;
private readonly Lazy<ICloudBlobContainer> _lazyContainer;
public PopularityTransferDataClient(
ICloudBlobClient cloudBlobClient,
IOptionsSnapshot<AzureSearchConfiguration> options,
IAzureSearchTelemetryService telemetryService,
ILogger<PopularityTransferDataClient> logger)
{
_cloudBlobClient = cloudBlobClient ?? throw new ArgumentNullException(nameof(cloudBlobClient));
_options = options ?? throw new ArgumentNullException(nameof(options));
_telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_lazyContainer = new Lazy<ICloudBlobContainer>(
() => _cloudBlobClient.GetContainerReference(_options.Value.StorageContainer));
}
private ICloudBlobContainer Container => _lazyContainer.Value;
public async Task<AuxiliaryFileResult<PopularityTransferData>> ReadLatestIndexedAsync(
IAccessCondition accessCondition,
StringCache stringCache)
{
var stopwatch = Stopwatch.StartNew();
var blobName = GetLatestIndexedBlobName();
var blobReference = Container.GetBlobReference(blobName);
_logger.LogInformation("Reading the latest indexed popularity transfers from {BlobName}.", blobName);
bool modified;
var data = new PopularityTransferData();
AuxiliaryFileMetadata metadata;
try
{
using (var stream = await blobReference.OpenReadAsync(accessCondition))
{
ReadStream(stream, (from, to) => data.AddTransfer(stringCache.Dedupe(from), stringCache.Dedupe(to)));
modified = true;
metadata = new AuxiliaryFileMetadata(
lastModified: new DateTimeOffset(blobReference.LastModifiedUtc, TimeSpan.Zero),
loadDuration: stopwatch.Elapsed,
fileSize: blobReference.Properties.Length,
etag: blobReference.ETag);
}
}
catch (CloudBlobNotModifiedException)
{
_logger.LogInformation("The blob {BlobName} has not changed.", blobName);
modified = false;
data = null;
metadata = null;
}
stopwatch.Stop();
_telemetryService.TrackReadLatestIndexedPopularityTransfers(data?.Count, modified, stopwatch.Elapsed);
return new AuxiliaryFileResult<PopularityTransferData>(
modified,
data,
metadata);
}
public async Task ReplaceLatestIndexedAsync(
PopularityTransferData newData,
IAccessCondition accessCondition)
{
using (_telemetryService.TrackReplaceLatestIndexedPopularityTransfers(newData.Count))
{
var blobName = GetLatestIndexedBlobName();
_logger.LogInformation("Replacing the latest indexed popularity transfers from {BlobName}.", blobName);
var blobReference = Container.GetBlobReference(blobName);
using (var stream = await blobReference.OpenWriteAsync(accessCondition, "application/json"))
using (var streamWriter = new StreamWriter(stream))
using (var jsonTextWriter = new JsonTextWriter(streamWriter))
{
blobReference.Properties.ContentType = "application/json";
Serializer.Serialize(jsonTextWriter, newData);
}
}
}
private static void ReadStream(Stream stream, Action<string, string> add)
{
using (var textReader = new StreamReader(stream))
using (var jsonReader = new JsonTextReader(textReader))
{
Guard.Assert(jsonReader.Read(), "The blob should be readable.");
Guard.Assert(jsonReader.TokenType == JsonToken.StartObject, "The first token should be the start of an object.");
Guard.Assert(jsonReader.Read(), "There should be a second token.");
while (jsonReader.TokenType == JsonToken.PropertyName)
{
var fromId = (string)jsonReader.Value;
Guard.Assert(jsonReader.Read(), "There should be a token after the property name.");
Guard.Assert(jsonReader.TokenType == JsonToken.StartArray, "The token after the property name should be the start of an array.");
Guard.Assert(jsonReader.Read(), "There should be a token after the start of the transfer array.");
while (jsonReader.TokenType == JsonToken.String)
{
add(fromId, (string)jsonReader.Value);
Guard.Assert(jsonReader.Read(), "There should be a token after the 'to' package ID.");
}
Guard.Assert(jsonReader.TokenType == JsonToken.EndArray, "The token after reading the array should be the end of an array.");
Guard.Assert(jsonReader.Read(), "There should be a token after the end of the array.");
}
Guard.Assert(jsonReader.TokenType == JsonToken.EndObject, "The last token should be the end of an object.");
Guard.Assert(!jsonReader.Read(), "There should be no token after the end of the object.");
}
}
private string GetLatestIndexedBlobName()
{
return $"{_options.Value.NormalizeStoragePath()}popularity-transfers/popularity-transfers.v1.json";
}
}
}