Source code for neurio.devices.physical.canaan.kendryte

#!/user/bin/env python

Author: Simon Narduzzi
Copyright: CSEM, 2022
Creation: 21.09.22
Description: K210 deployment pipeline
import datetime
from typing import Tuple, Optional

import sys

from neurio.common import Prediction, Profiler
from neurio.devices.device import Device
import os
from tqdm import tqdm
import numpy as np
import json
from PIL import Image
from neurio.converters.kendryte_utils import convert_to_kmodel
from neurio.converters.tflite_utils import keras_to_tflite
from neurio.exceptions import InvalidImageRangeError
import tensorflow as tf
import time
import pexpect

python_version = sys.version_info
PYTHON_VERSION = "{}.{}.{}".format(python_version.major, python_version.minor, python_version.micro)

[docs]class K210(Device): def __init__(self, port: any, name: str = "k210", log_dir: str = None, **kwargs): super().__init__(port, name, log_dir, **kwargs) self.original_model = None self.kmodel = None self.tflite_model = None self.verbose = kwargs.get("verbose", 0) self.device_storage_location = kwargs.get("device_storage_location", "/sd") self.rshell_session = None self.rshell_session = self.__create_rshell_session__(port) self.code_dir = os.path.join(self.log_dir, "code") os.makedirs(self.code_dir, exist_ok=True) def __prepare_model__(self, model: tf.keras.models.Model, **kwargs): """ Prepare the model for deployment. This includes converting it to a tflite model and then to a kmodel. :param model: Model to be prepared :param kwargs: Other parameters """ self.models_dir = os.path.join(self.log_dir, "models") os.makedirs(self.models_dir, exist_ok=True) self.calibration_dir = os.path.join(self.log_dir, "calibration") os.makedirs(self.calibration_dir, exist_ok=True) # save calibration directory passed in kwargs if "calibration_data" in kwargs: for data in kwargs["calibration_data"]: img, label = data, "{}.bmp".format(label))) tflite_model = keras_to_tflite(model, tflite_path=os.path.join(self.models_dir, + ".tflite")) compilation_options = {"target": "k210", "dataset": os.path.abspath(self.calibration_dir)} kmodel_file = convert_to_kmodel(tflite_model, options=compilation_options, tool_options={"nncase_version": NNCASE_VERSION, "python_version": PYTHON_VERSION}, verbose=self.verbose) self.kmodel_path = os.path.abspath(kmodel_file) # get input and output shapes of the model self.input_shapes = model.input_shape self.output_shapes = model.output_shape def __generate_inference_code__(self): """ Generate the inference code for the device. The script will be generated in MicroPython and stored in the "upload" directory of the logs. """ # edit to fit data and model dir_path = os.path.dirname(os.path.realpath(__file__)) skeleton_dir = os.path.join(dir_path, "assets", "k210", "skeletons", "v{}".format(NNCASE_VERSION)) with open(os.path.join(skeleton_dir, ""), "r") as f: file = file = file.replace("DEBUG = True", "DEBUG = {}".format(str(self.verbose).title())) file = file.replace("IMG_WIDTH = 224", "IMG_WIDTH = {}".format(self.input_shapes[0])) file = file.replace("IMG_HEIGHT = 224", "IMG_HEIGHT = {}".format(self.input_shapes[1])) file = file.replace("KPU_OUTPUT = [0, 1, 1, 5]", "KPU_OUTPUT = {}".format(self.output_shapes)) # sfile = file.replace('SD_FOLDER_DATA = "/sd/data"', 'SD_FOLDER_DATA = "{}"'.format("/sd/img")) file = file.replace('SD_RESULT_FILE = "/sd/results.txt"', 'SD_RESULT_FILE = "{}"'.format("{}/results.txt".format(self.device_storage_location))) file = file.replace('MODEL_PATH = "/sd/test.kmodel"', 'MODEL_PATH = "{}"'.format( "{}/{}".format(self.device_storage_location, os.path.basename(self.kmodel_path)))) # copy skeletons of and with open(os.path.join(self.code_dir, ""), "w") as f: f.write(file) with open(os.path.join(skeleton_dir, ""), "r") as p: prepare_content = with open(os.path.join(self.code_dir, ""), "w") as f: f.write(prepare_content) def __deploy_model__(self): """ Deploy the model on the device by copying it to the device storage location (usually /sd). The model is uploaded via rshell (UART connection), and the upload time is estimated based on the size of the model and the upload speed. This can take a while, depending on the size of the model. """ if self.verbose > 0: print("Size of model: {} MB".format(os.path.getsize(self.kmodel_path) / 1024 / 1024)) upload_speed = 14 # KB/s if self.verbose > 0: print("Upload speed: {} KB/s".format(upload_speed)) estimated_upload_time = os.path.getsize(self.kmodel_path) / 1024 / upload_speed if self.verbose > 0: print("Estimated upload time: {} s".format(estimated_upload_time)) upload_start_time = time.time() self.__execute_command_sync__("cp {} {}/".format(self.kmodel_path, self.device_storage_location), timeout=int(10 * estimated_upload_time)) if self.verbose > 0: print("Upload done in {} s".format(time.time() - upload_start_time)) def __deploy_code__(self): """ Deploy the code for inference on the device by copying it to the device storage location (usually /sd). The code consists in two MicroPython scripts: and The script is used to set the device up for inference, while the script is used to run the inference. Both are uploaded via rshell (UART connection). """ self.__execute_command_sync__( "cp {} {}/".format(os.path.join(self.code_dir, ""), self.device_storage_location), 120) self.__execute_command_sync__( "cp {} {}/".format(os.path.join(self.code_dir, ""), self.device_storage_location), 120) def __prepare_data__(self, input_x, **kwargs): def __check_image_range__(image): """ Checks if the range of an image corresponds to its format. :param: image_path (str): Path to the input image file. :return: bool: True if the range is correct, False otherwise. """ # Get the format of the image image_format = image.format # Define the expected range based on the image format expected_range = { "JPEG": (0, 255), # JPEG format uses 8-bit per channel (0-255) "PNG": (0, 255), # PNG format also uses 8-bit per channel (0-255) "BMP": (0, 255), None: (0, 255), # None format typically uses 8-bit per channel (0-255) "PPM": (0, 255), # PPM format also uses 8-bit per channel (0-255), } # Check if the pixel values are within the expected range min_value, max_value = expected_range.get(image_format, (0, 0)) p_min = min(image.getdata()) p_max = max(image.getdata()) # case 1-channel image if isinstance(p_min, int) and p_min >= min_value and p_max <= max_value: return True # case 3-channel image elif p_min[0] >= min_value and p_max[0] <= max_value and p_min[1] >= min_value and p_max[1] <= max_value and \ p_min[2] >= min_value and p_max[2] <= max_value: return True else: return False def __preprocess_image__(img): """ Preprocess an image by converting it to the appropriate format and range. :param img: image to preprocess :return: the preprocessed image """ # check if im has dimension channel dimension of 1. If yes, remove dimension. if len(img.shape) == 3 and img.shape[2] == 1: img = np.squeeze(img, axis=2) # Create a PIL image from the input NumPy array image = Image.fromarray(img) # Check if the image range corresponds to its format if not __check_image_range__(image): raise InvalidImageRangeError("The image range does not correspond to its format.") # Get the number of channels in the image, and determine the appropriate mode based on the number of channels num_channels = img.shape[2] if len(img.shape) == 3 else 1 if num_channels == 1: # Grayscale image mode = "L" elif num_channels == 3: # RGB image mode = "RGB" elif num_channels == 4: # RGBA image mode = "RGBA" else: if self.verbose > 0: print("Unsupported number of channels: {}".format(num_channels)) return False # Convert the image to the determined mode image = image.convert(mode) # Save the image as BMP return image if self.verbose > 0: print("Saving dataset") # preprocess data preprocessed_data = [] for i in tqdm(range(len(input_x))): data_x = input_x[i] img = __preprocess_image__(data_x) preprocessed_data.append(img) return preprocessed_data def __transfer_data_to_memory__(self, input_x): """ Transfer the data to the device memory. :param input_x: the data to transfer """ # clean data folder self.__execute_command_sync__('rm -rf {}/data/*\n'.format(self.device_storage_location), timeout=20) # create data folder self.__execute_command_sync__('mkdir {}/data\n'.format(self.device_storage_location), timeout=20) # clean results folder self.__execute_command_sync__('rm -rf {}/results.txt\n'.format(self.device_storage_location), timeout=20) # create folders to store images self.data_dir = os.path.join(self.log_dir, "data") os.makedirs(self.data_dir, exist_ok=True) json_data = {} for i in range(len(input_x)): img = input_x[i] if self.verbose > 0: print("Uploading data to SD card") # save image as BMP img_filename = os.path.join(self.data_dir, "{}.bmp".format(i)) # upload the saved sample to the "data" folder in the memory of the device destination = "{}/data/{}".format(self.device_storage_location, os.path.basename(img_filename)) self.__execute_command_sync__('cp {} {}\n'.format(img_filename, destination), timeout=20) # generate json file with filenames json_data[i] = os.path.basename(img_filename) # upload json too json_filename = os.path.join(self.log_dir, "data/img_list.json") with open(json_filename, "w") as f: json.dump(json_data, f) device_json_filename = "{}/inference.json".format(self.device_storage_location) self.__execute_command_sync__('cp {} {}\n'.format(json_filename, device_json_filename), timeout=20) def __run_inference__(self, profile: bool = True): """ Triggers the inference on the device. :param profile: Has no effect in K210, profiling always occur. """ # Open Micropython REPL if self.verbose > 0: print("[RSHELL] {}".format("repl")) self.rshell_session.sendline("repl") self.rshell_session.expect("MicroPython", timeout=15) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) time.sleep(0.5) self.rshell_session.expect(">>>", timeout=15) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) # import scripts if self.verbose > 0: print("[RSHELL] {}".format("import benchmark")) self.rshell_session.sendline("import benchmark") self.rshell_session.expect(">", timeout=15) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) time.sleep(0.5) # run inference if self.verbose > 0: print("[RSHELL] {}".format("benchmark.main()")) self.rshell_session.sendline("benchmark.main()") index = self.rshell_session.expect(["Done", "memory", "MemoryError"], timeout=20 * 60) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) # check if a problem occurred if index == 0: # if no problem if self.verbose > 0: print("Done") self.rshell_session.expect("MicroPython", timeout=20) # wait for restart if self.verbose > 0: print(self.rshell_session.before.decode().strip()) self.rshell_session.expect("information.", timeout=20) time.sleep(0.5) # exit REPL if self.verbose > 0: print(self.rshell_session.before.decode().strip()) self.rshell_session.sendcontrol("X") # ctrl+x expected_patterns = ['/[^>]*>', '/sd>[^>]*', '/flash>[^>]*'] pattern_index = self.rshell_session.expect(expected_patterns, timeout=20) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) time.sleep(0.1) # wait for reset if pattern_index == 0 or pattern_index == 1 or pattern_index == 2: if self.verbose > 0: print("Machine reset.") time.sleep(0.1) self.rshell_session.sendline("ls {}".format(self.device_storage_location)) self.rshell_session.expect(["", pexpect.EOF], timeout=120) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) self.rshell_session.before = "" print("\033[0m") time.sleep(1.0) # Memory error due to KPU if index == 1: self.__reset_device__() if self.verbose > 0: print("KPU Memory error: could not load kmodel in KPU.") raise MemoryError("Out of memory") # Memory error in MicroPython if index == 2: self.__reset_device__() if self.verbose > 0: print("Memory error during inference step on MicroPython.") raise MemoryError("Memory error during inference") # Helper methods def __read_inference_results__(self): """ Read results from devices, by downloading the results file from the device and parsing it. :return: tuple of (predictions, profiler) """ def text_to_json(text): data = text data = "[" + data data = data.replace("(", "'(") data = data.replace(")", ")'") data = data.replace("}{", "},{") data = data + "]" data = data.replace("'", '"') return data def _read_result_file(filepath: str) -> any: with open(filepath, "r") as f: if filepath.endswith(".txt"): text_data = json_data = text_to_json(text_data) json_data = json_data.replace("//./////", "\"nan\"") data = json.loads(json_data) elif filepath.endswith(".json"): data = json.load(f) else: raise Exception("File extension not known") return data inference_times = [] preprocess_times = [] load_times = [] y_pred = [] self.__download_results__() result_file = os.path.join(self.results_dir, "results.txt") predicted_json = _read_result_file(result_file) # sort by input index predicted_json = sorted(predicted_json, key=lambda x: int(x['filename'].split('/')[-1].split('.')[0])) for r_dict in predicted_json: filename = os.path.basename(r_dict["filename"]) inference = r_dict["inference_ms"] load = r_dict["load_ms"] preprocess = r_dict["preprocess_ms"] if "nan" not in r_dict["output"]: y_p = np.asarray(r_dict["output"]) else: y_p = r_dict["output"] # store inference_times.append(inference) load_times.append(load) preprocess_times.append(preprocess) y_pred.append(y_p) # predictions and Profiler temp_profiler = Profiler() temp_profiler.inference_times = inference_times temp_profiler.load_times = load_times temp_profiler.preprocess_times = preprocess_times return y_pred, temp_profiler def __download_results__(self): """ Download the results of the inference from the device to the local storage. """ self.results_dir = os.path.join(self.log_dir, "results") os.makedirs(self.results_dir, exist_ok=True) cmd = "ls /flash" if self.verbose > 0: print("[RSHELL] {}".format(cmd)) self.rshell_session.sendline(cmd) self.rshell_session.expect("freq.conf", timeout=120) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) cmd = "cp {}/results.txt {}/".format(self.device_storage_location, self.results_dir) if self.verbose > 0: print("[RSHELL] {}".format(cmd)) self.rshell_session.sendline(cmd) self.rshell_session.expect("Copying", timeout=10) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) self.rshell_session.expect(">", timeout=10) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) def __reset_device__(self): """ Reset the device by sending a reset command to the device. """ if self.verbose > 0: print("====>>>>> RESETTING DEVICE") if self.verbose > 0: print(self.rshell_session.before.decode().strip()) # check if in repl mode in_repl = True try: cmd = "ls" if self.verbose > 0: print("[RSHELL] {}".format(cmd)) self.rshell_session.sendline(cmd) self.rshell_session.expect("NameError", timeout=2) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) except pexpect.exceptions.TIMEOUT: in_repl = False if in_repl: # send reset and quit repl cmd = "import machine" if self.verbose > 0: print("[RSHELL] {}".format(cmd)) self.rshell_session.sendline(cmd) self.rshell_session.expect(">", timeout=10) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) cmd = "machine.reset()" if self.verbose > 0: print("[RSHELL] {}".format(cmd)) self.rshell_session.sendline(cmd) self.rshell_session.expect(">", timeout=10) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) time.sleep(1.0) # ctrl+x self.rshell_session.sendcontrol("X") # ctrl+x self.rshell_session.expect(">", timeout=10) if self.verbose > 0: print(self.rshell_session.before.decode().strip()) time.sleep(1.0) if self.verbose > 0: print("Device reset.") def __create_rshell_session__(self, port): """ Create a rshell session to the device. :param port: the port to which the device is connected :return: a pexpect session from the rshell package, that can be used as pipe to send commands to the device """ command = "rshell -p {}".format(port) child = pexpect.spawn(command) try: child.expect("connected", timeout=20) except Exception as e: raise ConnectionError("Device not available at {}".format(port)) if self.verbose > 0: print(child.before.decode().strip()) if self.verbose > 0: print("Connecting...") child.expect("Retrieving sysname ...", timeout=10) if self.verbose > 0: print(child.before.decode().strip()) child.expect("CanMV", timeout=10) if self.verbose > 0: print("Connected. ") child.expect(">", timeout=10) output = child.before.decode().strip() if self.verbose > 0: print("Output:", output) return child def __execute_command_sync__(self, cmd: str, timeout=20): """ Execute a command on the device and wait for the result. :param cmd: the command line to send through rshell :param timeout: the timeout for the command, in seconds :return: the output of the command """ stripped_command = cmd.replace("rshell -p {}".format(self.port), "") stripped_command = stripped_command.replace("-f", "").strip() if len(stripped_command) > 1: if self.verbose > 0: print("[RSHELL] {}".format(stripped_command)) self.rshell_session.sendline(stripped_command) self.rshell_session.expect(">", timeout=timeout) out = self.rshell_session.before.decode().strip() if self.verbose > 0: print(out) return out else: if self.verbose > 0: print("Skipping empty command") return ""
[docs] def is_alive(self, timeout: int = 20) -> bool: pass
def __str__(self): pass