From 195f09d05f23156a68d5b0da238c42396399dd3a Mon Sep 17 00:00:00 2001 From: hqb Date: Sat, 21 Mar 2026 11:15:39 +0000 Subject: [PATCH 1/2] Fix P2P message validation and host parameter issue --- main.py | 6 +++--- minichain/p2p.py | 10 ++++++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/main.py b/main.py index dd51fd5..a72605c 100644 --- a/main.py +++ b/main.py @@ -288,7 +288,7 @@ async def cli_loop(sk, pk, chain, mempool, network): # Main entry point # ────────────────────────────────────────────── -async def run_node(port: int, connect_to: str | None, fund: int, datadir: str | None): +async def run_node(port: int, host: str, connect_to: str | None, fund: int, datadir: str | None): """Boot the node, optionally connect to a peer, then enter the CLI.""" sk, pk = create_wallet() @@ -326,7 +326,7 @@ async def on_peer_connected(writer): await writer.drain() logger.info("🔄 Sent state sync to new peer") - network.set_on_peer_connected(on_peer_connected) + network._on_peer_connected = on_peer_connected await network.start(port=port, host=host) @@ -373,7 +373,7 @@ def main(): ) try: - asyncio.run(run_node(args.port, args.connect, args.fund, args.datadir)) + asyncio.run(run_node(args.port, args.host, args.connect, args.fund, args.datadir)) except KeyboardInterrupt: print("\nNode shut down.") diff --git a/minichain/p2p.py b/minichain/p2p.py index ee52d7d..5e4950a 100644 --- a/minichain/p2p.py +++ b/minichain/p2p.py @@ -43,7 +43,7 @@ def register_handler(self, handler_callback): raise ValueError("handler_callback must be callable") self._handler_callback = handler_callback - async def start(self, port: int = 9000): + async def start(self, port: int = 9000, host: str = "127.0.0.1"): """Start listening for incoming peer connections on the given port.""" self._port = port self._server = await asyncio.start_server( @@ -206,7 +206,13 @@ def _validate_block_payload(self, payload): def _validate_message(self, message): if not isinstance(message, dict): return False - if set(message) != {"type", "data"}: + # Allow _peer_addr field added by _listen_to_peer + required_fields = {"type", "data"} + if not required_fields.issubset(set(message)): + return False + # Reject messages with unexpected fields (except _peer_addr) + allowed_fields = {"type", "data", "_peer_addr"} + if not set(message).issubset(allowed_fields): return False msg_type = message.get("type") From 7b238b22408553e1c316f5e9d71add80f7407e7d Mon Sep 17 00:00:00 2001 From: wsl_ub Date: Tue, 24 Mar 2026 23:08:07 +0800 Subject: [PATCH 2/2] Fix peer callback registration and message envelope validation --- main.py | 2 +- minichain/p2p.py | 33 ++++++++++++++++----------------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/main.py b/main.py index a72605c..7b783f6 100644 --- a/main.py +++ b/main.py @@ -326,7 +326,7 @@ async def on_peer_connected(writer): await writer.drain() logger.info("🔄 Sent state sync to new peer") - network._on_peer_connected = on_peer_connected + network.register_on_peer_connected(on_peer_connected) await network.start(port=port, host=host) diff --git a/minichain/p2p.py b/minichain/p2p.py index 5e4950a..3271598 100644 --- a/minichain/p2p.py +++ b/minichain/p2p.py @@ -43,6 +43,18 @@ def register_handler(self, handler_callback): raise ValueError("handler_callback must be callable") self._handler_callback = handler_callback + def register_on_peer_connected(self, handler_callback): + if not callable(handler_callback): + raise ValueError("handler_callback must be callable") + self._on_peer_connected = handler_callback + + async def _notify_peer_connected(self, writer, error_message): + if self._on_peer_connected: + try: + await self._on_peer_connected(writer) + except Exception: + logger.exception(error_message) + async def start(self, port: int = 9000, host: str = "127.0.0.1"): """Start listening for incoming peer connections on the given port.""" self._port = port @@ -80,11 +92,7 @@ async def connect_to_peer(self, host: str, port: int) -> bool: self._listen_to_peer(reader, writer, f"{host}:{port}") ) self._listen_tasks.append(task) - if self._on_peer_connected: - try: - await self._on_peer_connected(writer) - except Exception: - logger.exception("Network: Error during outbound peer sync") + await self._notify_peer_connected(writer, "Network: Error during outbound peer sync") logger.info("Network: Connected to peer %s:%d", host, port) return True except Exception as exc: @@ -103,11 +111,7 @@ async def _handle_incoming( self._peers.append((reader, writer)) task = asyncio.create_task(self._listen_to_peer(reader, writer, addr)) self._listen_tasks.append(task) - if self._on_peer_connected: - try: - await self._on_peer_connected(writer) - except Exception: - logger.exception("Network: Error during peer sync") + await self._notify_peer_connected(writer, "Network: Error during peer sync") def _validate_transaction_payload(self, payload): if not isinstance(payload, dict): @@ -206,13 +210,10 @@ def _validate_block_payload(self, payload): def _validate_message(self, message): if not isinstance(message, dict): return False - # Allow _peer_addr field added by _listen_to_peer required_fields = {"type", "data"} if not required_fields.issubset(set(message)): return False - # Reject messages with unexpected fields (except _peer_addr) - allowed_fields = {"type", "data", "_peer_addr"} - if not set(message).issubset(allowed_fields): + if not set(message).issubset(required_fields): return False msg_type = message.get("type") @@ -271,9 +272,6 @@ async def _listen_to_peer( except (json.JSONDecodeError, UnicodeDecodeError): logger.warning("Network: Malformed message from %s", addr) continue - if isinstance(data, dict): - data["_peer_addr"] = addr - if not self._validate_message(data): logger.warning("Network: Invalid message schema from %s", addr) continue @@ -284,6 +282,7 @@ async def _listen_to_peer( logger.info("Network: Duplicate %s ignored from %s", msg_type, addr) continue self._mark_seen(msg_type, payload) + data["_peer_addr"] = addr if self._handler_callback: try: