Skip to content

[Bug]: callback.result() is re-executing the commands between the callback creation and subsequent callback.result() call #347

@alexiswl

Description

@alexiswl

Expected Behavior

Expected behaviour

callback.result() shouldn't be re-executing external requests once a SendDurableExecutionCallbackSuccess has come through, or this should be better documented

Actual Behavior

Actual Behaviour

I have noticed duplicate calls to my step function start_execution timed almost immediately after the first SendDurableExecutionCallbackSuccess is sent through.

When the second step function (the replay) runs, it then fails at the SendDurableExecutionCallbackSuccess step because the callback id is no longer valid. The replay shouldn't be running in the first place.

Steps to Reproduce

I have provided the durable lambda responsible for the issue

Code

using aws-durable-execution-sdk-python 1.4.0, boto3 1.42.90 and python 3.14)

"""
Throttle the copy jobs from an SQS queue URL

SQS queue forwards the request onto here and we don't return until the copy job step is resolved in the step function we launch.

This way we can throttle through the requests to prevent overload of the ICAv2 API

We take in one request per lambda and run a maximum of 50 lambdas simultaneously.

Once the request comes back we delete the queue from the database too

"""

# Standard library imports
import json
from os import environ
import typing
import boto3

# Durable context imports
from aws_durable_execution_sdk_python import (
    DurableContext,
    durable_execution
)
from aws_durable_execution_sdk_python.config import (
    Duration, CallbackConfig
)

if typing.TYPE_CHECKING:
    from mypy_boto3_stepfunctions.client import SFNClient
    from mypy_boto3_dynamodb.client import DynamoDBClient

# Globals
DATABASE_NAME_ENV_VAR = "DATABASE_NAME"
HANDLE_COPY_JOB_SFN_ARN_ENV_VAR = "HANDLE_COPY_JOB_SFN_ARN"


def get_dynamodb_client() -> 'DynamoDBClient':
    return boto3.client('dynamodb')


def get_sfn_client() -> 'SFNClient':
    return boto3.client('stepfunctions')


@durable_execution
def handler(event, context: DurableContext):
    """
    Expect the following inputs from the event object:
      * inputs
      * engineParameters
      * tags

    :param event:
    :param context:
    :return:
    """

    # Not sure what this will look like from the sqs event source
    for record in event.get("Records", []):
        record_body = json.loads(record.get("body", {}))
        # Check if the event contains the required keys
        required_keys = ['payload']
        for key in required_keys:
            if key not in record_body:
                raise ValueError(f"Missing required key: {key}")

        # Run the durable execution callback configuration
        # Step 1: Create the callback
        callback = context.create_callback(
            name="WESRequestCallback",
            config=CallbackConfig(timeout=Duration.from_minutes(15)),
        )

        # Step 2: Run the step function asynchronously
        get_sfn_client().start_execution(
            stateMachineArn=environ[HANDLE_COPY_JOB_SFN_ARN_ENV_VAR],
            input=json.dumps({
                **record_body,
                "callbackId": callback.callback_id,
            }),
        )

        # Step 3: Wait for the result
        callback.result()

        # Step 2: Delete the item from DynamoDb database in the SQS queue
        if 'taskToken' in record_body.keys() and record_body['taskToken'] is not None:
            try:
                get_dynamodb_client().delete_item(
                    Key={
                        "id": {
                            "S": record_body['taskToken'],
                        },
                        "id_type": {
                            "S": "TASK_TOKEN_SQS"
                        }
                    },
                    TableName=environ[DATABASE_NAME_ENV_VAR]
                )
            # This will get cleaned up later if we cant delete it now for some reason
            except Exception as e:
                continue

SDK Version

1.4.0

Python Version

3.14

Is this a regression?

No

Last Working Version

1.3.0 (I'm not confident on this one)

Additional Context

Context

I want to process messages from an SQS queue with a step function but in such a way that only five step functions are running simultaneously.
There is no way to set max concurrency on step functions so I've used a durable lambda with an SQS event source to then call the step function asynchronously from the lambda.

The step function launched has one particular step at the start which calls an external service that has some stability issues, hence why I want to restrict to running only 5 step functions at a time.
Once the step function has passed this step, it sends a callback to the lambda to unlock it and then continues on with the rest of the step function.

I have noticed that once a step function execution gets passed this step, another step function will launch with the exact same inputs and then fail when the next step function gets to the unlock step (because the second execution is just part of the durable function replay). The second step function starts milliseconds before the durable lambda ends which is why I know this is coming from the lambda, not a redrive from the SQS queue.

Solution

If anyone else comes across this issue, this is what I've found to be effective

#!/usr/bin/env python3

"""
Throttle the copy jobs from an SQS queue URL

SQS queue forwards the request onto here and we don't return until the copy job step is resolved in the step function we launch.

This way we can throttle through the requests to prevent overload of the ICAv2 API

We take in one request per lambda and run a maximum of 50 lambdas simultaneously.

Once the request comes back we delete the queue from the database too

"""


# Standard library imports
import json
from os import environ
import boto3
import typing
from typing import Dict, Any

# Durable context imports
from aws_durable_execution_sdk_python import (
    DurableContext,
    durable_execution
)
from aws_durable_execution_sdk_python.config import (
    Duration, WaitForCallbackConfig, CallbackConfig
)

from aws_durable_execution_sdk_python.retries import RetryStrategyConfig, RetryDecision, create_retry_strategy
from aws_durable_execution_sdk_python.types import WaitForCallbackContext

if typing.TYPE_CHECKING:
    from mypy_boto3_stepfunctions.client import SFNClient
    from mypy_boto3_dynamodb.client import DynamoDBClient

# Globals
DATABASE_NAME_ENV_VAR = "DATABASE_NAME"
HANDLE_COPY_JOB_SFN_ARN_ENV_VAR = "HANDLE_COPY_JOB_SFN_ARN"


def get_dynamodb_client() -> 'DynamoDBClient':
    return boto3.client('dynamodb')


def get_sfn_client() -> 'SFNClient':
    return boto3.client('stepfunctions')


def run_execution(sfn_input: Dict[str, Any], context: DurableContext) -> None:
    # Define the wrapper function
    def submitter(callback_id: str, callback_context: WaitForCallbackContext):
        callback_context.logger.info("Submitting copy job")
        # Step 2: Launch the copy job (asynchronously)
        sfn_object = get_sfn_client().start_execution(
            stateMachineArn=environ[HANDLE_COPY_JOB_SFN_ARN_ENV_VAR],
            input=json.dumps({
                **sfn_input,
                "callbackId": callback_id,
            }),
        )
        callback_context.logger.info(f"Submitting copy job as {sfn_object['executionArn']}")

    # Step 3: Wait here for the callback to be invoked
    context.wait_for_callback(
        submitter=submitter,
        name=None,
        config=WaitForCallbackConfig(
            timeout=Duration.from_minutes(15),
            retry_strategy=create_retry_strategy(
                config=None
            )
        ),
    )

@durable_execution
def handler(event, context: DurableContext):
    """
    Expect the following inputs from the event object:
      * inputs
      * engineParameters
      * tags

    :param event:
    :param context:
    :return:
    """

    # Not sure what this will look like from the sqs event source
    for record in event.get("Records", []):
        record_body = json.loads(record.get("body", {}))
        # Check if the event contains the required keys
        required_keys = ['payload']
        for key in required_keys:
            if key not in record_body:
                raise ValueError(f"Missing required key: {key}")

        # Run the durable execution callback configuration
        run_execution(record_body, context)

        # Step 2: Delete the item from DynamoDb database
        if 'taskToken' in record_body.keys() and record_body['taskToken'] is not None:
            try:
                get_dynamodb_client().delete_item(
                    Key={
                        "id": {
                            "S": record_body['taskToken'],
                        },
                        "id_type": {
                            "S": "TASK_TOKEN_SQS"
                        }
                    },
                    TableName=environ[DATABASE_NAME_ENV_VAR]
                )
            # This will get cleaned up later if we cant delete it now for some reason
            except Exception as e:
                continue

Metadata

Metadata

Assignees

Labels

Type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions