From 17069628a7f28198652a296ac16dc83529c7eaae Mon Sep 17 00:00:00 2001 From: Richard Burton Date: Thu, 17 Mar 2022 10:54:26 +0000 Subject: MLECO-3036: Update to use Pathlib in Python scripts * Pathlib used in Python scripts over os * Bug fix for build_default.py * Minor code style updates Signed-off-by: Richard Burton Change-Id: I5fc2e582a84443c3fb79250eb711b960d63ed8fd --- build_default.py | 36 +++++----- download_dependencies.py | 44 ++++++------ scripts/py/check_update_resources_downloaded.py | 13 ++-- scripts/py/gen_audio.py | 2 + scripts/py/gen_audio_cpp.py | 33 +++++---- scripts/py/gen_default_input_cpp.py | 10 +-- scripts/py/gen_labels_cpp.py | 16 ++--- scripts/py/gen_model_cpp.py | 18 ++--- scripts/py/gen_rgb_cpp.py | 32 ++++----- scripts/py/gen_test_data_cpp.py | 26 +++---- scripts/py/rnnoise_dump_extractor.py | 11 +-- set_up_default_resources.py | 95 ++++++++++--------------- 12 files changed, 156 insertions(+), 180 deletions(-) diff --git a/build_default.py b/build_default.py index 2e95528..e37a9ad 100755 --- a/build_default.py +++ b/build_default.py @@ -22,6 +22,7 @@ import sys import threading from argparse import ArgumentDefaultsHelpFormatter from argparse import ArgumentParser +from pathlib import Path from set_up_default_resources import default_npu_config_names from set_up_default_resources import get_default_npu_config_from_name @@ -70,7 +71,7 @@ def run( npu_config_name(str) : Ethos-U NPU configuration name. See "valid_npu_config_names" """ - current_file_dir = os.path.dirname(os.path.abspath(__file__)) + current_file_dir = Path(__file__).parent.resolve() # 1. Make sure the toolchain is supported, and set the right one here supported_toolchain_ids = ["gnu", "arm"] @@ -88,6 +89,7 @@ def run( set_up_resources( run_vela_on_models=run_vela_on_models, additional_npu_config_names=[npu_config_name], + additional_requirements_file=current_file_dir / "scripts" / "py" / "requirements.txt" ) # 3. Build default configuration @@ -95,20 +97,17 @@ def run( target_platform = "mps3" target_subsystem = "sse-300" ethos_u_cfg = get_default_npu_config_from_name(npu_config_name) - build_dir = os.path.join( - current_file_dir, - f"cmake-build-{target_platform}-{target_subsystem}-{npu_config_name}-{toolchain}", - ) + build_dir = current_file_dir / f"cmake-build-{target_platform}-{target_subsystem}-{npu_config_name}-{toolchain}" + try: - os.mkdir(build_dir) + build_dir.mkdir() except FileExistsError: - # Directory already exists, clean it - for filename in os.listdir(build_dir): - filepath = os.path.join(build_dir, filename) + # Directory already exists, clean it. + for filepath in build_dir.iterdir(): try: - if os.path.isfile(filepath) or os.path.islink(filepath): - os.unlink(filepath) - elif os.path.isdir(filepath): + if filepath.is_file() or filepath.is_symlink(): + filepath.unlink() + elif filepath.is_dir(): shutil.rmtree(filepath) except Exception as e: logging.error(f"Failed to delete {filepath}. Reason: {e}") @@ -116,9 +115,8 @@ def run( logpipe = PipeLogging(logging.INFO) os.chdir(build_dir) - cmake_toolchain_file = os.path.join( - current_file_dir, "scripts", "cmake", "toolchains", toolchain_file_name - ) + cmake_toolchain_file = current_file_dir / "scripts" / "cmake" / "toolchains" / toolchain_file_name + cmake_command = ( f"cmake .. -DTARGET_PLATFORM={target_platform}" + f" -DTARGET_SUBSYSTEM={target_subsystem}" @@ -149,9 +147,9 @@ if __name__ == "__main__": "--toolchain", default="gnu", help=""" - Specify the toolchain to use (Arm or GNU). - Options are [gnu, arm]; default is gnu. - """, + Specify the toolchain to use (Arm or GNU). + Options are [gnu, arm]; default is gnu. + """, ) parser.add_argument( "--skip-download", @@ -166,7 +164,7 @@ if __name__ == "__main__": parser.add_argument( "--npu-config-name", help=f"""Arm Ethos-U configuration to build for. Choose from: - {valid_npu_config_names}""", + {valid_npu_config_names}""", default=default_npu_config_names[0], ) parser.add_argument( diff --git a/download_dependencies.py b/download_dependencies.py index c02d98f..b62c9b1 100755 --- a/download_dependencies.py +++ b/download_dependencies.py @@ -15,20 +15,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -# The script does effectively the same as "git submodule update --init" command. - +"""This script does effectively the same as "git submodule update --init" command.""" import logging -import os import sys import tarfile import tempfile from urllib.request import urlopen from zipfile import ZipFile +from pathlib import Path + +TF = "https://github.com/tensorflow/tflite-micro/archive/1a0287fc5fa81fa6aa1dcfb0c5b2e01f74164393.zip" +CMSIS = "https://github.com/ARM-software/CMSIS_5/archive/9b5df640c777563919affb4e9201c96c657adbb2.zip" +ETHOS_U_CORE_DRIVER = "https://git.mlplatform.org/ml/ethos-u/ethos-u-core-driver.git/snapshot/ethos-u-core-driver-22.02.tar.gz" +ETHOS_U_CORE_PLATFORM = "https://git.mlplatform.org/ml/ethos-u/ethos-u-core-platform.git/snapshot/ethos-u-core-platform-22.02.tar.gz" -tf = "https://github.com/tensorflow/tflite-micro/archive/1a0287fc5fa81fa6aa1dcfb0c5b2e01f74164393.zip" -cmsis = "https://github.com/ARM-software/CMSIS_5/archive/9b5df640c777563919affb4e9201c96c657adbb2.zip" -ethos_u_core_driver = "https://git.mlplatform.org/ml/ethos-u/ethos-u-core-driver.git/snapshot/ethos-u-core-driver-22.02.tar.gz" -ethos_u_core_platform = "https://git.mlplatform.org/ml/ethos-u/ethos-u-core-platform.git/snapshot/ethos-u-core-platform-22.02.tar.gz" def download(url_file: str, post_process=None): with urlopen(url_file) as response, tempfile.NamedTemporaryFile() as temp: @@ -46,10 +46,10 @@ def unzip(file, to_path): archive_path.filename = archive_path.filename[archive_path.filename.find("/") + 1:] if archive_path.filename: z.extract(archive_path, to_path) - target_path = os.path.join(to_path, archive_path.filename) + target_path = to_path / archive_path.filename attr = archive_path.external_attr >> 16 if attr != 0: - os.chmod(target_path, attr) + target_path.chmod(attr) def untar(file, to_path): @@ -63,29 +63,25 @@ def untar(file, to_path): z.extract(archive_path, to_path) -def main(dependencies_path: str): +def main(dependencies_path: Path): - download(cmsis, - lambda file: unzip(file.name, - to_path=os.path.join(dependencies_path, "cmsis"))) - download(ethos_u_core_driver, - lambda file: untar(file.name, - to_path=os.path.join(dependencies_path, "core-driver"))) - download(ethos_u_core_platform, - lambda file: untar(file.name, - to_path=os.path.join(dependencies_path, "core-platform"))) - download(tf, - lambda file: unzip(file.name, - to_path=os.path.join(dependencies_path, "tensorflow"))) + download(CMSIS, + lambda file: unzip(file.name, to_path=dependencies_path / "cmsis")) + download(ETHOS_U_CORE_DRIVER, + lambda file: untar(file.name, to_path=dependencies_path / "core-driver")) + download(ETHOS_U_CORE_PLATFORM, + lambda file: untar(file.name, to_path=dependencies_path / "core-platform")) + download(TF, + lambda file: unzip(file.name, to_path=dependencies_path / "tensorflow")) if __name__ == '__main__': logging.basicConfig(filename='download_dependencies.log', level=logging.DEBUG, filemode='w') logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) - download_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "dependencies")) + download_dir = Path(__file__).parent.resolve() / "dependencies" - if os.path.isdir(download_dir): + if download_dir.is_dir(): logging.info(f'{download_dir} exists. Skipping download.') else: main(download_dir) diff --git a/scripts/py/check_update_resources_downloaded.py b/scripts/py/check_update_resources_downloaded.py index 44e9bd9..021f1b1 100644 --- a/scripts/py/check_update_resources_downloaded.py +++ b/scripts/py/check_update_resources_downloaded.py @@ -13,11 +13,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import json -import os import sys import hashlib from argparse import ArgumentParser +from pathlib import Path def get_md5sum_for_file(filepath: str) -> str: @@ -51,11 +52,9 @@ def check_update_resources_downloaded( set_up_script_path (string): Specifies the path to set_up_default_resources.py file. """ - metadata_file_path = os.path.join( - resource_downloaded_dir, "resources_downloaded_metadata.json" - ) + metadata_file_path = Path(resource_downloaded_dir) / "resources_downloaded_metadata.json" - if os.path.isfile(metadata_file_path): + if metadata_file_path.is_file(): with open(metadata_file_path) as metadata_json: metadata_dict = json.load(metadata_json) @@ -95,8 +94,8 @@ if __name__ == "__main__": required=True) args = parser.parse_args() - # Check validity of script path - if not os.path.isfile(args.setup_script_path): + # Check validity of script path. + if not Path(args.setup_script_path).is_file(): raise ValueError(f'Invalid script path: {args.setup_script_path}') # Check the resources are downloaded as expected diff --git a/scripts/py/gen_audio.py b/scripts/py/gen_audio.py index 53ed019..e997499 100644 --- a/scripts/py/gen_audio.py +++ b/scripts/py/gen_audio.py @@ -36,6 +36,7 @@ parser.add_argument("--min_samples", type=int, help="Minimum sample number.", de parser.add_argument("-v", "--verbosity", action="store_true") args = parser.parse_args() + def main(args): audio_data, samplerate = AudioUtils.load_resample_audio_clip(args.audio_path, args.sampling_rate, @@ -44,5 +45,6 @@ def main(args): args.min_samples) sf.write(path.join(args.output_dir, path.basename(args.audio_path)), audio_data, samplerate) + if __name__ == '__main__': main(args) diff --git a/scripts/py/gen_audio_cpp.py b/scripts/py/gen_audio_cpp.py index e7155c7..b8928f0 100644 --- a/scripts/py/gen_audio_cpp.py +++ b/scripts/py/gen_audio_cpp.py @@ -21,12 +21,12 @@ from the cpp files. import datetime import glob import math -import os +from pathlib import Path +from argparse import ArgumentParser import numpy as np -from os import path -from argparse import ArgumentParser from jinja2 import Environment, FileSystemLoader + from gen_utils import AudioUtils parser = ArgumentParser() @@ -45,7 +45,7 @@ parser.add_argument("--license_template", type=str, help="Header template file", parser.add_argument("-v", "--verbosity", action="store_true") args = parser.parse_args() -env = Environment(loader=FileSystemLoader(os.path.join(os.path.dirname(__file__), 'templates')), +env = Environment(loader=FileSystemLoader(Path(__file__).parent / 'templates'), trim_blocks=True, lstrip_blocks=True) @@ -54,7 +54,7 @@ def write_hpp_file(header_filepath, cc_filepath, header_template_file, num_audio print(f"++ Generating {header_filepath}") header_template = env.get_template(header_template_file) - hdr = header_template.render(script_name=os.path.basename(__file__), + hdr = header_template.render(script_name=Path(__file__).name, gen_time=datetime.datetime.now(), year=datetime.datetime.now().year) env.get_template('AudioClips.hpp.template').stream(common_template_header=hdr, @@ -77,8 +77,8 @@ def write_individual_audio_cc_file(clip_dirpath, clip_filename, cc_filename, header_template_file, array_name, sampling_rate_value, mono_value, offset_value, duration_value, res_type_value, min_len): - print(f"++ Converting {clip_filename} to {path.basename(cc_filename)}") - audio_filepath = path.join(clip_dirpath, clip_filename) + print(f"++ Converting {clip_filename} to {Path(cc_filename).name}") + audio_filepath = Path(clip_dirpath) / clip_filename clip_data, samplerate = AudioUtils.load_resample_audio_clip(audio_filepath, sampling_rate_value, mono_value, offset_value, duration_value, @@ -90,7 +90,7 @@ def write_individual_audio_cc_file(clip_dirpath, clip_filename, np.iinfo(np.int16).max).flatten().astype(np.int16) header_template = env.get_template(header_template_file) - hdr = header_template.render(script_name=os.path.basename(__file__), + hdr = header_template.render(script_name=Path(__file__).name, gen_time=datetime.datetime.now(), file_name=clip_filename, year=datetime.datetime.now().year) @@ -114,25 +114,24 @@ def main(args): audioclip_array_names = [] header_filename = "InputFiles.hpp" common_cc_filename = "InputFiles.cc" - header_filepath = path.join(args.header_folder_path, header_filename) - common_cc_filepath = path.join(args.source_folder_path, common_cc_filename) + header_filepath = Path(args.header_folder_path) / header_filename + common_cc_filepath = Path(args.source_folder_path) / common_cc_filename - if os.path.isdir(args.audio_path): - filepaths = sorted(glob.glob(path.join(args.audio_path, '**/*.wav'), recursive=True)) - elif os.path.isfile(args.audio_path): + if Path(args.audio_path).is_dir(): + filepaths = sorted(glob.glob(str(Path(args.audio_path) / '**/*.wav'), recursive=True)) + elif Path(args.audio_path).is_file(): filepaths = [args.audio_path] else: raise OSError("Directory or file does not exist.") for filepath in filepaths: - filename = path.basename(filepath) - clip_dirpath = path.dirname(filepath) + filename = Path(filepath).name + clip_dirpath = Path(filepath).parent try: audioclip_filenames.append(filename) # Save the cc file - cc_filename = path.join(args.source_folder_path, - (filename.rsplit(".")[0]).replace(" ", "_") + ".cc") + cc_filename = Path(args.source_folder_path) / (Path(filename).stem.replace(" ", "_") + ".cc") array_name = "audio" + str(audioclip_idx) array_size = write_individual_audio_cc_file(clip_dirpath, filename, cc_filename, args.license_template, array_name, args.sampling_rate, args.mono, args.offset, diff --git a/scripts/py/gen_default_input_cpp.py b/scripts/py/gen_default_input_cpp.py index c091fd1..e42f4ae 100644 --- a/scripts/py/gen_default_input_cpp.py +++ b/scripts/py/gen_default_input_cpp.py @@ -17,9 +17,9 @@ Utility script to generate the minimum InputFiles.hpp and cpp files required by an application. """ import datetime -import os - +from pathlib import Path from argparse import ArgumentParser + from jinja2 import Environment, FileSystemLoader parser = ArgumentParser() @@ -28,7 +28,7 @@ parser.add_argument("--license_template", type=str, help="Header template file", default="header_template.txt") args = parser.parse_args() -env = Environment(loader=FileSystemLoader(os.path.join(os.path.dirname(__file__), 'templates')), +env = Environment(loader=FileSystemLoader(Path(__file__).parent / 'templates'), trim_blocks=True, lstrip_blocks=True) @@ -36,7 +36,7 @@ env = Environment(loader=FileSystemLoader(os.path.join(os.path.dirname(__file__) def write_hpp_file(header_file_path, header_template_file): print(f"++ Generating {header_file_path}") header_template = env.get_template(header_template_file) - hdr = header_template.render(script_name=os.path.basename(__file__), + hdr = header_template.render(script_name=Path(__file__).name, gen_time=datetime.datetime.now(), year=datetime.datetime.now().year) env.get_template('default.hpp.template').stream(common_template_header=hdr) \ @@ -45,7 +45,7 @@ def write_hpp_file(header_file_path, header_template_file): def main(args): header_filename = "InputFiles.hpp" - header_filepath = os.path.join(args.header_folder_path, header_filename) + header_filepath = Path(args.header_folder_path) / header_filename write_hpp_file(header_filepath, args.license_template) diff --git a/scripts/py/gen_labels_cpp.py b/scripts/py/gen_labels_cpp.py index 1be9c63..25c4bc3 100644 --- a/scripts/py/gen_labels_cpp.py +++ b/scripts/py/gen_labels_cpp.py @@ -22,8 +22,9 @@ this script to be called as part of the build framework to auto-generate the cpp file with labels that can be used in the application without modification. """ import datetime -import os +from pathlib import Path from argparse import ArgumentParser + from jinja2 import Environment, FileSystemLoader parser = ArgumentParser() @@ -42,7 +43,7 @@ parser.add_argument("--license_template", type=str, help="Header template file", args = parser.parse_args() -env = Environment(loader=FileSystemLoader(os.path.join(os.path.dirname(__file__), 'templates')), +env = Environment(loader=FileSystemLoader(Path(__file__).parent / 'templates'), trim_blocks=True, lstrip_blocks=True) @@ -57,19 +58,18 @@ def main(args): raise Exception(f"no labels found in {args.label_file}") header_template = env.get_template(args.license_template) - hdr = header_template.render(script_name=os.path.basename(__file__), + hdr = header_template.render(script_name=Path(__file__).name, gen_time=datetime.datetime.now(), - file_name=os.path.basename(args.labels_file), + file_name=Path(args.labels_file).name, year=datetime.datetime.now().year) - hpp_filename = os.path.join(args.header_folder_path, args.output_file_name + ".hpp") + hpp_filename = Path(args.header_folder_path) / (args.output_file_name + ".hpp") env.get_template('Labels.hpp.template').stream(common_template_header=hdr, - filename=(args.output_file_name).upper(), + filename=args.output_file_name.upper(), namespaces=args.namespaces) \ .dump(str(hpp_filename)) - - cc_filename = os.path.join(args.source_folder_path, args.output_file_name + ".cc") + cc_filename = Path(args.source_folder_path) / (args.output_file_name + ".cc") env.get_template('Labels.cc.template').stream(common_template_header=hdr, labels=labels, labelsSize=len(labels), diff --git a/scripts/py/gen_model_cpp.py b/scripts/py/gen_model_cpp.py index c43c93a..de71992 100644 --- a/scripts/py/gen_model_cpp.py +++ b/scripts/py/gen_model_cpp.py @@ -19,9 +19,9 @@ project directly. This should be called as part of cmake framework should the models need to be generated at configuration stage. """ import datetime -import os from argparse import ArgumentParser from pathlib import Path + from jinja2 import Environment, FileSystemLoader import binascii @@ -36,7 +36,7 @@ parser.add_argument("--license_template", type=str, help="Header template file", default="header_template.txt") args = parser.parse_args() -env = Environment(loader=FileSystemLoader(os.path.join(os.path.dirname(__file__), 'templates')), +env = Environment(loader=FileSystemLoader(Path(__file__).parent / 'templates'), trim_blocks=True, lstrip_blocks=True) @@ -70,20 +70,20 @@ def get_tflite_data(tflite_path: str) -> list: def main(args): - if not os.path.isfile(args.tflite_path): + if not Path(args.tflite_path).is_file(): raise Exception(f"{args.tflite_path} not found") # Cpp filename: - cpp_filename = Path(os.path.join(args.output_dir, os.path.basename(args.tflite_path) + ".cc")).absolute() - print(f"++ Converting {os.path.basename(args.tflite_path)} to\ - {os.path.basename(cpp_filename)}") + cpp_filename = (Path(args.output_dir) / (Path(args.tflite_path).name + ".cc")).resolve() + print(f"++ Converting {Path(args.tflite_path).name} to\ + {cpp_filename.name}") - os.makedirs(cpp_filename.parent, exist_ok=True) + cpp_filename.parent.mkdir(exist_ok=True) header_template = env.get_template(args.license_template) - hdr = header_template.render(script_name=os.path.basename(__file__), - file_name=os.path.basename(args.tflite_path), + hdr = header_template.render(script_name=Path(__file__).name, + file_name=Path(args.tflite_path).name, gen_time=datetime.datetime.now(), year=datetime.datetime.now().year) diff --git a/scripts/py/gen_rgb_cpp.py b/scripts/py/gen_rgb_cpp.py index c53fbd7..88ff81e 100644 --- a/scripts/py/gen_rgb_cpp.py +++ b/scripts/py/gen_rgb_cpp.py @@ -21,10 +21,10 @@ from the cpp files. import datetime import glob import math -import os -import numpy as np - +from pathlib import Path from argparse import ArgumentParser + +import numpy as np from PIL import Image, UnidentifiedImageError from jinja2 import Environment, FileSystemLoader @@ -37,7 +37,7 @@ parser.add_argument("--license_template", type=str, help="Header template file", default="header_template.txt") args = parser.parse_args() -env = Environment(loader=FileSystemLoader(os.path.join(os.path.dirname(__file__), 'templates')), +env = Environment(loader=FileSystemLoader(Path(__file__).parent / 'templates'), trim_blocks=True, lstrip_blocks=True) @@ -46,7 +46,7 @@ def write_hpp_file(header_file_path, cc_file_path, header_template_file, num_ima image_array_names, image_size): print(f"++ Generating {header_file_path}") header_template = env.get_template(header_template_file) - hdr = header_template.render(script_name=os.path.basename(__file__), + hdr = header_template.render(script_name=Path(__file__).name, gen_time=datetime.datetime.now(), year=datetime.datetime.now().year) env.get_template('Images.hpp.template').stream(common_template_header=hdr, @@ -63,12 +63,12 @@ def write_hpp_file(header_file_path, cc_file_path, header_template_file, num_ima def write_individual_img_cc_file(image_filename, cc_filename, header_template_file, original_image, image_size, array_name): - print(f"++ Converting {image_filename} to {os.path.basename(cc_filename)}") + print(f"++ Converting {image_filename} to {cc_filename.name}") header_template = env.get_template(header_template_file) - hdr = header_template.render(script_name=os.path.basename(__file__), + hdr = header_template.render(script_name=Path(__file__).name, gen_time=datetime.datetime.now(), - file_name=os.path.basename(image_filename), + file_name=image_filename, year=datetime.datetime.now().year) # IFM size ifm_width = image_size[0] @@ -104,16 +104,15 @@ def main(args): image_filenames = [] image_array_names = [] - - if os.path.isdir(args.image_path): - filepaths = sorted(glob.glob(os.path.join(args.image_path, '**/*.*'), recursive=True)) - elif os.path.isfile(args.image_path): + if Path(args.image_path).is_dir(): + filepaths = sorted(glob.glob(str(Path(args.image_path) / '**/*.*'), recursive=True)) + elif Path(args.image_path).is_file(): filepaths = [args.image_path] else: raise OSError("Directory or file does not exist.") for filepath in filepaths: - filename = os.path.basename(filepath) + filename = Path(filepath).name try: original_image = Image.open(filepath).convert("RGB") @@ -124,8 +123,7 @@ def main(args): image_filenames.append(filename) # Save the cc file - cc_filename = os.path.join(args.source_folder_path, - (filename.rsplit(".")[0]).replace(" ", "_") + ".cc") + cc_filename = Path(args.source_folder_path) / (Path(filename).stem.replace(" ", "_") + ".cc") array_name = "im" + str(image_idx) image_array_names.append(array_name) write_individual_img_cc_file(filename, cc_filename, args.license_template, @@ -135,9 +133,9 @@ def main(args): image_idx = image_idx + 1 header_filename = "InputFiles.hpp" - header_filepath = os.path.join(args.header_folder_path, header_filename) + header_filepath = Path(args.header_folder_path) / header_filename common_cc_filename = "InputFiles.cc" - common_cc_filepath = os.path.join(args.source_folder_path, common_cc_filename) + common_cc_filepath = Path(args.source_folder_path) / common_cc_filename if len(image_filenames) > 0: write_hpp_file(header_filepath, common_cc_filepath, args.license_template, diff --git a/scripts/py/gen_test_data_cpp.py b/scripts/py/gen_test_data_cpp.py index dae181d..f1cb30f 100644 --- a/scripts/py/gen_test_data_cpp.py +++ b/scripts/py/gen_test_data_cpp.py @@ -39,18 +39,18 @@ parser.add_argument("-v", "--verbosity", action="store_true") args = parser.parse_args() -env = Environment(loader=FileSystemLoader(os.path.join(os.path.dirname(__file__), 'templates')), +env = Environment(loader=FileSystemLoader(Path(__file__).parent / 'templates'), trim_blocks=True, lstrip_blocks=True) def write_hpp_file(header_filename, cc_file_path, header_template_file, num_ifms, num_ofms, ifm_array_names, ifm_sizes, ofm_array_names, ofm_sizes, iofm_data_type): - header_file_path = os.path.join(args.header_folder_path, header_filename) + header_file_path = Path(args.header_folder_path) / header_filename print(f"++ Generating {header_file_path}") header_template = env.get_template(header_template_file) - hdr = header_template.render(script_name=os.path.basename(__file__), + hdr = header_template.render(script_name=Path(__file__).name, gen_time=datetime.datetime.now(), year=datetime.datetime.now().year) env.get_template('TestData.hpp.template').stream(common_template_header=hdr, @@ -74,15 +74,15 @@ def write_hpp_file(header_filename, cc_file_path, header_template_file, num_ifms def write_individual_cc_file(filename, cc_filename, header_filename, header_template_file, array_name, iofm_data_type): - print(f"++ Converting {filename} to {os.path.basename(cc_filename)}") + print(f"++ Converting {filename} to {cc_filename.name}") header_template = env.get_template(header_template_file) - hdr = header_template.render(script_name=os.path.basename(__file__), + hdr = header_template.render(script_name=Path(__file__).name, gen_time=datetime.datetime.now(), - file_name=os.path.basename(filename), + file_name=filename, year=datetime.datetime.now().year) # Convert the image and write it to the cc file - fm_data = (np.load(os.path.join(args.data_folder_path, filename))).flatten() + fm_data = (np.load(Path(args.data_folder_path) / filename)).flatten() type(fm_data.dtype) hex_line_generator = (', '.join(map(hex, sub_arr)) for sub_arr in np.array_split(fm_data, math.ceil(len(fm_data) / 20))) @@ -104,8 +104,8 @@ def get_npy_vec_size(filename: str) -> int: Return: size in bytes """ - data = np.load(os.path.join(args.data_folder_path, filename)) - return (data.size * data.dtype.itemsize) + data = np.load(Path(args.data_folder_path) / filename) + return data.size * data.dtype.itemsize def main(args): @@ -126,7 +126,7 @@ def main(args): iofm_data_type = "int8_t" if ifms_count > 0: - iofm_data_type = "int8_t" if (np.load(os.path.join(args.data_folder_path, "ifm0.npy")).dtype == np.int8) else "uint8_t" + iofm_data_type = "int8_t" if (np.load(Path(args.data_folder_path) / "ifm0.npy").dtype == np.int8) else "uint8_t" ifm_sizes = [] ofm_sizes = [] @@ -136,7 +136,7 @@ def main(args): base_name = "ifm" + str(idx) filename = base_name+".npy" array_name = base_name + add_usecase_fname - cc_filename = os.path.join(args.source_folder_path, array_name + ".cc") + cc_filename = Path(args.source_folder_path) / (array_name + ".cc") ifm_array_names.append(array_name) write_individual_cc_file(filename, cc_filename, header_filename, args.license_template, array_name, iofm_data_type) ifm_sizes.append(get_npy_vec_size(filename)) @@ -146,12 +146,12 @@ def main(args): base_name = "ofm" + str(idx) filename = base_name+".npy" array_name = base_name + add_usecase_fname - cc_filename = os.path.join(args.source_folder_path, array_name + ".cc") + cc_filename = Path(args.source_folder_path) / (array_name + ".cc") ofm_array_names.append(array_name) write_individual_cc_file(filename, cc_filename, header_filename, args.license_template, array_name, iofm_data_type) ofm_sizes.append(get_npy_vec_size(filename)) - common_cc_filepath = os.path.join(args.source_folder_path, common_cc_filename) + common_cc_filepath = Path(args.source_folder_path) / common_cc_filename write_hpp_file(header_filename, common_cc_filepath, args.license_template, ifms_count, ofms_count, ifm_array_names, ifm_sizes, ofm_array_names, ofm_sizes, iofm_data_type) diff --git a/scripts/py/rnnoise_dump_extractor.py b/scripts/py/rnnoise_dump_extractor.py index 947a75a..ff79dcb 100644 --- a/scripts/py/rnnoise_dump_extractor.py +++ b/scripts/py/rnnoise_dump_extractor.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ This script can be used with the noise reduction use case to save the dumped noise reduced audio to a wav file. @@ -20,14 +19,15 @@ the dumped noise reduced audio to a wav file. Example use: python rnnoise_dump_extractor.py --dump_file output.bin --output_dir ./denoised_wavs/ """ + import soundfile as sf import numpy as np import argparse from os import path - import struct + def extract(fp, output_dir, export_npy): while True: filename_length = struct.unpack("i", fp.read(4))[0] @@ -47,19 +47,20 @@ def extract(fp, output_dir, export_npy): if export_npy: output_file_name += ".npy" pack_format = "{}h".format(int(audio_clip_length/2)) - npdata = np.array(struct.unpack(pack_format,audio_clip)).astype(np.int16) + npdata = np.array(struct.unpack(pack_format, audio_clip)).astype(np.int16) np.save(output_file_name, npdata) print("{} written to disk".format(output_file_name)) + def main(args): extract(args.dump_file, args.output_dir, args.export_npy) + parser = argparse.ArgumentParser() parser.add_argument("--dump_file", type=argparse.FileType('rb'), help="Dump file with audio files to extract.", required=True) parser.add_argument("--output_dir", help="Output directory, Warning: Duplicated file names will be overwritten.", required=True) parser.add_argument("--export_npy", help="Export the audio buffer in NumPy format", action="store_true") args = parser.parse_args() -if __name__=="__main__": +if __name__ == "__main__": main(args) - diff --git a/set_up_default_resources.py b/set_up_default_resources.py index a4c3594..9e0bdf4 100755 --- a/set_up_default_resources.py +++ b/set_up_default_resources.py @@ -27,6 +27,7 @@ from argparse import ArgumentParser from argparse import ArgumentTypeError from collections import namedtuple from urllib.error import URLError +from pathlib import Path from scripts.py.check_update_resources_downloaded import get_md5sum_for_file @@ -278,7 +279,7 @@ def get_default_npu_config_from_name( config_name: str, arena_cache_size: int = 0 ) -> NPUConfig: """ - Gets the file suffix for the tflite file from the + Gets the file suffix for the TFLite file from the `accelerator_config` string. Parameters: @@ -309,10 +310,10 @@ def get_default_npu_config_from_name( memory_modes = ["Shared_Sram", "Dedicated_Sram"] system_configs = ["Ethos_U55_High_End_Embedded", "Ethos_U65_High_End"] memory_modes_arena = { - # For shared SRAM memory mode, we use the MPS3 SRAM size by default + # For shared SRAM memory mode, we use the MPS3 SRAM size by default. "Shared_Sram": mps3_max_sram_sz if arena_cache_size <= 0 else arena_cache_size, - # For dedicated SRAM memory mode, we do no override the arena size. This is expected to - # be defined in the vela configuration file instead. + # For dedicated SRAM memory mode, we do not override the arena size. This is expected to + # be defined in the Vela configuration file instead. "Dedicated_Sram": None if arena_cache_size <= 0 else arena_cache_size, } @@ -333,9 +334,9 @@ def get_default_npu_config_from_name( def remove_tree_dir(dir_path): try: - # Remove the full directory + # Remove the full directory. shutil.rmtree(dir_path) - # Re-create an empty one + # Re-create an empty one. os.mkdir(dir_path) except Exception as e: logging.error(f"Failed to delete {dir_path}.") @@ -343,7 +344,7 @@ def remove_tree_dir(dir_path): def set_up_resources( run_vela_on_models: bool = False, - additional_npu_config_names: list = (), + additional_npu_config_names: tuple = (), arena_cache_size: int = 0, check_clean_folder: bool = False, additional_requirements_file: str = "", @@ -365,14 +366,10 @@ def set_up_resources( additional packages need to be installed. """ - # Paths - current_file_dir = os.path.dirname(os.path.abspath(__file__)) - download_dir = os.path.abspath( - os.path.join(current_file_dir, "resources_downloaded") - ) - metadata_file_path = os.path.join( - download_dir, "resources_downloaded_metadata.json" - ) + # Paths. + current_file_dir = Path(__file__).parent.resolve() + download_dir = current_file_dir / "resources_downloaded" + metadata_file_path = download_dir / "resources_downloaded_metadata.json" metadata_dict = dict() vela_version = "3.3.0" @@ -390,17 +387,17 @@ def set_up_resources( ) setup_script_hash_verified = False - setup_script_hash = get_md5sum_for_file(os.path.abspath(__file__)) + setup_script_hash = get_md5sum_for_file(Path(__file__).resolve()) try: # 1.1 Does the download dir exist? - os.mkdir(download_dir) + download_dir.mkdir() except OSError as e: if e.errno == errno.EEXIST: logging.info("'resources_downloaded' directory exists.") # Check and clean? - if check_clean_folder and os.path.isfile(metadata_file_path): - with open(metadata_file_path) as (metadata_file): + if check_clean_folder and metadata_file_path.is_file(): + with open(metadata_file_path) as metadata_file: metadata_dict = json.load(metadata_file) vela_in_metadata = metadata_dict["ethosu_vela_version"] if vela_in_metadata != vela_version: @@ -421,15 +418,12 @@ def set_up_resources( raise # 1.2 Does the virtual environment exist? - env_python = str( - os.path.abspath(os.path.join(download_dir, "env", "bin", "python3")) - ) - env_activate = str( - os.path.abspath(os.path.join(download_dir, "env", "bin", "activate")) - ) - if not os.path.isdir(os.path.join(download_dir, "env")): + env_python = str(download_dir / "env" / "bin" / "python3") + env_activate = str(download_dir / "env" / "bin" / "activate") + + if not (download_dir / "env").is_dir(): os.chdir(download_dir) - # Create the virtual environment + # Create the virtual environment. command = "python3 -m venv env" call_command(command) commands = ["pip install --upgrade pip", "pip install --upgrade setuptools"] @@ -459,10 +453,10 @@ def set_up_resources( res_url_prefix = uc["url_prefix"] try: # Does the usecase_name download dir exist? - os.mkdir(os.path.join(download_dir, use_case_name)) + (download_dir / use_case_name).mkdir() except OSError as e: if e.errno == errno.EEXIST: - # The usecase_name download dir exist + # The usecase_name download dir exist. if check_clean_folder and not setup_script_hash_verified: for idx, metadata_uc_url_prefix in enumerate( [ @@ -473,7 +467,7 @@ def set_up_resources( ): if metadata_uc_url_prefix != res_url_prefix[idx]: logging.info(f"Removing {use_case_name} resources.") - remove_tree_dir(os.path.join(download_dir, use_case_name)) + remove_tree_dir(download_dir / use_case_name) break elif e.errno != errno.EEXIST: logging.error(f"Error creating {use_case_name} directory.") @@ -492,9 +486,7 @@ def set_up_resources( if "sub_folder" in res: try: # Does the usecase_name/sub_folder download dir exist? - os.mkdir( - os.path.join(download_dir, use_case_name, res["sub_folder"]) - ) + (download_dir / use_case_name / res["sub_folder"]).mkdir() except OSError as e: if e.errno != errno.EEXIST: logging.error( @@ -503,9 +495,9 @@ def set_up_resources( raise sub_folder = res["sub_folder"] - res_dst = os.path.join(download_dir, use_case_name, sub_folder, res_name) + res_dst = download_dir / use_case_name / sub_folder / res_name - if os.path.isfile(res_dst): + if res_dst.is_file(): logging.info(f"File {res_dst} exists, skipping download.") else: try: @@ -526,11 +518,9 @@ def set_up_resources( # Note: To avoid to run vela twice on the same model, it's supposed that # downloaded model names don't contain the 'vela' word. if run_vela_on_models is True: - config_file = os.path.join( - current_file_dir, "scripts", "vela", "default_vela.ini" - ) + config_file = current_file_dir / "scripts" / "vela" / "default_vela.ini" models = [ - os.path.join(dirpath, f) + Path(dirpath) / f for dirpath, dirnames, files in os.walk(download_dir) for f in fnmatch.filter(files, "*.tflite") if "vela" not in f @@ -552,9 +542,9 @@ def set_up_resources( optimisation_skipped = False for model in models: - output_dir = os.path.dirname(model) + output_dir = model.parent # model name after compiling with vela is an initial model name + _vela suffix - vela_optimised_model_path = str(model).replace(".tflite", "_vela.tflite") + vela_optimised_model_path = model.parent / (model.stem + "_vela.tflite") for config in npu_configs: vela_command_arena_cache_size = "" @@ -575,14 +565,12 @@ def set_up_resources( + f"{vela_command_arena_cache_size}" ) - # we want the name to include the configuration suffix. For example: vela_H128, + # We want the name to include the configuration suffix. For example: vela_H128, # vela_Y512 etc. new_suffix = "_vela_" + config.ethos_u_config_id + ".tflite" - new_vela_optimised_model_path = vela_optimised_model_path.replace( - "_vela.tflite", new_suffix - ) + new_vela_optimised_model_path = model.parent / (model.stem + new_suffix) - if os.path.isfile(new_vela_optimised_model_path): + if new_vela_optimised_model_path.is_file(): logging.info( f"File {new_vela_optimised_model_path} exists, skipping optimisation." ) @@ -591,8 +579,8 @@ def set_up_resources( call_command(vela_command) - # rename default vela model - os.rename(vela_optimised_model_path, new_vela_optimised_model_path) + # Rename default vela model. + vela_optimised_model_path.rename(new_vela_optimised_model_path) logging.info( f"Renaming {vela_optimised_model_path} to {new_vela_optimised_model_path}." ) @@ -636,19 +624,14 @@ if __name__ == "__main__": ) parser.add_argument( "--clean", - help="Clean the disctory and optimize the downloaded resources", + help="Clean the directory and optimize the downloaded resources", action="store_true", ) parser.add_argument( "--requirements-file", help="Path to requirements.txt file to install additional packages", type=str, - default=os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "scripts", - "py", - "requirements.txt", - ), + default=Path(__file__).parent.resolve() / 'scripts' / 'py' / 'requirements.txt' ) args = parser.parse_args() @@ -656,7 +639,7 @@ if __name__ == "__main__": if args.arena_cache_size < 0: raise ArgumentTypeError("Arena cache size cannot not be less than 0") - if not os.path.isfile(args.requirements_file): + if not Path(args.requirements_file).is_file(): raise ArgumentTypeError(f"Invalid requirements file: {args.requirements_file}") logging.basicConfig(filename="log_build_default.log", level=logging.DEBUG) -- cgit v1.2.1