This repository was archived by the owner on Mar 31, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathWideEntityService.cs
More file actions
681 lines (595 loc) · 27.9 KB
/
WideEntityService.cs
File metadata and controls
681 lines (595 loc) · 27.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using Azure;
using Azure.Data.Tables;
using NuGet.Insights.StorageNoOpRetry;
namespace NuGet.Insights.WideEntities
{
public class WideEntityService
{
/// <summary>
/// 68% of 4 MiB. We use 68% since Base64 encoding of the binary will bring us down to 75% of 4 MiB and we'll
/// leave the remaining % for the other metadata included in the entity request. Remember, the 4 MiB considers
/// the entire request body size, not the sum total size of the entities. The request body includes things like
/// multi-part boundaries, entity identifiers, and unicode escaped '+' signs.
/// See: https://docs.microsoft.com/en-us/rest/api/storageservices/performing-entity-group-transactions#requirements-for-entity-group-transactions
/// See: https://github.com/Azure/azure-sdk-for-net/issues/19815
/// </summary>
private const int MaxTotalEntitySize = (int)(0.68 * 1024 * 1024 * 4);
/// <summary>
/// 64 KiB
/// See: https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model#property-types
/// </summary>
private const int MaxBinaryPropertySize = 64 * 1024;
/// <summary>
/// For storage emulator this is 8 segments and 3 for Azure. So we use 8.
/// </summary>
private const int MaxSegmentsPerWideEntity = 8;
/// <summary>
/// This must be smaller than <see cref="MaxTotalEntitySize"/> to allow for entity overhead and property
/// overhead per entity. This represents that maximum content length allowed in a wide entity. We use around
/// </summary>
public static readonly int MaxTotalDataSize;
private static IReadOnlyList<string> NoDataColumns = new[]
{
StorageUtility.PartitionKey,
StorageUtility.RowKey,
StorageUtility.Timestamp,
WideEntitySegment.SegmentCountPropertyName,
};
static WideEntityService()
{
// We calculate the max total data size by subtracting the largest possible entity overhead size times the
// maximum number of segments.
var calculator = new TableEntitySizeCalculator();
calculator.AddEntityOverhead();
calculator.AddPartitionKey(512); // Max partition key size = 1 KiB (512 UTF-16 characters)
calculator.AddRowKey(512); // Max row key size = 1 KiB (512 UTF-16 characters)
// Add the segment size property.
calculator.AddPropertyOverhead(1);
calculator.AddInt32Data();
// Add the client request ID property.
calculator.AddPropertyOverhead(1);
calculator.AddGuidData();
// We can have up to 16 chunks per entity.
for (var i = 0; i < WideEntitySegment.ChunkPropertyNames.Count; i++)
{
// Add the overhead for a binary property.
calculator.AddBinaryData(0);
}
MaxTotalDataSize = MaxTotalEntitySize - MaxSegmentsPerWideEntity * calculator.Size;
}
private const string ContentTooLargeMessage = "The content is too large.";
private readonly ServiceClientFactory _serviceClientFactory;
private readonly ITelemetryClient _telemetryClient;
private readonly IOptions<NuGetInsightsSettings> _options;
private readonly int _maxEntitySize;
public WideEntityService(
ServiceClientFactory serviceClientFactory,
ITelemetryClient telemetryClient,
IOptions<NuGetInsightsSettings> options)
{
_serviceClientFactory = serviceClientFactory ?? throw new ArgumentNullException(nameof(serviceClientFactory));
_telemetryClient = telemetryClient ?? throw new ArgumentNullException(nameof(telemetryClient));
_options = options ?? throw new ArgumentNullException(nameof(options));
if (options.Value.UseDevelopmentStorage)
{
// See: https://stackoverflow.com/a/65770156
_maxEntitySize = (int)(393250 * 0.99);
}
else
{
// 1 MiB
// See: https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-the-table-service-data-model#property-limitations
_maxEntitySize = 1024 * 1024;
}
}
public async Task CreateTableAsync(string tableName)
{
await (await GetTableAsync(tableName)).CreateIfNotExistsAsync(retry: true);
}
public async Task DeleteTableAsync(string tableName)
{
await (await GetTableAsync(tableName)).DeleteAsync();
}
public async Task<WideEntity> RetrieveAsync(string tableName, string partitionKey, string rowKey)
{
return await RetrieveAsync(tableName, partitionKey, rowKey, includeData: true);
}
public async Task<WideEntity> RetrieveAsync(string tableName, string partitionKey, string rowKey, bool includeData)
{
if (rowKey == null)
{
throw new ArgumentNullException(nameof(rowKey));
}
var result = await RetrieveAsync(tableName, partitionKey, rowKey, rowKey, includeData);
return result.SingleOrDefault();
}
public async Task<IReadOnlyList<WideEntity>> RetrieveAsync(string tableName)
{
return await RetrieveAsync(tableName, partitionKey: null, minRowKey: null, maxRowKey: null, includeData: true);
}
public async Task<IReadOnlyList<WideEntity>> RetrieveAsync(string tableName, bool includeData)
{
return await RetrieveAsync(tableName, partitionKey: null, minRowKey: null, maxRowKey: null, includeData);
}
public async Task<IReadOnlyList<WideEntity>> RetrieveAsync(string tableName, string partitionKey)
{
return await RetrieveAsync(tableName, partitionKey, minRowKey: null, maxRowKey: null, includeData: true);
}
public async Task<IReadOnlyList<WideEntity>> RetrieveAsync(string tableName, string partitionKey, bool includeData)
{
return await RetrieveAsync(tableName, partitionKey, minRowKey: null, maxRowKey: null, includeData);
}
public async Task<IReadOnlyList<WideEntity>> RetrieveAsync(string tableName, string partitionKey, string minRowKey, string maxRowKey)
{
return await RetrieveAsync(tableName, partitionKey, minRowKey, maxRowKey, includeData: true);
}
public async Task<IReadOnlyList<WideEntity>> RetrieveAsync(string tableName, string partitionKey, string minRowKey, string maxRowKey, bool includeData)
{
return await RetrieveAsync(tableName, partitionKey, minRowKey, maxRowKey, includeData, maxPerPage: StorageUtility.MaxTakeCount)
.ToListAsync();
}
public async IAsyncEnumerable<WideEntity> RetrieveAsync(string tableName, string partitionKey, string minRowKey, string maxRowKey, bool includeData, int maxPerPage)
{
var table = await GetTableAsync(tableName);
await foreach (var entity in RetrieveAsync(tableName, table, partitionKey, minRowKey, maxRowKey, includeData, maxPerPage))
{
yield return entity;
}
}
private async IAsyncEnumerable<WideEntity> RetrieveAsync(string tableName, TableClientWithRetryContext table, string partitionKey, string minRowKey, string maxRowKey, bool includeData, int maxPerPage)
{
using var metrics = _telemetryClient.StartQueryLoopMetrics(dimension1Name: "TableName", dimension1Value: tableName);
var noRowKeys = false;
if (minRowKey == null)
{
if (maxRowKey != null)
{
throw new ArgumentException("If min row key is null, max row key must be null as well.", nameof(maxRowKey));
}
noRowKeys = true;
}
else
{
if (partitionKey == null)
{
throw new ArgumentException("If the partition key is null, the min and max row keys must be null as well.", nameof(minRowKey));
}
}
if (minRowKey != null && maxRowKey == null)
{
throw new ArgumentNullException(nameof(maxRowKey));
}
IReadOnlyList<string> selectColumns;
Expression<Func<WideEntitySegment, bool>> filter;
string singleRowKey;
if (includeData)
{
selectColumns = null;
if (partitionKey == null)
{
filter = x => true;
singleRowKey = null;
}
else if (noRowKeys)
{
filter = x => x.PartitionKey == partitionKey;
singleRowKey = null;
}
else
{
filter = x =>
x.PartitionKey == partitionKey
&& string.Compare(x.RowKey, $"{minRowKey}{WideEntitySegment.RowKeySeparator}", StringComparison.Ordinal) >= 0 // Minimum possible row key with this prefix.
&& string.Compare(x.RowKey, $"{maxRowKey}{WideEntitySegment.RowKeySeparator}{char.MaxValue}", StringComparison.Ordinal) <= 0; // Maximum possible row key with this prefix.
singleRowKey = minRowKey == maxRowKey ? minRowKey : null;
}
}
else
{
selectColumns = NoDataColumns;
if (partitionKey == null)
{
filter = x => true;
singleRowKey = null;
}
else if (noRowKeys)
{
filter = x => x.PartitionKey == partitionKey;
singleRowKey = null;
}
else if (minRowKey == maxRowKey)
{
filter = null;
singleRowKey = minRowKey;
}
else
{
filter = x =>
x.PartitionKey == partitionKey
&& string.Compare(x.RowKey, $"{minRowKey}{WideEntitySegment.RowKeySeparator}", StringComparison.Ordinal) >= 0
&& string.Compare(x.RowKey, $"{maxRowKey}{WideEntitySegment.RowKeySeparator}{WideEntitySegment.Index0Suffix}", StringComparison.Ordinal) <= 0;
singleRowKey = null;
}
}
// This is an optimization. Most usages of the WideEntityService result in only a single segment row per
// wide entity. Since queries are more expensive than point reads, try to read a single wide entity segment
// using a point read and only perform a query if there more than 1 segment.
if (singleRowKey is not null)
{
var index0RowKey = $"{singleRowKey}{WideEntitySegment.RowKeySeparator}{WideEntitySegment.Index0Suffix}";
var segment = await table.GetEntityOrNullAsync<WideEntitySegment>(partitionKey, index0RowKey, selectColumns);
if (segment is null)
{
yield break;
}
if (segment.SegmentCount == 1 || !includeData)
{
var segments = new List<WideEntitySegment> { segment };
yield return MakeWideEntity(includeData, segments);
yield break;
}
if (filter is null)
{
yield break;
}
}
var entities = QueryEntitiesAsync(table, metrics, selectColumns, filter, maxPerPage);
await foreach (var entity in DeserializeEntitiesAsync(entities, includeData))
{
yield return entity;
}
}
private static async IAsyncEnumerable<WideEntitySegment> QueryEntitiesAsync(
TableClientWithRetryContext table,
QueryLoopMetrics metrics,
IReadOnlyList<string> selectColumns,
Expression<Func<WideEntitySegment, bool>> filter,
int maxPerPage)
{
var query = table.QueryAsync(
filter,
maxPerPage: maxPerPage,
select: selectColumns);
var enumerator = query.AsPages().GetAsyncEnumerator();
while (await enumerator.MoveNextAsync(metrics))
{
var entitySegment = enumerator.Current.Values;
if (!entitySegment.Any())
{
continue;
}
foreach (var entity in entitySegment)
{
yield return entity;
}
}
}
public static async IAsyncEnumerable<WideEntity> DeserializeEntitiesAsync(IAsyncEnumerable<WideEntitySegment> segments, bool includeData)
{
string currentPartitionKey = null;
string currentRowKeyPrefix = null;
var currentSegments = new List<WideEntitySegment>();
await foreach (var entity in segments)
{
if (currentPartitionKey == null)
{
currentPartitionKey = entity.PartitionKey;
currentRowKeyPrefix = entity.RowKeyPrefix;
}
if (entity.PartitionKey == currentPartitionKey && entity.RowKeyPrefix == currentRowKeyPrefix)
{
currentSegments.Add(entity);
}
else
{
yield return MakeWideEntity(includeData, currentSegments);
currentPartitionKey = entity.PartitionKey;
currentRowKeyPrefix = entity.RowKeyPrefix;
currentSegments.Clear();
currentSegments.Add(entity);
}
}
if (currentSegments.Any())
{
yield return MakeWideEntity(includeData, currentSegments);
}
}
public async Task<IReadOnlyList<WideEntity>> ExecuteBatchAsync(string tableName, IEnumerable<WideEntityOperation> batch, bool allowBatchSplits)
{
var table = await GetTableAsync(tableName);
var tableBatch = new MutableTableTransactionalBatch(table, WideEntitySegment.ClientRequestIdPropertyName);
var segmentsList = new List<List<WideEntitySegment>>();
foreach (var operation in batch)
{
// Keep track of the number of table operations in the batch prior to adding the next wide entity
// operation, just in case we have to remove the operations for a batch split.
var previousOperationCount = tableBatch.Count;
switch (operation)
{
case WideEntityInsertOperation insert:
segmentsList.Add(await AddInsertAsync(insert.PartitionKey, insert.RowKey, insert.Content, tableBatch));
break;
case WideEntityReplaceOperation replace:
segmentsList.Add(await AddReplaceAsync(replace.Existing, replace.Content, tableBatch));
break;
case WideEntityInsertOrReplaceOperation insertOrReplace:
segmentsList.Add(await AddInsertOrReplaceAsync(tableName, insertOrReplace.PartitionKey, insertOrReplace.RowKey, insertOrReplace.Content, tableBatch));
break;
case WideEntityDeleteOperation delete:
AddDelete(delete.Existing, tableBatch);
segmentsList.Add(null);
break;
default:
throw new NotImplementedException();
}
if (allowBatchSplits
&& tableBatch.Count > previousOperationCount
&& previousOperationCount > 0
&& segmentsList.Sum(x => x != null ? x.Sum(y => y.GetEntitySize()) : 0) > MaxTotalEntitySize)
{
// Remove the table operations added by this wide entity operation since it made the batch too large.
var addedOperations = new Stack<TableTransactionOperation>();
while (tableBatch.Count > previousOperationCount)
{
addedOperations.Push(tableBatch.Last());
tableBatch.RemoveAt(tableBatch.Count - 1);
}
// Execute the batch, now that it is within the size limit.
await tableBatch.SubmitBatchAsync();
// Start a new batch with the table operations associated with this entity operation.
tableBatch.Clear();
while (addedOperations.Any())
{
tableBatch.Add(addedOperations.Pop());
}
}
}
await tableBatch.SubmitBatchIfNotEmptyAsync();
var output = new List<WideEntity>();
foreach (var segments in segmentsList)
{
if (segments == null)
{
output.Add(null);
}
else
{
output.Add(new WideEntity(segments));
}
}
return output;
}
public async Task DeleteAsync(string tableName, WideEntity existing)
{
var table = await GetTableAsync(tableName);
var batch = new MutableTableTransactionalBatch(table, WideEntitySegment.ClientRequestIdPropertyName);
AddDelete(existing, batch);
await batch.SubmitBatchAsync();
}
private static void AddDelete(WideEntity existing, MutableTableTransactionalBatch batch)
{
// Use the etag on the first entity, for optimistic concurrency.
batch.DeleteEntity(existing.PartitionKey, WideEntitySegment.GetRowKey(existing.RowKey, index: 0), existing.ETag);
for (var index = 1; index < existing.SegmentCount; index++)
{
batch.DeleteEntity(existing.PartitionKey, WideEntitySegment.GetRowKey(existing.RowKey, index), ETag.All);
}
}
public async Task<WideEntity> ReplaceAsync(string tableName, WideEntity existing, ReadOnlyMemory<byte> content)
{
var table = await GetTableAsync(tableName);
var batch = new MutableTableTransactionalBatch(table, WideEntitySegment.ClientRequestIdPropertyName);
var segments = await AddReplaceAsync(existing, content, batch);
await batch.SubmitBatchAsync();
return new WideEntity(segments);
}
private async Task<List<WideEntitySegment>> AddReplaceAsync(
WideEntity existing,
ReadOnlyMemory<byte> content,
MutableTableTransactionalBatch batch)
{
return await AddInsertOrReplaceAsync(
batch,
() => Task.FromResult(existing),
existing.PartitionKey,
existing.RowKey,
content);
}
public async Task<WideEntity> InsertAsync(string tableName, string partitionKey, string rowKey, ReadOnlyMemory<byte> content)
{
var table = await GetTableAsync(tableName);
var batch = new MutableTableTransactionalBatch(table, WideEntitySegment.ClientRequestIdPropertyName);
var segments = await AddInsertAsync(partitionKey, rowKey, content, batch);
await batch.SubmitBatchAsync();
return new WideEntity(segments);
}
private async Task<List<WideEntitySegment>> AddInsertAsync(
string partitionKey,
string rowKey,
ReadOnlyMemory<byte> content,
MutableTableTransactionalBatch batch)
{
return await AddInsertOrReplaceAsync(
batch,
() => Task.FromResult<WideEntity>(null),
partitionKey,
rowKey,
content);
}
public async Task<WideEntity> InsertOrReplaceAsync(string tableName, string partitionKey, string rowKey, ReadOnlyMemory<byte> content)
{
var table = await GetTableAsync(tableName);
var batch = new MutableTableTransactionalBatch(table, WideEntitySegment.ClientRequestIdPropertyName);
var segments = await AddInsertOrReplaceAsync(tableName, partitionKey, rowKey, content, batch);
await batch.SubmitBatchAsync();
return new WideEntity(segments);
}
private Task<List<WideEntitySegment>> AddInsertOrReplaceAsync(
string tableName,
string partitionKey,
string rowKey,
ReadOnlyMemory<byte> content,
MutableTableTransactionalBatch batch)
{
return AddInsertOrReplaceAsync(
batch,
async () => (await RetrieveAsync(
tableName,
batch.TableClient,
partitionKey,
rowKey,
rowKey,
includeData: false,
maxPerPage: StorageUtility.MaxTakeCount).ToListAsync()).SingleOrDefault(),
partitionKey,
rowKey,
content);
}
private async Task<List<WideEntitySegment>> AddInsertOrReplaceAsync(
MutableTableTransactionalBatch batch,
Func<Task<WideEntity>> getExistingAsync,
string partitionKey,
string rowKey,
ReadOnlyMemory<byte> content)
{
if (content.Length > MaxTotalDataSize)
{
throw new ArgumentException(ContentTooLargeMessage, nameof(content));
}
var segments = MakeSegments(partitionKey, rowKey, content);
var existing = await getExistingAsync();
if (existing == null)
{
foreach (var segment in segments)
{
batch.AddEntity(segment);
}
}
else
{
// Use the etag on the first entity, for optimistic concurrency.
batch.UpdateEntity(segments[0], existing.ETag, mode: TableUpdateMode.Replace);
// Blindly insert or replace the rest of the new segments, ignoring etags.
foreach (var segment in segments.Skip(1))
{
batch.UpsertEntity(segment, mode: TableUpdateMode.Replace);
}
// Delete any extra segments existing on the old entity but not on the new one.
for (var index = segments.Count; index < existing.SegmentCount; index++)
{
batch.DeleteEntity(partitionKey, WideEntitySegment.GetRowKey(existing.RowKey, index), ETag.All);
}
}
return segments;
}
private static WideEntity MakeWideEntity(bool includeData, List<WideEntitySegment> segments)
{
if (includeData)
{
return new WideEntity(segments);
}
else
{
return new WideEntity(segments[0]);
}
}
private List<WideEntitySegment> MakeSegments(string partitionKey, string rowKey, ReadOnlyMemory<byte> content)
{
var segments = new List<WideEntitySegment>();
if (content.Length == 0)
{
segments.Add(new WideEntitySegment(partitionKey, rowKey, index: 0));
}
else
{
var calculator = new TableEntitySizeCalculator();
calculator.AddEntityOverhead();
calculator.AddPartitionKey(partitionKey);
calculator.AddRowKey(rowKey.Length + 1 + 2); // 1 for the separator, 2 for the index, e.g. "~02"
var entityOverhead = calculator.Size;
calculator.Reset();
calculator.AddPropertyOverhead(1); // 1 because the property names are a single character.
var propertyOverhead = calculator.Size;
calculator.Reset();
// Now, we drain the total entity size by filling with max size entities. Each max size entity will contain
// chunks of the data.
var remainingTotalEntitySize = MaxTotalEntitySize;
var dataStart = 0;
while (dataStart < content.Length)
{
(var segment, var segmentSize) = MakeSegmentOrNull(
partitionKey,
rowKey,
index: segments.Count,
entityOverhead,
propertyOverhead,
maxEntitySize: Math.Min(_maxEntitySize, remainingTotalEntitySize),
ref dataStart,
content);
if (segment == null)
{
throw new ArgumentException(ContentTooLargeMessage, nameof(content));
}
remainingTotalEntitySize -= segmentSize;
segments.Add(segment);
}
}
segments.First().SegmentCount = segments.Count;
return segments;
}
private (WideEntitySegment Segment, int SegmentSize) MakeSegmentOrNull(
string partitionKey,
string rowKey,
int index,
int entityOverhead,
int propertyOverhead,
int maxEntitySize,
ref int dataStart,
ReadOnlyMemory<byte> data)
{
WideEntitySegment segment = null;
var remainingEntitySize = maxEntitySize - entityOverhead;
var binaryPropertyOverhead = propertyOverhead + 4;
if (index == 0)
{
// Account for the segment count property: property name plus the integer
remainingEntitySize -= propertyOverhead + 4;
}
while (dataStart < data.Length && remainingEntitySize > 0)
{
var binarySize = Math.Min(remainingEntitySize - binaryPropertyOverhead, Math.Min(data.Length - dataStart, MaxBinaryPropertySize));
if (binarySize <= 0)
{
break;
}
// Account for the chunk property name and chunk length
remainingEntitySize -= binaryPropertyOverhead;
if (segment == null)
{
segment = new WideEntitySegment(partitionKey, rowKey, index);
}
segment.AddChunk(data.Slice(dataStart, binarySize));
dataStart += binarySize;
// Account for the chunk data
remainingEntitySize -= binarySize;
}
var segmentSize = maxEntitySize - remainingEntitySize;
if (segment != null)
{
var actual = segment.GetEntitySize();
if (actual != segmentSize)
{
throw new InvalidOperationException($"The segment size calculation is incorrect. Expected: {segmentSize}. Actual: {actual}.");
}
}
return (segment, segmentSize);
}
private async Task<TableClientWithRetryContext> GetTableAsync(string tableName)
{
return (await _serviceClientFactory.GetTableServiceClientAsync(_options.Value))
.GetTableClient(tableName);
}
}
}