From fe92118797497702c314cda1a10a41a9d8387c23 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 23 Feb 2026 13:26:06 -0800 Subject: [PATCH 1/8] moved placement of session.UpdatedRuntimeState call to be before we commit any outbound messages --- .../AzureStorageOrchestrationService.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 38cb7fac3..060905bbb 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1207,6 +1207,9 @@ public async Task CompleteTaskOrchestrationWorkItemAsync( continuedAsNewMessage, orchestrationState))); + // update the runtime state and execution id stored in the session + session.UpdateRuntimeState(runtimeState); + // First, add new messages into the queue. If a failure happens after this, duplicate messages will // be written after the retry, but the results of those messages are expected to be de-dup'd later. // This provider needs to ensure that response messages are not processed until the history a few @@ -1231,8 +1234,6 @@ await this.CommitOutboundQueueMessages( try { await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETags, session.TrackingStoreContext); - // update the runtime state and execution id stored in the session - session.UpdateRuntimeState(runtimeState); // if we deferred some messages, and the execution id of this instance has changed, redeliver them if (session.DeferredMessages.Count > 0 From 20846e8bd0c3732f9ec69f3909b6ba2a30106e45 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 24 Feb 2026 09:56:26 -0800 Subject: [PATCH 2/8] removed the unnecessary changes from the orchestration service, due to initial misdiagnosis, and changed the IsOutOfOrder logic instead --- .../AzureStorageOrchestrationService.cs | 5 +- .../Messaging/OrchestrationSession.cs | 51 ++++++++----------- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 060905bbb..38cb7fac3 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1207,9 +1207,6 @@ public async Task CompleteTaskOrchestrationWorkItemAsync( continuedAsNewMessage, orchestrationState))); - // update the runtime state and execution id stored in the session - session.UpdateRuntimeState(runtimeState); - // First, add new messages into the queue. If a failure happens after this, duplicate messages will // be written after the retry, but the results of those messages are expected to be de-dup'd later. // This provider needs to ensure that response messages are not processed until the history a few @@ -1234,6 +1231,8 @@ await this.CommitOutboundQueueMessages( try { await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETags, session.TrackingStoreContext); + // update the runtime state and execution id stored in the session + session.UpdateRuntimeState(runtimeState); // if we deferred some messages, and the execution id of this instance has changed, redeliver them if (session.DeferredMessages.Count > 0 diff --git a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs index 1b2e4a20e..5f1c0c1e6 100644 --- a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs +++ b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs @@ -150,37 +150,29 @@ public bool IsOutOfOrderMessage(MessageData message) return false; } - if (this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount > 5) - { - // The first five times a message for a nonexistant instance is dequeued, give the message the benefit - // of the doubt and assume that the instance hasn't had its history table populated yet. After the - // fifth execution, ~30 seconds have passed and the most likely scenario is that this is a zombie event. - // This means the history table for the message's orchestration no longer exists, either due to an explicit - // PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history. - return false; - } - - if (this.LastCheckpointTime > message.TaskMessage.Event.Timestamp) - { - // LastCheckpointTime represents the time at which the most recent history checkpoint completed. - // The checkpoint is written to the history table only *after* all queue messages are sent. - // A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp. - // In this case, we see that the checkpoint came *after* the message, so there is no out-of-order - // concern. Note that this logic only applies for messages sent by orchestrations to themselves. - // The next check considers the other cases (activities, sub-orchestrations, etc.). - // Orchestration checkpoint time information was added only after v1.6.4. - return false; - } - + // The first five times a message for a nonexistant instance is dequeued, give the message the benefit + // of the doubt and assume that the instance hasn't had its history table populated yet. After the + // fifth execution, ~30 seconds have passed and the most likely scenario is that this is a zombie event. + // This means the history table for the message's orchestration no longer exists, either due to an explicit + // PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history. + bool nonExistentInstance = this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount <= 5; + + // LastCheckpointTime represents the time at which the most recent history checkpoint completed. + // The checkpoint is written to the history table only *after* all queue messages are sent. + // A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp. + // In this case, we see that the checkpoint came *after* the message, so there is no out-of-order + // concern. Note that this logic only applies for messages sent by orchestrations to themselves. + // The next check considers the other cases (activities, sub-orchestrations, etc.). + // Orchestration checkpoint time information was added only after v1.6.4. + bool isStaleCheckpoint = this.LastCheckpointTime <= message.TaskMessage.Event.Timestamp; + + bool triggeringTaskDoesNotExist = true; if (Utils.TryGetTaskScheduledId(message.TaskMessage.Event, out int taskScheduledId)) { // This message is a response to a task. Search the history to make sure that we've recorded the fact that // this task was scheduled. HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.LastOrDefault(e => e.EventId == taskScheduledId); - if (mostRecentTaskEvent != null) - { - return false; - } + triggeringTaskDoesNotExist = mostRecentTaskEvent == null; } if (message.TaskMessage.Event.EventType == EventType.EventRaised) @@ -190,15 +182,12 @@ public bool IsOutOfOrderMessage(MessageData message) if (requestId != null) { HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.FirstOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId); - if (mostRecentTaskEvent != null) - { - return false; - } + triggeringTaskDoesNotExist = mostRecentTaskEvent == null; } } // The message is out of order and cannot be handled by the current session. - return true; + return nonExistentInstance || isStaleCheckpoint || triggeringTaskDoesNotExist; } Guid? FindRequestId(string input) From 3909fe7e7d6e149407c2c517bb8fb18efdb1ad4e Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 24 Feb 2026 10:24:25 -0800 Subject: [PATCH 3/8] fixed the endlessly abandoning nonexistent instances bug --- .../Messaging/OrchestrationSession.cs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs index 5f1c0c1e6..3472b7e0e 100644 --- a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs +++ b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs @@ -157,6 +157,14 @@ public bool IsOutOfOrderMessage(MessageData message) // PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history. bool nonExistentInstance = this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount <= 5; + // If the instance does not exist, even if the message has ben dequeued > 5 times, the next check for trying + // to find the corresponding task scheduled message will always fail so we will endlessly abandon the message. + // To avoid this we return early here. + if (this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount > 5) + { + return false; + } + // LastCheckpointTime represents the time at which the most recent history checkpoint completed. // The checkpoint is written to the history table only *after* all queue messages are sent. // A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp. From 66fc93cde2b66445ac6cefc7e8d0a50d1ade3cfb Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 11 Mar 2026 13:12:22 -0700 Subject: [PATCH 4/8] updated implementation and added tests --- .../AzureStorageOrchestrationService.cs | 2 +- .../Messaging/OrchestrationSession.cs | 39 +- .../AzureStorageScaleTests.cs | 275 ---------- .../AzureStorageScenarioTests.cs | 519 +++++++++++++++++- 4 files changed, 526 insertions(+), 309 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 38cb7fac3..0e72cc0cf 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1812,7 +1812,7 @@ public async Task SendTaskOrchestrationMessageAsync(TaskMessage message) await this.SendTaskOrchestrationMessageInternalAsync(EmptySourceInstance, controlQueue, message); } - Task SendTaskOrchestrationMessageInternalAsync( + internal Task SendTaskOrchestrationMessageInternalAsync( OrchestrationInstance sourceInstance, ControlQueue controlQueue, TaskMessage message) diff --git a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs index 3472b7e0e..0aafafea2 100644 --- a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs +++ b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs @@ -150,37 +150,25 @@ public bool IsOutOfOrderMessage(MessageData message) return false; } - // The first five times a message for a nonexistant instance is dequeued, give the message the benefit - // of the doubt and assume that the instance hasn't had its history table populated yet. After the - // fifth execution, ~30 seconds have passed and the most likely scenario is that this is a zombie event. - // This means the history table for the message's orchestration no longer exists, either due to an explicit - // PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history. - bool nonExistentInstance = this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount <= 5; - - // If the instance does not exist, even if the message has ben dequeued > 5 times, the next check for trying - // to find the corresponding task scheduled message will always fail so we will endlessly abandon the message. - // To avoid this we return early here. if (this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount > 5) { + // The first five times a message for a nonexistant instance is dequeued, give the message the benefit + // of the doubt and assume that the instance hasn't had its history table populated yet. After the + // fifth execution, ~30 seconds have passed and the most likely scenario is that this is a zombie event. + // This means the history table for the message's orchestration no longer exists, either due to an explicit + // PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history. return false; } - // LastCheckpointTime represents the time at which the most recent history checkpoint completed. - // The checkpoint is written to the history table only *after* all queue messages are sent. - // A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp. - // In this case, we see that the checkpoint came *after* the message, so there is no out-of-order - // concern. Note that this logic only applies for messages sent by orchestrations to themselves. - // The next check considers the other cases (activities, sub-orchestrations, etc.). - // Orchestration checkpoint time information was added only after v1.6.4. - bool isStaleCheckpoint = this.LastCheckpointTime <= message.TaskMessage.Event.Timestamp; - - bool triggeringTaskDoesNotExist = true; if (Utils.TryGetTaskScheduledId(message.TaskMessage.Event, out int taskScheduledId)) { // This message is a response to a task. Search the history to make sure that we've recorded the fact that // this task was scheduled. HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.LastOrDefault(e => e.EventId == taskScheduledId); - triggeringTaskDoesNotExist = mostRecentTaskEvent == null; + if (mostRecentTaskEvent != null) + { + return false; + } } if (message.TaskMessage.Event.EventType == EventType.EventRaised) @@ -189,13 +177,16 @@ public bool IsOutOfOrderMessage(MessageData message) var requestId = ((EventRaisedEvent)message.TaskMessage.Event).Name; if (requestId != null) { - HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.FirstOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId); - triggeringTaskDoesNotExist = mostRecentTaskEvent == null; + HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.LastOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId); + if (mostRecentTaskEvent != null) + { + return false; + } } } // The message is out of order and cannot be handled by the current session. - return nonExistentInstance || isStaleCheckpoint || triggeringTaskDoesNotExist; + return true; } Guid? FindRequestId(string input) diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index a2872df75..854aa3007 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -643,281 +643,6 @@ await TestHelpers.WaitFor( } } - /// - /// Confirm that: - /// 1. If is true, and a worker attempts to update the instance table with a stale - /// etag upon completing a work item, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code - /// (precondition failed). - /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table. When it attempts to update - /// the instance table with a stale etag, it will fail. - /// 2. If is false for the above scenario, then the call to update the instance table - /// will go through, and the instance table will be updated with a "stale" status. - /// - /// - /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker - /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker - /// has since updated" the instance table. - /// - /// The value to use for - /// - [DataTestMethod] - [DataRow(true)] - [DataRow(false)] - public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling(bool useInstanceEtag) - { - AzureStorageOrchestrationService service = null; - try - { - var orchestrationInstance = new OrchestrationInstance - { - InstanceId = "instance_id", - ExecutionId = "execution_id", - }; - - ExecutionStartedEvent startedEvent = new(-1, string.Empty) - { - Name = "orchestration", - Version = string.Empty, - OrchestrationInstance = orchestrationInstance, - ScheduledStartTime = DateTime.UtcNow, - }; - - var settings = new AzureStorageOrchestrationServiceSettings - { - PartitionCount = 1, - StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), - TaskHubName = TestHelpers.GetTestTaskHubName(), - ExtendedSessionsEnabled = false, - UseInstanceTableEtag = useInstanceEtag - }; - this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); - - service = new AzureStorageOrchestrationService(settings); - await service.CreateAsync(); - await service.StartAsync(); - - // Create the orchestration and get the first work item and start "working" on it - await service.CreateTaskOrchestrationAsync( - new TaskMessage() - { - OrchestrationInstance = orchestrationInstance, - Event = startedEvent - }); - var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - var runtimeState = workItem.OrchestrationRuntimeState; - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(startedEvent); - runtimeState.AddEvent(new TaskScheduledEvent(0)); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - - AzureStorageClient azureStorageClient = new AzureStorageClient(settings); - - // Now manually update the instance to have status "Completed" - Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); - TableEntity entity = new(orchestrationInstance.InstanceId, "") - { - ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), - }; - await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); - - if (useInstanceEtag) - { - // Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item - SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) - ); - Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); - DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; - Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); - } - else - { - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); - - var queryCondition = new OrchestrationInstanceStatusQueryCondition - { - InstanceId = "instance_id", - FetchInput = false, - }; - - ODataCondition odata = queryCondition.ToOData(); - OrchestrationInstanceStatus instanceTableEntity = await instanceTable - .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None) - .FirstOrDefaultAsync(); - - // Confirm the instance table was updated with a "stale" status - Assert.IsNotNull(instanceTableEntity); - Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus); - } - } - finally - { - await service?.StopAsync(isForced: true); - } - } - - /// - /// Confirm that: - /// 1. If is true, and a worker attempts to update the instance table with a stale - /// etag upon completing a work item for a suborchestration, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has - /// the correct status code (conflict). - /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table for the first work item - /// for a suborchestration. When it attempts to insert a new entity into the instance table for the suborchestration (since for a suborchestration, - /// the instance entity is only created upon completion of the first work item), it will fail. - /// 2. If is false for the above scenario, then the call to update the instance table - /// will go through, and the instance table will be updated with a "stale" status. - /// - /// - /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker - /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker - /// has since updated" the instance table. - /// - /// The value to use for - /// - [DataTestMethod] - [DataRow(true)] - [DataRow(false)] - public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrchestration(bool useInstanceEtag) - { - AzureStorageOrchestrationService service = null; - try - { - var orchestrationInstance = new OrchestrationInstance - { - InstanceId = "instance_id", - ExecutionId = "execution_id", - }; - - ExecutionStartedEvent startedEvent = new(-1, string.Empty) - { - Name = "orchestration", - Version = string.Empty, - OrchestrationInstance = orchestrationInstance, - ScheduledStartTime = DateTime.UtcNow, - }; - - var settings = new AzureStorageOrchestrationServiceSettings - { - PartitionCount = 1, - StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), - TaskHubName = TestHelpers.GetTestTaskHubName(), - ExtendedSessionsEnabled = false, - UseInstanceTableEtag = useInstanceEtag - }; - this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); - - service = new AzureStorageOrchestrationService(settings); - await service.CreateAsync(); - await service.StartAsync(); - - // Create the orchestration and get the first work item and start "working" on it - await service.CreateTaskOrchestrationAsync( - new TaskMessage() - { - OrchestrationInstance = orchestrationInstance, - Event = startedEvent - }); - var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - var runtimeState = workItem.OrchestrationRuntimeState; - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(startedEvent); - runtimeState.AddEvent(new SubOrchestrationInstanceCreatedEvent(0) - { - Name = "suborchestration", - InstanceId = "sub_instance_id" - }); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - - // Create the task message to start the suborchestration - var subOrchestrationExecutionStartedEvent = new ExecutionStartedEvent(-1, string.Empty) - { - OrchestrationInstance = new OrchestrationInstance - { - InstanceId = "sub_instance_id", - ExecutionId = Guid.NewGuid().ToString("N") - }, - ParentInstance = new ParentInstance - { - OrchestrationInstance = runtimeState.OrchestrationInstance, - Name = runtimeState.Name, - Version = runtimeState.Version, - TaskScheduleId = 0, - }, - Name = "suborchestration" - }; - List orchestratorMessages = - new() { - new TaskMessage() - { - OrchestrationInstance = subOrchestrationExecutionStartedEvent.OrchestrationInstance, - Event = subOrchestrationExecutionStartedEvent, - } - }; - - // Complete the first work item, which will send the execution started message for the suborchestration - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), orchestratorMessages, new List(), null, null); - - // Now get the work item for the suborchestration and "work" on it - workItem = await service.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - runtimeState = workItem.OrchestrationRuntimeState; - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(subOrchestrationExecutionStartedEvent); - runtimeState.AddEvent(new TaskScheduledEvent(0)); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - - AzureStorageClient azureStorageClient = new(settings); - Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); - // Now manually update the suborchestration to have status "Completed" - TableEntity entity = new("sub_instance_id", "") - { - ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), - }; - await instanceTable.InsertEntityAsync(entity); - - if (useInstanceEtag) - { - // Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table - // when one already exists - SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) - ); - Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); - DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; - Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); - } - else - { - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); - - var queryCondition = new OrchestrationInstanceStatusQueryCondition - { - InstanceId = "sub_instance_id", - FetchInput = false, - }; - - ODataCondition odata = queryCondition.ToOData(); - OrchestrationInstanceStatus instanceTableEntity = await instanceTable - .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None) - .FirstOrDefaultAsync(); - - // Confirm the instance table was updated with a "stale" status - Assert.IsNotNull(instanceTableEntity); - Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus); - } - - } - finally - { - await service?.StopAsync(isForced: true); - } - } - [TestMethod] public async Task MonitorIdleTaskHubDisconnected() { diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index 502e5fab6..e49cb497b 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -13,15 +13,6 @@ namespace DurableTask.AzureStorage.Tests { - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.IO; - using System.Linq; - using System.Runtime.Serialization; - using System.Text; - using System.Threading; - using System.Threading.Tasks; using Azure.Data.Tables; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; @@ -36,6 +27,17 @@ namespace DurableTask.AzureStorage.Tests using Moq; using Newtonsoft.Json; using Newtonsoft.Json.Linq; + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Linq; + using System.Net; + using System.Runtime.Serialization; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using static DurableTask.AzureStorage.Tests.AzureStorageScaleTests; #if !NET48 using OpenTelemetry; using OpenTelemetry.Trace; @@ -3143,6 +3145,505 @@ public async Task OrchestrationRejectsWithVersionMismatch() } } + /// + /// Confirm that: + /// 1. If is true, and a worker attempts to update the instance table with a stale + /// etag upon completing a work item, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code + /// (precondition failed). + /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table. When it attempts to update + /// the instance table with a stale etag, it will fail. + /// 2. If is false for the above scenario, then the call to update the instance table + /// will go through, and the instance table will be updated with a "stale" status. + /// + /// + /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker + /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker + /// has since updated" the instance table. + /// + /// The value to use for + /// + [DataTestMethod] + [DataRow(true)] + [DataRow(false)] + public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling(bool useInstanceEtag) + { + AzureStorageOrchestrationService service = null; + try + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = false, + UseInstanceTableEtag = useInstanceEtag + }; + + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new TaskScheduledEvent(0)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + // Now manually update the instance to have status "Completed" + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new(orchestrationInstance.InstanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + if (useInstanceEtag) + { + // Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); + } + else + { + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); + + var queryCondition = new OrchestrationInstanceStatusQueryCondition + { + InstanceId = "instance_id", + FetchInput = false, + }; + + ODataCondition odata = queryCondition.ToOData(); + OrchestrationInstanceStatus instanceTableEntity = await instanceTable + .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None) + .FirstOrDefaultAsync(); + + // Confirm the instance table was updated with a "stale" status + Assert.IsNotNull(instanceTableEntity); + Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus); + } + } + finally + { + await service?.StopAsync(isForced: true); + } + } + + /// + /// Confirm that: + /// 1. If is true, and a worker attempts to update the instance table with a stale + /// etag upon completing a work item for a suborchestration, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has + /// the correct status code (conflict). + /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table for the first work item + /// for a suborchestration. When it attempts to insert a new entity into the instance table for the suborchestration (since for a suborchestration, + /// the instance entity is only created upon completion of the first work item), it will fail. + /// 2. If is false for the above scenario, then the call to update the instance table + /// will go through, and the instance table will be updated with a "stale" status. + /// + /// + /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker + /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker + /// has since updated" the instance table. + /// + /// The value to use for + /// + [DataTestMethod] + [DataRow(true)] + [DataRow(false)] + public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrchestration(bool useInstanceEtag) + { + AzureStorageOrchestrationService service = null; + try + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = false, + UseInstanceTableEtag = useInstanceEtag + }; + + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new SubOrchestrationInstanceCreatedEvent(0) + { + Name = "suborchestration", + InstanceId = "sub_instance_id" + }); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + // Create the task message to start the suborchestration + var subOrchestrationExecutionStartedEvent = new ExecutionStartedEvent(-1, string.Empty) + { + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "sub_instance_id", + ExecutionId = Guid.NewGuid().ToString("N") + }, + ParentInstance = new ParentInstance + { + OrchestrationInstance = runtimeState.OrchestrationInstance, + Name = runtimeState.Name, + Version = runtimeState.Version, + TaskScheduleId = 0, + }, + Name = "suborchestration" + }; + List orchestratorMessages = + new() { + new TaskMessage() + { + OrchestrationInstance = subOrchestrationExecutionStartedEvent.OrchestrationInstance, + Event = subOrchestrationExecutionStartedEvent, + } + }; + + // Complete the first work item, which will send the execution started message for the suborchestration + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), orchestratorMessages, new List(), null, null); + + // Now get the work item for the suborchestration and "work" on it + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(subOrchestrationExecutionStartedEvent); + runtimeState.AddEvent(new TaskScheduledEvent(0)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + AzureStorageClient azureStorageClient = new(settings); + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + // Now manually update the suborchestration to have status "Completed" + TableEntity entity = new("sub_instance_id", "") + { + ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), + }; + await instanceTable.InsertEntityAsync(entity); + + if (useInstanceEtag) + { + // Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table + // when one already exists + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); + } + else + { + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); + + var queryCondition = new OrchestrationInstanceStatusQueryCondition + { + InstanceId = "sub_instance_id", + FetchInput = false, + }; + + ODataCondition odata = queryCondition.ToOData(); + OrchestrationInstanceStatus instanceTableEntity = await instanceTable + .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None) + .FirstOrDefaultAsync(); + + // Confirm the instance table was updated with a "stale" status + Assert.IsNotNull(instanceTableEntity); + Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus); + } + + } + finally + { + await service?.StopAsync(isForced: true); + } + } + + [DataTestMethod] + [DataRow(true)] + [DataRow(false)] + public async Task WorkerAttemptingToDequeueMessageForNonExistentInstance(bool extendedSessionsEnabled) + { + AzureStorageOrchestrationService service = null; + try + { + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = extendedSessionsEnabled, + }; + + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; + + await service.SendTaskOrchestrationMessageAsync( + new TaskMessage + { + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }, + Event = new TaskCompletedEvent(-1, 0, string.Empty) + { + Timestamp = DateTime.UtcNow - TimeSpan.FromMinutes(1), + } + }); + + for (int i = 0; i < 6; i++) + { + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(1), + CancellationToken.None); + Assert.IsNull(workItem); + } + + Assert.AreEqual(0, await service.OwnedControlQueues.Single().InnerQueue.GetApproximateMessagesCountAsync()); + + } + finally + { + await service?.StopAsync(isForced: true); + } + } + + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task WorkerAttemptingToDequeueMessageWithNoTaskScheduledInHistory(bool extendedSessionsEnabled, bool addTaskScheduledEvent) + { + AzureStorageOrchestrationService service = null; + try + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = extendedSessionsEnabled + }; + + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + if (addTaskScheduledEvent) + { + runtimeState.AddEvent(new TaskScheduledEvent(0)); + } + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); + + // Necessary to force a new work item to be generated for the next message + await service.ReleaseTaskOrchestrationWorkItemAsync(workItem); + + // Send a task completed for a different task scheduled ID, messages should be abandoned + await service.SendTaskOrchestrationMessageAsync( + new TaskMessage + { + OrchestrationInstance = orchestrationInstance, + Event = new TaskCompletedEvent(-1, 1, string.Empty) + }); + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(1), + CancellationToken.None); + Assert.IsNull(workItem); + + } + finally + { + await service?.StopAsync(isForced: true); + } + } + + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task WorkerAttemptingToDequeueMessageWithNoEventSentInHistory(bool extendedSessionsEnabled, bool addEventSentEvent) + { + AzureStorageOrchestrationService service = null; + try + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = extendedSessionsEnabled + }; + + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + if (addEventSentEvent) + { + runtimeState.AddEvent(new EventSentEvent(-1) + { + Input = $"{{ \"id\": \"{Guid.NewGuid()}\" }}" + }); + } + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); + + // Necessary to force a new work item to be generated for the next message + await service.ReleaseTaskOrchestrationWorkItemAsync(workItem); + + // Send a task completed for a different task scheduled ID, messages should be abandoned + await service.SendTaskOrchestrationMessageInternalAsync( + sourceInstance: new OrchestrationInstance() + { + InstanceId = "@test@myEntity" + }, + controlQueue: service.OwnedControlQueues.Single(), + new TaskMessage + { + OrchestrationInstance = orchestrationInstance, + Event = new EventRaisedEvent(-1, string.Empty) + { + Name = Guid.NewGuid().ToString() + } + }); + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(1), + CancellationToken.None); + Assert.IsNull(workItem); + + } + finally + { + await service?.StopAsync(isForced: true); + } + } + #if !NET48 /// /// End-to-end test which validates a simple orchestrator function that calls an activity function From b3f152294a7b9a28545324a9f369dfb97e36dba7 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 11 Mar 2026 13:48:54 -0700 Subject: [PATCH 5/8] updated the tests a bit, resolved some copilot comments --- .../AzureStorageScenarioTests.cs | 47 ++++++++++++++----- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index e49cb497b..cecffc9f4 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -24,7 +24,6 @@ namespace DurableTask.AzureStorage.Tests using DurableTask.Core.Settings; using Microsoft.Practices.EnterpriseLibrary.SemanticLogging.Utility; using Microsoft.VisualStudio.TestTools.UnitTesting; - using Moq; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; @@ -37,7 +36,6 @@ namespace DurableTask.AzureStorage.Tests using System.Text; using System.Threading; using System.Threading.Tasks; - using static DurableTask.AzureStorage.Tests.AzureStorageScaleTests; #if !NET48 using OpenTelemetry; using OpenTelemetry.Trace; @@ -3438,12 +3436,6 @@ public async Task WorkerAttemptingToDequeueMessageForNonExistentInstance(bool ex await service.CreateAsync(); await service.StartAsync(); - var orchestrationInstance = new OrchestrationInstance - { - InstanceId = "instance_id", - ExecutionId = "execution_id", - }; - await service.SendTaskOrchestrationMessageAsync( new TaskMessage { @@ -3467,7 +3459,6 @@ await service.SendTaskOrchestrationMessageAsync( } Assert.AreEqual(0, await service.OwnedControlQueues.Single().InnerQueue.GetApproximateMessagesCountAsync()); - } finally { @@ -3535,7 +3526,7 @@ await service.CreateTaskOrchestrationAsync( // Necessary to force a new work item to be generated for the next message await service.ReleaseTaskOrchestrationWorkItemAsync(workItem); - // Send a task completed for a different task scheduled ID, messages should be abandoned + // Send a task completed for a different task scheduled ID, message should be abandoned await service.SendTaskOrchestrationMessageAsync( new TaskMessage { @@ -3547,6 +3538,20 @@ await service.SendTaskOrchestrationMessageAsync( CancellationToken.None); Assert.IsNull(workItem); + if (addTaskScheduledEvent) + { + // Send a task completed for the same task scheduled ID, this should work + await service.SendTaskOrchestrationMessageAsync( + new TaskMessage + { + OrchestrationInstance = orchestrationInstance, + Event = new TaskCompletedEvent(-1, 0, string.Empty) + }); + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(1), + CancellationToken.None); + Assert.IsNotNull(workItem); + } } finally { @@ -3603,11 +3608,12 @@ await service.CreateTaskOrchestrationAsync( var runtimeState = workItem.OrchestrationRuntimeState; runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); runtimeState.AddEvent(startedEvent); + string requestId = Guid.NewGuid().ToString(); if (addEventSentEvent) { runtimeState.AddEvent(new EventSentEvent(-1) { - Input = $"{{ \"id\": \"{Guid.NewGuid()}\" }}" + Input = $"{{ \"id\": \"{requestId}\" }}" }); } runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); @@ -3617,7 +3623,7 @@ await service.CreateTaskOrchestrationAsync( // Necessary to force a new work item to be generated for the next message await service.ReleaseTaskOrchestrationWorkItemAsync(workItem); - // Send a task completed for a different task scheduled ID, messages should be abandoned + // Send an event raised for a different request ID, message should be abandoned await service.SendTaskOrchestrationMessageInternalAsync( sourceInstance: new OrchestrationInstance() { @@ -3637,6 +3643,23 @@ await service.SendTaskOrchestrationMessageInternalAsync( CancellationToken.None); Assert.IsNull(workItem); + if (addEventSentEvent) + { + // Send an event raised for the same request ID, this should work + await service.SendTaskOrchestrationMessageAsync( + new TaskMessage + { + OrchestrationInstance = orchestrationInstance, + Event = new EventRaisedEvent(-1, string.Empty) + { + Name = requestId + } + }); + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(1), + CancellationToken.None); + Assert.IsNotNull(workItem); + } } finally { From c0cf545c6d1dde2e41442d8000df5016d7d9bd65 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 11 Mar 2026 13:50:09 -0700 Subject: [PATCH 6/8] restored a usings, added a comment --- .../AzureStorageScenarioTests.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index cecffc9f4..ed892a776 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -24,6 +24,7 @@ namespace DurableTask.AzureStorage.Tests using DurableTask.Core.Settings; using Microsoft.Practices.EnterpriseLibrary.SemanticLogging.Utility; using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; @@ -3458,6 +3459,8 @@ await service.SendTaskOrchestrationMessageAsync( Assert.IsNull(workItem); } + // On the last attempt, the message should have been deleted since we have exceeded the maximum abandonment count + // for a message to a nonexistent instance (5) Assert.AreEqual(0, await service.OwnedControlQueues.Single().InnerQueue.GetApproximateMessagesCountAsync()); } finally From 3d84e2ef150353f49ba0b4c08614105509d35e41 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 11 Mar 2026 14:12:03 -0700 Subject: [PATCH 7/8] updated a potentially flaky test --- .../DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index ed892a776..eddb4f64c 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -3461,7 +3461,7 @@ await service.SendTaskOrchestrationMessageAsync( // On the last attempt, the message should have been deleted since we have exceeded the maximum abandonment count // for a message to a nonexistent instance (5) - Assert.AreEqual(0, await service.OwnedControlQueues.Single().InnerQueue.GetApproximateMessagesCountAsync()); + Assert.IsNull(await service.OwnedControlQueues.Single().InnerQueue.PeekMessageAsync()); } finally { From 9c803bc2e050b53ebfa92ebcfdc611eb601a8ee0 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 11 Mar 2026 14:13:19 -0700 Subject: [PATCH 8/8] changed instance/execution IDs in tests to be random --- .../AzureStorageScenarioTests.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index eddb4f64c..3a48ee5ff 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -3481,8 +3481,8 @@ public async Task WorkerAttemptingToDequeueMessageWithNoTaskScheduledInHistory(b { var orchestrationInstance = new OrchestrationInstance { - InstanceId = "instance_id", - ExecutionId = "execution_id", + InstanceId = Guid.NewGuid().ToString(), + ExecutionId = Guid.NewGuid().ToString(), }; ExecutionStartedEvent startedEvent = new(-1, string.Empty) @@ -3574,8 +3574,8 @@ public async Task WorkerAttemptingToDequeueMessageWithNoEventSentInHistory(bool { var orchestrationInstance = new OrchestrationInstance { - InstanceId = "instance_id", - ExecutionId = "execution_id", + InstanceId = Guid.NewGuid().ToString(), + ExecutionId = Guid.NewGuid().ToString(), }; ExecutionStartedEvent startedEvent = new(-1, string.Empty)