#!/usr/bin/env python3 # Copyright (c) 2021-2022, ARM Limited. # SPDX-License-Identifier: Apache-2.0 """Build conformance tests. Steps: - Specific input shapes (or tests) are specified and produced by using the settings in the .json files. - Tests are selected to produce a good coverage. - Tests are run on the reference model to produce the correct output files. - Tests are converted into JSON format and saved to desired output directory. """ import argparse import json import logging import multiprocessing as mp import os import shlex import shutil import subprocess from functools import partial from itertools import tee from pathlib import Path from conformance.test_select import Operator from convert2conformance.convert2conformance import main as c2c_main from distutils.dir_util import copy_tree logging.basicConfig() logger = logging.getLogger("tosa_verif_conformance_generator") # Configuration for each TOSA profile PROFILE_OPS_INFO = { "tosa-bi": { "operator_test_params": "tosa_base_profile_ops_info.json", "framework_tests": "tosa_base_profile_framework_ops_info.json", "exclude_types": [], }, "tosa-mi": { # Note: This is just the extra tests not in the base profile! "operator_test_params": "tosa_main_profile_ops_info.json", "framework_tests": "tosa_main_profile_framework_ops_info.json", "exclude_types": [], }, } PROFILES_ALL = "all" LOCATION_REF_MODEL_BINARY = Path("build/reference_model/tosa_reference_model") DEFAULT_SEED = 42 class GenConformanceError(Exception): """Generation error reporting exception.""" pass def _run_sh_command(args, cwd, full_cmd): """Run an external command and capture stdout/stderr.""" # Quote the command line for printing full_cmd_esc = [shlex.quote(x) for x in full_cmd] if args.capture_output: logger.debug(f"Command: {full_cmd_esc}") rc = subprocess.run( full_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd ) if args.capture_output: stdout = rc.stdout.decode("utf-8") logger.debug(f"stdout: \n{stdout}") if rc.returncode != 0: raise Exception( "Error running command: {}.\n{}".format( " ".join(full_cmd_esc), rc.stderr.decode("utf-8") ) ) return (rc.stdout, rc.stderr) def build_op_tests(args, operator, test_params): """Build tests for a given operator. Builds a set of tests based on the parameters defined in test_params Returns operator output directory """ assert operator in test_params build_tests_cmd = "tosa_verif_build_tests" op_build_dir = args.build_dir ref_cmd_base = [ build_tests_cmd, "--filter", operator, "-o", str(op_build_dir), "--seed", str(args.random_seed), ] ref_cmds = [] if args.test_type in ["positive", "both"]: # Append extra parameters and run test generator for each set of parameters. for arglist in test_params[operator]["generator_args"]: ref_cmd_pos_test = ref_cmd_base.copy() ref_cmd_pos_test.extend(["--test-type", "positive"]) ref_cmd_pos_test.extend(arglist) ref_cmds.append(ref_cmd_pos_test) if args.test_type in ["negative", "both"]: # Get target-dtypes options only to limit tests to those needed target_dtypes_args = [] for arglist in test_params[operator]["generator_args"]: idx = 0 while idx < len(arglist): if arglist[idx] == "--target-dtype": if arglist[idx + 1] not in target_dtypes_args: target_dtypes_args.extend(arglist[idx : idx + 2]) idx += 1 # skip over option (and then argument below) idx += 1 ref_cmd_neg_test = ref_cmd_base.copy() ref_cmd_neg_test.extend(["--test-type", "negative"]) # Limit sizes of negative tests ref_cmd_neg_test.extend(["--tensor-dim-range", "1,16"]) ref_cmd_neg_test.extend(target_dtypes_args) ref_cmds.append(ref_cmd_neg_test) logger.debug(f"Creating {operator} tests with {len(ref_cmds)} parameter(s)") error = False for i, cmd in enumerate(ref_cmds): try: _run_sh_command(args, args.ref_model_dir.absolute(), cmd) logger.info( f"{operator} test batch {(i+1)}/{len(ref_cmds)} created successfully" ) except Exception as e: logger.error( f"{operator} test batch {(i+1)}/{len(ref_cmds)} unsuccessful, skipping" ) logger.error(f" build_op_tests error: {e} ") error = True if error: raise (GenConformanceError()) return op_build_dir def _check_to_include_test(profile, test_name, exclude_negative_tests=False): """Check test name for exclusions, return False to indicate excluded.""" excludes = ["ERRORIF"] if exclude_negative_tests else [] excludes.extend(PROFILE_OPS_INFO[profile]["exclude_types"]) for exclusion in excludes: if f"_{exclusion}_" in test_name: return False return True def _get_all_tests_list( profile, test_root_dir, operator, exclude_negative_tests=False, include_all=False ): """Create test list based on tests in the test_dir.""" test_dir = test_root_dir / operator if not test_dir.is_dir(): # Tests are split into multiple dirs, for example: conv2d_1x1, conv2d_3x3 test_dir = test_root_dir directories = [ tdir for tdir in test_dir.glob("*") if tdir.name.startswith(operator) ] else: directories = [test_dir] tests = [] for tdir in directories: tests.extend( [ test for test in tdir.glob("*") if include_all or _check_to_include_test(profile, test.name, exclude_negative_tests) ] ) return tests def generate_results(args, profile, operator, op_build_dir, tests=None): """Run tests on reference model and save result to the test directory.""" num_cores = args.num_cores run_tests_cmd = "tosa_verif_run_tests" ref_model_path = args.ref_model_dir / LOCATION_REF_MODEL_BINARY ref_cmd_base = ref_cmd = [ run_tests_cmd, "--ref-model-path", str(ref_model_path.absolute()), "-j", str(num_cores), "-v", "-t", ] ref_cmds = [] if not tests: # Do not need to run ERRORIF tests as they don't have result files tests = _get_all_tests_list( profile, op_build_dir, operator, exclude_negative_tests=True ) for test in tests: ref_cmd = ref_cmd_base.copy() ref_cmd.append(str(test)) ref_cmds.append(ref_cmd) fail_string = "UNEXPECTED_FAILURE" failed_counter = 0 job_pool = mp.Pool(args.num_cores) sh_partial = partial(_run_sh_command, args, args.ref_model_dir.absolute()) pool_results = job_pool.map(sh_partial, ref_cmds) job_pool.close() job_pool.join() # Use captured output for run_sh_command to work out if test passed. for i, rc in enumerate(pool_results): if fail_string in str(rc[0]): logger.error(f"Test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} failed.") failed_counter += 1 else: logger.info(f"Test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} passed.") logger.info(f"{len(ref_cmds)-failed_counter}/{len(ref_cmds)} tests passed") logger.info("Ran tests on model and saved results of passing tests") def convert_tests( args, profile, operator, op_build_dir, output_dir, op_profiles_list, tests=None, group=None, trim_op_subdir=False, ): """Convert tests to JSON and save to output directory.""" ref_model_dir = args.ref_model_dir if group: output_dir = output_dir / group ref_cmd_base = ["--ref-model-directory", str(ref_model_dir)] # This op maybe in more than one profile - e.g. tosa_bi and tosa_mi # even if we are only producing tests for tosa_mi for op_profile in op_profiles_list: ref_cmd_base.extend(["--profile", op_profile]) if args.framework_schema: ref_cmd_base.extend(["--framework-schema", str(args.framework_schema)]) ref_cmd_base.append("--output-directory") ref_cmds = [] if not tests: tests = _get_all_tests_list(profile, op_build_dir, operator) logger.info(f"Converting all {profile} profile tests") # Controls if we copy the tests in their operator sub-directory or not output_dir_relative_pos = -1 if trim_op_subdir else -2 for test in tests: logger.info(f"Test chosen: {test}") ref_cmd = ref_cmd_base.copy() full_output_directory = output_dir / test.relative_to( *test.parts[:output_dir_relative_pos] ) ref_cmd.append(str(full_output_directory)) ref_cmd.append(str(test)) ref_cmds.append(ref_cmd) if len(ref_cmds) == 0: logger.warning("No tests found. Nothing to convert") return job_pool = mp.Pool(args.num_cores) pool_results = job_pool.map(c2c_main, ref_cmds) job_pool.close() job_pool.join() failed_counter = 0 for i, result in enumerate(pool_results): if result != 0: logger.error( f"test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} failed to convert." ) failed_counter += 1 else: logger.info(f"test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} converted") logger.info( f"{len(ref_cmds)-failed_counter}/{len(ref_cmds)} tests successfully converted" ) if failed_counter > 0: logger.error(f"Stopping due to {failed_counter} test conversion errors") raise (GenConformanceError()) logger.info("Converted tests to JSON and saved to output directory") return output_dir def get_op_tests_selection( args, profile, operator, op_build_dir, test_params, negative=False ): """Use test picker to get subsection of tests generated.""" assert operator in test_params logger.info("Choosing {} tests".format(("negative" if negative else "positive"))) try: op_params = test_params[operator] op = Operator.registry[operator]( op_build_dir, op_params, negative, exclude_types=PROFILE_OPS_INFO[profile]["exclude_types"], ) except KeyError: logger.error(f"{operator} operator is not supported by test_select") raise (GenConformanceError()) return op.select_tests() def check_op_tests(args, profile, operator, output_dir): """Move test folders than contain files larger than 30MB to new directory.""" destination_dir = str(args.output_dir) + "_large_files" tests = _get_all_tests_list(profile, output_dir, operator, include_all=True) if not tests: logger.error( f"Couldn't find any tests to size check for {operator} in {output_dir}" ) raise (GenConformanceError()) for tdir in tests: move_dir = False test_files = [file for file in tdir.glob("*")] for file in test_files: file_size = os.stat(file).st_size / 1024**2 if file_size > 30: move_dir = True if move_dir: move_destination = destination_dir / tdir.relative_to(output_dir) logger.warning( f"{tdir.relative_to(output_dir)} contains files that are too large (>30MB), test moved to new folder: {destination_dir}" ) if move_destination.is_dir(): logger.warning( f"{move_destination} directory already exists, deleting existing." ) shutil.rmtree(str(move_destination)) shutil.move(str(tdir), move_destination) def copy_rename_framework_tests(args, operator, test_picks): """Copy framework tests into new folder and rename them if needed. The tests are renamed to match the framework operator names if an alternate name has been used instead. """ framework_tests_dir = args.framework_tests_dir new_tests_dir = args.build_dir / "frameworks" / operator os.makedirs(new_tests_dir, exist_ok=True) # Get the framework tests operator name if "alternate_names" in test_picks[operator]: alternate_names = test_picks[operator]["alternate_names"] else: alternate_names = [operator] # Get the alternate named test directories for the operator for alt_name in alternate_names: test_prefix = f"test_{alt_name}" test_dirs = list(framework_tests_dir.glob(f"{test_prefix}_*")) # Copy tests to new directory and rename to match framework operator names # - if there is just 1 alternate name, replace the full test prefix # test_add_... -> add_... # - if there are multiple alternate names, just replace the "test" # test_concatv2_... -> concatenation_concatv2_... old_prefix = test_prefix if len(alternate_names) == 1 else "test" for tdir in test_dirs: new_test_name = tdir.name.replace(old_prefix, operator) copy_destination = new_tests_dir / new_test_name logger.debug(f"copying test folder {tdir} to {copy_destination}") copy_tree(str(tdir), str(copy_destination)) logger.info(f"Copied and renamed {len(test_dirs)} framework test folders") return new_tests_dir.parent def get_framework_tests_selection(args, operator, test_picks, op_build_dir): """Get the list of pre-chosen tests with relative paths.""" try: tests = test_picks[operator]["tests"] except KeyError: logger.error(f"Framework test selection not defined for {operator} operator") raise (GenConformanceError()) test_paths = [op_build_dir / operator / test for test in tests] return test_paths def parse_args(argv=None): """Parse the arguments.""" parser = argparse.ArgumentParser() profiles = list(PROFILE_OPS_INFO.keys()) profiles.append(PROFILES_ALL) parser.add_argument( "--profile", dest="profile", choices=profiles, default=profiles[0], type=str, help=f"TOSA profile (default is {profiles[0]})", ) parser.add_argument( "--operators", type=str, nargs="*", help="The operator(s) to create tests for, if not supplied all tests will be created", ) parser.add_argument( "--unit-tests", dest="unit_tests", choices=["operator", "framework", "both"], default="operator", type=str, help="Which unit tests are produced (default is operator)", ) parser.add_argument( "--test-type", dest="test_type", choices=["positive", "negative", "both"], default="both", type=str, help="Type of tests produced (default is both)", ) parser.add_argument( "--ref-model-directory", dest="ref_model_dir", type=Path, required=True, help="Reference Model directory (must be pre-built)", ) parser.add_argument( "--seed", dest="random_seed", default=DEFAULT_SEED, type=int, help="Random test seed", ) parser.add_argument( "--framework-tests-directory", dest="framework_tests_dir", type=Path, default=Path.cwd() / "tests", help="The pre-built framework tests directory (default is tests)", ) parser.add_argument( "--framework-schema", dest="framework_schema", type=Path, help="Framework flatbuffers schema needed to convert framework models", ) parser.add_argument( "--build-directory", dest="build_dir", type=Path, default=Path.cwd() / "conformance_build", help="Temporary build directory for files created during this process (default is conformance_build)", ) parser.add_argument( "--output-directory", dest="output_dir", type=Path, default=Path.cwd() / "conformance", help="Output directory (default is conformance)", ) script_dir = Path(__file__).parent.absolute() parser.add_argument( "--test-param-json-directory", dest="param_json_dir", type=Path, default=script_dir, help=f"Test parameters (ops info) JSON file directory (default is {script_dir})", ) parser.add_argument( "--convert-all-tests", action="store_true", help="Converts all tests instead of those picked by test_select", ) parser.add_argument( "--keep-large-files", action="store_true", help="Keeps tests that contain files larger than 30MB in output directory", ) parser.add_argument( "--capture-output", action="store_true", help="Prints output of running sh commands", ) parser.add_argument( "-j", dest="num_cores", type=int, default=6, help="Number of simultaneous jobs to split the tasks into for multiprocessing", ) parser.add_argument( "-v", dest="verbosity", action="count", default=0, help="Verbosity (can be used multiple times for more details)", ) args = parser.parse_args(argv) return args def main(): args = parse_args() if not args.ref_model_dir.is_dir(): logger.error( f"Missing or invalid reference model directory: {args.ref_model_dir}" ) return 2 else: ref_model = args.ref_model_dir / LOCATION_REF_MODEL_BINARY if not ref_model.is_file(): logger.error( f"{LOCATION_REF_MODEL_BINARY} not found in {args.ref_model_dir}\nHave you built the reference model?" ) return 2 if args.unit_tests in ["framework", "both"]: if not args.framework_schema: logger.error( "Need to supply location of Framework flatbuffers schema via --framework-schema" ) return 2 if not args.framework_tests_dir.is_dir(): logger.error( f"Missing or invalid framework tests directory: {args.framework_tests_dir}" ) return 2 loglevels = (logging.WARNING, logging.INFO, logging.DEBUG) loglevel = loglevels[min(args.verbosity, len(loglevels) - 1)] logger.setLevel(loglevel) # Set other loggers the same logging.getLogger("test_select").setLevel(loglevel) logging.getLogger("convert2conformance").setLevel(loglevel) print(f"Output directory: {args.output_dir}") if args.random_seed != DEFAULT_SEED: logger.warning( "Random test seed changed from default, tests will not match official conformance" ) args.build_dir = args.build_dir.resolve() logger.debug(f"Creating build directory: {args.build_dir}") args.build_dir.mkdir(parents=True, exist_ok=True) # TODO: For tosa-mi should really generate tosa-bi profile as well # - for now leave it as subset instead of as superset (for testing) if args.profile == PROFILES_ALL: profiles = list(PROFILE_OPS_INFO.keys()) else: profiles = [args.profile] try: for profile in profiles: print(f"Creating conformance tests for TOSA {profile} profile") # Framework unit tests if args.unit_tests in ["framework", "both"]: logger.debug("Creating FRAMEWORK unit tests") test_picks_file = ( args.param_json_dir / PROFILE_OPS_INFO[profile]["framework_tests"] ) try: with open(test_picks_file, "r") as fd: test_picks = json.load(fd) except Exception as e: logger.error( f"Couldn't load framework tests info - {test_picks_file}: {e}" ) return 1 operators = args.operators if not operators: # Create tests for all the operators operators = list(test_picks.keys()) root_output_dir = ( args.output_dir / "frameworks" / "tflite" / "operators" ) for op in operators: logger.info(f"FRAMEWORK OP: {op}") if op not in test_picks: logger.warning( f"Framework op {op} not found in {test_picks_file} - skipping" ) continue op_profiles_list = test_picks[op]["profile"] if ( args.profile != PROFILES_ALL and args.profile not in op_profiles_list ): # Skip this operator as not part of the profile chosen logger.debug(f"Skipping {op} as not part of {args.profile}") continue logger.debug(f"Copying and renaming {op}") framework_test_dir = copy_rename_framework_tests( args, op, test_picks ) if args.convert_all_tests: logger.debug("Running and converting all framework tests") framework_tests = None # Don't select any else: logger.debug("Running and converting selected framework tests") framework_tests = get_framework_tests_selection( args, op, test_picks, framework_test_dir ) convert_tests( args, profile, op, framework_test_dir, root_output_dir, op_profiles_list, tests=framework_tests, trim_op_subdir=True, ) # Operator unit tests if args.unit_tests in ["operator", "both"]: logger.debug("Creating OPERATOR unit tests") test_params_file = ( args.param_json_dir / PROFILE_OPS_INFO[profile]["operator_test_params"] ) try: with open(test_params_file, "r") as fd: test_params = json.load(fd) except Exception as e: logger.error( f"Couldn't load operator test params - {test_params_file}: {e}" ) return 1 operators = args.operators if not operators: # Create tests for all the operators operators = list(test_params.keys()) for op in operators: logger.info(f"OPERATOR: {op}") if op not in test_params: logger.warning( f"{op} operator parameters not found in {test_params_file} - skipping" ) continue if ( args.test_type == "negative" and "no_negative_tests" in test_params[op] and test_params[op]["no_negative_tests"] ): logger.warning(f"No negative tests for {op}") continue op_profiles_list = test_params[op]["profile"] if ( args.profile != PROFILES_ALL and args.profile not in op_profiles_list ): # Skip this operator as not part of the profile chosen logger.debug(f"Skipping {op} as not part of {args.profile}") continue op_build_dir = build_op_tests(args, op, test_params) operator_group = test_params[op]["group"] root_output_dir = args.output_dir / "operators" if args.convert_all_tests: logger.debug(f"Running and converting all {op} tests") generate_results(args, profile, op, op_build_dir) operator_test_list = None else: logger.debug(f"Running and converting selection of {op} tests") if args.test_type in ["positive", "both"]: tests_gen, tests_gen2 = tee( get_op_tests_selection( args, profile, op, op_build_dir, test_params ) ) generate_results(args, profile, op, op_build_dir, tests_gen) operator_test_list = list(tests_gen2) else: operator_test_list = [] if args.test_type in ["negative", "both"] and ( "no_negative_tests" not in test_params[op] or not test_params[op]["no_negative_tests"] ): operator_test_list.extend( get_op_tests_selection( args, profile, op, op_build_dir, test_params, negative=True, ) ) output_dir = convert_tests( args, profile, op, op_build_dir, root_output_dir, op_profiles_list, tests=operator_test_list, group=operator_group, ) if not args.keep_large_files: check_op_tests(args, profile, op, output_dir) except GenConformanceError: return 1 return 0 if __name__ == "__main__": exit(main())