diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9c3471978..9ad814c43 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1821,7 +1821,7 @@ public async Task SendTaskOrchestrationMessageAsync(TaskMessage message) await this.SendTaskOrchestrationMessageInternalAsync(EmptySourceInstance, controlQueue, message); } - Task SendTaskOrchestrationMessageInternalAsync( + internal Task SendTaskOrchestrationMessageInternalAsync( OrchestrationInstance sourceInstance, ControlQueue controlQueue, TaskMessage message) diff --git a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs index d30a1b40f..ae3e355d6 100644 --- a/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs +++ b/src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs @@ -164,18 +164,6 @@ public bool IsOutOfOrderMessage(MessageData message) return false; } - if (this.LastCheckpointTime > message.TaskMessage.Event.Timestamp) - { - // LastCheckpointTime represents the time at which the most recent history checkpoint completed. - // The checkpoint is written to the history table only *after* all queue messages are sent. - // A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp. - // In this case, we see that the checkpoint came *after* the message, so there is no out-of-order - // concern. Note that this logic only applies for messages sent by orchestrations to themselves. - // The next check considers the other cases (activities, sub-orchestrations, etc.). - // Orchestration checkpoint time information was added only after v1.6.4. - return false; - } - if (Utils.TryGetTaskScheduledId(message.TaskMessage.Event, out int taskScheduledId)) { // This message is a response to a task. Search the history to make sure that we've recorded the fact that @@ -193,7 +181,7 @@ public bool IsOutOfOrderMessage(MessageData message) var requestId = ((EventRaisedEvent)message.TaskMessage.Event).Name; if (requestId != null) { - HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.FirstOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId); + HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.LastOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId); if (mostRecentTaskEvent != null) { return false; diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index a2872df75..854aa3007 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -643,281 +643,6 @@ await TestHelpers.WaitFor( } } - /// - /// Confirm that: - /// 1. If is true, and a worker attempts to update the instance table with a stale - /// etag upon completing a work item, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code - /// (precondition failed). - /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table. When it attempts to update - /// the instance table with a stale etag, it will fail. - /// 2. If is false for the above scenario, then the call to update the instance table - /// will go through, and the instance table will be updated with a "stale" status. - /// - /// - /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker - /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker - /// has since updated" the instance table. - /// - /// The value to use for - /// - [DataTestMethod] - [DataRow(true)] - [DataRow(false)] - public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling(bool useInstanceEtag) - { - AzureStorageOrchestrationService service = null; - try - { - var orchestrationInstance = new OrchestrationInstance - { - InstanceId = "instance_id", - ExecutionId = "execution_id", - }; - - ExecutionStartedEvent startedEvent = new(-1, string.Empty) - { - Name = "orchestration", - Version = string.Empty, - OrchestrationInstance = orchestrationInstance, - ScheduledStartTime = DateTime.UtcNow, - }; - - var settings = new AzureStorageOrchestrationServiceSettings - { - PartitionCount = 1, - StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), - TaskHubName = TestHelpers.GetTestTaskHubName(), - ExtendedSessionsEnabled = false, - UseInstanceTableEtag = useInstanceEtag - }; - this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); - - service = new AzureStorageOrchestrationService(settings); - await service.CreateAsync(); - await service.StartAsync(); - - // Create the orchestration and get the first work item and start "working" on it - await service.CreateTaskOrchestrationAsync( - new TaskMessage() - { - OrchestrationInstance = orchestrationInstance, - Event = startedEvent - }); - var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - var runtimeState = workItem.OrchestrationRuntimeState; - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(startedEvent); - runtimeState.AddEvent(new TaskScheduledEvent(0)); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - - AzureStorageClient azureStorageClient = new AzureStorageClient(settings); - - // Now manually update the instance to have status "Completed" - Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); - TableEntity entity = new(orchestrationInstance.InstanceId, "") - { - ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), - }; - await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); - - if (useInstanceEtag) - { - // Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item - SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) - ); - Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); - DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; - Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); - } - else - { - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); - - var queryCondition = new OrchestrationInstanceStatusQueryCondition - { - InstanceId = "instance_id", - FetchInput = false, - }; - - ODataCondition odata = queryCondition.ToOData(); - OrchestrationInstanceStatus instanceTableEntity = await instanceTable - .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None) - .FirstOrDefaultAsync(); - - // Confirm the instance table was updated with a "stale" status - Assert.IsNotNull(instanceTableEntity); - Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus); - } - } - finally - { - await service?.StopAsync(isForced: true); - } - } - - /// - /// Confirm that: - /// 1. If is true, and a worker attempts to update the instance table with a stale - /// etag upon completing a work item for a suborchestration, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has - /// the correct status code (conflict). - /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table for the first work item - /// for a suborchestration. When it attempts to insert a new entity into the instance table for the suborchestration (since for a suborchestration, - /// the instance entity is only created upon completion of the first work item), it will fail. - /// 2. If is false for the above scenario, then the call to update the instance table - /// will go through, and the instance table will be updated with a "stale" status. - /// - /// - /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker - /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker - /// has since updated" the instance table. - /// - /// The value to use for - /// - [DataTestMethod] - [DataRow(true)] - [DataRow(false)] - public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrchestration(bool useInstanceEtag) - { - AzureStorageOrchestrationService service = null; - try - { - var orchestrationInstance = new OrchestrationInstance - { - InstanceId = "instance_id", - ExecutionId = "execution_id", - }; - - ExecutionStartedEvent startedEvent = new(-1, string.Empty) - { - Name = "orchestration", - Version = string.Empty, - OrchestrationInstance = orchestrationInstance, - ScheduledStartTime = DateTime.UtcNow, - }; - - var settings = new AzureStorageOrchestrationServiceSettings - { - PartitionCount = 1, - StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), - TaskHubName = TestHelpers.GetTestTaskHubName(), - ExtendedSessionsEnabled = false, - UseInstanceTableEtag = useInstanceEtag - }; - this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe); - - service = new AzureStorageOrchestrationService(settings); - await service.CreateAsync(); - await service.StartAsync(); - - // Create the orchestration and get the first work item and start "working" on it - await service.CreateTaskOrchestrationAsync( - new TaskMessage() - { - OrchestrationInstance = orchestrationInstance, - Event = startedEvent - }); - var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - var runtimeState = workItem.OrchestrationRuntimeState; - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(startedEvent); - runtimeState.AddEvent(new SubOrchestrationInstanceCreatedEvent(0) - { - Name = "suborchestration", - InstanceId = "sub_instance_id" - }); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - - // Create the task message to start the suborchestration - var subOrchestrationExecutionStartedEvent = new ExecutionStartedEvent(-1, string.Empty) - { - OrchestrationInstance = new OrchestrationInstance - { - InstanceId = "sub_instance_id", - ExecutionId = Guid.NewGuid().ToString("N") - }, - ParentInstance = new ParentInstance - { - OrchestrationInstance = runtimeState.OrchestrationInstance, - Name = runtimeState.Name, - Version = runtimeState.Version, - TaskScheduleId = 0, - }, - Name = "suborchestration" - }; - List orchestratorMessages = - new() { - new TaskMessage() - { - OrchestrationInstance = subOrchestrationExecutionStartedEvent.OrchestrationInstance, - Event = subOrchestrationExecutionStartedEvent, - } - }; - - // Complete the first work item, which will send the execution started message for the suborchestration - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), orchestratorMessages, new List(), null, null); - - // Now get the work item for the suborchestration and "work" on it - workItem = await service.LockNextTaskOrchestrationWorkItemAsync( - TimeSpan.FromMinutes(5), - CancellationToken.None); - runtimeState = workItem.OrchestrationRuntimeState; - runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - runtimeState.AddEvent(subOrchestrationExecutionStartedEvent); - runtimeState.AddEvent(new TaskScheduledEvent(0)); - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - - AzureStorageClient azureStorageClient = new(settings); - Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); - // Now manually update the suborchestration to have status "Completed" - TableEntity entity = new("sub_instance_id", "") - { - ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), - }; - await instanceTable.InsertEntityAsync(entity); - - if (useInstanceEtag) - { - // Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table - // when one already exists - SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) - ); - Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); - DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; - Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); - } - else - { - await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); - - var queryCondition = new OrchestrationInstanceStatusQueryCondition - { - InstanceId = "sub_instance_id", - FetchInput = false, - }; - - ODataCondition odata = queryCondition.ToOData(); - OrchestrationInstanceStatus instanceTableEntity = await instanceTable - .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None) - .FirstOrDefaultAsync(); - - // Confirm the instance table was updated with a "stale" status - Assert.IsNotNull(instanceTableEntity); - Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus); - } - - } - finally - { - await service?.StopAsync(isForced: true); - } - } - [TestMethod] public async Task MonitorIdleTaskHubDisconnected() { diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index 502e5fab6..3a48ee5ff 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -13,15 +13,6 @@ namespace DurableTask.AzureStorage.Tests { - using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.IO; - using System.Linq; - using System.Runtime.Serialization; - using System.Text; - using System.Threading; - using System.Threading.Tasks; using Azure.Data.Tables; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; @@ -36,6 +27,16 @@ namespace DurableTask.AzureStorage.Tests using Moq; using Newtonsoft.Json; using Newtonsoft.Json.Linq; + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Linq; + using System.Net; + using System.Runtime.Serialization; + using System.Text; + using System.Threading; + using System.Threading.Tasks; #if !NET48 using OpenTelemetry; using OpenTelemetry.Trace; @@ -3143,6 +3144,532 @@ public async Task OrchestrationRejectsWithVersionMismatch() } } + /// + /// Confirm that: + /// 1. If is true, and a worker attempts to update the instance table with a stale + /// etag upon completing a work item, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code + /// (precondition failed). + /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table. When it attempts to update + /// the instance table with a stale etag, it will fail. + /// 2. If is false for the above scenario, then the call to update the instance table + /// will go through, and the instance table will be updated with a "stale" status. + /// + /// + /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker + /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker + /// has since updated" the instance table. + /// + /// The value to use for + /// + [DataTestMethod] + [DataRow(true)] + [DataRow(false)] + public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling(bool useInstanceEtag) + { + AzureStorageOrchestrationService service = null; + try + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = false, + UseInstanceTableEtag = useInstanceEtag + }; + + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new TaskScheduledEvent(0)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + // Now manually update the instance to have status "Completed" + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new(orchestrationInstance.InstanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + if (useInstanceEtag) + { + // Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode); + } + else + { + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); + + var queryCondition = new OrchestrationInstanceStatusQueryCondition + { + InstanceId = "instance_id", + FetchInput = false, + }; + + ODataCondition odata = queryCondition.ToOData(); + OrchestrationInstanceStatus instanceTableEntity = await instanceTable + .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None) + .FirstOrDefaultAsync(); + + // Confirm the instance table was updated with a "stale" status + Assert.IsNotNull(instanceTableEntity); + Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus); + } + } + finally + { + await service?.StopAsync(isForced: true); + } + } + + /// + /// Confirm that: + /// 1. If is true, and a worker attempts to update the instance table with a stale + /// etag upon completing a work item for a suborchestration, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has + /// the correct status code (conflict). + /// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table for the first work item + /// for a suborchestration. When it attempts to insert a new entity into the instance table for the suborchestration (since for a suborchestration, + /// the instance entity is only created upon completion of the first work item), it will fail. + /// 2. If is false for the above scenario, then the call to update the instance table + /// will go through, and the instance table will be updated with a "stale" status. + /// + /// + /// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker + /// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker + /// has since updated" the instance table. + /// + /// The value to use for + /// + [DataTestMethod] + [DataRow(true)] + [DataRow(false)] + public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrchestration(bool useInstanceEtag) + { + AzureStorageOrchestrationService service = null; + try + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = false, + UseInstanceTableEtag = useInstanceEtag + }; + + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + runtimeState.AddEvent(new SubOrchestrationInstanceCreatedEvent(0) + { + Name = "suborchestration", + InstanceId = "sub_instance_id" + }); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + // Create the task message to start the suborchestration + var subOrchestrationExecutionStartedEvent = new ExecutionStartedEvent(-1, string.Empty) + { + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "sub_instance_id", + ExecutionId = Guid.NewGuid().ToString("N") + }, + ParentInstance = new ParentInstance + { + OrchestrationInstance = runtimeState.OrchestrationInstance, + Name = runtimeState.Name, + Version = runtimeState.Version, + TaskScheduleId = 0, + }, + Name = "suborchestration" + }; + List orchestratorMessages = + new() { + new TaskMessage() + { + OrchestrationInstance = subOrchestrationExecutionStartedEvent.OrchestrationInstance, + Event = subOrchestrationExecutionStartedEvent, + } + }; + + // Complete the first work item, which will send the execution started message for the suborchestration + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), orchestratorMessages, new List(), null, null); + + // Now get the work item for the suborchestration and "work" on it + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(subOrchestrationExecutionStartedEvent); + runtimeState.AddEvent(new TaskScheduledEvent(0)); + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + AzureStorageClient azureStorageClient = new(settings); + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + // Now manually update the suborchestration to have status "Completed" + TableEntity entity = new("sub_instance_id", "") + { + ["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"), + }; + await instanceTable.InsertEntityAsync(entity); + + if (useInstanceEtag) + { + // Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table + // when one already exists + SessionAbortedException exception = await Assert.ThrowsExceptionAsync(async () => + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null) + ); + Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException)); + DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException; + Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode); + } + else + { + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); + + var queryCondition = new OrchestrationInstanceStatusQueryCondition + { + InstanceId = "sub_instance_id", + FetchInput = false, + }; + + ODataCondition odata = queryCondition.ToOData(); + OrchestrationInstanceStatus instanceTableEntity = await instanceTable + .ExecuteQueryAsync(odata.Filter, 1, odata.Select, CancellationToken.None) + .FirstOrDefaultAsync(); + + // Confirm the instance table was updated with a "stale" status + Assert.IsNotNull(instanceTableEntity); + Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus); + } + + } + finally + { + await service?.StopAsync(isForced: true); + } + } + + [DataTestMethod] + [DataRow(true)] + [DataRow(false)] + public async Task WorkerAttemptingToDequeueMessageForNonExistentInstance(bool extendedSessionsEnabled) + { + AzureStorageOrchestrationService service = null; + try + { + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = extendedSessionsEnabled, + }; + + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + await service.SendTaskOrchestrationMessageAsync( + new TaskMessage + { + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = "instance_id", + ExecutionId = "execution_id", + }, + Event = new TaskCompletedEvent(-1, 0, string.Empty) + { + Timestamp = DateTime.UtcNow - TimeSpan.FromMinutes(1), + } + }); + + for (int i = 0; i < 6; i++) + { + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(1), + CancellationToken.None); + Assert.IsNull(workItem); + } + + // On the last attempt, the message should have been deleted since we have exceeded the maximum abandonment count + // for a message to a nonexistent instance (5) + Assert.IsNull(await service.OwnedControlQueues.Single().InnerQueue.PeekMessageAsync()); + } + finally + { + await service?.StopAsync(isForced: true); + } + } + + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task WorkerAttemptingToDequeueMessageWithNoTaskScheduledInHistory(bool extendedSessionsEnabled, bool addTaskScheduledEvent) + { + AzureStorageOrchestrationService service = null; + try + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = Guid.NewGuid().ToString(), + ExecutionId = Guid.NewGuid().ToString(), + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = extendedSessionsEnabled + }; + + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + if (addTaskScheduledEvent) + { + runtimeState.AddEvent(new TaskScheduledEvent(0)); + } + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); + + // Necessary to force a new work item to be generated for the next message + await service.ReleaseTaskOrchestrationWorkItemAsync(workItem); + + // Send a task completed for a different task scheduled ID, message should be abandoned + await service.SendTaskOrchestrationMessageAsync( + new TaskMessage + { + OrchestrationInstance = orchestrationInstance, + Event = new TaskCompletedEvent(-1, 1, string.Empty) + }); + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(1), + CancellationToken.None); + Assert.IsNull(workItem); + + if (addTaskScheduledEvent) + { + // Send a task completed for the same task scheduled ID, this should work + await service.SendTaskOrchestrationMessageAsync( + new TaskMessage + { + OrchestrationInstance = orchestrationInstance, + Event = new TaskCompletedEvent(-1, 0, string.Empty) + }); + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(1), + CancellationToken.None); + Assert.IsNotNull(workItem); + } + } + finally + { + await service?.StopAsync(isForced: true); + } + } + + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task WorkerAttemptingToDequeueMessageWithNoEventSentInHistory(bool extendedSessionsEnabled, bool addEventSentEvent) + { + AzureStorageOrchestrationService service = null; + try + { + var orchestrationInstance = new OrchestrationInstance + { + InstanceId = Guid.NewGuid().ToString(), + ExecutionId = Guid.NewGuid().ToString(), + }; + + ExecutionStartedEvent startedEvent = new(-1, string.Empty) + { + Name = "orchestration", + Version = string.Empty, + OrchestrationInstance = orchestrationInstance, + ScheduledStartTime = DateTime.UtcNow, + }; + + var settings = new AzureStorageOrchestrationServiceSettings + { + PartitionCount = 1, + StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()), + TaskHubName = TestHelpers.GetTestTaskHubName(), + ExtendedSessionsEnabled = extendedSessionsEnabled + }; + + service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // Create the orchestration and get the first work item and start "working" on it + await service.CreateTaskOrchestrationAsync( + new TaskMessage() + { + OrchestrationInstance = orchestrationInstance, + Event = startedEvent + }); + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + var runtimeState = workItem.OrchestrationRuntimeState; + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(startedEvent); + string requestId = Guid.NewGuid().ToString(); + if (addEventSentEvent) + { + runtimeState.AddEvent(new EventSentEvent(-1) + { + Input = $"{{ \"id\": \"{requestId}\" }}" + }); + } + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List(), new List(), new List(), null, null); + + // Necessary to force a new work item to be generated for the next message + await service.ReleaseTaskOrchestrationWorkItemAsync(workItem); + + // Send an event raised for a different request ID, message should be abandoned + await service.SendTaskOrchestrationMessageInternalAsync( + sourceInstance: new OrchestrationInstance() + { + InstanceId = "@test@myEntity" + }, + controlQueue: service.OwnedControlQueues.Single(), + new TaskMessage + { + OrchestrationInstance = orchestrationInstance, + Event = new EventRaisedEvent(-1, string.Empty) + { + Name = Guid.NewGuid().ToString() + } + }); + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(1), + CancellationToken.None); + Assert.IsNull(workItem); + + if (addEventSentEvent) + { + // Send an event raised for the same request ID, this should work + await service.SendTaskOrchestrationMessageAsync( + new TaskMessage + { + OrchestrationInstance = orchestrationInstance, + Event = new EventRaisedEvent(-1, string.Empty) + { + Name = requestId + } + }); + workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(1), + CancellationToken.None); + Assert.IsNotNull(workItem); + } + } + finally + { + await service?.StopAsync(isForced: true); + } + } + #if !NET48 /// /// End-to-end test which validates a simple orchestrator function that calls an activity function