Source code for neurio.devices.virtual.canaan.kendryte

#!/user/bin/env python

Author: Simon Narduzzi
Copyright: CSEM, 2022
Creation: 21.09.22
Description: K210 deployment pipeline
import datetime
from typing import Tuple, Optional

import sys


from neurio.devices.device import Device
import os
from tqdm import tqdm
import numpy as np
import json
from PIL import Image
from neurio.converters.kendryte_utils import convert_to_kmodel, simulate
from neurio.converters.tflite_utils import keras_to_tflite
from neurio.helpers import NpEncoder
import tensorflow as tf
from neurio.exceptions import InvalidImageRangeError
from neurio.benchmarking.profiler import Profiler

python_version = sys.version_info
PYTHON_VERSION = "{}.{}.{}".format(python_version.major, python_version.minor, python_version.micro)

[docs]class K210Virtual(Device): def __init__(self, port: any = "virtual", name: str = "K210", log_dir: str = None, **kwargs): super().__init__(port, name, log_dir, **kwargs) if self.port != "virtual": raise Exception("K210VirtualDevice only supports virtual ports") self.tools_options = {"nncase_version": NNCASE_VERSION, "python_version": PYTHON_VERSION} # self.image_format = kwargs.get("image_format", "jpeg") # works well on deployment self.image_format = kwargs.get("image_format", "bmp") self.verbose = kwargs.get("verbose", 0) # generate logdir based on current date and time if None if self.log_dir is None: self.log_dir = os.path.join(os.getcwd(),"%Y-%m-%d_%H-%M-%S")) os.makedirs(self.log_dir, exist_ok=True) self.output_dir = os.path.join(self.log_dir, "output") os.makedirs(self.output_dir, exist_ok=True) self.profiler = Profiler()
[docs] def is_alive(self, timeout: int = 20) -> bool: return True
def __prepare_model__(self, model: tf.keras.models.Model, **kwargs): """ Prepare the model for deployment. This includes converting it to a tflite model and then to a kmodel. :param model: Model to be prepared :param kwargs: Other parameters """ self.profiler["model"]["model_name"] = self.profiler["model"]["framework"] = "tensorflow" self.profiler["model"]["model_datetime"] ="%a %b %d %H:%M:%S %Y") self.models_dir = os.path.join(self.log_dir, "models") os.makedirs(self.models_dir, exist_ok=True) self.calibration_dir = os.path.join(self.log_dir, "calibration") os.makedirs(self.calibration_dir, exist_ok=True) # save calibration directory passed in kwargs if "calibration_data" in kwargs: for data in kwargs["calibration_data"]: img, label = data, "{}.bmp".format(label))) tflite_model = keras_to_tflite(model, tflite_path=os.path.join(self.models_dir, + ".tflite")) compilation_options = {"target": "k210", "dataset": os.path.abspath(self.calibration_dir)} kmodel_file = convert_to_kmodel(tflite_model, options=compilation_options, tool_options={"nncase_version": NNCASE_VERSION, "python_version": PYTHON_VERSION}, verbose=self.verbose) self.profiler["model"]["compile_datetime"] ="%a %b %d %H:%M:%S %Y") self.profiler["model"]["parameters"] = model.count_params() self.kmodel_path = os.path.abspath(kmodel_file) # get input and output shapes of the model and store in profiler self.input_shapes = model.input_shape self.output_shapes = model.output_shape self.profiler["model"]["inputs"] = [{"name":, "shape": i.shape} for i in model.inputs] self.profiler["model"]["outputs"] = [{"name":, "shape": i.shape} for i in model.outputs] # TODO #self.profiler["model"]["macc"] = 2*model.count_params() # complete profiler with device details self.__populate_profiler__() def __generate_inference_code__(self): pass def __deploy_model__(self): pass def __deploy_code__(self): pass def __prepare_data__(self, input_x, **kwargs): def __check_image_range__(image): """ Checks if the range of an image corresponds to its format. :param: image_path (str): Path to the input image file. :return: bool: True if the range is correct, False otherwise. """ # Get the format of the image image_format = image.format # Define the expected range based on the image format expected_range = { "JPEG": (0, 255), # JPEG format uses 8-bit per channel (0-255) "PNG": (0, 255), # PNG format also uses 8-bit per channel (0-255) "BMP": (0, 255), None: (0, 255), # None format typically uses 8-bit per channel (0-255) "PPM": (0, 255), # PPM format also uses 8-bit per channel (0-255), } # Check if the pixel values are within the expected range min_value, max_value = expected_range.get(image_format, (0, 0)) p_min = min(image.getdata()) p_max = max(image.getdata()) # case 1-channel image if isinstance(p_min, int) and p_min >= min_value and p_max <= max_value: return True # case 3-channel image elif p_min[0] >= min_value and p_max[0] <= max_value and p_min[1] >= min_value and p_max[1] <= max_value and \ p_min[2] >= min_value and p_max[2] <= max_value: return True else: return False def __preprocess_image__(img): """ Preprocess an image by converting it to the appropriate format and range. :param img: image to preprocess :return: the preprocessed image """ # check if im has dimension channel dimension of 1. If yes, remove dimension. if len(img.shape) == 3 and img.shape[2] == 1: img = np.squeeze(img, axis=2) # Create a PIL image from the input NumPy array image = Image.fromarray(img) # Check if the image range corresponds to its format if not __check_image_range__(image): raise InvalidImageRangeError("The image range does not correspond to its format.") # Get the number of channels in the image, and determine the appropriate mode based on the number of channels num_channels = img.shape[2] if len(img.shape) == 3 else 1 if num_channels == 1: # Grayscale image mode = "L" elif num_channels == 3: # RGB image mode = "RGB" elif num_channels == 4: # RGBA image mode = "RGBA" else: if self.verbose > 0: print("Unsupported number of channels: {}".format(num_channels)) return False # Convert the image to the determined mode image = image.convert(mode) # Save the image as BMP return image if self.verbose > 0: print("Saving dataset") # preprocess data preprocessed_data = [] for i in tqdm(range(len(input_x))): data_x = input_x[i] img = __preprocess_image__(data_x) preprocessed_data.append(img) return preprocessed_data def __subsample_data__(self, input_x, batch_size: int, batch_index: int): """ Subsample a batch of data from the dataset. :param data: the dataset :param batch_size: the batch size :param batch_index: the index of the batch :return: the batch of data """ start_index = batch_index * batch_size end_index = start_index + batch_size return input_x[start_index:end_index] def __transfer_data_to_memory__(self, input_x): """ Transfer the data to the RAM, by setting the value of self.input_data :param input_x: the input data """ self.input_data = input_x def __run_inference__(self, profile: bool = True): load_times = [] preprocess_times = [] inference_times = [] readout_times = [] y_pred = [] # load data data = self.input_data imgs = data imgs = [np.array(x) for x in imgs] imgs = [np.expand_dims(x, axis=0) for x in imgs] imgs = np.asarray(imgs, dtype=np.float32) imgs = np.asarray(imgs, dtype=np.uint8) outputs, times = simulate(self.kmodel_path, imgs, return_times=True) y_pred += outputs inference_times += times["inference_ms"] preprocess_times += times["preprocess_ms"] load_times += times["load_ms"] readout_times += times["readout_ms"] results = { "output": y_pred, "inference_ms": inference_times, "preprocess_ms": preprocess_times, "load_model_ms": load_times, "readout_ms": readout_times, } # save results in json json_filename = os.path.join(self.output_dir, "results.json") with open(json_filename, "w") as f: json.dump(results, f, cls=NpEncoder) def __read_inference_results__(self): json_filename = os.path.join(self.output_dir, "results.json") with open(json_filename, "r") as f: results = json.load(f) temp_profiler = Profiler(self.profiler.json_data) # get copy of current profiler temp_profiler["inference"]["batch_size"] = len(self.input_data) temp_profiler["inference"]["inference_time"] = results["inference_ms"] temp_profiler["inference"]["preprocess_time"] = results["preprocess_ms"] temp_profiler["inference"]["model_load_time"] = results["load_model_ms"] temp_profiler["inference"]["readout_time"] = results["readout_ms"] return results["output"], temp_profiler def __populate_profiler__(self): # setting profiler info self.profiler["device"]["name"] = "K210 Emulator" self.profiler["device"]["port"] = "virtual" self.profiler["device"]["dev_type"] = "k210" self.profiler["device"]["system"] = "maixpy 0.6.2" self.profiler["device"]["cpu_clock"] = 400000000 self.profiler["device"]["kpu_clock"] = 400000000 self.profiler["device"]["attrs"] = [ "fpu", "kpu", # TODO complete ] self.profiler["runtime"]["name"] = "Custom MaixPy 0.1" self.profiler["runtime"]["version"] = [0, 1] self.profiler["runtime"]["tools_version"] = { "tensorflow": [int(x) if x.isnumeric() else x for x in tf.__version__.split(".")], "maixpy": [0, 6, 2], # TODO automatically get from board "ncc": [int(x) if x.isnumeric() else x for x in NNCASE_VERSION.split(".")], "kmodel": "v5" if NNCASE_VERSION.split(".")[0] == "1" else "v4", "tflite": [int(x) if x.isnumeric() else x for x in tf.__version__.split(".")], } def __str__(self): return str({ "port": self.port, "name":, "log_dir": self.log_dir, })