Skip to content

Commit 7e57816

Browse files
authored
Mooncake log ingestion options to not write headers and add original filename column (#10256)
* Option for not writing the output file header. * Option to add sourceFilename column
1 parent 7e0e5a6 commit 7e57816

6 files changed

Lines changed: 189 additions & 45 deletions

File tree

src/Stats.AzureCdnLogs.Common/Collect/Collector.cs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
@@ -25,6 +25,8 @@ public abstract class Collector
2525
protected ILogSource _source;
2626
protected ILogDestination _destination;
2727
protected readonly ILogger<Collector> _logger;
28+
protected readonly bool _writeHeader;
29+
protected readonly bool _addSourceFilenameColumn;
2830

2931
/// <summary>
3032
/// Used by UnitTests
@@ -38,11 +40,18 @@ public Collector()
3840
/// <param name="source">The source of the Collector.</param>
3941
/// <param name="destination">The destination for the collector.</param>
4042
/// <param name="logger">The logger.</param>
41-
public Collector(ILogSource source, ILogDestination destination, ILogger<Collector> logger)
43+
public Collector(
44+
ILogSource source,
45+
ILogDestination destination,
46+
ILogger<Collector> logger,
47+
bool writeHeader,
48+
bool addSourceFilenameColumn)
4249
{
4350
_source = source;
4451
_destination = destination;
4552
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
53+
_writeHeader = writeHeader;
54+
_addSourceFilenameColumn = addSourceFilenameColumn;
4655
}
4756

4857
/// <summary>
@@ -126,9 +135,25 @@ private void AddException(ConcurrentBag<Exception> exceptions, Exception e, stri
126135
/// A method to transform each line from the input stream before writing it to the output stream. It is useful for example to modify the schema of each line.
127136
/// </summary>
128137
/// <param name="line">A line from the input stream.</param>
138+
/// <param name="sourceFilename">The name of the file the <paramref name="line"/> was read from.</param>
129139
/// <returns>The transformed line.</returns>
130140
public abstract OutputLogLine TransformRawLogLine(string line);
131141

142+
/// <summary>
143+
/// A method that returns the file header to put into the output file (if enabled).
144+
/// </summary>
145+
/// <returns>The header line.</returns>
146+
public string GetOutputFileHeader()
147+
{
148+
if (!_addSourceFilenameColumn)
149+
{
150+
return OutputLogLine.Header;
151+
}
152+
153+
return $"{OutputLogLine.Header} sourceFilename";
154+
}
155+
156+
132157
/// <summary>
133158
/// A method to validate the stream integrity before data transfer.
134159
/// </summary>
@@ -151,7 +176,10 @@ protected void ProcessLogStream(Stream sourceStream, Stream targetStream, string
151176
using (var sourceStreamReader = new StreamReader(sourceStream))
152177
using (var targetStreamWriter = new StreamWriter(targetStream))
153178
{
154-
targetStreamWriter.WriteLine(OutputLogLine.Header);
179+
if (_writeHeader)
180+
{
181+
targetStreamWriter.WriteLine(GetOutputFileHeader());
182+
}
155183

156184
while (!sourceStreamReader.EndOfStream)
157185
{
@@ -279,7 +307,15 @@ private string GetParsedModifiedLogEntry(int lineNumber, string rawLogEntry, str
279307
stringBuilder.Append(spaceCharacter);
280308

281309
// x-ec_custom-1
282-
stringBuilder.AppendLine((parsedEntry.CustomField ?? dashCharacter));
310+
stringBuilder.Append((parsedEntry.CustomField ?? dashCharacter));
311+
312+
if (_addSourceFilenameColumn)
313+
{
314+
stringBuilder.Append(spaceCharacter);
315+
stringBuilder.Append(OutputLogLine.Quote(filename));
316+
}
317+
318+
stringBuilder.AppendLine();
283319

284320
return stringBuilder.ToString();
285321
}

src/Stats.AzureCdnLogs.Common/Collect/OutputLogLine.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
namespace Stats.AzureCdnLogs.Common.Collect
@@ -79,15 +79,15 @@ public OutputLogLine(string timestamp,
7979

8080
public static string Header
8181
{
82-
get { return "#Fields: timestamp time-taken c-ip filesize s-ip s-port sc-status sc-bytes cs-method cs-uri-stem - rs-duration rs-bytes c-referrer c-user-agent customer-id x-ec_custom-1\n"; }
82+
get { return "#Fields: timestamp time-taken c-ip filesize s-ip s-port sc-status sc-bytes cs-method cs-uri-stem - rs-duration rs-bytes c-referrer c-user-agent customer-id x-ec_custom-1"; }
8383
}
8484

8585
public override string ToString()
8686
{
8787
return $"{TimeStamp} {TimeTaken} {CIp} {FileSize} {SIp} {SPort} {ScStatus} {ScBytes} {CsMethod} {CsUriStem} - {RsDuration} {RsBytes} {CReferrer} {Quote(CUserAgent)} {CustomerId} {Quote(XEc_Custom_1)}";
8888
}
8989

90-
private static string Quote(string input)
90+
public static string Quote(string input)
9191
{
9292
if (input.StartsWith("\"") && input.EndsWith("\""))
9393
{

src/Stats.CollectAzureChinaCDNLogs/ChinaStatsCollector.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
@@ -35,7 +35,13 @@ private enum ChinaLogHeaderFields
3535
sip = 11
3636
}
3737

38-
public ChinaStatsCollector(ILogSource source, ILogDestination destination, ILogger<ChinaStatsCollector> logger) : base(source, destination, logger)
38+
public ChinaStatsCollector(
39+
ILogSource source,
40+
ILogDestination destination,
41+
ILogger<ChinaStatsCollector> logger,
42+
bool writeHeader,
43+
bool addSourceFilenameColumn)
44+
: base(source, destination, logger, writeHeader, addSourceFilenameColumn)
3945
{}
4046

4147
public override OutputLogLine TransformRawLogLine(string line)

src/Stats.CollectAzureChinaCDNLogs/Configuration/CollectAzureChinaCdnLogsConfiguration.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
namespace Stats.CollectAzureChinaCDNLogs
@@ -16,5 +16,8 @@ public class CollectAzureChinaCdnLogsConfiguration
1616
public string DestinationFilePrefix { get; set; }
1717

1818
public int? ExecutionTimeoutInSeconds { get; set; }
19+
20+
public bool WriteOutputHeader { get; set; } = true;
21+
public bool AddSourceFilenameColumn { get; set; } = false;
1922
}
2023
}

src/Stats.CollectAzureChinaCDNLogs/Job.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
@@ -53,7 +53,12 @@ public void InitializeJobConfiguration(IServiceProvider serviceProvider)
5353
_configuration.AzureContainerNameDestination,
5454
serviceProvider.GetRequiredService<ILogger<AzureStatsLogDestination>>());
5555

56-
_chinaCollector = new ChinaStatsCollector(source, dest, serviceProvider.GetRequiredService<ILogger<ChinaStatsCollector>>());
56+
_chinaCollector = new ChinaStatsCollector(
57+
source,
58+
dest,
59+
serviceProvider.GetRequiredService<ILogger<ChinaStatsCollector>>(),
60+
_configuration.WriteOutputHeader,
61+
_configuration.AddSourceFilenameColumn);
5762
}
5863

5964
public override async Task Run()

tests/Tests.Stats.CollectAzureChinaCDNLogs/ChinaCollectorTests.cs

Lines changed: 127 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
@@ -35,7 +35,9 @@ public void TransformRawLogLine(string input, string expectedOutput)
3535
var collector = new ChinaStatsCollector(
3636
Mock.Of<ILogSource>(),
3737
Mock.Of<ILogDestination>(),
38-
Mock.Of<ILogger<ChinaStatsCollector>>());
38+
Mock.Of<ILogger<ChinaStatsCollector>>(),
39+
writeHeader: true,
40+
addSourceFilenameColumn: false);
3941

4042
var transformedInput = collector.TransformRawLogLine(input);
4143
string output = transformedInput == null ? null : transformedInput.ToString();
@@ -51,7 +53,9 @@ public void CdnLogEntryParserIntegration(string input)
5153
var collector = new ChinaStatsCollector(
5254
Mock.Of<ILogSource>(),
5355
Mock.Of<ILogDestination>(),
54-
Mock.Of<ILogger<ChinaStatsCollector>>());
56+
Mock.Of<ILogger<ChinaStatsCollector>>(),
57+
writeHeader: true,
58+
addSourceFilenameColumn: false);
5559

5660
var transformedInput = collector.TransformRawLogLine(input);
5761
if (transformedInput == null)
@@ -74,7 +78,9 @@ public void SkipsMalformedTimestamps()
7478
var collector = new ChinaStatsCollector(
7579
Mock.Of<ILogSource>(),
7680
Mock.Of<ILogDestination>(),
77-
Mock.Of<ILogger<ChinaStatsCollector>>());
81+
Mock.Of<ILogger<ChinaStatsCollector>>(),
82+
writeHeader: true,
83+
addSourceFilenameColumn: false);
7884

7985
OutputLogLine transformedInput = null;
8086

@@ -121,54 +127,146 @@ public void SkipsMalformedTimestamps()
121127
public async Task SkipsLinesWithMalformedColumns(string data, int expectedOutputLines)
122128
{
123129
const string header = "c-ip, timestamp, cs-method, cs-uri-stem, http-ver, sc-status, sc-bytes, c-referer, c-user-agent, rs-duration(ms), hit-miss, s-ip\n";
124-
var sourceUri = new Uri("https://example.com/log1");
130+
Mock<ILogSource> sourceMock = SetupSource(header + data);
125131

126-
var sourceMock = new Mock<ILogSource>();
127-
sourceMock
128-
.Setup(s => s.GetFilesAsync(It.IsAny<int>(), It.IsAny<CancellationToken>(), It.IsAny<string>()))
129-
.ReturnsAsync((IEnumerable<Uri>)new List<Uri> { sourceUri });
132+
var writeSucceeded = false;
133+
var outputStream = new MemoryStream();
134+
Mock<ILogDestination> destinationMock = SetupDestination(outputStream, () => writeSucceeded = true);
130135

131-
sourceMock
132-
.Setup(s => s.TakeLockAsync(sourceUri, It.IsAny<CancellationToken>()))
133-
.ReturnsAsync(new AzureBlobLockResult(new Microsoft.WindowsAzure.Storage.Blob.CloudBlob(sourceUri), true, "foo", CancellationToken.None));
136+
var collector = new ChinaStatsCollector(
137+
sourceMock.Object,
138+
destinationMock.Object,
139+
Mock.Of<ILogger<ChinaStatsCollector>>(),
140+
writeHeader: true,
141+
addSourceFilenameColumn: false);
134142

135-
sourceMock
136-
.Setup(s => s.OpenReadAsync(sourceUri, It.IsAny<ContentType>(), It.IsAny<CancellationToken>()))
137-
.ReturnsAsync(() => new MemoryStream(Encoding.UTF8.GetBytes(header + data)));
143+
await collector.TryProcessAsync(
144+
maxFileCount: 10,
145+
fileNameTransform: s => s,
146+
sourceContentType: ContentType.Text,
147+
destinationContentType: ContentType.Text,
148+
CancellationToken.None);
138149

139-
var destinationMock = new Mock<ILogDestination>();
150+
string[] outputLines = null;
151+
152+
outputLines = GetStreamLines(outputStream);
153+
154+
Assert.True(writeSucceeded);
155+
Assert.NotEmpty(outputLines);
156+
Assert.Equal("#Fields: timestamp time-taken c-ip filesize s-ip s-port sc-status sc-bytes cs-method cs-uri-stem - rs-duration rs-bytes c-referrer c-user-agent customer-id x-ec_custom-1", outputLines[0]);
157+
Assert.True(expectedOutputLines <= outputLines.Length - 1); // excluding header
158+
}
159+
160+
[Fact]
161+
public async Task DoesNotWriteHeaderIfConfigured()
162+
{
163+
const string header = "c-ip, timestamp, cs-method, cs-uri-stem, http-ver, sc-status, sc-bytes, c-referer, c-user-agent, rs-duration(ms), hit-miss, s-ip\n";
164+
const string data = "1.2.3.4,4/6/2019 4:00:20 PM +00:00,GET,\"/v3-flatcontainer/microsoft.codeanalysis.common/1.2.2/microsoft.codeanalysis.common.1.2.2.nupkg\",HTTPS,200,2044843,\"NULL\",\"NuGet VS VSIX/4.7.0 (Microsoft Windows NT 10.0.17134.0, VS Enterprise/15.0)\",796,MISS,4.3.2.1";
165+
166+
Mock<ILogSource> sourceMock = SetupSource(header + data);
167+
var writeSucceeded = false;
140168
var outputStream = new MemoryStream();
169+
Mock<ILogDestination> destinationMock = SetupDestination(outputStream, () => writeSucceeded = true);
170+
171+
var collector = new ChinaStatsCollector(
172+
sourceMock.Object,
173+
destinationMock.Object,
174+
Mock.Of<ILogger<ChinaStatsCollector>>(),
175+
writeHeader: false,
176+
addSourceFilenameColumn: false);
177+
178+
await collector.TryProcessAsync(
179+
maxFileCount: 10,
180+
fileNameTransform: s => s,
181+
sourceContentType: ContentType.Text,
182+
destinationContentType: ContentType.Text,
183+
CancellationToken.None);
184+
185+
string[] outputLines = null;
186+
187+
outputLines = GetStreamLines(outputStream);
188+
Assert.True(writeSucceeded);
189+
Assert.Single(outputLines);
190+
Assert.False(outputLines[0].StartsWith("#Fields"));
191+
}
192+
193+
[Fact]
194+
public async Task WritesSourceFilenameColumn()
195+
{
196+
const string header = "c-ip, timestamp, cs-method, cs-uri-stem, http-ver, sc-status, sc-bytes, c-referer, c-user-agent, rs-duration(ms), hit-miss, s-ip\n";
197+
const string data = "1.2.3.4,4/6/2019 4:00:20 PM +00:00,GET,\"/v3-flatcontainer/microsoft.codeanalysis.common/1.2.2/microsoft.codeanalysis.common.1.2.2.nupkg\",HTTPS,200,2044843,\"NULL\",\"NuGet VS VSIX/4.7.0 (Microsoft Windows NT 10.0.17134.0, VS Enterprise/15.0)\",796,MISS,4.3.2.1";
198+
199+
Mock<ILogSource> sourceMock = SetupSource(header + data);
141200
var writeSucceeded = false;
201+
var outputStream = new MemoryStream();
202+
Mock<ILogDestination> destinationMock = SetupDestination(outputStream, () => writeSucceeded = true);
203+
204+
var collector = new ChinaStatsCollector(
205+
sourceMock.Object,
206+
destinationMock.Object,
207+
Mock.Of<ILogger<ChinaStatsCollector>>(),
208+
writeHeader: true,
209+
addSourceFilenameColumn: true);
210+
211+
await collector.TryProcessAsync(
212+
maxFileCount: 10,
213+
fileNameTransform: s => s,
214+
sourceContentType: ContentType.Text,
215+
destinationContentType: ContentType.Text,
216+
CancellationToken.None);
217+
218+
string[] outputLines = null;
219+
220+
outputLines = GetStreamLines(outputStream);
221+
Assert.True(writeSucceeded);
222+
Assert.Equal(2, outputLines.Length);
223+
Assert.EndsWith("sourceFilename", outputLines[0]);
224+
Assert.EndsWith("log1", outputLines[1]);
225+
}
226+
227+
private static Mock<ILogDestination> SetupDestination(MemoryStream outputStream, Action onSuccess)
228+
{
229+
var destinationMock = new Mock<ILogDestination>();
142230
destinationMock
143231
.Setup(d => d.TryWriteAsync(It.IsAny<Stream>(), It.IsAny<Action<Stream, Stream>>(), It.IsAny<string>(), It.IsAny<ContentType>(), It.IsAny<CancellationToken>()))
144232
.ReturnsAsync((Stream inputStream, Action<Stream, Stream> writeAction, string destinationFileName, ContentType destinationContentType, CancellationToken token) =>
145233
{
146234
try
147235
{
148236
writeAction(inputStream, outputStream);
149-
writeSucceeded = true;
237+
onSuccess();
150238
}
151239
catch (Exception ex)
152240
{
153241
return new AsyncOperationResult(false, ex);
154242
}
155243
return new AsyncOperationResult(true, null);
156244
});
245+
return destinationMock;
246+
}
157247

158-
var collector = new ChinaStatsCollector(
159-
sourceMock.Object,
160-
destinationMock.Object,
161-
Mock.Of<ILogger<ChinaStatsCollector>>());
248+
private static Mock<ILogSource> SetupSource(string content)
249+
{
250+
var sourceUri = new Uri("https://example.com/log1");
162251

163-
await collector.TryProcessAsync(
164-
maxFileCount: 10,
165-
fileNameTransform: s => s,
166-
sourceContentType: ContentType.Text,
167-
destinationContentType: ContentType.Text,
168-
CancellationToken.None);
252+
var sourceMock = new Mock<ILogSource>();
253+
sourceMock
254+
.Setup(s => s.GetFilesAsync(It.IsAny<int>(), It.IsAny<CancellationToken>(), It.IsAny<string>()))
255+
.ReturnsAsync((IEnumerable<Uri>)new List<Uri> { sourceUri });
169256

170-
string[] outputLines = null;
257+
sourceMock
258+
.Setup(s => s.TakeLockAsync(sourceUri, It.IsAny<CancellationToken>()))
259+
.ReturnsAsync(new AzureBlobLockResult(new Microsoft.WindowsAzure.Storage.Blob.CloudBlob(sourceUri), true, "foo", CancellationToken.None));
171260

261+
sourceMock
262+
.Setup(s => s.OpenReadAsync(sourceUri, It.IsAny<ContentType>(), It.IsAny<CancellationToken>()))
263+
.ReturnsAsync(() => new MemoryStream(Encoding.UTF8.GetBytes(content)));
264+
return sourceMock;
265+
}
266+
267+
private static string[] GetStreamLines(MemoryStream outputStream)
268+
{
269+
string[] outputLines;
172270
// need to reopen closed stream
173271
var outputBuffer = outputStream.ToArray();
174272
outputStream = new MemoryStream(outputBuffer);
@@ -177,11 +275,7 @@ await collector.TryProcessAsync(
177275
outputLines = streamReader.ReadToEnd().Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries);
178276
}
179277

180-
Assert.True(writeSucceeded);
181-
Assert.NotEmpty(outputLines);
182-
Assert.Equal("#Fields: timestamp time-taken c-ip filesize s-ip s-port sc-status sc-bytes cs-method cs-uri-stem - rs-duration rs-bytes c-referrer c-user-agent customer-id x-ec_custom-1", outputLines[0]);
183-
Assert.True(expectedOutputLines <= outputLines.Length - 1); // excluding header
278+
return outputLines;
184279
}
185280
}
186281
}
187-

0 commit comments

Comments
 (0)