Source code for neurio.devices.physical.st.stm32

#!/user/bin/env python

"""
Author: Simon Narduzzi
Email: simon.narduzzi@csem.ch
Copyright: CSEM, 2023
Creation: 05.01.2024
Description: TODO
"""
import warnings
import tensorflow as tf
from neurio.devices.device import Device
import os
import sys
import time

skeleton_projects = {
    "STM32L4R9": {"project": "STM32L4R9",
                  "project_name": "STM32L4R9",
                  "project_build": "STM32L4R9"},
    'NUCLEO-H723ZG': {"project": "NUCLEO-H723ZG",
                      "project_name": "NUCLEO-H723ZG",
                      "project_build": "NUCLEO-H723ZG"},
}

ASSETS_DIR = os.path.join(os.path.dirname(__file__), "assets")



[docs]class STM32(Device): def __init__(self, port: any = "serial", device_identifier: str = "STM32", name: str = "STM32Base", log_dir: str = None, **kwargs): super().__init__(port, name, log_dir, **kwargs) self.verbose = kwargs.get("verbose", 0) self.device_identifier = device_identifier self.code_dir = os.path.join(self.log_dir, "c_project") os.makedirs(self.code_dir, exist_ok=True) # check for env variables self.__check__env__() def __check__env__(self): # check STM32CubeProgrammer is downlaoded if os.environ["STM32CUBEPROGRAMER_CLI_PATH"] is None: raise Exception( "STM32CubeProgramer path not found, please set the STM32CUBEPROGRAMER_PATH environment variable to the path of STM32CubeProgramer") if os.environ["STM32CUBEIDE_PATH"] is None: raise Exception( "STM32CubeIDE path not found, please set the STM32CUBEIDE_PATH environment variable to the path of STM32CubeIDE") if os.environ["X_CUBE_AI_PATH"] is None: raise Exception( "X-CUBE-AI path not found, please set the X_CUBE_AI_PATH environment variable to the path of X-CUBE-AI") self.programmer_cli_path = os.environ["STM32CUBEPROGRAMER_CLI_PATH"] self.x_cube_ai_path = os.environ["X_CUBE_AI_PATH"] self.stm32ai_exec = os.path.join(self.x_cube_ai_path, "Utilities", "mac", "stm32ai") self.stm32ai_running_lib_path = os.path.join(self.x_cube_ai_path, "scripts", "ai_runner") self.cube_ide_path = os.environ["STM32CUBEIDE_PATH"] sys.path.append(self.stm32ai_running_lib_path) def __prepare_model__(self, model, **kwargs): self.original_model = model #self.model_path = os.path.join(self.log_dir, "original_model.h5") #self.original_model.save(self.model_path) # convert to tflite: self.tflite_model_path = os.path.join(self.log_dir, "tflite_model.tflite") converter = tf.lite.TFLiteConverter.from_keras_model(model) tflite_model = converter.convert() with open(self.tflite_model_path, "wb") as f: f.write(tflite_model) self.model_path = self.tflite_model_path def __generate_network_code__(self): # generate code for the network if self.verbose > 0: print("Generating model c files: {}".format(self.model_path)) compression = None command = " ".join( ["{} generate".format(self.stm32ai_exec), '-m', '{}'.format(self.model_path), '-o', '{}/{}'.format(ASSETS_DIR, skeleton_projects[self.device_identifier]["project"])]) if compression is not None: command += " ".join([' -c', '{}'.format(compression)]) if self.verbose > 0: print(command) os.system(command) def __compile_project__(self): # clean all print("Building elf...") command = " ".join([self.cube_ide_path, "-nosplash -application org.eclipse.cdt.managedbuilder.core.headlessbuild", '-data', '{}'.format(ASSETS_DIR), '-importAll', '{}/{}'.format(ASSETS_DIR, skeleton_projects[self.device_identifier]["project"]), '-cleanBuild', '{}'.format(skeleton_projects[self.device_identifier]["project_name"])]) print(command) os.system(command) def __generate_inference_code__(self): # code is already generated in assets. We need to select the right one # based on the device self.project = skeleton_projects[self.device_identifier]["project"] self.project_name = skeleton_projects[self.device_identifier]["project_name"] self.project_build = skeleton_projects[self.device_identifier]["project_build"] self.project_path = os.path.join(ASSETS_DIR, self.project) # generate code for the network self.__generate_network_code__() # compile the entire project self.__compile_project__() def __deploy_model__(self): print("Model is already deployed within the code") pass def __deploy_code__(self): # deploy the code if self.verbose > 0: print("Flashing...") command = " ".join([self.programmer_cli_path, '-c', 'port=SWD', '-w', '{}/{}/Debug/{}.elf'.format(ASSETS_DIR, skeleton_projects[self.device_identifier]["project_build"], skeleton_projects[self.device_identifier]["project_name"])] ) if self.verbose > 0: print(command) os.system(command) time.sleep(2) self.__reset_device__() def __reset_device__(self): command = " ".join([self.programmer_cli_path, '-c', 'port=SWD', 'mode=POWERDOWN', '-s', '0x08000000']) os.system(command) time.sleep(2) command = " ".join([self.programmer_cli_path, '-c', 'port=SWD', '-rst', '-s', '-hardRst']) os.system(command) if self.verbose > 0: print("Device reset") time.sleep(2) self.__connect_runner() def __prepare_data__(self, input_x, **kwargs): # default: convert to float32 input_x_processed = input_x.astype("float32") return input_x_processed def __connect_runner(self): try: from stm_ai_runner import AiRunner # imported from X-CUBE-AI path except ImportError: raise Exception( "stm_ai_runner not found, please set the X_CUBE_AI_PATH environment variable to the path of X-CUBE-AI") runner = AiRunner(debug=True) # Connect to device if not runner.connect(self.port): raise RuntimeError('runtime "{}" is not connected'.format(self.port)) print(runner, flush=True) runner.summary() self.runner = runner def __transfer_data_to_memory__(self, input_x): # Initialize Ai Runner warnings.warn("Data is transfered during inference, no need to transfer it before") self.inputs = input_x def __run_inference__(self, profile: bool = True): if self.runner is None: raise Exception("Runner is not initialized") # load the dataset inputs = self.inputs predictions, profiler = self.runner.invoke(inputs) self.tmp_predictions = predictions self.tmp_profiler = profiler def __read_inference_results__(self): warnings.warn("Data is send back by AiRunner after inference, no need to transfer it after") return self.tmp_predictions, self.tmp_profiler
[docs] def is_alive(self, timeout: int = 20) -> bool: pass
def __str__(self): pass
[docs]class STM32L4R9(STM32): def __init__(self, port: any = "serial", name: str = "STM32L4R9I-DISCO", log_dir: str = None, **kwargs): super().__init__(port=port, device_identifier="STM32L4R9", name=name, log_dir=log_dir)
[docs]class NUCLEOH723ZG(STM32): def __init__(self, port: any = "serial", name: str = "NUCLEO-H723ZG", log_dir: str = None, **kwargs): super().__init__(port=port, device_identifier="NUCLEO-H723ZG", name=name, log_dir=log_dir)