From ce846dd9346e14e7491e103b2eac54ebdb15dcf1 Mon Sep 17 00:00:00 2001 From: Cbampeta Date: Mon, 16 Mar 2026 00:10:14 +0100 Subject: [PATCH 1/5] fix: fixinig some error and logic in launching ia --- src/high_level/src/high_level/backend.py | 37 ++++++++++++++---------- src/high_level/src/high_level/server.py | 7 +++-- src/high_level/src/programs/car.py | 3 +- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/high_level/src/high_level/backend.py b/src/high_level/src/high_level/backend.py index 646f9a0..f2859c7 100644 --- a/src/high_level/src/high_level/backend.py +++ b/src/high_level/src/high_level/backend.py @@ -273,22 +273,27 @@ async def start_ai_with_model(req: Request) -> dict[str, Any]: programs = getattr(self.server, "programs", []) ai_prog = next( - (p for p in programs if type(p).__name__ == "AIProgram"), None + ( + i + for i in range(len(programs)) + if type(programs[i]).__name__ == "AIProgram" + ), + None, ) if ai_prog is None: raise HTTPException(status_code=404, detail="AI program not found") - ai_prog.start(model_give=model) + self.server.start_process(ai_prog, attr=model) return {"status": "ok", "model": model} @self.app.get("/api/programs") - def programs(): + def programs() -> List[Dict[str, Any]]: return self._list_programs() @self.app.post("/api/programs/{prog_id}/toggle") - def toggle_program(prog_id: int): + def toggle_program(prog_id: int) -> dict[str, Any]: programs = getattr(self.server, "programs", []) if not isinstance(programs, list) or not (0 <= prog_id < len(programs)): raise HTTPException(status_code=404, detail="Unknown program id") @@ -302,7 +307,7 @@ def toggle_program(prog_id: int): } @self.app.post("/api/programs/{prog_id}/start") - def start_program(prog_id: int): + def start_program(prog_id: int) -> dict[str, Any]: programs = getattr(self.server, "programs", []) if not isinstance(programs, list) or not (0 <= prog_id < len(programs)): raise HTTPException(status_code=404, detail="Unknown program id") @@ -315,7 +320,7 @@ def start_program(prog_id: int): return {"status": "ok", "program_id": prog_id} @self.app.post("/api/programs/{prog_id}/kill") - def kill_program(prog_id: int): + def kill_program(prog_id: int) -> dict[str, Any]: programs = getattr(self.server, "programs", []) if not isinstance(programs, list) or not (0 <= prog_id < len(programs)): raise HTTPException(status_code=404, detail="Unknown program id") @@ -329,19 +334,19 @@ def kill_program(prog_id: int): return {"status": "ok", "program_id": prog_id} @self.app.get("/api/stream/camera") - def camera_stream(): + def camera_stream() -> dict[str, str]: # give the URL. return {"url": self._camera_stream_url()} @self.app.get("/api/lidar") - def lidar_snapshot(): + def lidar_snapshot() -> dict[str, Any]: data = self._get_lidar_ranges() if data is None: raise HTTPException(status_code=503, detail="Lidar not available") return data @self.app.get("/api/lidar_init") - def lidar_init(): + def lidar_init() -> dict[str, Any]: lidar = self._lidar() if not lidar: raise HTTPException(status_code=503, detail="Lidar not available") @@ -362,7 +367,7 @@ def lidar_init(): } @self.app.websocket("/api/lidar/ws") - async def lidar_ws(ws: WebSocket): + async def lidar_ws(ws: WebSocket) -> None: await ws.accept() self.logger.info("Lidar WS client connected") @@ -389,7 +394,7 @@ async def telemetry_ws(ws: WebSocket): self.logger.info("Telemetry WS client disconnected") @self.app.post("/api/camera_matrix/start") - def start_camera_matrix(): + def start_camera_matrix() -> dict[str, str]: if self._camera_matrix_running: return {"status": "already_running"} @@ -403,7 +408,7 @@ def start_camera_matrix(): return {"status": "started"} @self.app.post("/api/camera_matrix/stop") - def stop_camera_matrix(): + def stop_camera_matrix() -> dict[str, str]: self._camera_matrix_running = False return {"status": "stopped"} @@ -431,7 +436,7 @@ async def camera_matrix_ws(ws: WebSocket): # ---------------------------- # Program interface # ---------------------------- - def start(self): + def start(self) -> None: if self.running: return self.running = True @@ -442,7 +447,7 @@ def start(self): ) self._uvicorn_server = uvicorn.Server(config) - def _run(): + def _run() -> None: try: self.logger.info("BackendAPI starting on %s:%d", self.host, self.port) assert self._uvicorn_server is not None @@ -455,7 +460,7 @@ def _run(): self._thread = threading.Thread(target=_run, daemon=True) self._thread.start() - def kill(self): + def kill(self) -> None: if not self.running: return self.logger.info("BackendAPI stopping...") @@ -477,6 +482,6 @@ def kill(self): self.running = False self.logger.info("BackendAPI stopped") - def display(self): + def display(self) -> str: name = self.__class__.__name__ return f"{name}\n(running)" if self.running else name diff --git a/src/high_level/src/high_level/server.py b/src/high_level/src/high_level/server.py index 6a3a01d..7886390 100644 --- a/src/high_level/src/high_level/server.py +++ b/src/high_level/src/high_level/server.py @@ -199,7 +199,7 @@ def bouton_entre(self, num=None): # Processus # --------------------------------------------------------------------------------------------------- - def start_process(self, number_program): + def start_process(self, number_program, attr=None) -> None: """Starts the program referenced by its number: if it is a program that controls the car, it kills the old program that was controlling, otherwise the program is started or stopped depending on whether it was already running or stopped before""" @@ -224,7 +224,10 @@ def start_process(self, number_program): elif self.programs[number_program].controls_car: self.programs[self.last_program_control].kill() - self.programs[number_program].start() + if attr is not None: + self.programs[number_program].start(attr) + else: + self.programs[number_program].start() self.log.warning( "Car control changed: %s -> %s", type(self.programs[self.last_program_control]).__name__, diff --git a/src/high_level/src/programs/car.py b/src/high_level/src/programs/car.py index ac99214..5093eca 100644 --- a/src/high_level/src/programs/car.py +++ b/src/high_level/src/programs/car.py @@ -92,7 +92,7 @@ def __init__(self, driving_strategy, server, model) -> None: # Initialize AI session try: - self.ai_session = ort.InferenceSession(MODEL_PATH) + self.ai_session = ort.InferenceSession(MODEL_PATH + "/" + model) self.log.info("AI session initialized successfully") except Exception as e: self.log.error(f"Error initializing AI session: {e}") @@ -219,7 +219,6 @@ def run(self) -> None: try: if self.GR86 is not None: self.GR86.main() - print("lolooibiiuib : " + self.running.__str__()) except Exception as e: self.log.error(f"AI error: {e}") self.running = False From ec8bf74d718d93795a540a9a1a848a017cade216 Mon Sep 17 00:00:00 2001 From: Nikopol Date: Thu, 19 Mar 2026 15:58:15 +0100 Subject: [PATCH 2/5] feat: adding backward function in the ai script --- src/high_level/src/programs/car.py | 44 ++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/src/high_level/src/programs/car.py b/src/high_level/src/programs/car.py index 5093eca..597cdcb 100644 --- a/src/high_level/src/programs/car.py +++ b/src/high_level/src/programs/car.py @@ -26,6 +26,24 @@ from .program import Program from .utils import Driver +def too_close(lidar, dir): + R = 0.83 + length = len(lidar) + straight = lidar[length // 2] + if dir: + nearest = min(lidar[length // 2 :]) + else: + nearest = min(lidar[: length // 2]) + + cos = nearest / straight + + # I don't know why sometimes this happens + if cos < -1 or cos > 1: + return True + + theta = np.arccos(cos) + L = R * (1 - np.sin(theta)) + return nearest < L class Border_zone: ZONE1 = [0, 370] @@ -38,6 +56,7 @@ def __init__(self, server) -> None: self.log = logging.getLogger(__name__) self.server = server self.crashed = False + self.state = 0 # Load reference lidar contour once try: self.reference_lidar = np.load( @@ -131,6 +150,24 @@ def turn_around(self) -> None: if self.server.camera_red_or_green.is_reverse: self.turn_around() + def back(self): + # if wall on "dir": turn to "dir" and reverse until able to move forward (wall distance to verify) + lidar, cam = self.lidar.r_distance, self.camera.camera_matrix + S = sum(cam) + dir = S > 0 + if dir: + self.direction = 18 + if too_close(lidar, dir): + self.target_speed = -2 + else: + self.state = 0 + else: + self.direction = -18 + if too_close(lidar, dir): + self.target_speed = -2 + else: + self.state = 0 + def main(self) -> None: # retrieve lidar data. We only take the first 1080 values and ignore the last one for simplicity for the ai if self.camera is None or self.lidar is None: @@ -149,6 +186,13 @@ def main(self) -> None: lidar_data_ai, self.camera.camera_matrix() ) # the ai takes distances in meters not in mm self.log.debug(f"Min Lidar: {min(lidar_data)}, Max Lidar: {max(lidar_data)}") + + if self.server.crash_car.crashed: + self.state = 1 + + if self.state == 1: + self.back() + """ if self.camera.is_running_in_reversed(): self.reverse_count += 1 From baf0b326dc5e956982192cfe0737e862099d1381 Mon Sep 17 00:00:00 2001 From: Cbampeta Date: Thu, 19 Mar 2026 16:21:27 +0100 Subject: [PATCH 3/5] feat test de fou --- src/high_level/src/high_level/server.py | 2 + src/high_level/src/programs/__init__.py | 2 + src/high_level/src/programs/car.py | 6 +- src/high_level/src/programs/test_recule.py | 88 ++++++++++++++++++++++ 4 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 src/high_level/src/programs/test_recule.py diff --git a/src/high_level/src/high_level/server.py b/src/high_level/src/high_level/server.py index 7886390..680938d 100644 --- a/src/high_level/src/high_level/server.py +++ b/src/high_level/src/high_level/server.py @@ -22,6 +22,7 @@ from programs.remote_control import RemoteControl from programs.ssh_program import SshProgram from programs.utils.ssh import check_ssh_connections +from programs import Test_recule from .backend import BackendAPI @@ -66,6 +67,7 @@ def __init__(self): # ProgramStreamCamera(self), BackendAPI(self, host="0.0.0.0", port=8001, site_dir=SITE_DIR_BACKEND), Poweroff(), + Test_recule(self), ] self.initialization_module = self.programs[ 1 diff --git a/src/high_level/src/programs/__init__.py b/src/high_level/src/programs/__init__.py index 25c282b..d62317b 100644 --- a/src/high_level/src/programs/__init__.py +++ b/src/high_level/src/programs/__init__.py @@ -5,6 +5,7 @@ from .program import Program # in first because other programs depend on it from .remote_control import RemoteControl from .ssh_program import SshProgram +from .test_recule import Test_recule __all__ = [ "Program", @@ -19,4 +20,5 @@ "StreamOutput", "frame_buffer", "CrashCar", + "Test_recule", ] diff --git a/src/high_level/src/programs/car.py b/src/high_level/src/programs/car.py index 597cdcb..6ed2ede 100644 --- a/src/high_level/src/programs/car.py +++ b/src/high_level/src/programs/car.py @@ -26,6 +26,7 @@ from .program import Program from .utils import Driver + def too_close(lidar, dir): R = 0.83 length = len(lidar) @@ -45,6 +46,7 @@ def too_close(lidar, dir): L = R * (1 - np.sin(theta)) return nearest < L + class Border_zone: ZONE1 = [0, 370] ZONE2 = [371, 750] @@ -167,7 +169,7 @@ def back(self): self.target_speed = -2 else: self.state = 0 - + def main(self) -> None: # retrieve lidar data. We only take the first 1080 values and ignore the last one for simplicity for the ai if self.camera is None or self.lidar is None: @@ -189,7 +191,7 @@ def main(self) -> None: if self.server.crash_car.crashed: self.state = 1 - + if self.state == 1: self.back() diff --git a/src/high_level/src/programs/test_recule.py b/src/high_level/src/programs/test_recule.py new file mode 100644 index 0000000..2f2f4cc --- /dev/null +++ b/src/high_level/src/programs/test_recule.py @@ -0,0 +1,88 @@ +from programs import Program +import numpy as np +import logging +import threading + + +def too_close(lidar, dir): + R = 0.83 + length = len(lidar) + straight = lidar[length // 2] + if dir: + nearest = min(lidar[length // 2 :]) + else: + nearest = min(lidar[: length // 2]) + + cos = nearest / straight + + # I don't know why sometimes this happens + if cos < -1 or cos > 1: + return True + + theta = np.arccos(cos) + L = R * (1 - np.sin(theta)) + return nearest < L + + +class Test_recule(Program): + def __init__(self, server): + super().__init__() + self.server = server + self.controls_car = True + self.speed = 0 + self.dir = 0 + self.state = 0 + self.running = False + + @property + def target_speed(self) -> float: + return self.speed + + @property + def direction(self): + return self.dir + + @property + def lidar(self): + return self.server.lidar + + @property + def camera(self): + return self.server.camera + + def back(self): + # if wall on "dir": turn to "dir" and reverse until able to move forward (wall distance to verify) + lidar, cam = self.lidar.r_distance, self.camera.camera_matrix + S = sum(cam) + dir = S > 0 + if dir: + self.dir = 18 + if too_close(lidar, dir): + self.speed = -2 + else: + self.state = 0 + else: + self.dir = -18 + if too_close(lidar, dir): + self.speed = -2 + else: + self.state = 0 + + def main(self) -> None: + while self.running: + if self.server.crash_car.crashed: + self.state = 1 + + if self.state == 1: + self.back() + else: + self.dir = 0 + self.speed = 0 + + def start(self): + self.running = False + threading.Thread(target=self.main, daemon=True).start() + + def kill(self): + self.running = True + self.speed = 0 From f0d4be819de539328b957853e3d2e33a9a2a8511 Mon Sep 17 00:00:00 2001 From: Cbampeta Date: Fri, 20 Mar 2026 18:30:32 +0100 Subject: [PATCH 4/5] fix: some error and improve --- src/high_level/scripts/min_lidar.py | 2 +- src/high_level/src/drivers/master_i2c.py | 2 +- src/high_level/src/programs/camera_proxy.py | 2 +- src/high_level/src/programs/car.py | 7 ++- src/high_level/src/programs/test_recule.py | 59 ++++++++++++--------- 5 files changed, 41 insertions(+), 31 deletions(-) diff --git a/src/high_level/scripts/min_lidar.py b/src/high_level/scripts/min_lidar.py index 0de5db9..f5a847c 100644 --- a/src/high_level/scripts/min_lidar.py +++ b/src/high_level/scripts/min_lidar.py @@ -5,7 +5,7 @@ import numpy as np -from src.driver.lidar import Lidar +from src.drivers.lidar import Lidar IP = "192.168.0.10" PORT = 10940 diff --git a/src/high_level/src/drivers/master_i2c.py b/src/high_level/src/drivers/master_i2c.py index 7b2d2b0..3309f50 100644 --- a/src/high_level/src/drivers/master_i2c.py +++ b/src/high_level/src/drivers/master_i2c.py @@ -48,7 +48,7 @@ def start_send(self): float(self.serveur.direction), ) self.bus.write_i2c_block_data(SLAVE_ADDRESS, 0, list(data)) - time.sleep(1e-4) # Short delay to prevent overwhelming the bus + time.sleep(1e-3) # Short delay to prevent overwhelming the bus except Exception as e: self.log.error("I2C write error: %s", e, exc_info=True) time.sleep(I2C_SLEEP_ERROR_LOOP) diff --git a/src/high_level/src/programs/camera_proxy.py b/src/high_level/src/programs/camera_proxy.py index a14a3ed..bb8abcc 100644 --- a/src/high_level/src/programs/camera_proxy.py +++ b/src/high_level/src/programs/camera_proxy.py @@ -17,7 +17,7 @@ DEBUG_DIR_wayfinding = "Debug_Wayfinding" # Directory for wayfinding debug images COLOUR_KEY = {"green": 1, "red": -1, "none": 0} COLOR_THRESHOLD = 20 # Threshold for color intensity difference -Y_OFFSET = -80 # Offset for the y-axis in the image +Y_OFFSET = -40 # Offset for the y-axis in the image class CameraProxy: diff --git a/src/high_level/src/programs/car.py b/src/high_level/src/programs/car.py index 6ed2ede..dc851be 100644 --- a/src/high_level/src/programs/car.py +++ b/src/high_level/src/programs/car.py @@ -59,6 +59,7 @@ def __init__(self, server) -> None: self.server = server self.crashed = False self.state = 0 + self.deadzone = 20 # Load reference lidar contour once try: self.reference_lidar = np.load( @@ -89,7 +90,9 @@ def has_Crashed(self) -> bool: self.crashed = False else: # Points that are inside the vehicle contour - penetration_mask = (current > 0) & (current < self.reference_lidar) + penetration_mask = (current > 0) & ( + current < self.reference_lidar + self.deadzone + ) penetration_count = np.sum(penetration_mask) @@ -271,7 +274,7 @@ def run(self) -> None: raise def initializeai(self, model: str) -> None: - self.driver = Driver(128, 128) + self.driver = Driver(1, 1024) self.driver.load_model(model) # self.GR86 = Car(self.driver.ai, self.server, model) diff --git a/src/high_level/src/programs/test_recule.py b/src/high_level/src/programs/test_recule.py index 2f2f4cc..f8af8fd 100644 --- a/src/high_level/src/programs/test_recule.py +++ b/src/high_level/src/programs/test_recule.py @@ -2,31 +2,13 @@ import numpy as np import logging import threading - - -def too_close(lidar, dir): - R = 0.83 - length = len(lidar) - straight = lidar[length // 2] - if dir: - nearest = min(lidar[length // 2 :]) - else: - nearest = min(lidar[: length // 2]) - - cos = nearest / straight - - # I don't know why sometimes this happens - if cos < -1 or cos > 1: - return True - - theta = np.arccos(cos) - L = R * (1 - np.sin(theta)) - return nearest < L +import time class Test_recule(Program): def __init__(self, server): super().__init__() + self.log = logging.getLogger(__name__) self.server = server self.controls_car = True self.speed = 0 @@ -50,27 +32,51 @@ def lidar(self): def camera(self): return self.server.camera + def too_close(self, lidar, dir): + R = 0.83 + length = len(lidar) + straight = lidar[length // 2] + lidar2 = [10000 if p == 0 else p for p in lidar] + if dir: + nearest = min(lidar2[length // 2 : 300]) + else: + nearest = min(lidar2[300 : length // 2]) + + cos = nearest / straight + + # I don't know why sometimes this happens + if cos < -1 or cos > 1: + return True + + theta = np.arccos(cos) + L = R * (1 - np.sin(theta)) + self.log.error(f"L:{L}") + return nearest < L * 1000 + def back(self): # if wall on "dir": turn to "dir" and reverse until able to move forward (wall distance to verify) - lidar, cam = self.lidar.r_distance, self.camera.camera_matrix + lidar, cam = self.lidar.r_distance, self.camera.camera_matrix() S = sum(cam) dir = S > 0 if dir: self.dir = 18 - if too_close(lidar, dir): - self.speed = -2 + if self.too_close(lidar, dir): + self.speed = -6 else: self.state = 0 + self.log.info("crashed finish") else: self.dir = -18 - if too_close(lidar, dir): - self.speed = -2 + if self.too_close(lidar, dir): + self.speed = -6 else: self.state = 0 + self.log.info("crashed finish") def main(self) -> None: while self.running: if self.server.crash_car.crashed: + self.log.info("crash_car.crashed") self.state = 1 if self.state == 1: @@ -78,9 +84,10 @@ def main(self) -> None: else: self.dir = 0 self.speed = 0 + time.sleep(0.01) def start(self): - self.running = False + self.running = True threading.Thread(target=self.main, daemon=True).start() def kill(self): From 321eb896ae6f9ca27431cddf58317eacc3563070 Mon Sep 17 00:00:00 2001 From: Cbampeta Date: Fri, 20 Mar 2026 19:01:27 +0100 Subject: [PATCH 5/5] fix: some code for chosing how to run the ai --- src/high_level/src/programs/car.py | 27 ++++++++++---------- src/high_level/src/programs/utils/driver.py | 28 ++++++++++++++++++--- 2 files changed, 37 insertions(+), 18 deletions(-) diff --git a/src/high_level/src/programs/car.py b/src/high_level/src/programs/car.py index dc851be..1d58a55 100644 --- a/src/high_level/src/programs/car.py +++ b/src/high_level/src/programs/car.py @@ -5,7 +5,6 @@ from typing import Optional import numpy as np -import onnxruntime as ort from drivers import Lidar from drivers.camera import Camera @@ -107,21 +106,12 @@ def has_Crashed(self) -> bool: class Car: - def __init__(self, driving_strategy, server, model) -> None: + def __init__(self, driving_strategy, server) -> None: self.log = logging.getLogger(__name__) self.target_speed = 0 # Speed in millimeters per second self.direction = 0 # Steering angle in degrees self.server = server self.reverse_count = 0 - - # Initialize AI session - try: - self.ai_session = ort.InferenceSession(MODEL_PATH + "/" + model) - self.log.info("AI session initialized successfully") - except Exception as e: - self.log.error(f"Error initializing AI session: {e}") - raise - self.driving = driving_strategy self.log.info("Car initialization complete") @@ -277,9 +267,18 @@ def initializeai(self, model: str) -> None: self.driver = Driver(1, 1024) self.driver.load_model(model) - # self.GR86 = Car(self.driver.ai, self.server, model) - self.GR86 = Car(self.driver.omniscent, self.server, model) - # self.GR86 = Car(self.driver.simple_minded, self.server, model) + nb_inputs = self.driver.get_nb_inputs() + + if nb_inputs == 1: + self.log.info("Model uses 1 input -> selecting lidar driver") + driving_strategy = self.driver.ai + elif nb_inputs == 2: + self.log.info("Model uses 2 inputs -> selecting lidar + camera driver") + driving_strategy = self.driver.omniscent + else: + raise ValueError(f"Unsupported number of model inputs: {nb_inputs}") + + self.GR86 = Car(driving_strategy, self.server) def start(self, model_give: Optional[str] = None) -> None: diff --git a/src/high_level/src/programs/utils/driver.py b/src/high_level/src/programs/utils/driver.py index f910dae..cce18f6 100644 --- a/src/high_level/src/programs/utils/driver.py +++ b/src/high_level/src/programs/utils/driver.py @@ -1,6 +1,6 @@ import logging import math -from typing import Tuple, cast +from typing import Any, Tuple, cast import numpy as np import onnxruntime as ort @@ -25,6 +25,8 @@ def __init__(self, context_size=0, horizontal_size=0): self._loaded = False self.ai_session: InferenceSession self.context: np.ndarray + self.input_infos = None + self.nb_inputs = 0 if self.log.isEnabledFor(logging.DEBUG): self.fig, self.ax = plt.subplots(4, 1, figsize=(10, 8)) @@ -63,16 +65,33 @@ def load_model(self, model: str): return self.log.info("Loading AI model...") self.ai_session = ort.InferenceSession(MODEL_PATH + "/" + model) + self.input_infos = self.ai_session.get_inputs() + self.nb_inputs = len(self.input_infos) + + for i, info in enumerate(self.input_infos): + self.log.info( + f"Input {i}: name={info.name}, shape={info.shape}, type={info.type}" + ) + self.context = np.zeros( - [2, self.context_size, self.horizontal_size], dtype=np.float32 - ) + [2, self.context_size, self.horizontal_size], dtype=np.float32) + + self._loaded = True + self.log.info(f"AI model loaded with {self.nb_inputs} input(s)") + # self.context = np.zeros( + # [2, self.context_size, self.horizontal_size], dtype=np.float32 + # ) self._loaded = True self.log.info("AI model loaded") - + def get_nb_inputs(self) -> int: + if not self._loaded: + raise RuntimeError("Driver not initialized (AI model not loaded)") + return self.nb_inputs def reset(self): self.context = np.zeros( [2, self.context_size, self.horizontal_size], dtype=np.float32 ) + pass def omniscent( self, lidar_data: np.ndarray, camera_data: np.ndarray @@ -229,3 +248,4 @@ def farthest_distants(self, lidar_data: np.ndarray) -> Tuple[float, float]: steering_angle = 0.0 return steering_angle, speed + def \ No newline at end of file