Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ public Task<ulong> GetMessageCountFromOffset(string endpointId, long offset)
return sequentialStore.GetCountFromOffset(offset);
}

/// <summary>
/// Triggers an immediate cleanup attempt. Used for connection recovery
/// to retry checkpoint commits that may have failed during network outages.
/// </summary>
public void TriggerCleanup()
{
this.messagesCleaner.TriggerCleanup();
}

public void Dispose()
{
this.Dispose(true);
Expand Down Expand Up @@ -230,6 +239,16 @@ public void Dispose()
// Not disposing the cleanup task, in case it is not completed yet.
}

/// <summary>
/// Triggers an immediate cleanup attempt. Called when cloud connection is restored
/// to retry checkpoint commits for messages that were sent but not acknowledged.
/// </summary>
public void TriggerCleanup()
{
this.EnsureCleanupTask(null);
Events.CleanupTriggeredByConnectionRecovery();
}

void EnsureCleanupTask(object state)
{
if (this.cleanupTask == null || this.cleanupTask.IsCompleted)
Expand Down Expand Up @@ -309,6 +328,10 @@ private async Task CleanQueue(bool checkEntireQueueOnCleanup)
Events.CleanupCheckpointState(messageQueueId, checkpointData);
int cleanupEntityStoreCount = 0;

// Track orphaned messages for observability
int orphanedMessageCount = 0;
DateTime earliestOrphanedMessageTime = DateTime.MaxValue;

async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
{
var expiry = messageRef.TimeStamp + messageRef.TimeToLive;
Expand All @@ -317,6 +340,16 @@ async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
return false;
}

// Detect orphaned messages (expired but can't clean due to offset gap)
if (offset > checkpointData.Offset && expiry <= DateTime.UtcNow)
{
orphanedMessageCount++;
if (messageRef.TimeStamp < earliestOrphanedMessageTime)
{
earliestOrphanedMessageTime = messageRef.TimeStamp;
}
}

var message = await this.TryDecrementRefCountUpdate(messageRef.EdgeMessageId, messageQueueId);

await message.ForEachAsync(async msg =>
Expand Down Expand Up @@ -378,6 +411,13 @@ await message.ForEachAsync(async msg =>
totalCleanupCount += cleanupCount;
totalCleanupStoreCount += cleanupEntityStoreCount;
Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);

// Log orphaned messages for observability
if (orphanedMessageCount > 0)
{
TimeSpan oldestOrphanAge = DateTime.UtcNow - earliestOrphanedMessageTime;
Events.OrphanedMessagesDetected(messageQueueId, orphanedMessageCount, checkpointData.Offset, oldestOrphanAge);
}
}
catch (Exception ex)
{
Expand Down Expand Up @@ -418,7 +458,9 @@ enum EventIds
MessageAdded,
ErrorGettingMessagesBatch,
CreatedCleanupProcessor,
ErrorUpdatingMessageForEndpoint
ErrorUpdatingMessageForEndpoint,
CleanupTriggeredByConnectionRecovery,
OrphanedMessagesDetected
}

public static void MessageStoreCreated()
Expand All @@ -441,6 +483,11 @@ public static void CleanupTaskInitialized()
Log.LogInformation((int)EventIds.CleanupTaskStarted, "Started task to cleanup processed and stale messages");
}

public static void CleanupTriggeredByConnectionRecovery()
{
Log.LogInformation((int)EventIds.CleanupTriggeredByConnectionRecovery, "Triggering cleanup due to cloud connection recovery to retry pending checkpoint commits");
}

public static void ErrorCleaningMessagesForEndpoint(Exception ex, string endpointId)
{
Log.LogWarning((int)EventIds.ErrorCleaningMessagesForEndpoint, ex, Invariant($"Error cleaning up messages for endpoint {endpointId}"));
Expand All @@ -462,6 +509,11 @@ public static void CleanupCompleted(string endpointId, int queueMessagesCount, i
Log.LogDebug((int)EventIds.CleanupCompleted, Invariant($"Total messages cleaned up from queue for endpoint {endpointId} = {totalQueueMessagesCount}, and total messages cleaned up for message store = {totalStoreMessagesCount}."));
}

public static void OrphanedMessagesDetected(string endpointId, int orphanedCount, long checkpointOffset, TimeSpan oldestAge)
{
Log.LogWarning((int)EventIds.OrphanedMessagesDetected, Invariant($"Detected {orphanedCount} orphaned message(s) in endpoint {endpointId}. Checkpoint offset={checkpointOffset}, oldest message age={oldestAge.TotalSeconds:F1}s. Messages are stuck in store because checkpoint has not advanced. This indicates a potential message acknowledgment failure during network disruption. Checkpoint retries or the cleanup trigger on connection recovery should resolve this."));
}

public static void ErrorGettingMessagesBatch(string entityName, Exception ex)
{
Log.LogWarning((int)EventIds.ErrorGettingMessagesBatch, ex, $"Error getting next batch for endpoint {entityName}.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ protected override void Load(ContainerBuilder builder)
var connectionManagerTask = c.Resolve<Task<IConnectionManager>>();
var subscriptionProcessorTask = c.Resolve<Task<ISubscriptionProcessor>>();
var deviceScopeIdentitiesCacheTask = c.Resolve<Task<IDeviceScopeIdentitiesCache>>();
var messageStoreTask = this.isStoreAndForwardEnabled ? c.Resolve<Task<IMessageStore>>() : null;
Router router = await routerTask;
ITwinManager twinManager = await twinManagerTask;
IConnectionManager connectionManager = await connectionManagerTask;
Expand All @@ -539,6 +540,18 @@ protected override void Load(ContainerBuilder builder)
invokeMethodHandler,
subscriptionProcessor,
deviceScopeIdentitiesCache);

// Subscribe MessageStore to connection recovery events
// to trigger cleanup when cloud connection is restored
if (messageStoreTask != null)
{
IMessageStore messageStore = await messageStoreTask;
connectionManager.CloudConnectionEstablished += (sender, identity) =>
{
messageStore.TriggerCleanup();
};
}

return hub;
})
.As<Task<IEdgeHub>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,11 @@ public interface IMessageStore : IDisposable
/// Returns the number of messages in the store from a offset
/// </summary>
Task<ulong> GetMessageCountFromOffset(string endpointId, long offset);

/// <summary>
/// Triggers an immediate cleanup attempt. Called when cloud connection is restored
/// to retry checkpoint commits for messages that were sent but not acknowledged.
/// </summary>
void TriggerCleanup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,20 +386,47 @@ static async Task EnterCheckpointingAsync(EndpointExecutorFsm thisPtr)
try
{
Preconditions.CheckNotNull(thisPtr.currentCheckpointCommand);
using (var cts = new CancellationTokenSource(thisPtr.config.Timeout))
ISinkResult<IMessage> result = thisPtr.currentCheckpointCommand.Result;

if (result.Succeeded.Any() || result.InvalidDetailsList.Any())
{
ISinkResult<IMessage> result = thisPtr.currentCheckpointCommand.Result;
ICollection<IMessage> toCheckpoint = result.InvalidDetailsList.Count > 0
? result.Succeeded.Concat(result.InvalidDetailsList.Select(i => i.Item)).ToList()
: result.Succeeded;
ICollection<IMessage> remaining = result.Failed;

if (result.Succeeded.Any() || result.InvalidDetailsList.Any())
Events.Checkpoint(thisPtr, result);

// Attempt checkpoint commit with retry logic
const int MaxCommitRetries = 3;
Exception lastException = null;

for (int attempt = 1; attempt <= MaxCommitRetries; attempt++)
{
ICollection<IMessage> toCheckpoint = result.InvalidDetailsList.Count > 0
? result.Succeeded.Concat(result.InvalidDetailsList.Select(i => i.Item)).ToList()
: result.Succeeded;
ICollection<IMessage> remaining = result.Failed;

Events.Checkpoint(thisPtr, result);
await thisPtr.Checkpointer.CommitAsync(toCheckpoint, remaining, Option.None<DateTime>(), thisPtr.unhealthySince, cts.Token);
Events.CheckpointSuccess(thisPtr, result);
try
{
using (var cts = new CancellationTokenSource(thisPtr.config.Timeout))
{
await thisPtr.Checkpointer.CommitAsync(toCheckpoint, remaining, Option.None<DateTime>(), thisPtr.unhealthySince, cts.Token);
Events.CheckpointSuccess(thisPtr, result);
break; // Success, exit retry loop
}
}
catch (Exception ex) when (attempt < MaxCommitRetries)
{
lastException = ex;
Events.CheckpointCommitRetry(thisPtr, attempt, ex);

// Exponential backoff: 100ms, 200ms, 400ms
int delayMs = 100 * (int)Math.Pow(2, attempt - 1);
await Task.Delay(delayMs);
}
catch (Exception ex) when (attempt == MaxCommitRetries)
{
lastException = ex;
Events.CheckpointCommitFailed(thisPtr, MaxCommitRetries, ex);
throw;
}
}
}

Expand Down Expand Up @@ -589,7 +616,9 @@ enum EventIds
UpdateEndpoint,
UpdateEndpointSuccess,
UpdateEndpointFailure,
CheckRetryInnerException
CheckRetryInnerException,
CheckpointCommitRetry,
CheckpointCommitFailed
}

public static void StateEnter(EndpointExecutorFsm fsm)
Expand Down Expand Up @@ -722,6 +751,28 @@ public static void CheckpointFailure(EndpointExecutorFsm fsm, Exception ex)
GetContextString(fsm));
}

public static void CheckpointCommitRetry(EndpointExecutorFsm fsm, int attempt, Exception ex)
{
Log.LogWarning(
(int)EventIds.CheckpointCommitRetry,
ex,
"[CheckpointCommitRetry] Checkpoint commit attempt {0} failed, retrying. CheckpointOffset: {1}, {2}",
attempt,
fsm.Status.CheckpointerStatus.Offset,
GetContextString(fsm));
}

public static void CheckpointCommitFailed(EndpointExecutorFsm fsm, int maxAttempts, Exception ex)
{
Log.LogError(
(int)EventIds.CheckpointCommitFailed,
ex,
"[CheckpointCommitFailed] Checkpoint commit failed after {0} attempts. CheckpointOffset: {1}, {2}",
maxAttempts,
fsm.Status.CheckpointerStatus.Offset,
GetContextString(fsm));
}

public static void CheckRetryInnerException(Exception ex, bool retry)
{
Log.LogDebug((int)EventIds.CheckRetryInnerException, ex, $"[CheckRetryInnerException] Decision to retry exception of type {ex.GetType()} is {retry}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,117 @@ public async Task MessageStoreAddRemoveEndpointTest(bool checkEntireQueueOnClean
}
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task TestTriggerCleanupWakesCleanupTask(bool checkEntireQueueOnCleanup)
{
// Verify calling TriggerCleanup() immediately starts cleanup
var result = await this.GetMessageStore(checkEntireQueueOnCleanup, ttlSecs: 1, messageCleanupIntervalSecs: 3600); // Long cleanup interval
using (IMessageStore messageStore = result.Item1)
{
await messageStore.AddEndpoint("endpoint1");

// Add a message with short TTL
IMessage message = this.GetMessage(1);
await messageStore.Add("endpoint1", message, 1);

// Wait for message to expire
await Task.Delay(1500);

// Manually trigger cleanup instead of waiting for 30-minute timer
messageStore.TriggerCleanup();

// Wait a bit for cleanup to run
await Task.Delay(500);

// Verify message was cleaned up
IMessageIterator iterator = messageStore.GetMessageIterator("endpoint1");
IEnumerable<IMessage> batch = await iterator.GetNext(100);
var batchList = batch as IList<IMessage> ?? batch.ToList();

// After cleanup triggered, expired message should be removed
Assert.Empty(batchList);
}
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task TestOrphanedMessageDetection(bool checkEntireQueueOnCleanup)
{
// Verify orphaned message detection when checkpoint doesn't advance
var result = await this.GetMessageStore(checkEntireQueueOnCleanup, ttlSecs: 1);
using (IMessageStore messageStore = result.Item1)
{
await messageStore.AddEndpoint("endpoint1");

// Add messages with short TTL
for (int i = 0; i < 3; i++)
{
IMessage message = this.GetMessage(i);
await messageStore.Add("endpoint1", message, 1);
}

// Wait for messages to expire
await Task.Delay(1500);

// NOTE: Checkpoint offset hasn't advanced, so messages are "orphaned"
// Create orphaned condition: messages expired, but checkpoint not updated
// This would normally be detected by the OrphanedMessagesDetected event

// Trigger cleanup which should detect orphaned messages
messageStore.TriggerCleanup();

// Wait for cleanup to complete
await Task.Delay(500);

// Verify at least one cleanup pass occurred
// (The orphan detection is logged, not queryable, but system remains stable)
Assert.NotNull(messageStore);
}
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task TestNoOrphanWhenCheckpointAdvances(bool checkEntireQueueOnCleanup)
{
// Verify NO orphan warning when checkpoint properly advances
var result = await this.GetMessageStore(checkEntireQueueOnCleanup, ttlSecs: 1);
using (IMessageStore messageStore = result.Item1)
{
ICheckpointStore checkpointStore = result.Item2;
await messageStore.AddEndpoint("endpoint1");

// Add messages with short TTL
for (int i = 0; i < 3; i++)
{
IMessage message = this.GetMessage(i);
await messageStore.Add("endpoint1", message, 1);
}

// Wait for messages to expire
await Task.Delay(1500);

// Advance checkpoint so messages can be cleaned normally (not orphaned)
var checkpointData = new CheckpointData(100L); // Offset beyond the messages
await checkpointStore.SetCheckpointDataAsync("endpoint1", checkpointData, CancellationToken.None);

// Trigger cleanup
messageStore.TriggerCleanup();

// Wait for cleanup
await Task.Delay(500);

// Verify messages are cleaned (no orphan condition)
IMessageIterator iterator = messageStore.GetMessageIterator("endpoint1");
IEnumerable<IMessage> batch = await iterator.GetNext(100);
var batchList = batch as IList<IMessage> ?? batch.ToList();
Assert.Empty(batchList);
}
}

[Fact]
public void MessageWrapperRoundtripTest()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ public Task RemoveEndpoint(string endpointId)

public void SetTimeToLive(TimeSpan timeToLive) => throw new NotImplementedException();

public void TriggerCleanup() => throw new NotImplementedException();

public List<IMessage> GetReceivedMessagesForEndpoint(string endpointId) => this.GetQueue(endpointId).Queue;

public Task<ulong> GetMessageCountFromOffset(string endpointId, long offset) => Task.FromResult(0ul);
Expand Down
Loading
Loading