diff --git a/edge_mining/adapters/domain/energy/cli/commands.py b/edge_mining/adapters/domain/energy/cli/commands.py index 78d23a6..6a044de 100644 --- a/edge_mining/adapters/domain/energy/cli/commands.py +++ b/edge_mining/adapters/domain/energy/cli/commands.py @@ -32,6 +32,8 @@ from edge_mining.shared.interfaces.config import EnergyMonitorConfig from edge_mining.shared.logging.port import LoggerPort +from edge_mining.adapters.utils import run_async_func + def select_energy_source_type() -> Optional[EnergySourceType]: """Select an energy source type from the list.""" @@ -263,15 +265,17 @@ def handle_add_energy_source(configuration_service: ConfigurationServiceInterfac ) try: - added: EnergySource = configuration_service.create_energy_source( - name=new_energy_source.name, - source_type=new_energy_source.type, - nominal_power_max=new_energy_source.nominal_power_max, - storage=new_energy_source.storage, - grid=new_energy_source.grid, - external_source=new_energy_source.external_source, - energy_monitor_id=new_energy_source.energy_monitor_id, - forecast_provider_id=new_energy_source.forecast_provider_id, + added: EnergySource = run_async_func( + configuration_service.create_energy_source( + name=new_energy_source.name, + source_type=new_energy_source.type, + nominal_power_max=new_energy_source.nominal_power_max, + storage=new_energy_source.storage, + grid=new_energy_source.grid, + external_source=new_energy_source.external_source, + energy_monitor_id=new_energy_source.energy_monitor_id, + forecast_provider_id=new_energy_source.forecast_provider_id, + ) ) click.echo( click.style( @@ -591,16 +595,18 @@ def update_single_energy_source( new_energy_source.forecast_provider_id = forecast_provider.id try: - updated: EnergySource = configuration_service.update_energy_source( - source_id=new_energy_source.id, - name=new_energy_source.name, - source_type=new_energy_source.type, - nominal_power_max=new_energy_source.nominal_power_max, - storage=new_energy_source.storage, - grid=new_energy_source.grid, - external_source=new_energy_source.external_source, - energy_monitor_id=new_energy_source.energy_monitor_id, - forecast_provider_id=new_energy_source.forecast_provider_id, + updated: EnergySource = run_async_func( + configuration_service.update_energy_source( + source_id=new_energy_source.id, + name=new_energy_source.name, + source_type=new_energy_source.type, + nominal_power_max=new_energy_source.nominal_power_max, + storage=new_energy_source.storage, + grid=new_energy_source.grid, + external_source=new_energy_source.external_source, + energy_monitor_id=new_energy_source.energy_monitor_id, + forecast_provider_id=new_energy_source.forecast_provider_id, + ) ) click.echo( click.style( @@ -634,9 +640,11 @@ def assign_energy_monitor_to_energy_source( return None try: - updated_energy_source = configuration_service.set_energy_monitor_to_energy_source( - energy_source_id=energy_source.id, - energy_monitor_id=energy_monitor.id, + updated_energy_source = run_async_func( + configuration_service.set_energy_monitor_to_energy_source( + energy_source_id=energy_source.id, + energy_monitor_id=energy_monitor.id, + ) ) click.echo( click.style( @@ -672,7 +680,7 @@ def delete_single_energy_source( return False try: - removed_energy_source = configuration_service.remove_energy_source(energy_source.id) + removed_energy_source = run_async_func(configuration_service.remove_energy_source(energy_source.id)) logger.debug(f"Energy Source {removed_energy_source.name} deleted successfully.") click.echo( click.style( @@ -702,9 +710,11 @@ def assign_forecast_provider_to_energy_source( click.echo(click.style("No forecast provider selected. Aborting assignment.", fg="red")) return None try: - updated_energy_source = configuration_service.set_forecast_provider_to_energy_source( - energy_source_id=energy_source.id, - forecast_provider_id=forecast_provider.id, + updated_energy_source = run_async_func( + configuration_service.set_forecast_provider_to_energy_source( + energy_source_id=energy_source.id, + forecast_provider_id=forecast_provider.id, + ) ) click.echo( click.style( @@ -1093,11 +1103,13 @@ def handle_add_energy_monitor( added: Optional[EnergyMonitor] = None try: - added = configuration_service.create_energy_monitor( - name=new_energy_monitor.name, - adapter_type=new_energy_monitor.adapter_type, - config=new_energy_monitor.config, - external_service_id=new_energy_monitor.external_service_id, + added = run_async_func( + configuration_service.create_energy_monitor( + name=new_energy_monitor.name, + adapter_type=new_energy_monitor.adapter_type, + config=new_energy_monitor.config, + external_service_id=new_energy_monitor.external_service_id, + ) ) click.echo( click.style( @@ -1381,11 +1393,13 @@ def update_single_energy_monitor( return None try: - updated_monitor: EnergyMonitor = configuration_service.update_energy_monitor( - monitor_id=new_energy_monitor.id, - name=new_energy_monitor.name, - config=new_energy_monitor.config, - external_service_id=new_energy_monitor.external_service_id, + updated_monitor: EnergyMonitor = run_async_func( + configuration_service.update_energy_monitor( + monitor_id=new_energy_monitor.id, + name=new_energy_monitor.name, + config=new_energy_monitor.config, + external_service_id=new_energy_monitor.external_service_id, + ) ) logger.debug(f"Energy Monitor {updated_monitor.name} updated successfully.") click.echo( @@ -1422,7 +1436,7 @@ def delete_single_energy_monitor( return False try: - removed_energy_monitor = configuration_service.remove_energy_monitor(monitor.id) + removed_energy_monitor = run_async_func(configuration_service.remove_energy_monitor(monitor.id)) logger.debug(f"Energy Monitor {removed_energy_monitor.name} deleted successfully.") click.echo( click.style( diff --git a/edge_mining/adapters/domain/energy/fast_api/router.py b/edge_mining/adapters/domain/energy/fast_api/router.py index d605e23..5b8bc0d 100644 --- a/edge_mining/adapters/domain/energy/fast_api/router.py +++ b/edge_mining/adapters/domain/energy/fast_api/router.py @@ -65,7 +65,7 @@ async def add_energy_source( energy_source_to_add: EnergySource = energy_source_data.to_model() # Add the energy source - created_source = config_service.create_energy_source( + created_source = await config_service.create_energy_source( name=energy_source_to_add.name, source_type=energy_source_to_add.type, nominal_power_max=energy_source_to_add.nominal_power_max, @@ -130,7 +130,7 @@ async def update_energy_source( raise EnergySourceNotFoundError(f"Energy source with id {source_id} not found") # Update the energy source - updated_source = config_service.update_energy_source( + updated_source = await config_service.update_energy_source( source_id=source_id, name=energy_source_update.name or "", source_type=energy_source_update.type, @@ -162,7 +162,7 @@ async def delete_energy_source( ) -> EnergySourceSchema: """Remove an energy source.""" try: - deleted_source = config_service.remove_energy_source(source_id) + deleted_source = await config_service.remove_energy_source(source_id) response = EnergySourceSchema.from_model(deleted_source) @@ -207,7 +207,7 @@ async def add_energy_monitor( raise EnergyMonitorConfigurationError("Energy monitor configuration should be set") # Add the energy monitor - created_monitor = config_service.create_energy_monitor( + created_monitor = await config_service.create_energy_monitor( name=energy_monitor_to_add.name, adapter_type=energy_monitor_to_add.adapter_type, config=energy_monitor_to_add.config, @@ -334,7 +334,7 @@ async def update_energy_monitor( external_service_id = EntityId(uuid.UUID(energy_monitor_update.external_service_id)) # Update the energy monitor - updated_monitor = config_service.update_energy_monitor( + updated_monitor = await config_service.update_energy_monitor( monitor_id=monitor_id, name=energy_monitor_update.name or "", config=cast(EnergyMonitorConfig, configuration), @@ -357,7 +357,7 @@ async def delete_energy_monitor( ) -> EnergyMonitorSchema: """Remove an energy monitor.""" try: - deleted_monitor = config_service.remove_energy_monitor(monitor_id) + deleted_monitor = await config_service.remove_energy_monitor(monitor_id) response = EnergyMonitorSchema.from_model(deleted_monitor) diff --git a/edge_mining/adapters/domain/forecast/cli/commands.py b/edge_mining/adapters/domain/forecast/cli/commands.py index 7c43653..627eee5 100644 --- a/edge_mining/adapters/domain/forecast/cli/commands.py +++ b/edge_mining/adapters/domain/forecast/cli/commands.py @@ -23,6 +23,8 @@ from edge_mining.shared.interfaces.config import ForecastProviderConfig from edge_mining.shared.logging.port import LoggerPort +from edge_mining.adapters.utils import run_async_func + def select_forecast_provider_adapter() -> Optional[ForecastProviderAdapter]: """Select a forecast provider adapter from the available options.""" @@ -163,11 +165,13 @@ def handle_add_forecast_provider( added: Optional[ForecastProvider] = None try: - added = configuration_service.create_forecast_provider( - name=name, - adapter_type=adapter_type, - config=config, - external_service_id=external_service_id, + added = run_async_func( + configuration_service.create_forecast_provider( + name=name, + adapter_type=adapter_type, + config=config, + external_service_id=external_service_id, + ) ) click.echo( click.style( @@ -431,12 +435,14 @@ def update_single_forecast_provider( ) try: - updated: ForecastProvider = configuration_service.update_forecast_provider( - provider_id=forecast_provider.id, - name=name, - adapter_type=forecast_provider.adapter_type, - config=config, - external_service_id=external_service_id, + updated: ForecastProvider = run_async_func( + configuration_service.update_forecast_provider( + provider_id=forecast_provider.id, + name=name, + adapter_type=forecast_provider.adapter_type, + config=config, + external_service_id=external_service_id, + ) ) logger.debug(f"Forecast Provider '{updated.name}' updated successfully.") click.echo( @@ -470,7 +476,7 @@ def delete_single_forecast_provider( return False try: - configuration_service.remove_forecast_provider(forecast_provider.id) + run_async_func(configuration_service.remove_forecast_provider(forecast_provider.id)) logger.debug(f"Forecast Provider '{forecast_provider.name}' deleted successfully.") click.echo( click.style( diff --git a/edge_mining/adapters/domain/forecast/fast_api/router.py b/edge_mining/adapters/domain/forecast/fast_api/router.py index 86f02af..2bb6d1e 100644 --- a/edge_mining/adapters/domain/forecast/fast_api/router.py +++ b/edge_mining/adapters/domain/forecast/fast_api/router.py @@ -63,7 +63,7 @@ async def add_forecast_provider( raise ForecastProviderConfigurationError("Forecast provider configuration should be set") # Add the forecast provider - created_provider = config_service.create_forecast_provider( + created_provider = await config_service.create_forecast_provider( name=forecast_provider_to_add.name, adapter_type=forecast_provider_to_add.adapter_type, config=forecast_provider_to_add.config, @@ -190,7 +190,7 @@ async def update_forecast_provider( external_service_id = EntityId(uuid.UUID(forecast_provider_update.external_service_id)) # Update the forecast provider - updated_provider = config_service.update_forecast_provider( + updated_provider = await config_service.update_forecast_provider( provider_id=provider_id, name=forecast_provider_update.name or "", adapter_type=forecast_provider.adapter_type, @@ -214,7 +214,7 @@ async def delete_forecast_provider( ) -> ForecastProviderSchema: """Remove a forecast provider.""" try: - deleted_provider = config_service.remove_forecast_provider(provider_id) + deleted_provider = await config_service.remove_forecast_provider(provider_id) response = ForecastProviderSchema.from_model(deleted_provider) diff --git a/edge_mining/adapters/domain/miner/cli/commands.py b/edge_mining/adapters/domain/miner/cli/commands.py index cb048b5..e193619 100644 --- a/edge_mining/adapters/domain/miner/cli/commands.py +++ b/edge_mining/adapters/domain/miner/cli/commands.py @@ -86,12 +86,14 @@ def handle_add_miner(configuration_service: ConfigurationServiceInterface, logge ) try: - added = configuration_service.add_miner( - name=new_miner.name, - model=new_miner.model, - hash_rate_max=new_miner.hash_rate_max, - power_consumption_max=new_miner.power_consumption_max, - controller_id=new_miner.controller_id, + added = run_async_func( + configuration_service.add_miner( + name=new_miner.name, + model=new_miner.model, + hash_rate_max=new_miner.hash_rate_max, + power_consumption_max=new_miner.power_consumption_max, + controller_id=new_miner.controller_id, + ) ) click.echo( click.style( @@ -293,13 +295,15 @@ def update_single_miner( hash_rate_max = HashRate(value=hash_rate, unit=hash_rate_unit) try: - updated = configuration_service.update_miner( - miner_id=selected_miner.id, - name=name, - model=model if model else None, - hash_rate_max=hash_rate_max, - power_consumption_max=Watts(power_consumption), - controller_id=EntityId(controller_id) if controller_id else None, + updated = run_async_func( + configuration_service.update_miner( + miner_id=selected_miner.id, + name=name, + model=model if model else None, + hash_rate_max=hash_rate_max, + power_consumption_max=Watts(power_consumption), + controller_id=EntityId(controller_id) if controller_id else None, + ) ) click.echo( click.style( @@ -333,7 +337,7 @@ def delete_single_miner( click.echo(click.style("Deletion cancelled.", fg="yellow")) return False try: - removed_miner = configuration_service.remove_miner(miner_id=selected_miner.id) + removed_miner = run_async_func(configuration_service.remove_miner(miner_id=selected_miner.id)) logger.info(f"Miner '{removed_miner.name}' (ID: {removed_miner.id}) successfully removed.") click.echo( click.style( @@ -366,14 +370,16 @@ def assign_controller_to_miner( try: selected_miner.controller_id = controller.id - updated_miner = configuration_service.update_miner( - miner_id=selected_miner.id, - name=selected_miner.name, - model=selected_miner.model, - hash_rate_max=selected_miner.hash_rate_max, - power_consumption_max=selected_miner.power_consumption_max, - controller_id=selected_miner.controller_id, - active=selected_miner.active, + updated_miner = run_async_func( + configuration_service.update_miner( + miner_id=selected_miner.id, + name=selected_miner.name, + model=selected_miner.model, + hash_rate_max=selected_miner.hash_rate_max, + power_consumption_max=selected_miner.power_consumption_max, + controller_id=selected_miner.controller_id, + active=selected_miner.active, + ) ) click.echo( click.style( @@ -573,7 +579,7 @@ def manage_single_miner_menu( if choice == "1": try: - miner = configuration_service.activate_miner(miner.id) + miner = run_async_func(configuration_service.activate_miner(miner.id)) logger.info(f"Miner {miner.name} activated successfully.") except Exception as e: logger.error(f"Error activating miner: {e}") @@ -585,7 +591,7 @@ def manage_single_miner_menu( elif choice == "2": try: - miner = configuration_service.deactivate_miner(miner.id) + miner = run_async_func(configuration_service.deactivate_miner(miner.id)) logger.info(f"Miner {miner.name} deactivated successfully.") except Exception as e: logger.error(f"Error deactivating miner: {e}") @@ -916,11 +922,13 @@ def handle_add_miner_controller( return None try: - added_controller = configuration_service.add_miner_controller( - name=new_controller.name, - adapter=new_controller.adapter_type, - config=new_controller.config, - external_service_id=new_controller.external_service_id, + added_controller = run_async_func( + configuration_service.add_miner_controller( + name=new_controller.name, + adapter=new_controller.adapter_type, + config=new_controller.config, + external_service_id=new_controller.external_service_id, + ) ) click.echo( click.style( @@ -1045,8 +1053,10 @@ def update_single_miner_controller( return None try: - updated_controller = configuration_service.update_miner_controller( - controller_id=controller.id, name=name, config=config, external_service_id=external_service_id + updated_controller = run_async_func( + configuration_service.update_miner_controller( + controller_id=controller.id, name=name, config=config, external_service_id=external_service_id + ) ) logger.info(f"Miner Controller '{updated_controller.name}' (ID: {updated_controller.id}) successfully updated.") except Exception as e: @@ -1080,7 +1090,7 @@ def delete_single_miner_controller( return False try: - removed_controller = configuration_service.remove_miner_controller(controller_id=controller.id) + removed_controller = run_async_func(configuration_service.remove_miner_controller(controller_id=controller.id)) logger.info(f"Miner Controller '{removed_controller.name}' (ID: {removed_controller.id}) successfully removed.") except Exception as e: logger.error(f"Error removing miner controller: {e}") diff --git a/edge_mining/adapters/domain/miner/fast_api/router.py b/edge_mining/adapters/domain/miner/fast_api/router.py index 2236f37..4bc6acf 100644 --- a/edge_mining/adapters/domain/miner/fast_api/router.py +++ b/edge_mining/adapters/domain/miner/fast_api/router.py @@ -93,7 +93,7 @@ async def add_miner( try: miner_to_add: Miner = miner_schema.to_model() - new_miner = config_service.add_miner( + new_miner = await config_service.add_miner( name=miner_to_add.name, model=miner_to_add.model, hash_rate_max=miner_to_add.hash_rate_max, @@ -126,7 +126,7 @@ async def update_miner( Watts(miner_update.power_consumption_max) if miner_update.power_consumption_max is not None else None ) - miner_updated = config_service.update_miner( + miner_updated = await config_service.update_miner( miner_id=miner.id, name=miner_update.name or "", model=miner_update.model, @@ -152,7 +152,7 @@ async def remove_miner( ) -> MinerSchema: """Remove a miner.""" try: - deleted_miner = config_service.remove_miner(miner_id) + deleted_miner = await config_service.remove_miner(miner_id) response = MinerSchema.from_model(deleted_miner) @@ -302,7 +302,7 @@ async def activate_miner( ) -> MinerSchema: """Activate a miner.""" try: - miner = config_service.activate_miner(miner_id) + miner = await config_service.activate_miner(miner_id) response = MinerSchema.from_model(miner) @@ -320,7 +320,7 @@ async def deactivate_miner( ) -> MinerSchema: """Deactivate a miner.""" try: - miner = config_service.deactivate_miner(miner_id) + miner = await config_service.deactivate_miner(miner_id) response = MinerSchema.from_model(miner) @@ -344,7 +344,7 @@ async def set_miner_controller( if miner is None: raise MinerNotFoundError(f"Miner with ID {miner_id} not found") - config_service.set_miner_controller(miner_id, controller_id) + await config_service.set_miner_controller(miner_id, controller_id) response = MinerSchema.from_model(miner) @@ -388,7 +388,7 @@ async def add_miner_controller( if controller_to_add.config is None: raise MinerControllerConfigurationError("Miner controller configuration should be set") - new_controller = config_service.add_miner_controller( + new_controller = await config_service.add_miner_controller( name=controller_to_add.name, adapter=controller_to_add.adapter_type, config=controller_to_add.config, @@ -538,7 +538,7 @@ async def update_miner_controller( if controller_update.external_service_id: external_service_id = EntityId(uuid.UUID(controller_update.external_service_id)) - updated_controller = config_service.update_miner_controller( + updated_controller = await config_service.update_miner_controller( controller_id=controller.id, name=controller_update.name or "", config=cast(MinerControllerConfig, configuration), @@ -561,7 +561,7 @@ async def remove_miner_controller( ) -> MinerControllerSchema: """Remove a miner controller.""" try: - deleted_controller = config_service.remove_miner_controller(controller_id) + deleted_controller = await config_service.remove_miner_controller(controller_id) response = MinerControllerSchema.from_model(deleted_controller) diff --git a/edge_mining/adapters/domain/notification/cli/commands.py b/edge_mining/adapters/domain/notification/cli/commands.py index a526c11..5449458 100644 --- a/edge_mining/adapters/domain/notification/cli/commands.py +++ b/edge_mining/adapters/domain/notification/cli/commands.py @@ -28,6 +28,8 @@ from edge_mining.shared.interfaces.config import NotificationConfig from edge_mining.shared.logging.port import LoggerPort +from edge_mining.adapters.utils import run_async_func + def select_notifier_adapter() -> Optional[NotificationAdapter]: """Select a notifier adapter type from the list.""" @@ -149,11 +151,13 @@ def handle_add_notifier(configuration_service: ConfigurationServiceInterface, lo added: Optional[Notifier] = None try: - added = configuration_service.add_notifier( - name=new_notifier.name, - adapter_type=new_notifier.adapter_type, - config=new_notifier.config, - external_service_id=new_notifier.external_service_id, + added = run_async_func( + configuration_service.add_notifier( + name=new_notifier.name, + adapter_type=new_notifier.adapter_type, + config=new_notifier.config, + external_service_id=new_notifier.external_service_id, + ) ) click.echo( click.style( @@ -545,11 +549,13 @@ def update_single_notifier( return None try: - updated_notifier = configuration_service.update_notifier( - notifier_id=new_notifier.id, - name=new_notifier.name, - config=new_notifier.config, - external_service_id=new_notifier.external_service_id, + updated_notifier = run_async_func( + configuration_service.update_notifier( + notifier_id=new_notifier.id, + name=new_notifier.name, + config=new_notifier.config, + external_service_id=new_notifier.external_service_id, + ) ) click.echo( click.style( @@ -585,7 +591,7 @@ def delete_single_notifier( return False try: - removed = configuration_service.remove_notifier(notifier_id=notifier.id) + removed = run_async_func(configuration_service.remove_notifier(notifier_id=notifier.id)) logger.info(f"Notifier '{removed.name}' (ID: {removed.id}) successfully removed.") click.echo( click.style( diff --git a/edge_mining/adapters/domain/notification/fast_api/router.py b/edge_mining/adapters/domain/notification/fast_api/router.py index 4b25119..8ad5864 100644 --- a/edge_mining/adapters/domain/notification/fast_api/router.py +++ b/edge_mining/adapters/domain/notification/fast_api/router.py @@ -149,7 +149,7 @@ async def add_notifier( if notifier_to_add.config is None: raise NotifierConfigurationError("Notifier configuration should be set") - new_notifier = config_service.add_notifier( + new_notifier = await config_service.add_notifier( name=notifier_to_add.name, adapter_type=notifier_to_add.adapter_type, config=notifier_to_add.config, @@ -193,7 +193,7 @@ async def update_notifier( if notifier_update.external_service_id: external_service_id = EntityId(uuid.UUID(notifier_update.external_service_id)) - updated_notifier = config_service.update_notifier( + updated_notifier = await config_service.update_notifier( notifier_id=notifier.id, name=notifier_update.name or "", config=cast(NotificationConfig, configuration), @@ -216,7 +216,7 @@ async def remove_notifier( ) -> NotifierSchema: """Remove a notifier.""" try: - deleted_notifier = config_service.remove_notifier(notifier_id) + deleted_notifier = await config_service.remove_notifier(notifier_id) response = NotifierSchema.from_model(deleted_notifier) diff --git a/edge_mining/adapters/domain/optimization_unit/cli/commands.py b/edge_mining/adapters/domain/optimization_unit/cli/commands.py index 7b6c8c9..0bb34b6 100644 --- a/edge_mining/adapters/domain/optimization_unit/cli/commands.py +++ b/edge_mining/adapters/domain/optimization_unit/cli/commands.py @@ -20,6 +20,8 @@ from edge_mining.domain.policy.aggregate_roots import OptimizationPolicy from edge_mining.shared.logging.port import LoggerPort +from edge_mining.adapters.utils import run_async_func + def handle_add_optimization_unit(configuration_service: ConfigurationServiceInterface, logger: LoggerPort): """Menu to add a new optimization unit.""" @@ -80,15 +82,17 @@ def handle_add_optimization_unit(configuration_service: ConfigurationServiceInte target_miner_ids = [m.id for m in selected_miners] if selected_miners else [] notifier_ids = [n.id for n in selected_notifiers] if selected_notifiers else [] - created = configuration_service.create_optimization_unit( - name=name, - description=description if description else None, - energy_source_id=selected_energy_source.id if selected_energy_source else None, - target_miner_ids=target_miner_ids, - policy_id=selected_policy.id if selected_policy else None, - home_forecast_provider_id=home_forecast_provider_id, - performance_tracker_id=performance_tracker_id, - notifier_ids=notifier_ids, + created = run_async_func( + configuration_service.create_optimization_unit( + name=name, + description=description if description else None, + energy_source_id=selected_energy_source.id if selected_energy_source else None, + target_miner_ids=target_miner_ids, + policy_id=selected_policy.id if selected_policy else None, + home_forecast_provider_id=home_forecast_provider_id, + performance_tracker_id=performance_tracker_id, + notifier_ids=notifier_ids, + ) ) if not created: raise ValueError("Failed to create the optimization unit.") @@ -233,7 +237,7 @@ def handle_activate_optimization_unit( return try: - configuration_service.activate_optimization_unit(optimization_unit.id) + run_async_func(configuration_service.activate_optimization_unit(optimization_unit.id)) click.echo(click.style("Optimization unit activated successfully.", fg="green")) except Exception as e: click.echo(click.style(f"Error activating optimization unit: {e}", fg="red")) @@ -252,7 +256,7 @@ def handle_deactivate_optimization_unit( return try: - configuration_service.deactivate_optimization_unit(optimization_unit.id) + run_async_func(configuration_service.deactivate_optimization_unit(optimization_unit.id)) click.echo(click.style("Optimization unit deactivated successfully.", fg="green")) except Exception as e: click.echo(click.style(f"Error deactivating optimization unit: {e}", fg="red")) @@ -340,17 +344,19 @@ def update_optimization_unit( # Home forecast provider and performance tracker updates will be implemented in the next release try: - updated = configuration_service.update_optimization_unit( - unit_id=optimization_unit.id, - name=new_optimization_unit.name, - description=new_optimization_unit.description, - energy_source_id=new_optimization_unit.energy_source_id, - target_miner_ids=new_optimization_unit.target_miner_ids, - policy_id=new_optimization_unit.policy_id, - home_forecast_provider_id=new_optimization_unit.home_forecast_provider_id, - performance_tracker_id=new_optimization_unit.performance_tracker_id, - notifier_ids=new_optimization_unit.notifier_ids, - is_enabled=new_optimization_unit.is_enabled, + updated = run_async_func( + configuration_service.update_optimization_unit( + unit_id=optimization_unit.id, + name=new_optimization_unit.name, + description=new_optimization_unit.description, + energy_source_id=new_optimization_unit.energy_source_id, + target_miner_ids=new_optimization_unit.target_miner_ids, + policy_id=new_optimization_unit.policy_id, + home_forecast_provider_id=new_optimization_unit.home_forecast_provider_id, + performance_tracker_id=new_optimization_unit.performance_tracker_id, + notifier_ids=new_optimization_unit.notifier_ids, + is_enabled=new_optimization_unit.is_enabled, + ) ) click.echo( click.style( @@ -385,7 +391,7 @@ def delete_single_optimization_unit( return False try: - configuration_service.remove_optimization_unit(optimization_unit.id) + run_async_func(configuration_service.remove_optimization_unit(optimization_unit.id)) click.echo(click.style("Optimization unit deleted successfully.", fg="green")) return True except Exception as e: @@ -408,8 +414,10 @@ def manage_assign_energy_source( return try: - configuration_service.assign_energy_source_to_optimization_unit( - unit_id=optimization_unit.id, energy_source_id=selected_energy_source.id + run_async_func( + configuration_service.assign_energy_source_to_optimization_unit( + unit_id=optimization_unit.id, energy_source_id=selected_energy_source.id + ) ) click.echo(click.style(f"Energy source '{selected_energy_source.name}' assigned successfully.", fg="green")) except Exception as e: @@ -432,8 +440,10 @@ def manage_assign_optimization_policy( return try: - configuration_service.assign_policy_to_optimization_unit( - unit_id=optimization_unit.id, policy_id=selected_policy.id + run_async_func( + configuration_service.assign_policy_to_optimization_unit( + unit_id=optimization_unit.id, policy_id=selected_policy.id + ) ) click.echo(click.style(f"Optimization policy '{selected_policy.name}' assigned successfully.", fg="green")) except Exception as e: @@ -461,8 +471,10 @@ def manage_assign_target_miners( try: target_miner_ids = [m.id for m in selected_miners] - configuration_service.assign_miners_to_optimization_unit( - unit_id=optimization_unit.id, miner_ids=target_miner_ids + run_async_func( + configuration_service.assign_miners_to_optimization_unit( + unit_id=optimization_unit.id, miner_ids=target_miner_ids + ) ) click.echo(click.style(f"{len(selected_miners)} Target miners assigned successfully.", fg="green")) except Exception as e: @@ -504,7 +516,11 @@ def manage_add_target_miner( return try: - configuration_service.add_miner_to_optimization_unit(unit_id=optimization_unit.id, miner_id=selected_miner.id) + run_async_func( + configuration_service.add_miner_to_optimization_unit( + unit_id=optimization_unit.id, miner_id=selected_miner.id + ) + ) click.echo(click.style(f"Target miner '{selected_miner.name}' added successfully.", fg="green")) except Exception as e: logger.error(f"Error adding target miner: {e}") @@ -540,8 +556,10 @@ def manage_remove_target_miner( selected_miner = selected_miner[0] try: - configuration_service.remove_miner_from_optimization_unit( - unit_id=optimization_unit.id, miner_id=selected_miner.id + run_async_func( + configuration_service.remove_miner_from_optimization_unit( + unit_id=optimization_unit.id, miner_id=selected_miner.id + ) ) click.echo(click.style(f"Target miner '{selected_miner.name}' removed successfully.", fg="green")) except Exception as e: @@ -569,8 +587,10 @@ def manage_assign_notifiers( try: notifier_ids = [n.id for n in selected_notifiers] if selected_notifiers else [] - configuration_service.assign_notifiers_to_optimization_unit( - unit_id=optimization_unit.id, notifier_ids=notifier_ids + run_async_func( + configuration_service.assign_notifiers_to_optimization_unit( + unit_id=optimization_unit.id, notifier_ids=notifier_ids + ) ) click.echo(click.style(f"{len(notifier_ids)} Notifiers assigned successfully.", fg="green")) except Exception as e: @@ -602,8 +622,10 @@ def manage_add_notifier( selected_notifier = selected_notifier[0] try: - configuration_service.add_notifier_to_optimization_unit( - unit_id=optimization_unit.id, notifier_id=selected_notifier.id + run_async_func( + configuration_service.add_notifier_to_optimization_unit( + unit_id=optimization_unit.id, notifier_id=selected_notifier.id + ) ) click.echo(click.style(f"Notifier '{selected_notifier.name}' added successfully.", fg="green")) except Exception as e: @@ -641,8 +663,10 @@ def manage_remove_notifier( return try: - configuration_service.remove_notifier_from_optimization_unit( - unit_id=optimization_unit.id, notifier_id=selected_notifier.id + run_async_func( + configuration_service.remove_notifier_from_optimization_unit( + unit_id=optimization_unit.id, notifier_id=selected_notifier.id + ) ) click.echo(click.style(f"Notifier '{selected_notifier.name}' removed successfully.", fg="green")) except Exception as e: diff --git a/edge_mining/adapters/domain/optimization_unit/fast_api/router.py b/edge_mining/adapters/domain/optimization_unit/fast_api/router.py index f9989ad..dba5fad 100644 --- a/edge_mining/adapters/domain/optimization_unit/fast_api/router.py +++ b/edge_mining/adapters/domain/optimization_unit/fast_api/router.py @@ -56,7 +56,7 @@ async def add_optimization_unit( optimization_unit_to_add: EnergyOptimizationUnit = optimization_unit_data.to_model() # Add the optimization unit - created_unit = config_service.create_optimization_unit( + created_unit = await config_service.create_optimization_unit( name=optimization_unit_to_add.name, description=optimization_unit_to_add.description, policy_id=optimization_unit_to_add.policy_id, @@ -140,7 +140,7 @@ async def update_optimization_unit( notifier_ids = [EntityId(uuid.UUID(notifier_id)) for notifier_id in optimization_unit_update.notifier_ids] # Update the optimization unit - updated_unit = config_service.update_optimization_unit( + updated_unit = await config_service.update_optimization_unit( unit_id=unit_id, name=optimization_unit_update.name or "", description=optimization_unit_update.description, @@ -168,7 +168,7 @@ async def delete_optimization_unit( ) -> EnergyOptimizationUnitSchema: """Remove an optimization unit.""" try: - deleted_unit = config_service.remove_optimization_unit(unit_id) + deleted_unit = await config_service.remove_optimization_unit(unit_id) response = EnergyOptimizationUnitSchema.from_model(deleted_unit) @@ -191,7 +191,7 @@ async def enable_optimization_unit( if optimization_unit is None: raise OptimizationUnitNotFoundError(f"Optimization Unit with ID {unit_id} not found") - enabled_unit = config_service.activate_optimization_unit(unit_id) + enabled_unit = await config_service.activate_optimization_unit(unit_id) response = EnergyOptimizationUnitSchema.from_model(enabled_unit) @@ -216,7 +216,7 @@ async def disable_optimization_unit( if optimization_unit is None: raise OptimizationUnitNotFoundError(f"Optimization Unit with ID {unit_id} not found") - disabled_unit = config_service.deactivate_optimization_unit(unit_id) + disabled_unit = await config_service.deactivate_optimization_unit(unit_id) response = EnergyOptimizationUnitSchema.from_model(disabled_unit) @@ -240,7 +240,7 @@ async def assign_energy_source( if optimization_unit is None: raise OptimizationUnitNotFoundError(f"Optimization Unit with ID {unit_id} not found") - updated_unit = config_service.assign_energy_source_to_optimization_unit(unit_id, energy_source_id) + updated_unit = await config_service.assign_energy_source_to_optimization_unit(unit_id, energy_source_id) response = EnergyOptimizationUnitSchema.from_model(updated_unit) @@ -264,7 +264,7 @@ async def assign_policy( if optimization_unit is None: raise OptimizationUnitNotFoundError(f"Optimization Unit with ID {unit_id} not found") - updated_unit = config_service.assign_policy_to_optimization_unit(unit_id, policy_id) + updated_unit = await config_service.assign_policy_to_optimization_unit(unit_id, policy_id) response = EnergyOptimizationUnitSchema.from_model(updated_unit) @@ -288,7 +288,7 @@ async def assign_miners( if optimization_unit is None: raise OptimizationUnitNotFoundError(f"Optimization Unit with ID {unit_id} not found") - updated_unit = config_service.assign_miners_to_optimization_unit(unit_id, miner_ids) + updated_unit = await config_service.assign_miners_to_optimization_unit(unit_id, miner_ids) response = EnergyOptimizationUnitSchema.from_model(updated_unit) @@ -312,7 +312,7 @@ async def add_target_miner( if optimization_unit is None: raise OptimizationUnitNotFoundError(f"Optimization Unit with ID {unit_id} not found") - updated_unit = config_service.add_miner_to_optimization_unit(unit_id, miner_id) + updated_unit = await config_service.add_miner_to_optimization_unit(unit_id, miner_id) response = EnergyOptimizationUnitSchema.from_model(updated_unit) @@ -336,7 +336,7 @@ async def remove_target_miner( if optimization_unit is None: raise OptimizationUnitNotFoundError(f"Optimization Unit with ID {unit_id} not found") - updated_unit = config_service.remove_miner_from_optimization_unit(unit_id, miner_id) + updated_unit = await config_service.remove_miner_from_optimization_unit(unit_id, miner_id) response = EnergyOptimizationUnitSchema.from_model(updated_unit) @@ -360,7 +360,7 @@ async def assign_notifiers( if optimization_unit is None: raise OptimizationUnitNotFoundError(f"Optimization Unit with ID {unit_id} not found") - updated_unit = config_service.assign_notifiers_to_optimization_unit(unit_id, notifier_ids) + updated_unit = await config_service.assign_notifiers_to_optimization_unit(unit_id, notifier_ids) response = EnergyOptimizationUnitSchema.from_model(updated_unit) @@ -384,7 +384,7 @@ async def add_notifier( if optimization_unit is None: raise OptimizationUnitNotFoundError(f"Optimization Unit with ID {unit_id} not found") - updated_unit = config_service.add_notifier_to_optimization_unit(unit_id, notifier_id) + updated_unit = await config_service.add_notifier_to_optimization_unit(unit_id, notifier_id) response = EnergyOptimizationUnitSchema.from_model(updated_unit) @@ -408,7 +408,7 @@ async def remove_notifier( if optimization_unit is None: raise OptimizationUnitNotFoundError(f"Optimization Unit with ID {unit_id} not found") - updated_unit = config_service.remove_notifier_from_optimization_unit(unit_id, notifier_id) + updated_unit = await config_service.remove_notifier_from_optimization_unit(unit_id, notifier_id) response = EnergyOptimizationUnitSchema.from_model(updated_unit) diff --git a/edge_mining/adapters/domain/policy/cli/commands.py b/edge_mining/adapters/domain/policy/cli/commands.py index f8f0a96..18f8237 100644 --- a/edge_mining/adapters/domain/policy/cli/commands.py +++ b/edge_mining/adapters/domain/policy/cli/commands.py @@ -18,6 +18,8 @@ from edge_mining.domain.policy.exceptions import PolicyError, PolicyNotFoundError from edge_mining.shared.logging.port import LoggerPort +from edge_mining.adapters.utils import run_async_func + def select_mining_decision() -> Optional[MiningDecision]: """Select a mining decision from the available options.""" @@ -106,7 +108,7 @@ def handle_add_optimization_policy(configuration_service: ConfigurationServiceIn description: str = click.prompt("Description (optional)", type=str, default="") try: - new_policy = configuration_service.create_policy(name=name, description=description) + new_policy = run_async_func(configuration_service.create_policy(name=name, description=description)) if not new_policy: click.echo(click.style("Failed to create optimization policy.", fg="red")) @@ -146,13 +148,15 @@ def handle_add_optimization_policy(configuration_service: ConfigurationServiceIn conditions = create_rule_conditions() if conditions: - configuration_service.add_rule_to_policy( - policy_id=new_policy.id, - rule_type=RuleType.START, - name=rule_name, - priority=rule_priority, - conditions=conditions, - description=rule_description, + run_async_func( + configuration_service.add_rule_to_policy( + policy_id=new_policy.id, + rule_type=RuleType.START, + name=rule_name, + priority=rule_priority, + conditions=conditions, + description=rule_description, + ) ) click.echo(click.style(f"Start rule '{rule_name}' added!", fg="green")) @@ -171,13 +175,15 @@ def handle_add_optimization_policy(configuration_service: ConfigurationServiceIn conditions = create_rule_conditions() if conditions: - configuration_service.add_rule_to_policy( - policy_id=new_policy.id, - rule_type=RuleType.STOP, - name=rule_name, - priority=rule_priority, - conditions=conditions, - description=rule_description, + run_async_func( + configuration_service.add_rule_to_policy( + policy_id=new_policy.id, + rule_type=RuleType.STOP, + name=rule_name, + priority=rule_priority, + conditions=conditions, + description=rule_description, + ) ) click.echo(click.style(f"Stop rule '{rule_name}' added!", fg="green")) @@ -370,7 +376,7 @@ def delete_single_optimization_policy( return False try: - configuration_service.delete_policy(policy.id) + run_async_func(configuration_service.delete_policy(policy.id)) click.echo(click.style("Optimization policy deleted successfully!", fg="green")) return True except (PolicyError, PolicyNotFoundError) as e: @@ -416,7 +422,7 @@ def manage_single_optimization_policy_menu( return "q" elif choice == "2": try: - configuration_service.sort_policy_rules(policy.id) + run_async_func(configuration_service.sort_policy_rules(policy.id)) click.echo( click.style( f"Rules of Policy '{policy.name}' sorted by priority.", @@ -520,13 +526,15 @@ def add_rule_to_policy( return try: - configuration_service.add_rule_to_policy( - policy_id=policy.id, - rule_type=rule_type, - name=rule_name, - priority=rule_priority, - conditions=conditions, - description=rule_description, + run_async_func( + configuration_service.add_rule_to_policy( + policy_id=policy.id, + rule_type=rule_type, + name=rule_name, + priority=rule_priority, + conditions=conditions, + description=rule_description, + ) ) click.echo( click.style( @@ -591,14 +599,16 @@ def edit_policy_rule( click.echo(click.style("No conditions specified. Rule not updated.", fg="red")) return - configuration_service.update_policy_rule( - policy_id=policy.id, - rule_id=selected_rule.id, - name=new_name, - priority=new_rule_priority, - conditions=new_conditions, - description=new_description, - enabled=rule_enabled, + run_async_func( + configuration_service.update_policy_rule( + policy_id=policy.id, + rule_id=selected_rule.id, + name=new_name, + priority=new_rule_priority, + conditions=new_conditions, + description=new_description, + enabled=rule_enabled, + ) ) click.echo(click.style(f"Rule '{new_name}' updated successfully!", fg="green")) @@ -646,7 +656,7 @@ def delete_policy_rule( ): return - configuration_service.delete_policy_rule(policy_id=policy.id, rule_id=selected_rule.id) + run_async_func(configuration_service.delete_policy_rule(policy_id=policy.id, rule_id=selected_rule.id)) click.echo( click.style( f"Rule '{selected_rule.name}' deleted successfully!", diff --git a/edge_mining/adapters/domain/policy/fast_api/router.py b/edge_mining/adapters/domain/policy/fast_api/router.py index 3a78565..8349051 100644 --- a/edge_mining/adapters/domain/policy/fast_api/router.py +++ b/edge_mining/adapters/domain/policy/fast_api/router.py @@ -63,14 +63,14 @@ async def add_policy( policy_to_add: OptimizationPolicy = policy_schema.to_model() # Create policy using configuration service - new_policy = config_service.create_policy( + new_policy = await config_service.create_policy( name=policy_to_add.name, description=policy_to_add.description or "", ) if policy_to_add.start_rules: for rule in policy_to_add.start_rules: - config_service.add_rule_to_policy( + await config_service.add_rule_to_policy( policy_id=new_policy.id, rule_type=RuleType.START, name=rule.name, @@ -80,7 +80,7 @@ async def add_policy( ) if policy_to_add.stop_rules: for rule in policy_to_add.stop_rules: - config_service.add_rule_to_policy( + await config_service.add_rule_to_policy( policy_id=new_policy.id, rule_type=RuleType.STOP, name=rule.name, @@ -130,7 +130,7 @@ async def update_policy( raise HTTPException(status_code=404, detail="Policy not found") # Update policy fields - updated_policy = config_service.update_policy( + updated_policy = await config_service.update_policy( policy_id=policy_id, name=policy_update.name or existing_policy.name, description=policy_update.description or existing_policy.description or "", @@ -159,7 +159,7 @@ async def delete_policy( raise HTTPException(status_code=404, detail="Policy not found") # Delete policy - config_service.delete_policy(policy_id) + await config_service.delete_policy(policy_id) return OptimizationPolicySchema.from_model(policy) @@ -243,7 +243,7 @@ async def add_rule_to_policy( try: rule_to_add: AutomationRule = rule_schema.to_model() - new_rule = config_service.add_rule_to_policy( + new_rule = await config_service.add_rule_to_policy( policy_id=policy_id, rule_type=rule_type, name=rule_to_add.name, @@ -309,7 +309,7 @@ async def enable_policy_rule( ) -> AutomationRuleSchema: """Enable a specific rule for a policy""" try: - rule: AutomationRule = config_service.enable_policy_rule(policy_id, rule_id) + rule: AutomationRule = await config_service.enable_policy_rule(policy_id, rule_id) return AutomationRuleSchema.from_model(rule) except PolicyNotFoundError as e: @@ -328,7 +328,7 @@ async def disable_policy_rule( ) -> AutomationRuleSchema: """Disable a specific rule for a policy""" try: - rule: AutomationRule = config_service.disable_policy_rule(policy_id, rule_id) + rule: AutomationRule = await config_service.disable_policy_rule(policy_id, rule_id) return AutomationRuleSchema.from_model(rule) except PolicyNotFoundError as e: @@ -357,7 +357,7 @@ async def update_policy_rule( if rule_schema.conditions is not None: conditions = rule_schema.conditions.to_model() - updated_rule = config_service.update_policy_rule( + updated_rule = await config_service.update_policy_rule( policy_id=policy_id, rule_id=rule_id, name=rule_schema.name or existing_rule.name, @@ -392,7 +392,7 @@ async def delete_policy_rule( if not rule: raise HTTPException(status_code=404, detail="Rule not found") - deleted_rule = config_service.delete_policy_rule(policy_id, rule_id) + deleted_rule = await config_service.delete_policy_rule(policy_id, rule_id) return AutomationRuleSchema.from_model(deleted_rule) except PolicyNotFoundError as e: diff --git a/edge_mining/adapters/infrastructure/cli/commands.py b/edge_mining/adapters/infrastructure/cli/commands.py index 279663e..da194a4 100644 --- a/edge_mining/adapters/infrastructure/cli/commands.py +++ b/edge_mining/adapters/infrastructure/cli/commands.py @@ -14,6 +14,8 @@ from edge_mining.domain.miner.value_objects import HashRate from edge_mining.shared.infrastructure import Services +from edge_mining.adapters.utils import run_async_func + @cli.command("run-evaluation") @click.pass_context @@ -83,15 +85,17 @@ def create_optimization_unit( EntityId(cast(UUID, performance_tracker_id_str)) if performance_tracker_id_str else None ) - created = configuration_service.create_optimization_unit( - name=name, - description=description, - energy_source_id=energy_source_id, - target_miner_ids=target_miner_ids, - policy_id=policy_id, - home_forecast_provider_id=home_forecast_provider_id, - performance_tracker_id=performance_tracker_id, - notifier_ids=notifier_ids, + created = run_async_func( + configuration_service.create_optimization_unit( + name=name, + description=description, + energy_source_id=energy_source_id, + target_miner_ids=target_miner_ids, + policy_id=policy_id, + home_forecast_provider_id=home_forecast_provider_id, + performance_tracker_id=performance_tracker_id, + notifier_ids=notifier_ids, + ) ) if not created: click.echo("Error: Optimization Unit creation failed.", err=True) @@ -148,11 +152,13 @@ def add_miner( hash_rate = HashRate(value=float(hash_rate_str), unit=hash_rate_unit_str) power_consumption = Watts(float(power_consumption_str)) - added = configuration_service.add_miner( - name=name, - hash_rate_max=hash_rate, - power_consumption_max=power_consumption, - controller_id=controller_id, + added = run_async_func( + configuration_service.add_miner( + name=name, + hash_rate_max=hash_rate, + power_consumption_max=power_consumption, + controller_id=controller_id, + ) ) click.echo(f"Miner '{added.name}' ({added.id}) added successfully.") except Exception as e: @@ -188,7 +194,7 @@ def remove_miner(ctx: click.Context, miner_id_str: str): miner_id = EntityId(cast(UUID, miner_id_str)) try: - configuration_service.remove_miner(miner_id=miner_id) + run_async_func(configuration_service.remove_miner(miner_id=miner_id)) click.echo(f"Miner {miner_id} removed.") except Exception as e: click.echo(f"Error removing miner: {e}", err=True) @@ -214,7 +220,7 @@ def create_policy(ctx: click.Context, name: str, description: str): return try: - created = configuration_service.create_policy(name=name, description=description) + created = run_async_func(configuration_service.create_policy(name=name, description=description)) click.echo(f"Optimization Policy '{created.name}' ({created.description}) created successfully.") except Exception as e: click.echo(f"Error adding miner: {e}", err=True) diff --git a/edge_mining/adapters/infrastructure/event_bus/__init__.py b/edge_mining/adapters/infrastructure/event_bus/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/edge_mining/adapters/infrastructure/event_bus/in_memory_event_bus.py b/edge_mining/adapters/infrastructure/event_bus/in_memory_event_bus.py new file mode 100644 index 0000000..18982f8 --- /dev/null +++ b/edge_mining/adapters/infrastructure/event_bus/in_memory_event_bus.py @@ -0,0 +1,58 @@ +"""In-memory implementation of the event bus.""" + +import asyncio +from collections import defaultdict +from typing import Callable, Type + +from edge_mining.application.interfaces import EventBusInterface +from edge_mining.domain.common import DomainEvent +from edge_mining.shared.logging.port import LoggerPort + + +class InMemoryEventBus(EventBusInterface): + """In-memory implementation of the event bus with blocking/fire-and-forget support.""" + + def __init__(self, logger: LoggerPort) -> None: + self._logger = logger + # dict[Type[DomainEvent], list[tuple[Callable, bool]]] + # bool = is_blocking + self._handlers: dict[Type[DomainEvent], list[tuple[Callable, bool]]] = defaultdict(list) + + def subscribe( + self, + event_type: Type[DomainEvent], + handler: Callable, + blocking: bool = True, + ) -> None: + self._handlers[event_type].append((handler, blocking)) + self._logger.debug( + f"EventBus: subscribed {handler.__qualname__} to " f"{event_type.__name__} (blocking={blocking})" + ) + + async def publish(self, event: DomainEvent) -> None: + handlers = self._handlers.get(type(event), []) + + if not handlers: + return + + self._logger.debug( + f"EventBus: publishing {event.event_type} " f"(id={event.event_id[:8]}..., handlers={len(handlers)})" + ) + + # 1. Blocking handlers — the publisher WAITS, exceptions are propagated + for handler, is_blocking in handlers: + if is_blocking: + await handler(event) + + # 2. Fire-and-forget handlers — the publisher DOES NOT wait, exceptions are caught + for handler, is_blocking in handlers: + if not is_blocking: + asyncio.create_task(self._safe_execute(handler, event)) + + async def _safe_execute(self, handler: Callable, event: DomainEvent) -> None: + try: + await handler(event) + except Exception as e: + self._logger.warning( + f"EventBus: fire-and-forget handler {handler.__qualname__} " f"failed for {event.event_type}: {e}" + ) diff --git a/edge_mining/adapters/infrastructure/external_services/cli/commands.py b/edge_mining/adapters/infrastructure/external_services/cli/commands.py index 6298cf9..b54b328 100644 --- a/edge_mining/adapters/infrastructure/external_services/cli/commands.py +++ b/edge_mining/adapters/infrastructure/external_services/cli/commands.py @@ -23,6 +23,8 @@ from edge_mining.shared.interfaces.config import ExternalServiceConfig from edge_mining.shared.logging.port import LoggerPort +from edge_mining.adapters.utils import run_async_func + def select_external_service_type() -> Optional[ExternalServiceAdapter]: """Prompt user to select an external service adapter type.""" @@ -89,8 +91,8 @@ def handle_add_external_service( return None try: - created_service = configuration_service.create_external_service( - name=name, adapter_type=adapter_type, config=config + created_service = run_async_func( + configuration_service.create_external_service(name=name, adapter_type=adapter_type, config=config) ) click.echo( click.style( @@ -261,8 +263,8 @@ def update_single_external_service( return None try: - updated_external_service = configuration_service.update_external_service( - service_id=service.id, name=name, config=config + updated_external_service = run_async_func( + configuration_service.update_external_service(service_id=service.id, name=name, config=config) ) except Exception as e: logger.error(f"Error updating external service: {e}") @@ -295,7 +297,7 @@ def delete_single_external_service( return False try: - removed_external_service = configuration_service.remove_external_service(service.id) + removed_external_service = run_async_func(configuration_service.remove_external_service(service.id)) click.echo( click.style( f"External Service '{removed_external_service.name}' successfully deleted.", diff --git a/edge_mining/adapters/infrastructure/external_services/fast_api/router.py b/edge_mining/adapters/infrastructure/external_services/fast_api/router.py index a12ef4f..dc304b5 100644 --- a/edge_mining/adapters/infrastructure/external_services/fast_api/router.py +++ b/edge_mining/adapters/infrastructure/external_services/fast_api/router.py @@ -66,7 +66,7 @@ async def add_external_service( raise ExternalServiceConfigurationError("External service configuration should be set") # Add the external service - created_service = config_service.create_external_service( + created_service = await config_service.create_external_service( name=external_service_to_add.name, adapter_type=external_service_to_add.adapter_type, config=external_service_to_add.config, @@ -166,7 +166,7 @@ async def update_external_service( configuration = ExternalServiceConfig.from_dict(external_service_update.config) # Update the external service - updated_service = config_service.update_external_service( + updated_service = await config_service.update_external_service( service_id=service_id, name=external_service_update.name or "", config=cast(ExternalServiceConfig, configuration), @@ -188,7 +188,7 @@ async def delete_external_service( ) -> ExternalServiceSchema: """Remove an external service.""" try: - deleted_service = config_service.remove_external_service(service_id) + deleted_service = await config_service.remove_external_service(service_id) response = ExternalServiceSchema.from_model(deleted_service) diff --git a/edge_mining/application/events/__init__.py b/edge_mining/application/events/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/edge_mining/application/events/configuration_events.py b/edge_mining/application/events/configuration_events.py new file mode 100644 index 0000000..9708679 --- /dev/null +++ b/edge_mining/application/events/configuration_events.py @@ -0,0 +1,39 @@ +"""Configuration domain events.""" + +from dataclasses import dataclass +from typing import Optional +from enum import Enum + +from edge_mining.domain.common import DomainEvent, EntityId + + +class ConfigurationUpdatedEventType(Enum): + """Enum for the different types of configuration updates.""" + + ENERGY_MONITOR = "energy_monitor" + MINER_CONTROLLER = "miner_controller" + NOTIFIER = "notifier" + EXTERNAL_SERVICE = "external_service" + UNKNOWN = "" + + +class ConfigurationAction(Enum): + """Enum for the possible actions on a configuration entity.""" + + CREATED = "created" + UPDATED = "updated" + REMOVED = "removed" + UNKNOWN = "" + + +@dataclass +class ConfigurationUpdatedEvent(DomainEvent): + """Event emitted when a configuration is created, updated, or removed. + + Generic application event: does not carry the modified entity's data, + but only the information needed to invalidate the adapters' cache. + """ + + entity_type: ConfigurationUpdatedEventType = ConfigurationUpdatedEventType.UNKNOWN + entity_id: Optional[EntityId] = None + action: ConfigurationAction = ConfigurationAction.UNKNOWN diff --git a/edge_mining/application/interfaces.py b/edge_mining/application/interfaces.py index c866f43..0bc0a1f 100644 --- a/edge_mining/application/interfaces.py +++ b/edge_mining/application/interfaces.py @@ -2,9 +2,10 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, Type from edge_mining.domain.common import EntityId, Watts +from edge_mining.domain.common import DomainEvent from edge_mining.domain.energy.common import EnergyMonitorAdapter, EnergySourceType from edge_mining.domain.energy.entities import EnergyMonitor, EnergySource from edge_mining.domain.energy.ports import EnergyMonitorPort @@ -151,7 +152,7 @@ class ConfigurationServiceInterface(ABC): # --- Miner Management --- @abstractmethod - def add_miner( + async def add_miner( self, name: str, model: Optional[str] = None, @@ -172,11 +173,11 @@ def list_miners(self) -> List[Miner]: """List all miners in the system.""" @abstractmethod - def remove_miner(self, miner_id: EntityId) -> Miner: + async def remove_miner(self, miner_id: EntityId) -> Miner: """Remove a miner from the system.""" @abstractmethod - def update_miner( + async def update_miner( self, miner_id: EntityId, name: str, @@ -189,11 +190,11 @@ def update_miner( """Update a miner in the system.""" @abstractmethod - def activate_miner(self, miner_id: EntityId) -> Miner: + async def activate_miner(self, miner_id: EntityId) -> Miner: """Activate a miner in the system.""" @abstractmethod - def deactivate_miner(self, miner_id: EntityId) -> Miner: + async def deactivate_miner(self, miner_id: EntityId) -> Miner: """Deactivate a miner in the system.""" @abstractmethod @@ -205,7 +206,7 @@ def check_miner(self, miner: Miner) -> bool: """Check if a miner is valid and can be used.""" @abstractmethod - def add_miner_controller( + async def add_miner_controller( self, name: str, adapter: MinerControllerAdapter, @@ -223,15 +224,15 @@ def list_miner_controllers(self) -> List[MinerController]: """List all miner controllers in the system.""" @abstractmethod - def unlink_miner_controller(self, miner_controller_id: EntityId) -> None: + async def unlink_miner_controller(self, miner_controller_id: EntityId) -> None: """Unlink a miner controller from all miners.""" @abstractmethod - def remove_miner_controller(self, controller_id: EntityId) -> MinerController: + async def remove_miner_controller(self, controller_id: EntityId) -> MinerController: """Remove a miner controller from the system.""" @abstractmethod - def update_miner_controller( + async def update_miner_controller( self, controller_id: EntityId, name: str, @@ -245,7 +246,7 @@ def update_miner_controller( """ @abstractmethod - def set_miner_controller(self, controller_id: EntityId, miner_id: EntityId) -> None: + async def set_miner_controller(self, controller_id: EntityId, miner_id: EntityId) -> None: """Set a miner controller to a miner.""" @abstractmethod @@ -266,7 +267,7 @@ def get_miner_controller_external_service_adapter( # --- Notifier Management --- @abstractmethod - def add_notifier( + async def add_notifier( self, name: str, adapter_type: NotificationAdapter, @@ -284,11 +285,11 @@ def list_notifiers(self) -> List[Notifier]: """List all notifiers in the system.""" @abstractmethod - def remove_notifier(self, notifier_id: EntityId) -> Notifier: + async def remove_notifier(self, notifier_id: EntityId) -> Notifier: """Remove a notifier from the system.""" @abstractmethod - def update_notifier( + async def update_notifier( self, notifier_id: EntityId, name: str, @@ -313,7 +314,7 @@ def get_notifier_external_service_adapter( # --- Policy Management --- @abstractmethod - def create_policy(self, name: str, description: str = "") -> OptimizationPolicy: + async def create_policy(self, name: str, description: str = "") -> OptimizationPolicy: """Create a new policy.""" @abstractmethod @@ -325,7 +326,7 @@ def list_policies(self) -> List[OptimizationPolicy]: """List all policies in the system.""" @abstractmethod - def add_rule_to_policy( + async def add_rule_to_policy( self, policy_id: EntityId, rule_type: RuleType, @@ -345,7 +346,7 @@ def get_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> Optional[Au """Get a rule by its ID.""" @abstractmethod - def update_policy_rule( + async def update_policy_rule( self, policy_id: EntityId, rule_id: EntityId, @@ -358,19 +359,19 @@ def update_policy_rule( """Update a rule in a policy.""" @abstractmethod - def delete_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: + async def delete_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: """Delete a rule from a policy.""" @abstractmethod - def enable_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: + async def enable_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: """Set a rule as enabled.""" @abstractmethod - def disable_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: + async def disable_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: """Set a rule as disabled.""" @abstractmethod - def delete_policy(self, policy_id: EntityId) -> Optional[OptimizationPolicy]: + async def delete_policy(self, policy_id: EntityId) -> Optional[OptimizationPolicy]: """Delete a policy from the system.""" @abstractmethod @@ -378,7 +379,7 @@ def check_policy(self, policy_id: EntityId) -> bool: """Check if a policy is valid and can be used.""" @abstractmethod - def update_policy( + async def update_policy( self, policy_id: EntityId, name: str, @@ -387,7 +388,7 @@ def update_policy( """Update a policy in the system.""" @abstractmethod - def sort_policy_rules(self, policy_id: EntityId) -> None: + async def sort_policy_rules(self, policy_id: EntityId) -> None: """Sort the rules of a policy by priority.""" @abstractmethod @@ -404,7 +405,7 @@ def validate_rule_conditions(self, conditions: Dict) -> tuple[bool, List[str], L # --- Optimization Unit Management --- @abstractmethod - def create_optimization_unit( + async def create_optimization_unit( self, name: str, description: Optional[str] = None, @@ -439,11 +440,11 @@ def filter_optimization_units( """Filter optimization units based on various criteria.""" @abstractmethod - def remove_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: + async def remove_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: """Remove an optimization unit from the system.""" @abstractmethod - def update_optimization_unit( + async def update_optimization_unit( self, unit_id: EntityId, name: str, @@ -459,61 +460,67 @@ def update_optimization_unit( """Update an optimization unit in the system.""" @abstractmethod - def activate_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: + async def activate_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: """Activate an optimization unit in the system.""" @abstractmethod - def deactivate_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: + async def deactivate_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: """Deactivate an optimization unit in the system.""" @abstractmethod - def assign_miners_to_optimization_unit( + async def assign_miners_to_optimization_unit( self, unit_id: EntityId, miner_ids: List[EntityId] ) -> EnergyOptimizationUnit: """Assign target miners to an optimization unit.""" @abstractmethod - def add_miner_to_optimization_unit(self, unit_id: EntityId, miner_id: EntityId) -> EnergyOptimizationUnit: + async def add_miner_to_optimization_unit(self, unit_id: EntityId, miner_id: EntityId) -> EnergyOptimizationUnit: """Add a miner to an optimization unit.""" @abstractmethod - def remove_miner_from_optimization_unit(self, unit_id: EntityId, miner_id: EntityId) -> EnergyOptimizationUnit: + async def remove_miner_from_optimization_unit( + self, unit_id: EntityId, miner_id: EntityId + ) -> EnergyOptimizationUnit: """Remove a miner from an optimization unit.""" @abstractmethod - def assign_policy_to_optimization_unit(self, unit_id: EntityId, policy_id: EntityId) -> EnergyOptimizationUnit: + async def assign_policy_to_optimization_unit( + self, unit_id: EntityId, policy_id: EntityId + ) -> EnergyOptimizationUnit: """Assign a policy to an optimization unit.""" @abstractmethod - def assign_energy_source_to_optimization_unit( + async def assign_energy_source_to_optimization_unit( self, unit_id: EntityId, energy_source_id: EntityId ) -> EnergyOptimizationUnit: """Assign an energy source to an optimization unit.""" @abstractmethod - def assign_home_forecast_provider_to_optimization_unit( + async def assign_home_forecast_provider_to_optimization_unit( self, unit_id: EntityId, home_forecast_provider_id: EntityId ) -> EnergyOptimizationUnit: """Assign a home forecast provider to an optimization unit.""" @abstractmethod - def assign_performance_tracker_to_optimization_unit( + async def assign_performance_tracker_to_optimization_unit( self, unit_id: EntityId, performance_tracker_id: EntityId ) -> EnergyOptimizationUnit: """Assign a performance tracker to an optimization unit.""" @abstractmethod - def assign_notifiers_to_optimization_unit( + async def assign_notifiers_to_optimization_unit( self, unit_id: EntityId, notifier_ids: List[EntityId] ) -> EnergyOptimizationUnit: """Assign notifiers to an optimization unit.""" @abstractmethod - def add_notifier_to_optimization_unit(self, unit_id: EntityId, notifier_id: EntityId) -> EnergyOptimizationUnit: + async def add_notifier_to_optimization_unit( + self, unit_id: EntityId, notifier_id: EntityId + ) -> EnergyOptimizationUnit: """Add a notifier to an optimization unit.""" @abstractmethod - def remove_notifier_from_optimization_unit( + async def remove_notifier_from_optimization_unit( self, unit_id: EntityId, notifier_id: EntityId ) -> EnergyOptimizationUnit: """Remove a notifier from an optimization unit.""" @@ -524,7 +531,7 @@ def check_optimization_unit(self, optimization_unit: EnergyOptimizationUnit, str # --- External Service Management --- @abstractmethod - def create_external_service( + async def create_external_service( self, name: str, adapter_type: ExternalServiceAdapter, @@ -545,15 +552,15 @@ def get_entities_by_external_service(self, service_id: EntityId) -> ExternalServ """Get entities associated with this external service""" @abstractmethod - def unlink_external_service(self, service_id: EntityId) -> None: + async def unlink_external_service(self, service_id: EntityId) -> None: """Remove the association of an external service from all entities.""" @abstractmethod - def remove_external_service(self, service_id: EntityId) -> ExternalService: + async def remove_external_service(self, service_id: EntityId) -> ExternalService: """Remove an external service from the system.""" @abstractmethod - def update_external_service( + async def update_external_service( self, service_id: EntityId, name: str, @@ -576,7 +583,7 @@ def get_external_service_config_by_type( # --- Energy Source Management --- @abstractmethod - def create_energy_source( + async def create_energy_source( self, name: str, source_type: EnergySourceType, @@ -598,11 +605,11 @@ def list_energy_sources(self) -> List[EnergySource]: """List all energy sources in the system.""" @abstractmethod - def remove_energy_source(self, source_id: EntityId) -> EnergySource: + async def remove_energy_source(self, source_id: EntityId) -> EnergySource: """Remove an energy source from the system.""" @abstractmethod - def update_energy_source( + async def update_energy_source( self, source_id: EntityId, name: str, @@ -621,7 +628,7 @@ def check_energy_source(self, energy_source: EnergySource) -> bool: """Check if an energy source is valid and can be used.""" @abstractmethod - def create_energy_monitor( + async def create_energy_monitor( self, name: str, adapter_type: EnergyMonitorAdapter, @@ -639,15 +646,15 @@ def list_energy_monitors(self) -> List[EnergyMonitor]: """List all energy monitors in the system.""" @abstractmethod - def unlink_energy_monitor(self, monitor_id: EntityId) -> None: + async def unlink_energy_monitor(self, monitor_id: EntityId) -> None: """Unlink an energy monitor from all associated energy sources.""" @abstractmethod - def remove_energy_monitor(self, monitor_id: EntityId) -> EnergyMonitor: + async def remove_energy_monitor(self, monitor_id: EntityId) -> EnergyMonitor: """Remove an energy monitor from the system.""" @abstractmethod - def update_energy_monitor( + async def update_energy_monitor( self, monitor_id: EntityId, name: str, @@ -657,13 +664,13 @@ def update_energy_monitor( """Update an energy monitor in the system.""" @abstractmethod - def set_energy_monitor_to_energy_source( + async def set_energy_monitor_to_energy_source( self, energy_source_id: EntityId, energy_monitor_id: EntityId ) -> EnergySource: """Set an energy monitor to an energy source.""" @abstractmethod - def set_forecast_provider_to_energy_source( + async def set_forecast_provider_to_energy_source( self, energy_source_id: EntityId, forecast_provider_id: EntityId ) -> EnergySource: """Set a forecast provider to an energy source.""" @@ -694,7 +701,7 @@ def get_energy_monitor_external_service_adapter( # --- Forecast Provider Management --- @abstractmethod - def create_forecast_provider( + async def create_forecast_provider( self, name: str, adapter_type: ForecastProviderAdapter, @@ -712,11 +719,11 @@ def list_forecast_providers(self) -> List[ForecastProvider]: """List all forecast providers in the system.""" @abstractmethod - def remove_forecast_provider(self, provider_id: EntityId) -> ForecastProvider: + async def remove_forecast_provider(self, provider_id: EntityId) -> ForecastProvider: """Remove a forecast provider from the system.""" @abstractmethod - def update_forecast_provider( + async def update_forecast_provider( self, provider_id: EntityId, name: str, @@ -748,7 +755,7 @@ def get_all_settings(self) -> dict: """Get all settings.""" @abstractmethod - def update_setting(self, key: str, value: Any) -> None: + async def update_setting(self, key: str, value: Any) -> None: """Update a setting.""" @@ -758,3 +765,29 @@ class SunFactoryInterface(ABC): @abstractmethod def create_sun_for_date(self, for_date: datetime = datetime.now()) -> Sun: """Create a Sun object for a specific date.""" + + +class EventBusInterface(ABC): + """Application interface for the domain event bus.""" + + @abstractmethod + async def publish(self, event: DomainEvent) -> None: + """Publish an event. Blocking handlers are executed before returning.""" + ... + + @abstractmethod + def subscribe( + self, + event_type: Type[DomainEvent], + handler: Callable, + blocking: bool = True, + ) -> None: + """Register a handler for a specific event type. + + Args: + event_type: The class of the event to listen for. + handler: Async coroutine that receives the event. + blocking: If True, the publisher waits for the handler to complete. + If False, the handler is executed in fire-and-forget mode. + """ + ... diff --git a/edge_mining/application/ports/__init__.py b/edge_mining/application/ports/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/edge_mining/application/ports/event_bus.py b/edge_mining/application/ports/event_bus.py new file mode 100644 index 0000000..4ff413c --- /dev/null +++ b/edge_mining/application/ports/event_bus.py @@ -0,0 +1,32 @@ +"""Application port for the domain event bus.""" + +from abc import ABC, abstractmethod +from typing import Callable, Type + +from edge_mining.domain.common import DomainEvent + + +class EventBus(ABC): + """Application port for the domain event bus.""" + + @abstractmethod + async def publish(self, event: DomainEvent) -> None: + """Publish an event. Blocking handlers are executed before returning.""" + ... + + @abstractmethod + def subscribe( + self, + event_type: Type[DomainEvent], + handler: Callable, + blocking: bool = True, + ) -> None: + """Register a handler for a specific event type. + + Args: + event_type: The class of the event to listen for. + handler: Async coroutine that receives the event. + blocking: If True, the publisher waits for the handler to complete. + If False, the handler is executed in fire-and-forget mode. + """ + ... diff --git a/edge_mining/application/services/adapter_service.py b/edge_mining/application/services/adapter_service.py index 4ca7f7b..6d1920b 100644 --- a/edge_mining/application/services/adapter_service.py +++ b/edge_mining/application/services/adapter_service.py @@ -19,7 +19,8 @@ from edge_mining.adapters.domain.performance.trackers.dummy import DummyMiningPerformanceTracker from edge_mining.adapters.infrastructure.homeassistant.homeassistant_api import ServiceHomeAssistantAPIFactory from edge_mining.adapters.infrastructure.rule_engine.factory import RuleEngineFactory -from edge_mining.application.interfaces import AdapterServiceInterface +from edge_mining.application.events.configuration_events import ConfigurationUpdatedEvent, ConfigurationUpdatedEventType +from edge_mining.application.interfaces import AdapterServiceInterface, EventBusInterface from edge_mining.domain.common import EntityId from edge_mining.domain.energy.common import EnergyMonitorAdapter from edge_mining.domain.energy.entities import EnergyMonitor, EnergySource @@ -67,6 +68,7 @@ def __init__( mining_performance_tracker_repo: MiningPerformanceTrackerRepository, home_forecast_provider_repo: HomeForecastProviderRepository, external_service_repo: ExternalServiceRepository, + event_bus: EventBusInterface, logger: Optional[LoggerPort] = None, ): self.energy_monitor_repo = energy_monitor_repo @@ -95,6 +97,16 @@ def __init__( self.logger = logger + self._subscribe_events(event_bus) + + def _subscribe_events(self, event_bus: EventBusInterface) -> None: + """Register all event subscriptions for this service.""" + event_bus.subscribe( + ConfigurationUpdatedEvent, + self.on_configuration_updated, + blocking=True, + ) + def _initialize_external_service(self, external_service: ExternalService) -> Optional[ExternalServicePort]: """Initialize an external service""" # If the external service already exists, we use it @@ -753,3 +765,19 @@ def remove_service(self, external_service_id: EntityId): else: if self.logger: self.logger.warning(f"No external service found with ID {external_service_id} to remove.") + + async def on_configuration_updated(self, event: ConfigurationUpdatedEvent) -> None: + """Handler for cache invalidation when a configuration changes.""" + if self.logger: + self.logger.debug(f"Cache invalidation: {event.entity_type} " f"{event.entity_id} ({event.action})") + + if event.entity_id is None: + return + + if event.entity_type == ConfigurationUpdatedEventType.EXTERNAL_SERVICE: + # Invalidate the external service AND all adapters that may depend on it + self._service_cache.pop(event.entity_id, None) + self._instance_cache.clear() + else: + # Invalidate the specific adapter + self._instance_cache.pop(event.entity_id, None) diff --git a/edge_mining/application/services/configuration_service.py b/edge_mining/application/services/configuration_service.py index 59a8639..b5ae20a 100644 --- a/edge_mining/application/services/configuration_service.py +++ b/edge_mining/application/services/configuration_service.py @@ -4,7 +4,12 @@ from typing import Any, Dict, List, Optional -from edge_mining.application.interfaces import ConfigurationServiceInterface +from edge_mining.application.events.configuration_events import ( + ConfigurationAction, + ConfigurationUpdatedEvent, + ConfigurationUpdatedEventType, +) +from edge_mining.application.interfaces import ConfigurationServiceInterface, EventBusInterface from edge_mining.domain.common import EntityId, Watts from edge_mining.domain.energy.common import EnergyMonitorAdapter, EnergySourceType from edge_mining.domain.energy.entities import EnergyMonitor, EnergySource @@ -96,7 +101,7 @@ class ConfigurationService(ConfigurationServiceInterface): """Handles configuration of miners, policies, and system settings.""" - def __init__(self, persistence_settings: PersistenceSettings, logger: LoggerPort): + def __init__(self, persistence_settings: PersistenceSettings, event_bus: EventBusInterface, logger: LoggerPort): # Domains self.external_service_repo: ExternalServiceRepository = persistence_settings.external_service_repo self.energy_source_repo: EnergySourceRepository = persistence_settings.energy_source_repo @@ -116,10 +121,11 @@ def __init__(self, persistence_settings: PersistenceSettings, logger: LoggerPort self.settings_repo: SettingsRepository = persistence_settings.settings_repo # Infrastructure + self._event_bus = event_bus self.logger = logger # --- External Service Management --- - def create_external_service( + async def create_external_service( self, name: str, adapter_type: ExternalServiceAdapter, @@ -134,6 +140,14 @@ def create_external_service( self.external_service_repo.add(external_service) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.EXTERNAL_SERVICE, + entity_id=external_service.id, + action=ConfigurationAction.CREATED, + ) + ) + return external_service def get_external_service(self, service_id: EntityId) -> Optional[ExternalService]: @@ -168,7 +182,7 @@ def get_entities_by_external_service(self, service_id: EntityId) -> ExternalServ ) return external_service_linked_entities - def unlink_external_service(self, service_id: EntityId) -> None: + async def unlink_external_service(self, service_id: EntityId) -> None: """Remove the association of an external service from all entities.""" self.logger.debug(f"Unlinking external service {service_id}") @@ -215,7 +229,7 @@ def unlink_external_service(self, service_id: EntityId) -> None: notifier.external_service_id = None self.notifier_repo.update(notifier) - def remove_external_service(self, service_id: EntityId) -> ExternalService: + async def remove_external_service(self, service_id: EntityId) -> ExternalService: """Remove an external service from the system.""" self.logger.debug(f"Removing external service {service_id}") @@ -225,13 +239,21 @@ def remove_external_service(self, service_id: EntityId) -> ExternalService: raise ExternalServiceNotFoundError(f"External Service with ID {service_id} not found.") # Unlink the external service from all associated entities before removal - self.unlink_external_service(service_id) + await self.unlink_external_service(service_id) self.external_service_repo.remove(service_id) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.EXTERNAL_SERVICE, + entity_id=service_id, + action=ConfigurationAction.REMOVED, + ) + ) + return external_service - def update_external_service( + async def update_external_service( self, service_id: EntityId, name: str, @@ -256,6 +278,14 @@ def update_external_service( self.external_service_repo.update(external_service) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.EXTERNAL_SERVICE, + entity_id=service_id, + action=ConfigurationAction.UPDATED, + ) + ) + return external_service def check_external_service(self, external_service: ExternalService) -> bool: @@ -288,7 +318,7 @@ def get_external_service_config_by_type( return EXTERNAL_SERVICE_CONFIG_TYPE_MAP.get(adapter_type, None) # --- Energy Source Management --- - def create_energy_source( + async def create_energy_source( self, name: str, source_type: EnergySourceType, @@ -332,7 +362,7 @@ def list_energy_sources(self) -> List[EnergySource]: """List all energy sources in the system.""" return self.energy_source_repo.get_all() - def remove_energy_source(self, source_id: EntityId) -> EnergySource: + async def remove_energy_source(self, source_id: EntityId) -> EnergySource: """Remove an energy source from the system.""" self.logger.debug(f"Removing energy source {source_id}") @@ -345,7 +375,7 @@ def remove_energy_source(self, source_id: EntityId) -> EnergySource: return energy_source - def update_energy_source( + async def update_energy_source( self, source_id: EntityId, name: str, @@ -433,7 +463,7 @@ def check_energy_source(self, energy_source: EnergySource) -> bool: self.logger.debug(f"Energy Source {energy_source.id} ({energy_source.name}) is valid.") return True - def create_energy_monitor( + async def create_energy_monitor( self, name: str, adapter_type: EnergyMonitorAdapter, @@ -454,6 +484,14 @@ def create_energy_monitor( self.energy_monitor_repo.add(energy_monitor) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.ENERGY_MONITOR, + entity_id=energy_monitor.id, + action=ConfigurationAction.CREATED, + ) + ) + return energy_monitor def get_energy_monitor(self, monitor_id: EntityId) -> Optional[EnergyMonitor]: @@ -469,7 +507,7 @@ def list_energy_monitors(self) -> List[EnergyMonitor]: """List all energy monitors in the system.""" return self.energy_monitor_repo.get_all() - def unlink_energy_monitor(self, monitor_id: EntityId) -> None: + async def unlink_energy_monitor(self, monitor_id: EntityId) -> None: """Unlink an energy monitor from all associated energy sources.""" self.logger.debug(f"Unlinking energy monitor {monitor_id}") @@ -482,7 +520,7 @@ def unlink_energy_monitor(self, monitor_id: EntityId) -> None: source.energy_monitor_id = None self.energy_source_repo.update(source) - def remove_energy_monitor(self, monitor_id: EntityId) -> EnergyMonitor: + async def remove_energy_monitor(self, monitor_id: EntityId) -> EnergyMonitor: """Remove an energy monitor from the system.""" energy_monitor = self.energy_monitor_repo.get_by_id(monitor_id) @@ -491,13 +529,21 @@ def remove_energy_monitor(self, monitor_id: EntityId) -> EnergyMonitor: raise EnergyMonitorNotFoundError(f"Energy Monitor with ID {monitor_id} not found.") # Unlink the energy monitor from all associated energy sources before delete - self.unlink_energy_monitor(monitor_id) + await self.unlink_energy_monitor(monitor_id) self.energy_monitor_repo.remove(monitor_id) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.ENERGY_MONITOR, + entity_id=monitor_id, + action=ConfigurationAction.REMOVED, + ) + ) + return energy_monitor - def update_energy_monitor( + async def update_energy_monitor( self, monitor_id: EntityId, name: str, @@ -529,9 +575,17 @@ def update_energy_monitor( self.energy_monitor_repo.update(energy_monitor) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.ENERGY_MONITOR, + entity_id=monitor_id, + action=ConfigurationAction.UPDATED, + ) + ) + return energy_monitor - def set_energy_monitor_to_energy_source( + async def set_energy_monitor_to_energy_source( self, energy_source_id: EntityId, energy_monitor_id: EntityId ) -> EnergySource: """Set an energy monitor to an energy source.""" @@ -553,7 +607,7 @@ def set_energy_monitor_to_energy_source( return energy_source - def set_forecast_provider_to_energy_source( + async def set_forecast_provider_to_energy_source( self, energy_source_id: EntityId, forecast_provider_id: EntityId ) -> EnergySource: """Set a forecast provider to an energy source.""" @@ -646,7 +700,7 @@ def get_energy_monitor_external_service_adapter( return ENERGY_MONITOR_TYPE_EXTERNAL_SERVICE_MAP.get(adapter_type, None) # --- Forecast Provider Management --- - def create_forecast_provider( + async def create_forecast_provider( self, name: str, adapter_type: ForecastProviderAdapter, @@ -667,6 +721,14 @@ def create_forecast_provider( self.forecast_provider_repo.add(forecast_provider) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.FORECAST_PROVIDER, + entity_id=forecast_provider.id, + action=ConfigurationAction.CREATED, + ) + ) + return forecast_provider def get_forecast_provider(self, provider_id: EntityId) -> Optional[ForecastProvider]: @@ -682,7 +744,7 @@ def list_forecast_providers(self) -> List[ForecastProvider]: """List all forecast providers in the system.""" return self.forecast_provider_repo.get_all() - def remove_forecast_provider(self, provider_id: EntityId) -> ForecastProvider: + async def remove_forecast_provider(self, provider_id: EntityId) -> ForecastProvider: """Remove a forecast provider from the system.""" self.logger.debug(f"Removing forecast provider {provider_id}") @@ -693,9 +755,17 @@ def remove_forecast_provider(self, provider_id: EntityId) -> ForecastProvider: self.forecast_provider_repo.remove(provider_id) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.FORECAST_PROVIDER, + entity_id=provider_id, + action=ConfigurationAction.REMOVED, + ) + ) + return forecast_provider - def update_forecast_provider( + async def update_forecast_provider( self, provider_id: EntityId, name: str, @@ -720,6 +790,14 @@ def update_forecast_provider( self.forecast_provider_repo.update(forecast_provider) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.FORECAST_PROVIDER, + entity_id=provider_id, + action=ConfigurationAction.UPDATED, + ) + ) + return forecast_provider def check_forecast_provider(self, provider: ForecastProvider) -> bool: @@ -775,7 +853,7 @@ def get_forecast_provider_external_service_adapter( return FORECAST_PROVIDER_TYPE_EXTERNAL_SERVICE_MAP.get(adapter_type, None) # --- Optimization Unit Management --- - def create_optimization_unit( + async def create_optimization_unit( self, name: str, description: Optional[str] = None, @@ -848,7 +926,7 @@ def filter_optimization_units( eous = [eou for eou in eous if set(eou.notifier_ids).intersection(filter_by_notifiers)] return eous - def remove_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: + async def remove_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: """Remove an optimization unit from the system.""" self.logger.info(f"Removing optimization unit {unit_id}") @@ -861,7 +939,7 @@ def remove_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: return optimization_unit - def update_optimization_unit( + async def update_optimization_unit( self, unit_id: EntityId, name: str, @@ -911,7 +989,7 @@ def update_optimization_unit( return optimization_unit - def activate_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: + async def activate_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: """Activate an optimization unit in the system.""" self.logger.info(f"Activating optimization unit {unit_id}") @@ -934,7 +1012,7 @@ def activate_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUni return optimization_unit - def deactivate_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: + async def deactivate_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationUnit: """Deactivate an optimization unit in the system.""" self.logger.info(f"Deactivating optimization unit {unit_id}") @@ -949,7 +1027,7 @@ def deactivate_optimization_unit(self, unit_id: EntityId) -> EnergyOptimizationU return optimization_unit - def assign_miners_to_optimization_unit( + async def assign_miners_to_optimization_unit( self, unit_id: EntityId, miner_ids: List[EntityId] ) -> EnergyOptimizationUnit: """Assign target miners to an optimization unit.""" @@ -968,7 +1046,7 @@ def assign_miners_to_optimization_unit( return optimization_unit - def add_miner_to_optimization_unit(self, unit_id: EntityId, miner_id: EntityId) -> EnergyOptimizationUnit: + async def add_miner_to_optimization_unit(self, unit_id: EntityId, miner_id: EntityId) -> EnergyOptimizationUnit: """Add a miner to an optimization unit.""" self.logger.info(f"Adding miner {miner_id} to optimization unit {unit_id}") @@ -988,7 +1066,9 @@ def add_miner_to_optimization_unit(self, unit_id: EntityId, miner_id: EntityId) return optimization_unit - def remove_miner_from_optimization_unit(self, unit_id: EntityId, miner_id: EntityId) -> EnergyOptimizationUnit: + async def remove_miner_from_optimization_unit( + self, unit_id: EntityId, miner_id: EntityId + ) -> EnergyOptimizationUnit: """Remove a miner from an optimization unit.""" self.logger.info(f"Removing miner {miner_id} from optimization unit {unit_id}") @@ -1007,7 +1087,9 @@ def remove_miner_from_optimization_unit(self, unit_id: EntityId, miner_id: Entit return optimization_unit - def assign_policy_to_optimization_unit(self, unit_id: EntityId, policy_id: EntityId) -> EnergyOptimizationUnit: + async def assign_policy_to_optimization_unit( + self, unit_id: EntityId, policy_id: EntityId + ) -> EnergyOptimizationUnit: """Assign a policy to an optimization unit.""" self.logger.info(f"Assigning policy {policy_id} to optimization unit {unit_id}") @@ -1022,7 +1104,7 @@ def assign_policy_to_optimization_unit(self, unit_id: EntityId, policy_id: Entit return optimization_unit - def assign_energy_source_to_optimization_unit( + async def assign_energy_source_to_optimization_unit( self, unit_id: EntityId, energy_source_id: EntityId ) -> EnergyOptimizationUnit: """Assign an energy source to an optimization unit.""" @@ -1039,7 +1121,7 @@ def assign_energy_source_to_optimization_unit( return optimization_unit - def assign_home_forecast_provider_to_optimization_unit( + async def assign_home_forecast_provider_to_optimization_unit( self, unit_id: EntityId, home_forecast_provider_id: EntityId ) -> EnergyOptimizationUnit: """Assign a home forecast provider to an optimization unit.""" @@ -1056,7 +1138,7 @@ def assign_home_forecast_provider_to_optimization_unit( return optimization_unit - def assign_performance_tracker_to_optimization_unit( + async def assign_performance_tracker_to_optimization_unit( self, unit_id: EntityId, performance_tracker_id: EntityId ) -> EnergyOptimizationUnit: """Assign a performance tracker to an optimization unit.""" @@ -1073,7 +1155,7 @@ def assign_performance_tracker_to_optimization_unit( return optimization_unit - def assign_notifiers_to_optimization_unit( + async def assign_notifiers_to_optimization_unit( self, unit_id: EntityId, notifier_ids: List[EntityId] ) -> EnergyOptimizationUnit: """Assign notifiers to an optimization unit.""" @@ -1092,7 +1174,9 @@ def assign_notifiers_to_optimization_unit( return optimization_unit - def add_notifier_to_optimization_unit(self, unit_id: EntityId, notifier_id: EntityId) -> EnergyOptimizationUnit: + async def add_notifier_to_optimization_unit( + self, unit_id: EntityId, notifier_id: EntityId + ) -> EnergyOptimizationUnit: """Add a notifier to an optimization unit.""" self.logger.info(f"Adding notifier {notifier_id} to optimization unit {unit_id}") @@ -1111,7 +1195,7 @@ def add_notifier_to_optimization_unit(self, unit_id: EntityId, notifier_id: Enti return optimization_unit - def remove_notifier_from_optimization_unit( + async def remove_notifier_from_optimization_unit( self, unit_id: EntityId, notifier_id: EntityId ) -> EnergyOptimizationUnit: """Remove a notifier from an optimization unit.""" @@ -1206,7 +1290,7 @@ def check_optimization_unit(self, optimization_unit: EnergyOptimizationUnit, str return True # --- Miner Management --- - def add_miner( + async def add_miner( self, name: str, model: Optional[str] = None, @@ -1254,7 +1338,7 @@ def list_miners(self) -> List[Miner]: """List all miners in the system.""" return self.miner_repo.get_all() - def remove_miner(self, miner_id: EntityId) -> Miner: + async def remove_miner(self, miner_id: EntityId) -> Miner: """Remove a miner from the system.""" self.logger.info(f"Removing miner {miner_id}") @@ -1267,7 +1351,7 @@ def remove_miner(self, miner_id: EntityId) -> Miner: return miner - def update_miner( + async def update_miner( self, miner_id: EntityId, name: str, @@ -1297,7 +1381,7 @@ def update_miner( return miner - def activate_miner(self, miner_id: EntityId) -> Miner: + async def activate_miner(self, miner_id: EntityId) -> Miner: """Activate a miner in the system.""" self.logger.info(f"Activating miner {miner_id}") @@ -1312,7 +1396,7 @@ def activate_miner(self, miner_id: EntityId) -> Miner: return miner - def deactivate_miner(self, miner_id: EntityId) -> Miner: + async def deactivate_miner(self, miner_id: EntityId) -> Miner: """Deactivate a miner in the system.""" self.logger.info(f"Deactivating miner {miner_id}") @@ -1352,7 +1436,7 @@ def check_miner(self, miner: Miner) -> bool: self.logger.debug(f"Miner {miner.id} ({miner.name}) is valid.") return True - def add_miner_controller( + async def add_miner_controller( self, name: str, adapter: MinerControllerAdapter, @@ -1372,6 +1456,14 @@ def add_miner_controller( self.check_miner_controller(controller) self.miner_controller_repo.add(controller) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.MINER_CONTROLLER, + entity_id=controller.id, + action=ConfigurationAction.CREATED, + ) + ) + return controller def get_miner_controller(self, controller_id: EntityId) -> Optional[MinerController]: @@ -1387,7 +1479,7 @@ def list_miner_controllers(self) -> List[MinerController]: """List all miner controllers in the system.""" return self.miner_controller_repo.get_all() - def unlink_miner_controller(self, miner_controller_id: EntityId) -> None: + async def unlink_miner_controller(self, miner_controller_id: EntityId) -> None: """Unlink a miner controller from all miners.""" self.logger.info(f"Unlinking controller {miner_controller_id} from all miners") @@ -1398,7 +1490,7 @@ def unlink_miner_controller(self, miner_controller_id: EntityId) -> None: miner.controller_id = None self.miner_repo.update(miner) - def remove_miner_controller(self, controller_id: EntityId) -> MinerController: + async def remove_miner_controller(self, controller_id: EntityId) -> MinerController: """Remove a miner controller from the system.""" self.logger.info(f"Removing miner controller {controller_id}") @@ -1408,13 +1500,21 @@ def remove_miner_controller(self, controller_id: EntityId) -> MinerController: raise MinerControllerNotFoundError(f"Controller with ID {controller_id} not found.") # Unlink the controller from all miners before removal - self.unlink_miner_controller(controller_id) + await self.unlink_miner_controller(controller_id) self.miner_controller_repo.remove(controller_id) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.MINER_CONTROLLER, + entity_id=controller_id, + action=ConfigurationAction.REMOVED, + ) + ) + return controller - def update_miner_controller( + async def update_miner_controller( self, controller_id: EntityId, name: str, @@ -1450,9 +1550,17 @@ def update_miner_controller( self.miner_controller_repo.update(controller) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.MINER_CONTROLLER, + entity_id=controller_id, + action=ConfigurationAction.UPDATED, + ) + ) + return controller - def set_miner_controller(self, controller_id: EntityId, miner_id: EntityId) -> None: + async def set_miner_controller(self, controller_id: EntityId, miner_id: EntityId) -> None: """Set a miner controller to a miner.""" self.logger.info(f"Adding controller {controller_id} to miner {miner_id}") @@ -1504,7 +1612,7 @@ def get_miner_controller_external_service_adapter( return MINER_CONTROLLER_TYPE_EXTERNAL_SERVICE_MAP.get(adapter_type, None) # --- Notifier Management --- - def add_notifier( + async def add_notifier( self, name: str, adapter_type: NotificationAdapter, @@ -1525,6 +1633,14 @@ def add_notifier( self.notifier_repo.add(notifier) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.NOTIFIER, + entity_id=notifier.id, + action=ConfigurationAction.CREATED, + ) + ) + return notifier def get_notifier(self, notifier_id: EntityId) -> Optional[Notifier]: @@ -1538,7 +1654,7 @@ def list_notifiers(self) -> List[Notifier]: """List all notifiers in the system.""" return self.notifier_repo.get_all() - def remove_notifier(self, notifier_id: EntityId) -> Notifier: + async def remove_notifier(self, notifier_id: EntityId) -> Notifier: """Remove a notifier from the system.""" self.logger.debug(f"Removing notifier {notifier_id}") @@ -1547,9 +1663,18 @@ def remove_notifier(self, notifier_id: EntityId) -> Notifier: raise NotifierNotFoundError(f"Notifier with ID {notifier_id} not found.") self.notifier_repo.remove(notifier_id) + + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.NOTIFIER, + entity_id=notifier_id, + action=ConfigurationAction.REMOVED, + ) + ) + return notifier - def update_notifier( + async def update_notifier( self, notifier_id: EntityId, name: str, @@ -1570,6 +1695,14 @@ def update_notifier( self.check_notifier(notifier) self.notifier_repo.update(notifier) + await self._event_bus.publish( + ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.NOTIFIER, + entity_id=notifier_id, + action=ConfigurationAction.UPDATED, + ) + ) + return notifier def check_notifier(self, notifier: Notifier) -> bool: @@ -1625,7 +1758,7 @@ def get_notifier_external_service_adapter( return NOTIFIER_TYPE_EXTERNAL_SERVICE_MAP.get(adapter_type, None) # --- Policy Management --- - def create_policy(self, name: str, description: str = "") -> OptimizationPolicy: + async def create_policy(self, name: str, description: str = "") -> OptimizationPolicy: """Create a new policy.""" self.logger.info(f"Creating policy '{name}'") @@ -1648,7 +1781,7 @@ def list_policies(self) -> List[OptimizationPolicy]: """List all policies in the system.""" return self.policy_repo.get_all() - def add_rule_to_policy( + async def add_rule_to_policy( self, policy_id: EntityId, rule_type: RuleType, @@ -1708,7 +1841,7 @@ def get_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> Optional[Au raise RuleNotFoundError(f"Rule with ID {rule_id} not found in policy {policy_id}.") - def update_policy_rule( + async def update_policy_rule( self, policy_id: EntityId, rule_id: EntityId, @@ -1742,7 +1875,7 @@ def update_policy_rule( raise PolicyError(f"Rule with ID {rule_id} not found in policy {policy_id}.") - def delete_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: + async def delete_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: """Delete a rule from a policy.""" policy = self.policy_repo.get_by_id(policy_id) @@ -1763,7 +1896,7 @@ def delete_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> Automati return rule raise PolicyError(f"Rule with ID {rule_id} not found in policy {policy_id}.") - def enable_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: + async def enable_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: """Set a rule as enabled.""" self.logger.info(f"Setting rule {rule_id} of policy {policy_id} as active.") @@ -1788,7 +1921,7 @@ def enable_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> Automati return rule - def disable_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: + async def disable_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> AutomationRule: """Set a rule as disabled.""" self.logger.info(f"Setting rule {rule_id} of policy {policy_id} as disabled.") @@ -1813,7 +1946,7 @@ def disable_policy_rule(self, policy_id: EntityId, rule_id: EntityId) -> Automat return rule - def delete_policy(self, policy_id: EntityId) -> Optional[OptimizationPolicy]: + async def delete_policy(self, policy_id: EntityId) -> Optional[OptimizationPolicy]: """Delete a policy from the system.""" self.logger.info(f"Deleting policy {policy_id}") @@ -1858,7 +1991,7 @@ def check_policy(self, policy_id: EntityId) -> bool: self.logger.debug(f"Policy {policy.id} ({policy.name}) is valid.") return True - def update_policy( + async def update_policy( self, policy_id: EntityId, name: str, @@ -1880,7 +2013,7 @@ def update_policy( return policy - def sort_policy_rules(self, policy_id: EntityId) -> None: + async def sort_policy_rules(self, policy_id: EntityId) -> None: """Sort the rules of a policy by priority.""" policy = self.policy_repo.get_by_id(policy_id) @@ -1915,7 +2048,7 @@ def get_all_settings(self) -> Dict[str, Any]: settings: Optional[SystemSettings] = self.settings_repo.get_settings(user_id) return settings.settings if settings else {} - def update_setting(self, key: str, value: Any) -> None: + async def update_setting(self, key: str, value: Any) -> None: """Update a setting.""" user_id: UserId = UserId("global_settings") settings = self.settings_repo.get_settings(user_id) diff --git a/edge_mining/bootstrap.py b/edge_mining/bootstrap.py index 5ea1743..6f2741e 100644 --- a/edge_mining/bootstrap.py +++ b/edge_mining/bootstrap.py @@ -65,6 +65,7 @@ ) from edge_mining.adapters.infrastructure.persistence.sqlalchemy.base import BaseSQLAlchemyRepository from edge_mining.adapters.infrastructure.persistence.sqlite import BaseSqliteRepository +from edge_mining.adapters.infrastructure.event_bus.in_memory_event_bus import InMemoryEventBus from edge_mining.adapters.infrastructure.sun.factories import AstralSunFactory from edge_mining.application.interfaces import SunFactoryInterface from edge_mining.application.services.adapter_service import AdapterService @@ -285,6 +286,9 @@ def configure_dependencies(logger: LoggerPort, settings: AppSettings) -> Service logger.debug("Instantiating application services...") + # --- Event Bus --- + event_bus = InMemoryEventBus(logger) + adapter_service = AdapterService( energy_monitor_repo=persistence_settings.energy_monitor_repo, miner_controller_repo=persistence_settings.miner_controller_repo, @@ -293,6 +297,7 @@ def configure_dependencies(logger: LoggerPort, settings: AppSettings) -> Service home_forecast_provider_repo=persistence_settings.home_forecast_provider_repo, mining_performance_tracker_repo=persistence_settings.mining_performance_tracker_repo, external_service_repo=persistence_settings.external_service_repo, + event_bus=event_bus, logger=logger, ) @@ -312,13 +317,18 @@ def configure_dependencies(logger: LoggerPort, settings: AppSettings) -> Service logger=logger, ) - config_service = ConfigurationService(persistence_settings=persistence_settings, logger=logger) + config_service = ConfigurationService( + persistence_settings=persistence_settings, + event_bus=event_bus, + logger=logger, + ) services = Services( adapter_service=adapter_service, optimization_service=optimization_service, miner_action_service=miner_action_service, configuration_service=config_service, + event_bus=event_bus, ) logger.debug("Dependency configuration complete.") diff --git a/edge_mining/domain/common.py b/edge_mining/domain/common.py index 7a0e5a6..c06f001 100644 --- a/edge_mining/domain/common.py +++ b/edge_mining/domain/common.py @@ -2,9 +2,9 @@ import uuid from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timezone from enum import Enum -from typing import NewType, Tuple +from typing import Any, NewType, Tuple # Example Value Objects using NewType for stronger typing Watts = NewType("Watts", float) @@ -41,3 +41,25 @@ class AdapterType(Enum): """Base class for adapter types.""" pass # Base class for adapter types if needed + + +@dataclass +class DomainEvent: + """Base class for all domain events.""" + + event_id: str = field(default_factory=lambda: str(uuid.uuid4())) + occurred_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + def to_dict(self) -> dict[str, Any]: + """Serialization for WebSocket/logging. Override in subtypes if needed.""" + from dataclasses import asdict + + result = asdict(self) + result["occurred_at"] = self.occurred_at.isoformat() + result["event_type"] = self.event_type + return result + + @property + def event_type(self) -> str: + """Event type derived from class name. Useful for serialization.""" + return self.__class__.__name__ diff --git a/edge_mining/shared/infrastructure.py b/edge_mining/shared/infrastructure.py index ae55c67..cd461e6 100644 --- a/edge_mining/shared/infrastructure.py +++ b/edge_mining/shared/infrastructure.py @@ -7,6 +7,7 @@ MinerActionServiceInterface, AdapterServiceInterface, ConfigurationServiceInterface, + EventBusInterface, OptimizationServiceInterface, ) from edge_mining.domain.energy.ports import ( @@ -61,3 +62,4 @@ class Services: optimization_service: OptimizationServiceInterface miner_action_service: MinerActionServiceInterface configuration_service: ConfigurationServiceInterface + event_bus: EventBusInterface diff --git a/tests/unit/adapters/infrastructure/test_in_memory_event_bus.py b/tests/unit/adapters/infrastructure/test_in_memory_event_bus.py new file mode 100644 index 0000000..069aee5 --- /dev/null +++ b/tests/unit/adapters/infrastructure/test_in_memory_event_bus.py @@ -0,0 +1,166 @@ +"""Unit tests for InMemoryEventBus.""" + +import asyncio +import unittest +from dataclasses import dataclass +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from edge_mining.adapters.infrastructure.event_bus.in_memory_event_bus import InMemoryEventBus +from edge_mining.domain.common import DomainEvent + + +@dataclass +class EventA(DomainEvent): + value: str = "" + + +@dataclass +class EventB(DomainEvent): + value: int = 0 + + +@pytest.fixture +def logger(): + mock = MagicMock() + mock.debug = MagicMock() + mock.warning = MagicMock() + return mock + + +@pytest.fixture +def bus(logger): + return InMemoryEventBus(logger) + + +@pytest.mark.asyncio +async def test_subscribe_and_publish(bus): + handler = AsyncMock(__qualname__="test_handler") + bus.subscribe(EventA, handler, blocking=True) + + event = EventA(value="hello") + await bus.publish(event) + + handler.assert_awaited_once_with(event) + + +@pytest.mark.asyncio +async def test_publish_no_handlers(bus): + """Publishing an event with no subscribers should not fail.""" + await bus.publish(EventA(value="ignored")) + + +@pytest.mark.asyncio +async def test_blocking_handler_awaited(bus): + """Blocking handler is awaited before publish returns.""" + call_order = [] + + async def handler(event): + call_order.append("handler") + + bus.subscribe(EventA, handler, blocking=True) + await bus.publish(EventA(value="test")) + call_order.append("after_publish") + + assert call_order == ["handler", "after_publish"] + + +@pytest.mark.asyncio +async def test_fire_and_forget_handler(bus): + """Fire-and-forget handler runs after publish returns.""" + completed = asyncio.Event() + + async def handler(event): + completed.set() + + bus.subscribe(EventA, handler, blocking=False) + await bus.publish(EventA(value="test")) + + # Give the fire-and-forget task a chance to run + await asyncio.wait_for(completed.wait(), timeout=1.0) + assert completed.is_set() + + +@pytest.mark.asyncio +async def test_blocking_before_fire_and_forget(bus): + """Blocking handlers execute before fire-and-forget handlers.""" + order = [] + + async def blocking_handler(event): + order.append("blocking") + + async def ff_handler(event): + order.append("fire_and_forget") + + bus.subscribe(EventA, blocking_handler, blocking=True) + bus.subscribe(EventA, ff_handler, blocking=False) + + await bus.publish(EventA(value="test")) + # Let fire-and-forget tasks complete + await asyncio.sleep(0.05) + + assert order == ["blocking", "fire_and_forget"] + + +@pytest.mark.asyncio +async def test_blocking_handler_exception_propagates(bus): + """Exceptions from blocking handlers propagate to the publisher.""" + + async def failing_handler(event): + raise ValueError("handler failed") + + bus.subscribe(EventA, failing_handler, blocking=True) + + with pytest.raises(ValueError, match="handler failed"): + await bus.publish(EventA(value="test")) + + +@pytest.mark.asyncio +async def test_fire_and_forget_handler_exception_caught(bus, logger): + """Exceptions from fire-and-forget handlers are caught and logged.""" + completed = asyncio.Event() + + async def failing_handler(event): + completed.set() + raise ValueError("ff handler failed") + + bus.subscribe(EventA, failing_handler, blocking=False) + await bus.publish(EventA(value="test")) + + await asyncio.wait_for(completed.wait(), timeout=1.0) + await asyncio.sleep(0.05) # Let _safe_execute finish + + logger.warning.assert_called_once() + assert "ff handler failed" in logger.warning.call_args[0][0] + + +@pytest.mark.asyncio +async def test_multiple_handlers(bus): + """Multiple handlers for the same event type are all invoked.""" + handler1 = AsyncMock(__qualname__="handler1") + handler2 = AsyncMock(__qualname__="handler2") + + bus.subscribe(EventA, handler1, blocking=True) + bus.subscribe(EventA, handler2, blocking=True) + + event = EventA(value="multi") + await bus.publish(event) + + handler1.assert_awaited_once_with(event) + handler2.assert_awaited_once_with(event) + + +@pytest.mark.asyncio +async def test_handler_receives_only_subscribed_type(bus): + """Handlers only receive events of the type they subscribed to.""" + handler_a = AsyncMock(__qualname__="handler_a") + handler_b = AsyncMock(__qualname__="handler_b") + + bus.subscribe(EventA, handler_a, blocking=True) + bus.subscribe(EventB, handler_b, blocking=True) + + await bus.publish(EventA(value="a")) + + handler_a.assert_awaited_once() + handler_b.assert_not_awaited() diff --git a/tests/unit/application/__init__.py b/tests/unit/application/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/application/events/__init__.py b/tests/unit/application/events/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/application/events/test_configuration_events.py b/tests/unit/application/events/test_configuration_events.py new file mode 100644 index 0000000..525e659 --- /dev/null +++ b/tests/unit/application/events/test_configuration_events.py @@ -0,0 +1,59 @@ +"""Unit tests for ConfigurationUpdatedEvent.""" + +import unittest +import uuid + +from edge_mining.application.events.configuration_events import ConfigurationAction, ConfigurationUpdatedEvent, ConfigurationUpdatedEventType +from edge_mining.domain.common import DomainEvent, EntityId + + +class TestConfigurationUpdatedEvent(unittest.TestCase): + """Test cases for ConfigurationUpdatedEvent.""" + + def test_inherits_from_domain_event(self): + event = ConfigurationUpdatedEvent() + self.assertIsInstance(event, DomainEvent) + + def test_creation_with_properties(self): + entity_id = EntityId(uuid.uuid4()) + event = ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.ENERGY_MONITOR, + entity_id=entity_id, + action=ConfigurationAction.CREATED, + ) + self.assertEqual(event.entity_type, ConfigurationUpdatedEventType.ENERGY_MONITOR) + self.assertEqual(event.entity_id, entity_id) + self.assertEqual(event.action, ConfigurationAction.CREATED) + + def test_has_event_id_and_occurred_at(self): + event = ConfigurationUpdatedEvent() + self.assertIsNotNone(event.event_id) + self.assertIsNotNone(event.occurred_at) + + def test_event_type_property(self): + event = ConfigurationUpdatedEvent() + self.assertEqual(event.event_type, "ConfigurationUpdatedEvent") + + def test_to_dict_includes_all_fields(self): + entity_id = EntityId(uuid.uuid4()) + event = ConfigurationUpdatedEvent( + entity_type=ConfigurationUpdatedEventType.NOTIFIER, + entity_id=entity_id, + action=ConfigurationAction.REMOVED, + ) + result = event.to_dict() + self.assertEqual(result["entity_type"], ConfigurationUpdatedEventType.NOTIFIER) + self.assertEqual(result["action"], ConfigurationAction.REMOVED) + self.assertEqual(result["event_type"], "ConfigurationUpdatedEvent") + self.assertIn("event_id", result) + self.assertIn("occurred_at", result) + + def test_defaults(self): + event = ConfigurationUpdatedEvent() + self.assertEqual(event.entity_type, ConfigurationUpdatedEventType.UNKNOWN) + self.assertIsNone(event.entity_id) + self.assertEqual(event.action, ConfigurationAction.UNKNOWN) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/application/services/__init__.py b/tests/unit/application/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/application/services/test_configuration_event_flow.py b/tests/unit/application/services/test_configuration_event_flow.py new file mode 100644 index 0000000..cce89ea --- /dev/null +++ b/tests/unit/application/services/test_configuration_event_flow.py @@ -0,0 +1,267 @@ +"""Integration tests for ConfigurationService → EventBus → AdapterService flow.""" + +import uuid +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from edge_mining.adapters.infrastructure.event_bus.in_memory_event_bus import InMemoryEventBus +from edge_mining.application.events.configuration_events import ConfigurationAction, ConfigurationUpdatedEvent, ConfigurationUpdatedEventType +from edge_mining.application.services.adapter_service import AdapterService +from edge_mining.application.services.configuration_service import ConfigurationService +from edge_mining.domain.common import EntityId +from edge_mining.shared.infrastructure import PersistenceSettings + + +def make_entity_id(): + return EntityId(uuid.uuid4()) + + +@pytest.fixture +def logger(): + mock = MagicMock() + mock.debug = MagicMock() + mock.info = MagicMock() + mock.warning = MagicMock() + mock.error = MagicMock() + return mock + + +@pytest.fixture +def mock_event_bus(): + bus = AsyncMock() + bus.publish = AsyncMock() + return bus + + +@pytest.fixture +def mock_persistence(): + """Create a mock PersistenceSettings with all repos.""" + ps = MagicMock(spec=PersistenceSettings) + + # Each repo mock needs get_by_id, add, update, remove, get_all + for repo_name in [ + "external_service_repo", "energy_source_repo", "energy_monitor_repo", + "miner_repo", "miner_controller_repo", "policy_repo", + "optimization_unit_repo", "forecast_provider_repo", + "home_forecast_provider_repo", "mining_performance_tracker_repo", + "notifier_repo", "settings_repo", + ]: + repo = MagicMock() + repo.get_all.return_value = [] + repo.get_by_id.return_value = None + setattr(ps, repo_name, repo) + + return ps + + +@pytest.fixture +def config_service(mock_persistence, mock_event_bus, logger): + return ConfigurationService( + persistence_settings=mock_persistence, + event_bus=mock_event_bus, + logger=logger, + ) + + +# --- Test that ConfigurationService publishes events --- + +@pytest.mark.asyncio +async def test_create_external_service_publishes_event(config_service, mock_event_bus): + """Creating an external service should publish a ConfigurationUpdatedEvent.""" + from edge_mining.shared.external_services.common import ExternalServiceAdapter + + mock_config = MagicMock() + mock_config.is_valid.return_value = True + + service = await config_service.create_external_service( + name="Test HA", + adapter_type=ExternalServiceAdapter.HOME_ASSISTANT_API, + config=mock_config, + ) + + mock_event_bus.publish.assert_awaited_once() + event = mock_event_bus.publish.call_args[0][0] + assert isinstance(event, ConfigurationUpdatedEvent) + assert event.entity_type == ConfigurationUpdatedEventType.EXTERNAL_SERVICE + assert event.action == ConfigurationAction.CREATED + assert event.entity_id == service.id + + +@pytest.mark.asyncio +async def test_create_energy_monitor_publishes_event(config_service, mock_event_bus): + """Creating an energy monitor should publish a ConfigurationUpdatedEvent.""" + from edge_mining.domain.energy.common import EnergyMonitorAdapter + + mock_config = MagicMock() + mock_config.is_valid.return_value = True + + monitor = await config_service.create_energy_monitor( + name="Solar Monitor", + adapter_type=EnergyMonitorAdapter.DUMMY_SOLAR, + config=mock_config, + ) + + mock_event_bus.publish.assert_awaited_once() + event = mock_event_bus.publish.call_args[0][0] + assert isinstance(event, ConfigurationUpdatedEvent) + assert event.entity_type == ConfigurationUpdatedEventType.ENERGY_MONITOR + assert event.action == ConfigurationAction.CREATED + + +@pytest.mark.asyncio +async def test_update_notifier_publishes_event(config_service, mock_event_bus, mock_persistence): + """Updating a notifier should publish a ConfigurationUpdatedEvent.""" + from edge_mining.domain.notification.common import NotificationAdapter + from edge_mining.domain.notification.entities import Notifier + + notifier_id = make_entity_id() + existing = Notifier( + id=notifier_id, + name="Old", + adapter_type=NotificationAdapter.DUMMY, + config=MagicMock(), + ) + mock_persistence.notifier_repo.get_by_id.return_value = existing + + mock_config = MagicMock() + mock_config.is_valid.return_value = True + + await config_service.update_notifier( + notifier_id=notifier_id, + name="New", + config=mock_config, + ) + + mock_event_bus.publish.assert_awaited_once() + event = mock_event_bus.publish.call_args[0][0] + assert event.entity_type == ConfigurationUpdatedEventType.NOTIFIER + assert event.action == ConfigurationAction.UPDATED + + +@pytest.mark.asyncio +async def test_remove_miner_controller_publishes_event(config_service, mock_event_bus, mock_persistence): + """Removing a miner controller should publish a ConfigurationUpdatedEvent.""" + from edge_mining.domain.miner.common import MinerControllerAdapter + from edge_mining.domain.miner.entities import MinerController + + ctrl_id = make_entity_id() + existing = MinerController( + id=ctrl_id, + name="Ctrl", + adapter_type=MinerControllerAdapter.DUMMY, + config=MagicMock(), + ) + mock_persistence.miner_controller_repo.get_by_id.return_value = existing + mock_persistence.miner_repo.get_by_controller_id.return_value = [] + + await config_service.remove_miner_controller(controller_id=ctrl_id) + + mock_event_bus.publish.assert_awaited_once() + event = mock_event_bus.publish.call_args[0][0] + assert event.entity_type == ConfigurationUpdatedEventType.MINER_CONTROLLER + assert event.action == ConfigurationAction.REMOVED + assert event.entity_id == ctrl_id + + +# --- Test end-to-end flow with real InMemoryEventBus --- + +@pytest.mark.asyncio +async def test_end_to_end_cache_invalidation(mock_persistence, logger): + """End-to-end: creating an energy monitor triggers cache invalidation in AdapterService.""" + from edge_mining.domain.energy.common import EnergyMonitorAdapter + + event_bus = InMemoryEventBus(logger) + + adapter_service = AdapterService( + energy_monitor_repo=mock_persistence.energy_monitor_repo, + miner_controller_repo=mock_persistence.miner_controller_repo, + notifier_repo=mock_persistence.notifier_repo, + forecast_provider_repo=mock_persistence.forecast_provider_repo, + home_forecast_provider_repo=mock_persistence.home_forecast_provider_repo, + mining_performance_tracker_repo=mock_persistence.mining_performance_tracker_repo, + external_service_repo=mock_persistence.external_service_repo, + event_bus=event_bus, + logger=logger, + ) + + config_service = ConfigurationService( + persistence_settings=mock_persistence, + event_bus=event_bus, + logger=logger, + ) + + # Pre-populate adapter cache with a fake entry + fake_id = make_entity_id() + adapter_service._instance_cache[fake_id] = MagicMock() + assert fake_id in adapter_service._instance_cache + + mock_config = MagicMock() + mock_config.is_valid.return_value = True + + # Create an energy monitor - this should trigger cache invalidation + monitor = await config_service.create_energy_monitor( + name="Test Monitor", + adapter_type=EnergyMonitorAdapter.DUMMY_SOLAR, + config=mock_config, + ) + + # The monitor's own ID should have been popped (even though it was just created, + # the handler tries to pop it from instance_cache) + # The fake_id should still be there since it's a different entity + assert fake_id in adapter_service._instance_cache + + +@pytest.mark.asyncio +async def test_external_service_update_clears_all_instance_cache(mock_persistence, logger): + """Updating an external service should clear the entire instance cache.""" + from edge_mining.shared.external_services.common import ExternalServiceAdapter + from edge_mining.shared.external_services.entities import ExternalService + + event_bus = InMemoryEventBus(logger) + + adapter_service = AdapterService( + energy_monitor_repo=mock_persistence.energy_monitor_repo, + miner_controller_repo=mock_persistence.miner_controller_repo, + notifier_repo=mock_persistence.notifier_repo, + forecast_provider_repo=mock_persistence.forecast_provider_repo, + home_forecast_provider_repo=mock_persistence.home_forecast_provider_repo, + mining_performance_tracker_repo=mock_persistence.mining_performance_tracker_repo, + external_service_repo=mock_persistence.external_service_repo, + event_bus=event_bus, + logger=logger, + ) + + config_service = ConfigurationService( + persistence_settings=mock_persistence, + event_bus=event_bus, + logger=logger, + ) + + # Pre-populate caches + svc_id = make_entity_id() + adapter_id = make_entity_id() + adapter_service._service_cache[svc_id] = MagicMock() + adapter_service._instance_cache[adapter_id] = MagicMock() + + # Stub the repo to return existing service + mock_config = MagicMock() + mock_config.is_valid.return_value = True + existing = ExternalService( + id=svc_id, + name="HA", + adapter_type=ExternalServiceAdapter.HOME_ASSISTANT_API, + config=mock_config, + ) + mock_persistence.external_service_repo.get_by_id.return_value = existing + + await config_service.update_external_service( + service_id=svc_id, + name="HA Updated", + config=mock_config, + ) + + # Service cache should have the entry removed + assert svc_id not in adapter_service._service_cache + # Instance cache should be fully cleared (conservative approach for external_service) + assert len(adapter_service._instance_cache) == 0 diff --git a/tests/unit/domain/test_events.py b/tests/unit/domain/test_events.py new file mode 100644 index 0000000..86bc097 --- /dev/null +++ b/tests/unit/domain/test_events.py @@ -0,0 +1,55 @@ +"""Unit tests for DomainEvent base class.""" + +import unittest +from dataclasses import dataclass +from datetime import datetime, timezone + +from edge_mining.domain.common import DomainEvent + + +@dataclass +class SampleEvent(DomainEvent): + """Concrete event for testing.""" + + name: str = "" + + +class TestDomainEvent(unittest.TestCase): + """Test cases for DomainEvent base class.""" + + def test_event_id_auto_generated(self): + event = SampleEvent(name="test") + self.assertIsInstance(event.event_id, str) + self.assertTrue(len(event.event_id) > 0) + + def test_event_id_unique(self): + event1 = SampleEvent(name="a") + event2 = SampleEvent(name="b") + self.assertNotEqual(event1.event_id, event2.event_id) + + def test_occurred_at_auto_generated_utc(self): + event = SampleEvent(name="test") + self.assertIsInstance(event.occurred_at, datetime) + self.assertEqual(event.occurred_at.tzinfo, timezone.utc) + + def test_event_type_returns_class_name(self): + event = SampleEvent(name="test") + self.assertEqual(event.event_type, "SampleEvent") + + def test_to_dict_serializes_correctly(self): + event = SampleEvent(name="test") + result = event.to_dict() + self.assertEqual(result["name"], "test") + self.assertEqual(result["event_type"], "SampleEvent") + self.assertIsInstance(result["occurred_at"], str) + self.assertIn("event_id", result) + + def test_to_dict_datetime_is_iso_format(self): + event = SampleEvent(name="test") + result = event.to_dict() + # Should be parseable as ISO + datetime.fromisoformat(result["occurred_at"]) + + +if __name__ == "__main__": + unittest.main()