diff --git a/main.py b/main.py index dd51fd5..7b783f6 100644 --- a/main.py +++ b/main.py @@ -288,7 +288,7 @@ async def cli_loop(sk, pk, chain, mempool, network): # Main entry point # ────────────────────────────────────────────── -async def run_node(port: int, connect_to: str | None, fund: int, datadir: str | None): +async def run_node(port: int, host: str, connect_to: str | None, fund: int, datadir: str | None): """Boot the node, optionally connect to a peer, then enter the CLI.""" sk, pk = create_wallet() @@ -326,7 +326,7 @@ async def on_peer_connected(writer): await writer.drain() logger.info("🔄 Sent state sync to new peer") - network.set_on_peer_connected(on_peer_connected) + network.register_on_peer_connected(on_peer_connected) await network.start(port=port, host=host) @@ -373,7 +373,7 @@ def main(): ) try: - asyncio.run(run_node(args.port, args.connect, args.fund, args.datadir)) + asyncio.run(run_node(args.port, args.host, args.connect, args.fund, args.datadir)) except KeyboardInterrupt: print("\nNode shut down.") diff --git a/minichain/p2p.py b/minichain/p2p.py index ee52d7d..3271598 100644 --- a/minichain/p2p.py +++ b/minichain/p2p.py @@ -43,7 +43,19 @@ def register_handler(self, handler_callback): raise ValueError("handler_callback must be callable") self._handler_callback = handler_callback - async def start(self, port: int = 9000): + def register_on_peer_connected(self, handler_callback): + if not callable(handler_callback): + raise ValueError("handler_callback must be callable") + self._on_peer_connected = handler_callback + + async def _notify_peer_connected(self, writer, error_message): + if self._on_peer_connected: + try: + await self._on_peer_connected(writer) + except Exception: + logger.exception(error_message) + + async def start(self, port: int = 9000, host: str = "127.0.0.1"): """Start listening for incoming peer connections on the given port.""" self._port = port self._server = await asyncio.start_server( @@ -80,11 +92,7 @@ async def connect_to_peer(self, host: str, port: int) -> bool: self._listen_to_peer(reader, writer, f"{host}:{port}") ) self._listen_tasks.append(task) - if self._on_peer_connected: - try: - await self._on_peer_connected(writer) - except Exception: - logger.exception("Network: Error during outbound peer sync") + await self._notify_peer_connected(writer, "Network: Error during outbound peer sync") logger.info("Network: Connected to peer %s:%d", host, port) return True except Exception as exc: @@ -103,11 +111,7 @@ async def _handle_incoming( self._peers.append((reader, writer)) task = asyncio.create_task(self._listen_to_peer(reader, writer, addr)) self._listen_tasks.append(task) - if self._on_peer_connected: - try: - await self._on_peer_connected(writer) - except Exception: - logger.exception("Network: Error during peer sync") + await self._notify_peer_connected(writer, "Network: Error during peer sync") def _validate_transaction_payload(self, payload): if not isinstance(payload, dict): @@ -206,7 +210,10 @@ def _validate_block_payload(self, payload): def _validate_message(self, message): if not isinstance(message, dict): return False - if set(message) != {"type", "data"}: + required_fields = {"type", "data"} + if not required_fields.issubset(set(message)): + return False + if not set(message).issubset(required_fields): return False msg_type = message.get("type") @@ -265,9 +272,6 @@ async def _listen_to_peer( except (json.JSONDecodeError, UnicodeDecodeError): logger.warning("Network: Malformed message from %s", addr) continue - if isinstance(data, dict): - data["_peer_addr"] = addr - if not self._validate_message(data): logger.warning("Network: Invalid message schema from %s", addr) continue @@ -278,6 +282,7 @@ async def _listen_to_peer( logger.info("Network: Duplicate %s ignored from %s", msg_type, addr) continue self._mark_seen(msg_type, payload) + data["_peer_addr"] = addr if self._handler_callback: try: