diff --git a/src/corva/handlers.py b/src/corva/handlers.py index 3e748af4..c7ae5e0a 100644 --- a/src/corva/handlers.py +++ b/src/corva/handlers.py @@ -3,6 +3,7 @@ import itertools import logging import sys +import time import warnings from typing import ( Any, @@ -70,66 +71,73 @@ def base_handler( ) -> Callable[[Any, Any], List[Any]]: @functools.wraps(func) def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: - with LoggingContext( - aws_request_id=aws_context.aws_request_id, - asset_id=None, - app_connection_id=None, - handler=logging.StreamHandler(stream=sys.stdout), - user_handler=handler, - logger=CORVA_LOGGER, - ) as logging_ctx: - # Verify either current call from app_decorator or not - # for instance from partial rerun merge - ( - raw_custom_event_type, - custom_handler, - ) = _get_custom_event_type_by_raw_aws_event(aws_event) - is_direct_app_call: bool = not custom_handler - data_transformation_type = raw_custom_event_type or raw_event_type - if merge_events: - aws_event = _merge_events(aws_event, data_transformation_type) - - if ( - is_direct_app_call - and data_transformation_type not in GENERIC_APP_EVENT_TYPES - ): - CORVA_LOGGER.warning( - f"Handler for {data_transformation_type.__name__!r} " - f"event not found. Skipping..." - ) - return [] - - if is_direct_app_call: - # Means current app call is not RawPartialRerunMergeEvent or similar - validate_app_type_context(aws_event, raw_event_type) + start_time = time.time() + try: + with LoggingContext( + aws_request_id=aws_context.aws_request_id, + asset_id=None, + app_connection_id=None, + handler=logging.StreamHandler(stream=sys.stdout), + user_handler=handler, + logger=CORVA_LOGGER, + ) as logging_ctx: + # Verify either current call from app_decorator or not + # for instance from partial rerun merge + ( + raw_custom_event_type, + custom_handler, + ) = _get_custom_event_type_by_raw_aws_event(aws_event) + is_direct_app_call: bool = not custom_handler + data_transformation_type = raw_custom_event_type or raw_event_type + if merge_events: + aws_event = _merge_events(aws_event, data_transformation_type) + + if ( + is_direct_app_call + and data_transformation_type not in GENERIC_APP_EVENT_TYPES + ): + CORVA_LOGGER.warning( + f"Handler for {data_transformation_type.__name__!r} " + f"event not found. Skipping..." + ) + return [] - try: - context = CorvaContext.from_aws( - aws_event=aws_event, aws_context=aws_context - ) + if is_direct_app_call: + # Means current app call is not RawPartialRerunMergeEvent or similar + validate_app_type_context(aws_event, raw_event_type) - redis_client = redis.Redis.from_url( - url=SETTINGS.CACHE_URL, decode_responses=True, max_connections=1 - ) - raw_events = data_transformation_type.from_raw_event(event=aws_event) - specific_callable = custom_handler or func - - results = [ - specific_callable( - raw_event, - context.api_key, - context.aws_request_id, - logging_ctx, - redis_client, + try: + context = CorvaContext.from_aws( + aws_event=aws_event, aws_context=aws_context ) - for raw_event in raw_events - ] - - return results - except Exception: - CORVA_LOGGER.exception("The app failed to execute.") - raise + redis_client = redis.Redis.from_url( + url=SETTINGS.CACHE_URL, decode_responses=True, max_connections=1 + ) + raw_events = data_transformation_type.from_raw_event(event=aws_event) + specific_callable = custom_handler or func + + results = [ + specific_callable( + raw_event, + context.api_key, + context.aws_request_id, + logging_ctx, + redis_client, + ) + for raw_event in raw_events + ] + + return results + + except Exception: + CORVA_LOGGER.exception("The app failed to execute.") + raise + finally: + CORVA_LOGGER.info( + "Total data processing time: %d ms", + int((time.time() - start_time) * 1000), + ) return wrapper