Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1821,7 +1821,7 @@ public async Task SendTaskOrchestrationMessageAsync(TaskMessage message)
await this.SendTaskOrchestrationMessageInternalAsync(EmptySourceInstance, controlQueue, message);
}

Task<MessageData> SendTaskOrchestrationMessageInternalAsync(
internal Task<MessageData> SendTaskOrchestrationMessageInternalAsync(
OrchestrationInstance sourceInstance,
ControlQueue controlQueue,
TaskMessage message)
Expand Down
14 changes: 1 addition & 13 deletions src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,6 @@ public bool IsOutOfOrderMessage(MessageData message)
return false;
}

if (this.LastCheckpointTime > message.TaskMessage.Event.Timestamp)
{
// LastCheckpointTime represents the time at which the most recent history checkpoint completed.
// The checkpoint is written to the history table only *after* all queue messages are sent.
// A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp.
// In this case, we see that the checkpoint came *after* the message, so there is no out-of-order
// concern. Note that this logic only applies for messages sent by orchestrations to themselves.
// The next check considers the other cases (activities, sub-orchestrations, etc.).
// Orchestration checkpoint time information was added only after v1.6.4.
return false;
}

if (Utils.TryGetTaskScheduledId(message.TaskMessage.Event, out int taskScheduledId))
{
// This message is a response to a task. Search the history to make sure that we've recorded the fact that
Expand All @@ -193,7 +181,7 @@ public bool IsOutOfOrderMessage(MessageData message)
var requestId = ((EventRaisedEvent)message.TaskMessage.Event).Name;
if (requestId != null)
{
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.FirstOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId);
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.LastOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId);
if (mostRecentTaskEvent != null)
{
return false;
Expand Down
275 changes: 0 additions & 275 deletions test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -643,281 +643,6 @@ await TestHelpers.WaitFor(
}
}

/// <summary>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I incorrectly added these tests to this class in earlier PRs. They always should have been in the AzureStorageScenarioTests class so I took this opportunity to move them there

/// Confirm that:
/// 1. If <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/> is true, and a worker attempts to update the instance table with a stale
/// etag upon completing a work item, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has the correct status code
/// (precondition failed).
/// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table. When it attempts to update
/// the instance table with a stale etag, it will fail.
/// 2. If <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/> is false for the above scenario, then the call to update the instance table
/// will go through, and the instance table will be updated with a "stale" status.
/// </summary>
/// <remarks>
/// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker
/// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker
/// has since updated" the instance table.
/// </remarks>
/// <param name="useInstanceEtag">The value to use for <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/></param>
/// <returns></returns>
[DataTestMethod]
[DataRow(true)]
[DataRow(false)]
public async Task WorkerAttemptingToUpdateInstanceTableAfterStalling(bool useInstanceEtag)
{
AzureStorageOrchestrationService service = null;
try
{
var orchestrationInstance = new OrchestrationInstance
{
InstanceId = "instance_id",
ExecutionId = "execution_id",
};

ExecutionStartedEvent startedEvent = new(-1, string.Empty)
{
Name = "orchestration",
Version = string.Empty,
OrchestrationInstance = orchestrationInstance,
ScheduledStartTime = DateTime.UtcNow,
};

var settings = new AzureStorageOrchestrationServiceSettings
{
PartitionCount = 1,
StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()),
TaskHubName = TestHelpers.GetTestTaskHubName(),
ExtendedSessionsEnabled = false,
UseInstanceTableEtag = useInstanceEtag
};
this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe);

service = new AzureStorageOrchestrationService(settings);
await service.CreateAsync();
await service.StartAsync();

// Create the orchestration and get the first work item and start "working" on it
await service.CreateTaskOrchestrationAsync(
new TaskMessage()
{
OrchestrationInstance = orchestrationInstance,
Event = startedEvent
});
var workItem = await service.LockNextTaskOrchestrationWorkItemAsync(
TimeSpan.FromMinutes(5),
CancellationToken.None);
var runtimeState = workItem.OrchestrationRuntimeState;
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
runtimeState.AddEvent(startedEvent);
runtimeState.AddEvent(new TaskScheduledEvent(0));
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));

AzureStorageClient azureStorageClient = new AzureStorageClient(settings);

// Now manually update the instance to have status "Completed"
Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName);
TableEntity entity = new(orchestrationInstance.InstanceId, "")
{
["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"),
};
await instanceTable.MergeEntityAsync(entity, Azure.ETag.All);

if (useInstanceEtag)
{
// Confirm an exception is thrown due to the etag mismatch for the instance table when the worker attempts to complete the work item
SessionAbortedException exception = await Assert.ThrowsExceptionAsync<SessionAbortedException>(async () =>
await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null)
);
Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException;
Assert.AreEqual((int)HttpStatusCode.PreconditionFailed, dtse.HttpStatusCode);
}
else
{
await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null);

var queryCondition = new OrchestrationInstanceStatusQueryCondition
{
InstanceId = "instance_id",
FetchInput = false,
};

ODataCondition odata = queryCondition.ToOData();
OrchestrationInstanceStatus instanceTableEntity = await instanceTable
.ExecuteQueryAsync<OrchestrationInstanceStatus>(odata.Filter, 1, odata.Select, CancellationToken.None)
.FirstOrDefaultAsync();

// Confirm the instance table was updated with a "stale" status
Assert.IsNotNull(instanceTableEntity);
Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus);
}
}
finally
{
await service?.StopAsync(isForced: true);
}
}

/// <summary>
/// Confirm that:
/// 1. If <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/> is true, and a worker attempts to update the instance table with a stale
/// etag upon completing a work item for a suborchestration, a SessionAbortedException is thrown which wraps the inner DurableTaskStorageException, which has
/// the correct status code (conflict).
/// The specific scenario tested is if the worker stalled after updating the history table but before updating the instance table for the first work item
/// for a suborchestration. When it attempts to insert a new entity into the instance table for the suborchestration (since for a suborchestration,
/// the instance entity is only created upon completion of the first work item), it will fail.
/// 2. If <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/> is false for the above scenario, then the call to update the instance table
/// will go through, and the instance table will be updated with a "stale" status.
/// </summary>
/// <remarks>
/// Since it is impossible to force stalling, we simulate the above scenario by manually updating the instance table before the worker
/// attempts to complete the work item. The history table update will go through, but the instance table update will fail since "another worker
/// has since updated" the instance table.
/// </remarks>
/// <param name="useInstanceEtag">The value to use for <see cref="AzureStorageOrchestrationServiceSettings.UseInstanceTableEtag"/></param>
/// <returns></returns>
[DataTestMethod]
[DataRow(true)]
[DataRow(false)]
public async Task WorkerAttemptingToUpdateInstanceTableAfterStallingForSubOrchestration(bool useInstanceEtag)
{
AzureStorageOrchestrationService service = null;
try
{
var orchestrationInstance = new OrchestrationInstance
{
InstanceId = "instance_id",
ExecutionId = "execution_id",
};

ExecutionStartedEvent startedEvent = new(-1, string.Empty)
{
Name = "orchestration",
Version = string.Empty,
OrchestrationInstance = orchestrationInstance,
ScheduledStartTime = DateTime.UtcNow,
};

var settings = new AzureStorageOrchestrationServiceSettings
{
PartitionCount = 1,
StorageAccountClientProvider = new StorageAccountClientProvider(TestHelpers.GetTestStorageAccountConnectionString()),
TaskHubName = TestHelpers.GetTestTaskHubName(),
ExtendedSessionsEnabled = false,
UseInstanceTableEtag = useInstanceEtag
};
this.SetPartitionManagerType(settings, PartitionManagerType.V2Safe);

service = new AzureStorageOrchestrationService(settings);
await service.CreateAsync();
await service.StartAsync();

// Create the orchestration and get the first work item and start "working" on it
await service.CreateTaskOrchestrationAsync(
new TaskMessage()
{
OrchestrationInstance = orchestrationInstance,
Event = startedEvent
});
var workItem = await service.LockNextTaskOrchestrationWorkItemAsync(
TimeSpan.FromMinutes(5),
CancellationToken.None);
var runtimeState = workItem.OrchestrationRuntimeState;
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
runtimeState.AddEvent(startedEvent);
runtimeState.AddEvent(new SubOrchestrationInstanceCreatedEvent(0)
{
Name = "suborchestration",
InstanceId = "sub_instance_id"
});
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));

// Create the task message to start the suborchestration
var subOrchestrationExecutionStartedEvent = new ExecutionStartedEvent(-1, string.Empty)
{
OrchestrationInstance = new OrchestrationInstance
{
InstanceId = "sub_instance_id",
ExecutionId = Guid.NewGuid().ToString("N")
},
ParentInstance = new ParentInstance
{
OrchestrationInstance = runtimeState.OrchestrationInstance,
Name = runtimeState.Name,
Version = runtimeState.Version,
TaskScheduleId = 0,
},
Name = "suborchestration"
};
List<TaskMessage> orchestratorMessages =
new() {
new TaskMessage()
{
OrchestrationInstance = subOrchestrationExecutionStartedEvent.OrchestrationInstance,
Event = subOrchestrationExecutionStartedEvent,
}
};

// Complete the first work item, which will send the execution started message for the suborchestration
await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List<TaskMessage>(), orchestratorMessages, new List<TaskMessage>(), null, null);

// Now get the work item for the suborchestration and "work" on it
workItem = await service.LockNextTaskOrchestrationWorkItemAsync(
TimeSpan.FromMinutes(5),
CancellationToken.None);
runtimeState = workItem.OrchestrationRuntimeState;
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
runtimeState.AddEvent(subOrchestrationExecutionStartedEvent);
runtimeState.AddEvent(new TaskScheduledEvent(0));
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));

AzureStorageClient azureStorageClient = new(settings);
Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName);
// Now manually update the suborchestration to have status "Completed"
TableEntity entity = new("sub_instance_id", "")
{
["RuntimeStatus"] = OrchestrationStatus.Completed.ToString("G"),
};
await instanceTable.InsertEntityAsync(entity);

if (useInstanceEtag)
{
// Confirm an exception is thrown because the worker attempts to insert a new entity for the suborchestration into the instance table
// when one already exists
SessionAbortedException exception = await Assert.ThrowsExceptionAsync<SessionAbortedException>(async () =>
await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null)
);
Assert.IsInstanceOfType(exception.InnerException, typeof(DurableTaskStorageException));
DurableTaskStorageException dtse = (DurableTaskStorageException)exception.InnerException;
Assert.AreEqual((int)HttpStatusCode.Conflict, dtse.HttpStatusCode);
}
else
{
await service.CompleteTaskOrchestrationWorkItemAsync(workItem, runtimeState, new List<TaskMessage>(), new List<TaskMessage>(), new List<TaskMessage>(), null, null);

var queryCondition = new OrchestrationInstanceStatusQueryCondition
{
InstanceId = "sub_instance_id",
FetchInput = false,
};

ODataCondition odata = queryCondition.ToOData();
OrchestrationInstanceStatus instanceTableEntity = await instanceTable
.ExecuteQueryAsync<OrchestrationInstanceStatus>(odata.Filter, 1, odata.Select, CancellationToken.None)
.FirstOrDefaultAsync();

// Confirm the instance table was updated with a "stale" status
Assert.IsNotNull(instanceTableEntity);
Assert.AreEqual(OrchestrationStatus.Running.ToString(), instanceTableEntity.RuntimeStatus);
}

}
finally
{
await service?.StopAsync(isForced: true);
}
}

[TestMethod]
public async Task MonitorIdleTaskHubDisconnected()
{
Expand Down
Loading
Loading