Skip to content

Commit 6b94da5

Browse files
authored
AFD log ingestion for China pipeline (#10659)
* Option to rename output file. * Preserving paths in source storage. * Better filename generation for sourceFilename column. * Not actually logging "was already present" when destination blob was not present.
1 parent b69c471 commit 6b94da5

11 files changed

Lines changed: 222 additions & 94 deletions

File tree

src/Stats.AzureCdnLogs.Common/AzureHelpers/AzureBlobLeaseManager.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Threading.Tasks;
88
using Azure;
99
using Azure.Storage.Blobs;
10+
using Azure.Storage.Blobs.Models;
1011
using Azure.Storage.Blobs.Specialized;
1112
using Microsoft.Extensions.Logging;
1213
using NuGet.Services.Storage;
@@ -45,8 +46,9 @@ public async Task<AzureBlobLockResult> AcquireLease(BlobClient blob, Cancellatio
4546
{
4647
var leaseClient = blob.GetBlobLeaseClient();
4748
var leaseResponse = await leaseClient.AcquireAsync(TimeSpan.FromSeconds(MaxRenewPeriodInSeconds));
49+
BlobProperties properties = await blob.GetPropertiesAsync();
4850
string leaseId = leaseResponse.Value.LeaseId;
49-
var lockResult = new AzureBlobLockResult(blob, lockIsTaken: true, leaseId, token);
51+
var lockResult = new AzureBlobLockResult(blob, properties, lockIsTaken: true, leaseId, token);
5052
BlobClient leasedBlob = lockResult.Blob;
5153
// Start a task that will renew the lease until the token is cancelled or the Release method is invoked
5254
_ = Task.Run(async () =>

src/Stats.AzureCdnLogs.Common/AzureHelpers/AzureBlobLockResult.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Threading;
66
using Azure.Storage.Blobs;
7+
using Azure.Storage.Blobs.Models;
78

89
namespace Stats.AzureCdnLogs.Common
910
{
@@ -15,19 +16,27 @@ public class AzureBlobLockResult : IDisposable
1516

1617
public BlobClient Blob { get; }
1718

19+
public BlobProperties BlobProperties { get; }
20+
1821
/// <summary>
1922
/// It will be cancelled when the renew task could not renew the lease.
2023
/// Operations can listen to this cancellation to stop execution once the lease could not be renewed.
2124
/// </summary>
2225
public CancellationTokenSource BlobOperationToken { get; }
2326

2427
public AzureBlobLockResult(BlobClient blob, bool lockIsTaken, string leaseId, CancellationToken linkToken)
28+
:this(blob, blobProperties: null, lockIsTaken, leaseId, linkToken)
29+
{
30+
}
31+
32+
public AzureBlobLockResult(BlobClient blob, BlobProperties blobProperties, bool lockIsTaken, string leaseId, CancellationToken linkToken)
2533
{
2634
Blob = blob ?? throw new ArgumentNullException(nameof(blob));
2735
LockIsTaken = lockIsTaken;
2836
BlobOperationToken = CancellationTokenSource.CreateLinkedTokenSource(linkToken);
2937
// null is acceptable
3038
LeaseId = leaseId;
39+
BlobProperties = blobProperties;
3140
}
3241

3342
public static AzureBlobLockResult FailedLockResult(BlobClient blob)

src/Stats.AzureCdnLogs.Common/Collect/AzureStatsLogDestination.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public AzureStatsLogDestination(BlobServiceClient blobServiceClient, string cont
5555
public async Task<AsyncOperationResult> TryWriteAsync(Stream inputStream, Action<Stream, Stream> writeAction, string destinationFileName, ContentType destinationContentType, CancellationToken token)
5656
{
5757
_logger.LogInformation("WriteAsync: Start to write to {DestinationFileName}. ContentType is {ContentType}.",
58-
$"{_blobContainerClient.Uri}{destinationFileName}",
58+
$"{_blobContainerClient.Uri}/{destinationFileName}",
5959
destinationContentType);
6060
if (token.IsCancellationRequested)
6161
{
@@ -91,15 +91,18 @@ public async Task<AsyncOperationResult> TryWriteAsync(Stream inputStream, Action
9191
writeAction(inputStream, resultStream);
9292
}
9393

94-
if(!(await blobClient.ExistsAsync(token)))
94+
if (!(await blobClient.ExistsAsync(token)))
9595
{
9696
await resultStream.FlushAsync();
9797
_logger.LogInformation("WriteAsync: End write to {DestinationFileName}", destinationFileName);
9898
return new AsyncOperationResult(true, null);
9999
}
100-
100+
101+
}
102+
else
103+
{
104+
_logger.LogInformation("WriteAsync: The destination file {DestinationFileName}, was already present.", destinationFileName);
101105
}
102-
_logger.LogInformation("WriteAsync: The destination file {DestinationFileName}, was already present.", destinationFileName);
103106
return new AsyncOperationResult(false, null);
104107
}
105108
catch (Exception exception)

src/Stats.AzureCdnLogs.Common/Collect/AzureStatsLogSource.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public AzureStatsLogSource(BlobServiceClient blobServiceClient,
5252
if (containerName == null)
5353
throw new ArgumentNullException(nameof(containerName));
5454
else
55-
throw new ArgumentException(nameof(containerName));
55+
throw new ArgumentException("Empty container name", nameof(containerName));
5656
}
5757
_container = _blobServiceClient.GetBlobContainerClient(containerName);
5858

@@ -245,9 +245,8 @@ private async Task<BlobClient> GetBlobAsync(Uri blobUri)
245245
{
246246
var blobUriBuilder = new BlobUriBuilder(blobUri);
247247
var containerClient = _blobServiceClient.GetBlobContainerClient(blobUriBuilder.BlobContainerName);
248-
var _blobClient = containerClient.GetBlobClient(blobUriBuilder.BlobName);
249-
var properties = await _blobClient.GetPropertiesAsync();
250-
return _blobClient;
248+
var blobClient = containerClient.GetBlobClient(blobUriBuilder.BlobName);
249+
return blobClient;
251250
}
252251
catch (Exception)
253252
{
@@ -331,7 +330,14 @@ private async Task<bool> TryCopyInternalAsync(BlobClient sourceBlob,
331330

332331
private string GetBlobNameFromUri(Uri blobUri)
333332
{
334-
return blobUri.Segments.LastOrDefault();
333+
var path = blobUri.AbsolutePath.TrimStart('/');
334+
var firstSlash = path.IndexOf('/'); // blob storage paths start with container name which we need to remove
335+
if (firstSlash >= 0)
336+
{
337+
return path.Substring(firstSlash + 1);
338+
}
339+
340+
return path;
335341
}
336342

337343
private async Task<BlobContainerClient> CreateContainerAsync(string containerName)

src/Stats.AzureCdnLogs.Common/Collect/Collector.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
using System.Text;
1111
using System.Threading;
1212
using System.Threading.Tasks;
13+
using Azure.Storage.Blobs;
14+
using Azure.Storage.Blobs.Models;
1315
using Microsoft.Extensions.Logging;
1416

1517
namespace Stats.AzureCdnLogs.Common.Collect
@@ -64,7 +66,7 @@ public Collector(
6466
/// <param name="destinationContentType">The <see cref="Stats.AzureCdnLogs.Common.Collect.ContentType" for the destination file./></param>
6567
/// <param name="token">A <see cref="System.Threading.CancellationToken"/> to be used for cancelling the operation.</param>
6668
/// <returns>A collection of exceptions if any.</returns>
67-
public virtual async Task<AggregateException> TryProcessAsync(int maxFileCount, Func<string,string> fileNameTransform, ContentType sourceContentType, ContentType destinationContentType, CancellationToken token)
69+
public virtual async Task<AggregateException> TryProcessAsync(int maxFileCount, Func<string, DateTimeOffset,string> fileNameTransform, ContentType sourceContentType, ContentType destinationContentType, CancellationToken token)
6870
{
6971
ConcurrentBag<Exception> exceptions = new ConcurrentBag<Exception>();
7072
var files = (await _source.GetFilesAsync(maxFileCount, token)).ToArray();
@@ -73,12 +75,13 @@ public virtual async Task<AggregateException> TryProcessAsync(int maxFileCount,
7375
return exceptions.Count() > 0 ? new AggregateException(exceptions.ToArray()) : null;
7476
}
7577

76-
private async Task TryProcessBlobAsync(Uri file, Func<string, string> fileNameTransform, ContentType sourceContentType, ContentType destinationContentType, ConcurrentBag<Exception> exceptions, CancellationToken token)
78+
private async Task TryProcessBlobAsync(Uri file, Func<string, DateTimeOffset, string> fileNameTransform, ContentType sourceContentType, ContentType destinationContentType, ConcurrentBag<Exception> exceptions, CancellationToken token)
7779
{
7880
_logger.LogInformation("TryProcessAsync: {File} ", file.AbsoluteUri);
7981
if (token.IsCancellationRequested)
8082
{
8183
_logger.LogInformation("TryProcessAsync: The operation was cancelled.");
84+
return;
8285
}
8386
try
8487
{
@@ -92,11 +95,16 @@ private async Task TryProcessBlobAsync(Uri file, Func<string, string> fileNameTr
9295
{
9396
var blobToDeadLetter = !await VerifyStreamInternalAsync(file, sourceContentType, blobOperationToken);
9497
var filename = file.Segments.LastOrDefault();
98+
var filenameForData = filename;
99+
if (file.Segments.Any(s => s.StartsWith("y=", StringComparison.OrdinalIgnoreCase) || s.StartsWith("y%3D", StringComparison.OrdinalIgnoreCase)))
100+
{
101+
filenameForData = string.Join("", file.Segments.SkipWhile(s => !s.StartsWith("y=", StringComparison.OrdinalIgnoreCase) && !s.StartsWith("y%3D", StringComparison.OrdinalIgnoreCase)));
102+
}
95103
// If verification passed continue with the rest of the action
96104
// If not just move the blob to deadletter
97105
if (!blobToDeadLetter)
98106
{
99-
var writeOperationResult = await _destination.TryWriteAsync(inputStream, (i, o) => ProcessLogStream(i, o, filename), fileNameTransform(file.Segments.Last()), destinationContentType, blobOperationToken);
107+
var writeOperationResult = await _destination.TryWriteAsync(inputStream, (i, o) => ProcessLogStream(i, o, filenameForData), fileNameTransform(filename, lockResult.BlobProperties.LastModified), destinationContentType, blobOperationToken);
100108
blobToDeadLetter = writeOperationResult.OperationException != null;
101109
}
102110
await _source.TryCleanAsync(lockResult, onError: blobToDeadLetter, token: blobOperationToken);
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
6+
namespace Stats.CollectAzureChinaCDNLogs
7+
{
8+
public class AfdLogLine
9+
{
10+
public DateTime Time { get; set; }
11+
public AfdProperties Properties { get; set; }
12+
}
13+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
namespace Stats.CollectAzureChinaCDNLogs
5+
{
6+
public class AfdProperties
7+
{
8+
public string HttpMethod { get; set; }
9+
public string HttpVersion { get; set; }
10+
public string RequestUri { get; set; }
11+
public string Sni { get; set; }
12+
public string ResponseBytes { get; set; }
13+
public string UserAgent { get; set; }
14+
public string ClientIp { get; set; }
15+
public string ClientPort { get; set; }
16+
public string SocketIp { get; set; }
17+
public string TimeToFirstByte { get; set; }
18+
public string TimeTaken { get; set; }
19+
public string RequestProtocol { get; set; }
20+
public string SecurityProtocol { get; set; }
21+
public string[] RulesEngineMatchNames { get; set; }
22+
public string HttpStatusCode { get; set; }
23+
public string EdgeActionsStatusCode { get; set; }
24+
public string RoxyConnectStatusCode { get; set; }
25+
public string HttpStatusDetails { get; set; }
26+
public string Pop { get; set; }
27+
public string CacheStatus { get; set; }
28+
public string ErrorInfo { get; set; }
29+
public string Result { get; set; }
30+
public string Endpoint { get; set; }
31+
public string RoutingRuleName { get; set; }
32+
public string ClientJA4FingerPrint { get; set; }
33+
public string HostName { get; set; }
34+
public string OriginUrl { get; set; }
35+
public string OriginIp { get; set; }
36+
public string OriginName { get; set; }
37+
public string OriginCryptProtocol { get; set; }
38+
public string OriginCryptCipher { get; set; }
39+
public string Referer { get; set; }
40+
public string ClientCountry { get; set; }
41+
public string Domain { get; set; }
42+
public string SecurityCipher { get; set; }
43+
public string SecurityCurves { get; set; }
44+
}
45+
}

0 commit comments

Comments
 (0)