Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 64 additions & 56 deletions src/corva/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import itertools
import logging
import sys
import time
import warnings
from typing import (
Any,
Expand Down Expand Up @@ -70,66 +71,73 @@ def base_handler(
) -> Callable[[Any, Any], List[Any]]:
@functools.wraps(func)
def wrapper(aws_event: Any, aws_context: Any) -> List[Any]:
with LoggingContext(
aws_request_id=aws_context.aws_request_id,
asset_id=None,
app_connection_id=None,
handler=logging.StreamHandler(stream=sys.stdout),
user_handler=handler,
logger=CORVA_LOGGER,
) as logging_ctx:
# Verify either current call from app_decorator or not
# for instance from partial rerun merge
(
raw_custom_event_type,
custom_handler,
) = _get_custom_event_type_by_raw_aws_event(aws_event)
is_direct_app_call: bool = not custom_handler
data_transformation_type = raw_custom_event_type or raw_event_type
if merge_events:
aws_event = _merge_events(aws_event, data_transformation_type)

if (
is_direct_app_call
and data_transformation_type not in GENERIC_APP_EVENT_TYPES
):
CORVA_LOGGER.warning(
f"Handler for {data_transformation_type.__name__!r} "
f"event not found. Skipping..."
)
return []

if is_direct_app_call:
# Means current app call is not RawPartialRerunMergeEvent or similar
validate_app_type_context(aws_event, raw_event_type)
start_time = time.time()
try:
with LoggingContext(
aws_request_id=aws_context.aws_request_id,
asset_id=None,
app_connection_id=None,
handler=logging.StreamHandler(stream=sys.stdout),
user_handler=handler,
logger=CORVA_LOGGER,
) as logging_ctx:
# Verify either current call from app_decorator or not
# for instance from partial rerun merge
(
raw_custom_event_type,
custom_handler,
) = _get_custom_event_type_by_raw_aws_event(aws_event)
is_direct_app_call: bool = not custom_handler
data_transformation_type = raw_custom_event_type or raw_event_type
if merge_events:
aws_event = _merge_events(aws_event, data_transformation_type)

if (
is_direct_app_call
and data_transformation_type not in GENERIC_APP_EVENT_TYPES
):
CORVA_LOGGER.warning(
f"Handler for {data_transformation_type.__name__!r} "
f"event not found. Skipping..."
)
return []

try:
context = CorvaContext.from_aws(
aws_event=aws_event, aws_context=aws_context
)
if is_direct_app_call:
# Means current app call is not RawPartialRerunMergeEvent or similar
validate_app_type_context(aws_event, raw_event_type)

redis_client = redis.Redis.from_url(
url=SETTINGS.CACHE_URL, decode_responses=True, max_connections=1
)
raw_events = data_transformation_type.from_raw_event(event=aws_event)
specific_callable = custom_handler or func

results = [
specific_callable(
raw_event,
context.api_key,
context.aws_request_id,
logging_ctx,
redis_client,
try:
context = CorvaContext.from_aws(
aws_event=aws_event, aws_context=aws_context
)
for raw_event in raw_events
]

return results

except Exception:
CORVA_LOGGER.exception("The app failed to execute.")
raise
redis_client = redis.Redis.from_url(
url=SETTINGS.CACHE_URL, decode_responses=True, max_connections=1
)
raw_events = data_transformation_type.from_raw_event(event=aws_event)
specific_callable = custom_handler or func

results = [
specific_callable(
raw_event,
context.api_key,
context.aws_request_id,
logging_ctx,
redis_client,
)
for raw_event in raw_events
]

return results

except Exception:
CORVA_LOGGER.exception("The app failed to execute.")
raise
finally:
CORVA_LOGGER.info(
"Total data processing time: %d ms",
int((time.time() - start_time) * 1000),
)

return wrapper

Expand Down
Loading