Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[flake8]
ignore = E501,C901
ignore = E501,C901,W503
exclude =
.git
*_pb2*
Expand Down
4 changes: 2 additions & 2 deletions docs/supported-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):
# Orders of $1000 or more require manager approval
yield ctx.call_activity(send_approval_request, input=order)

# Approvals must be received within 24 hours or they will be canceled.
# Approvals must be received within 24 hours or they will be cancelled.
approval_event = ctx.wait_for_external_event("approval_received")
timeout_event = ctx.create_timer(timedelta(hours=24))
winner = yield task.when_any([approval_event, timeout_event])
if winner == timeout_event:
return "Canceled"
return "Cancelled"

# The order was approved
yield ctx.call_activity(place_order, input=order)
Expand Down
4 changes: 3 additions & 1 deletion durabletask-azuremanaged/durabletask/azuremanaged/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,6 @@ def __init__(self, *,
log_handler=log_handler,
log_formatter=log_formatter,
interceptors=interceptors,
concurrency_options=concurrency_options)
concurrency_options=concurrency_options,
maximum_timer_interval=None # DTS allows timers of indefinite length
)
79 changes: 72 additions & 7 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def set_custom_status(self, custom_status: Any) -> None:
pass

@abstractmethod
def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task:
def create_timer(self, fire_at: Union[datetime, timedelta]) -> CancellableTask:
"""Create a Timer Task to fire after at the specified deadline.

Parameters
Expand Down Expand Up @@ -228,10 +228,10 @@ def call_sub_orchestrator(self, orchestrator: Union[Orchestrator[TInput, TOutput
"""
pass

# TOOD: Add a timeout parameter, which allows the task to be canceled if the event is
# TOOD: Add a timeout parameter, which allows the task to be cancelled if the event is
# not received within the specified timeout. This requires support for task cancellation.
@abstractmethod
def wait_for_external_event(self, name: str) -> CompletableTask:
def wait_for_external_event(self, name: str) -> CancellableTask:
"""Wait asynchronously for an event to be raised with the name `name`.

Parameters
Expand Down Expand Up @@ -324,6 +324,10 @@ class OrchestrationStateError(Exception):
pass


class TaskCancelledError(Exception):
"""Exception type for cancelled orchestration tasks."""


class Task(ABC, Generic[T]):
"""Abstract base class for asynchronous tasks in a durable orchestration."""
_result: T
Expand Down Expand Up @@ -435,6 +439,48 @@ def fail(self, message: str, details: Union[Exception, pb.TaskFailureDetails]):
self._parent.on_child_completed(self)


class CancellableTask(CompletableTask[T]):
"""A completable task that can be cancelled before it finishes."""

def __init__(self) -> None:
super().__init__()
self._is_cancelled = False
self._cancel_handler: Optional[Callable[[], None]] = None

@property
def is_cancelled(self) -> bool:
"""Returns True if the task was cancelled, False otherwise."""
return self._is_cancelled

def get_result(self) -> T:
if self._is_cancelled:
raise TaskCancelledError('The task was cancelled.')
return super().get_result()

def set_cancel_handler(self, cancel_handler: Callable[[], None]) -> None:
self._cancel_handler = cancel_handler

def cancel(self) -> bool:
"""Attempts to cancel this task.

Returns
-------
bool
True if cancellation was applied, False if the task had already completed.
"""
if self._is_complete:
return False

if self._cancel_handler is not None:
self._cancel_handler()

self._is_cancelled = True
self._is_complete = True
if self._parent is not None:
self._parent.on_child_completed(self)
return True


class RetryableTask(CompletableTask[T]):
"""A task that can be retried according to a retry policy."""

Expand Down Expand Up @@ -474,13 +520,32 @@ def compute_next_delay(self) -> Optional[timedelta]:
return None


class TimerTask(CompletableTask[T]):
class TimerTask(CancellableTask[None]):
def set_retryable_parent(self, retryable_task: RetryableTask):
self._retryable_parent = retryable_task

def complete(self, _: datetime) -> None:
super().complete(None)

def __init__(self) -> None:

class LongTimerTask(TimerTask):
def __init__(self, final_fire_at: datetime, maximum_timer_interval: timedelta):
super().__init__()
self._final_fire_at = final_fire_at
self._maximum_timer_interval = maximum_timer_interval

def set_retryable_parent(self, retryable_task: RetryableTask):
self._retryable_parent = retryable_task
def start(self, current_utc_datetime: datetime) -> datetime:
return self._get_next_fire_at(current_utc_datetime)

def complete(self, current_utc_datetime: datetime) -> Optional[datetime]:
if current_utc_datetime < self._final_fire_at:
return self._get_next_fire_at(current_utc_datetime)
return super().complete(current_utc_datetime)

def _get_next_fire_at(self, current_utc_datetime: datetime) -> datetime:
if current_utc_datetime + self._maximum_timer_interval < self._final_fire_at:
return current_utc_datetime + self._maximum_timer_interval
return self._final_fire_at


class WhenAnyTask(CompositeTask[Task]):
Expand Down
Loading
Loading