aboutsummaryrefslogtreecommitdiff
path: root/verif/conformance/tosa_verif_conformance_generator.py
diff options
context:
space:
mode:
Diffstat (limited to 'verif/conformance/tosa_verif_conformance_generator.py')
-rw-r--r--verif/conformance/tosa_verif_conformance_generator.py676
1 files changed, 676 insertions, 0 deletions
diff --git a/verif/conformance/tosa_verif_conformance_generator.py b/verif/conformance/tosa_verif_conformance_generator.py
new file mode 100644
index 0000000..836c639
--- /dev/null
+++ b/verif/conformance/tosa_verif_conformance_generator.py
@@ -0,0 +1,676 @@
+#!/usr/bin/env python3
+# Copyright (c) 2021-2022, ARM Limited.
+# SPDX-License-Identifier: Apache-2.0
+"""Build conformance tests.
+
+Steps:
+- Specific input shapes (or tests) are specified and produced by using the
+ settings in the .json files.
+- Tests are selected to produce a good coverage.
+- Tests are run on the reference model to produce the correct output files.
+- Tests are converted into JSON format and saved to desired output directory.
+"""
+import argparse
+import json
+import logging
+import multiprocessing as mp
+import os
+import shlex
+import shutil
+import subprocess
+from functools import partial
+from itertools import tee
+from pathlib import Path
+
+from conformance.test_select import Operator
+from convert2conformance.convert2conformance import main as c2c_main
+from distutils.dir_util import copy_tree
+
+logging.basicConfig()
+logger = logging.getLogger("tosa_verif_conformance_generator")
+
+# Configuration for each TOSA profile
+PROFILE_OPS_INFO = {
+ "base": {
+ "operator_test_params": "tosa_base_profile_ops_info.json",
+ "framework_tests": "tosa_base_profile_framework_ops_info.json",
+ "exclude_types": ["float"],
+ }
+}
+
+LOCATION_REF_MODEL_BINARY = Path("build/reference_model/tosa_reference_model")
+
+
+class GenConformanceError(Exception):
+ """Generation error reporting exception."""
+
+ pass
+
+
+def _run_sh_command(args, cwd, full_cmd):
+ """Run an external command and capture stdout/stderr."""
+ # Quote the command line for printing
+ full_cmd_esc = [shlex.quote(x) for x in full_cmd]
+ if args.capture_output:
+ logger.debug(f"Command: {full_cmd_esc}")
+
+ rc = subprocess.run(
+ full_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd
+ )
+
+ if args.capture_output:
+ stdout = rc.stdout.decode("utf-8")
+ logger.debug(f"stdout: \n{stdout}")
+ if rc.returncode != 0:
+
+ raise Exception(
+ "Error running command: {}.\n{}".format(
+ " ".join(full_cmd_esc), rc.stderr.decode("utf-8")
+ )
+ )
+ return (rc.stdout, rc.stderr)
+
+
+def build_op_tests(args, operator, test_params):
+ """Build tests for a given operator.
+
+ Builds a set of tests based on the parameters defined in test_params
+
+ Returns operator output directory
+ """
+ assert operator in test_params
+
+ build_tests_cmd = "tosa_verif_build_tests"
+ op_build_dir = args.build_dir
+
+ ref_cmd_base = [
+ build_tests_cmd,
+ "--filter",
+ operator,
+ "-o",
+ str(op_build_dir),
+ "--seed",
+ "42",
+ ]
+
+ ref_cmds = []
+
+ if args.test_type in ["positive", "both"]:
+ # Append extra parameters and run test generator for each set of parameters.
+ for arglist in test_params[operator]["generator_args"]:
+ ref_cmd_pos_test = ref_cmd_base.copy()
+ ref_cmd_pos_test.extend(arglist)
+ ref_cmds.append(ref_cmd_pos_test)
+
+ if args.test_type in ["negative", "both"]:
+ ref_cmd_neg_test = ref_cmd_base.copy()
+ ref_cmd_neg_test.extend(["--test-type", "negative"])
+ ref_cmds.append(ref_cmd_neg_test)
+
+ logger.debug(f"Creating {operator} tests with {len(ref_cmds)} parameter(s)")
+ error = False
+ for i, cmd in enumerate(ref_cmds):
+ try:
+ _run_sh_command(args, args.ref_model_dir.absolute(), cmd)
+ logger.info(
+ f"{operator} test batch {(i+1)}/{len(ref_cmds)} created successfully"
+ )
+ except Exception as e:
+ logger.error(
+ f"{operator} test batch {(i+1)}/{len(ref_cmds)} unsuccessful, skipping"
+ )
+ logger.error(f" build_op_tests error: {e} ")
+ error = True
+ if error:
+ raise (GenConformanceError())
+
+ return op_build_dir
+
+
+def _check_to_include_test(profile, test_name, exclude_negative_tests=False):
+ """Check test name for exclusions, return False to indicate excluded."""
+ excludes = ["ERRORIF"] if exclude_negative_tests else []
+ excludes.extend(PROFILE_OPS_INFO[profile]["exclude_types"])
+
+ for exclusion in excludes:
+ if f"_{exclusion}_" in test_name:
+ return False
+ return True
+
+
+def _get_all_tests_list(
+ profile, test_root_dir, operator, exclude_negative_tests=False, include_all=False
+):
+ """Create test list based on tests in the test_dir."""
+ test_dir = test_root_dir / operator
+ if not test_dir.is_dir():
+ # Tests are split into multiple dirs, for example: conv2d_1x1, conv2d_3x3
+ test_dir = test_root_dir
+ directories = [
+ tdir for tdir in test_dir.glob("*") if tdir.name.startswith(operator)
+ ]
+ else:
+ directories = [test_dir]
+
+ tests = []
+ for tdir in directories:
+ tests.extend(
+ [
+ test
+ for test in tdir.glob("*")
+ if include_all
+ or _check_to_include_test(profile, test.name, exclude_negative_tests)
+ ]
+ )
+ return tests
+
+
+def generate_results(args, operator, op_build_dir, tests=None):
+ """Run tests on reference model and save result to the test directory."""
+ num_cores = args.num_cores
+ run_tests_cmd = "tosa_verif_run_tests"
+
+ ref_model_path = args.ref_model_dir / LOCATION_REF_MODEL_BINARY
+ ref_cmd_base = ref_cmd = [
+ run_tests_cmd,
+ "--ref-model-path",
+ str(ref_model_path.absolute()),
+ "-j",
+ str(num_cores),
+ "-v",
+ "-t",
+ ]
+ ref_cmds = []
+
+ if not tests:
+ # Do not need to run ERRORIF tests as they don't have result files
+ tests = _get_all_tests_list(
+ args.profile, op_build_dir, operator, exclude_negative_tests=True
+ )
+
+ for test in tests:
+ ref_cmd = ref_cmd_base.copy()
+ ref_cmd.append(str(test))
+ ref_cmds.append(ref_cmd)
+
+ fail_string = "UNEXPECTED_FAILURE"
+ failed_counter = 0
+
+ job_pool = mp.Pool(args.num_cores)
+ sh_partial = partial(_run_sh_command, args, args.ref_model_dir.absolute())
+ pool_results = job_pool.map(sh_partial, ref_cmds)
+ job_pool.close()
+ job_pool.join()
+
+ # Use captured output for run_sh_command to work out if test passed.
+ for i, rc in enumerate(pool_results):
+ if fail_string in str(rc[0]):
+ logger.error(f"Test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} failed.")
+ failed_counter += 1
+ else:
+ logger.info(f"Test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} passed.")
+
+ logger.info(f"{len(ref_cmds)-failed_counter}/{len(ref_cmds)} tests passed")
+ logger.info("Ran tests on model and saved results of passing tests")
+
+
+def convert_tests(
+ args,
+ operator,
+ op_build_dir,
+ output_dir,
+ tests=None,
+ group=None,
+ trim_op_subdir=False,
+):
+ """Convert tests to JSON and save to output directory."""
+ ref_model_dir = args.ref_model_dir
+
+ if group:
+ output_dir = output_dir / group
+
+ ref_cmd_base = ["--ref-model-directory", str(ref_model_dir)]
+ if args.framework_schema:
+ ref_cmd_base.extend(["--framework-schema", str(args.framework_schema)])
+ ref_cmd_base.append("--output-directory")
+
+ ref_cmds = []
+
+ if not tests:
+ tests = _get_all_tests_list(args.profile, op_build_dir, operator)
+ logger.info(f"Converting all {args.profile} profile tests")
+
+ # Controls if we copy the tests in their operator sub-directory or not
+ output_dir_relative_pos = -1 if trim_op_subdir else -2
+ for test in tests:
+ logger.info(f"Test chosen: {test}")
+ ref_cmd = ref_cmd_base.copy()
+ full_output_directory = output_dir / test.relative_to(
+ *test.parts[:output_dir_relative_pos]
+ )
+ ref_cmd.append(str(full_output_directory))
+ ref_cmd.append(str(test))
+ ref_cmds.append(ref_cmd)
+
+ if len(ref_cmds) == 0:
+ logger.warning("No tests found. Nothing to convert")
+ return
+
+ job_pool = mp.Pool(args.num_cores)
+
+ pool_results = job_pool.map(c2c_main, ref_cmds)
+ job_pool.close()
+ job_pool.join()
+
+ failed_counter = 0
+ for i, result in enumerate(pool_results):
+ if result != 0:
+ logger.error(
+ f"test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} failed to convert."
+ )
+ failed_counter += 1
+ else:
+ logger.info(f"test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} converted")
+ logger.info(
+ f"{len(ref_cmds)-failed_counter}/{len(ref_cmds)} tests successfully converted"
+ )
+
+ if failed_counter > 0:
+ logger.error(f"Stopping due to {failed_counter} test conversion errors")
+ raise (GenConformanceError())
+
+ logger.info("Converted tests to JSON and saved to output directory")
+
+ return output_dir
+
+
+def get_op_tests_selection(args, operator, op_build_dir, test_params, negative=False):
+ """Use test picker to get subsection of tests generated."""
+ assert operator in test_params
+ try:
+ op_params = test_params[operator]
+ op = Operator.registry[operator](
+ op_build_dir,
+ op_params,
+ negative,
+ exclude_types=PROFILE_OPS_INFO[args.profile]["exclude_types"],
+ )
+ except KeyError:
+ logger.error(f"{operator} operator is not supported by test_select")
+ raise (GenConformanceError())
+
+ return op.select_tests()
+
+
+def check_op_tests(args, operator, output_dir):
+ """Move test folders than contain files larger than 30MB to new directory."""
+ destination_dir = str(args.output_dir) + "_large_files"
+
+ tests = _get_all_tests_list(args.profile, output_dir, operator, include_all=True)
+ if not tests:
+ logger.error(
+ f"Couldn't find any tests to size check for {operator} in {output_dir}"
+ )
+ raise (GenConformanceError())
+
+ for tdir in tests:
+ move_dir = False
+ test_files = [file for file in tdir.glob("*")]
+ for file in test_files:
+ file_size = os.stat(file).st_size / 1024**2
+ if file_size > 30:
+ move_dir = True
+
+ if move_dir:
+ move_destination = destination_dir / tdir.relative_to(output_dir)
+ logger.warning(
+ f"{tdir.relative_to(output_dir)} contains files that are too large (>30MB), test moved to new folder: {destination_dir}"
+ )
+
+ if move_destination.is_dir():
+ logger.warning(
+ f"{move_destination} directory already exists, deleting existing."
+ )
+ shutil.rmtree(str(move_destination))
+ shutil.move(str(tdir), move_destination)
+
+
+def copy_rename_framework_tests(args, operator, test_picks):
+ """Copy framework tests into new folder and rename them if needed.
+
+ The tests are renamed to match the framework operator names if an
+ alternate name has been used instead.
+ """
+ framework_tests_dir = args.framework_tests_dir
+ new_tests_dir = args.build_dir / "frameworks" / operator
+ os.makedirs(new_tests_dir, exist_ok=True)
+
+ # Get the framework tests operator name
+ if "alternate_names" in test_picks[operator]:
+ alternate_names = test_picks[operator]["alternate_names"]
+ else:
+ alternate_names = [operator]
+
+ # Get the alternate named test directories for the operator
+ for alt_name in alternate_names:
+ test_prefix = f"test_{alt_name}"
+ test_dirs = list(framework_tests_dir.glob(f"{test_prefix}_*"))
+
+ # Copy tests to new directory and rename to match framework operator names
+ # - if there is just 1 alternate name, replace the full test prefix
+ # test_add_... -> add_...
+ # - if there are multiple alternate names, just replace the "test"
+ # test_concatv2_... -> concatenation_concatv2_...
+ old_prefix = test_prefix if len(alternate_names) == 1 else "test"
+
+ for tdir in test_dirs:
+ new_test_name = tdir.name.replace(old_prefix, operator)
+ copy_destination = new_tests_dir / new_test_name
+ logger.debug(f"copying test folder {tdir} to {copy_destination}")
+ copy_tree(str(tdir), str(copy_destination))
+
+ logger.info(f"Copied and renamed {len(test_dirs)} framework test folders")
+ return new_tests_dir.parent
+
+
+def get_framework_tests_selection(args, operator, test_picks, op_build_dir):
+ """Get the list of pre-chosen tests with relative paths."""
+ try:
+ tests = test_picks[operator]["tests"]
+ except KeyError:
+ logger.error(f"Framework test selection not defined for {operator} operator")
+ raise (GenConformanceError())
+
+ test_paths = [op_build_dir / operator / test for test in tests]
+ return test_paths
+
+
+def parse_args(argv=None):
+ """Parse the arguments."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--operators",
+ type=str,
+ nargs="*",
+ help="The operator(s) to create tests for, if not supplied all tests will be created",
+ )
+ parser.add_argument(
+ "--ref-model-directory",
+ dest="ref_model_dir",
+ type=Path,
+ required=True,
+ help="Reference Model directory (must be pre-built)",
+ )
+ script_dir = Path(__file__).parent.absolute()
+ parser.add_argument(
+ "--test-param-json-directory",
+ dest="param_json_dir",
+ type=Path,
+ default=script_dir,
+ help="Test parameters (ops info) JSON file directory",
+ )
+ parser.add_argument(
+ "--convert-all-tests",
+ action="store_true",
+ help="Converts all tests instead of those picked by test_select",
+ )
+ parser.add_argument(
+ "--keep-large-files",
+ action="store_true",
+ help="Keeps tests that contain files larger than 30MB in output directory",
+ )
+ parser.add_argument(
+ "--capture-output",
+ action="store_true",
+ help="Prints output of running sh commands",
+ )
+ parser.add_argument(
+ "--build-directory",
+ dest="build_dir",
+ type=Path,
+ default=Path.cwd() / "conformance_build",
+ help="Temporary build directory for files created during this process (default is conformance_build)",
+ )
+ parser.add_argument(
+ "--output-directory",
+ dest="output_dir",
+ type=Path,
+ default=Path.cwd() / "conformance",
+ help="Output directory (default is conformance)",
+ )
+ parser.add_argument(
+ "-j",
+ dest="num_cores",
+ type=int,
+ default=6,
+ help="Number of simultaneous jobs to split the tasks into for multiprocessing",
+ )
+ parser.add_argument(
+ "-v",
+ dest="verbosity",
+ action="count",
+ default=0,
+ help="Verbosity (can be used multiple times for more details)",
+ )
+ parser.add_argument(
+ "--unit-tests",
+ dest="unit_tests",
+ choices=["operator", "framework", "both"],
+ default="operator",
+ type=str,
+ help="Which unit tests are produced: operator, framework, or both",
+ )
+ parser.add_argument(
+ "--test-type",
+ dest="test_type",
+ choices=["positive", "negative", "both"],
+ default="both",
+ type=str,
+ help="Type of tests produced: positive, negative, or both",
+ )
+ profiles = list(PROFILE_OPS_INFO.keys())
+ parser.add_argument(
+ "--profile",
+ dest="profile",
+ choices=profiles,
+ default=profiles[0],
+ type=str,
+ help="TOSA profile",
+ )
+ parser.add_argument(
+ "--framework-tests-directory",
+ dest="framework_tests_dir",
+ type=Path,
+ default=Path.cwd() / "tests",
+ help="The pre-built framework tests directory (default is tests)",
+ )
+ parser.add_argument(
+ "--framework-schema",
+ dest="framework_schema",
+ type=Path,
+ help="Framework flatbuffers schema needed to convert framework models",
+ )
+ args = parser.parse_args(argv)
+
+ return args
+
+
+def main():
+ args = parse_args()
+
+ if not args.ref_model_dir.is_dir():
+ logger.error(
+ f"Missing or invalid reference model directory: {args.ref_model_dir}"
+ )
+ return 2
+ else:
+ ref_model = args.ref_model_dir / LOCATION_REF_MODEL_BINARY
+ if not ref_model.is_file():
+ logger.error(
+ f"{LOCATION_REF_MODEL_BINARY} not found in {args.ref_model_dir}\nHave you built the reference model?"
+ )
+ return 2
+ if args.unit_tests in ["framework", "both"]:
+ if not args.framework_schema:
+ logger.error(
+ "Need to supply location of Framework flatbuffers schema via --framework-schema"
+ )
+ return 2
+ if not args.framework_tests_dir.is_dir():
+ logger.error(
+ f"Missing or invalid framework tests directory: {args.framework_tests_dir}"
+ )
+ return 2
+
+ loglevels = (logging.WARNING, logging.INFO, logging.DEBUG)
+ loglevel = loglevels[min(args.verbosity, len(loglevels) - 1)]
+ logger.setLevel(loglevel)
+ # Set other loggers the same
+ logging.getLogger("test_select").setLevel(loglevel)
+ logging.getLogger("convert2conformance").setLevel(loglevel)
+
+ print(f"Creating conformance tests for TOSA {args.profile} profile")
+ print(f"Output directory: {args.output_dir}")
+
+ args.build_dir = args.build_dir.resolve()
+ logger.debug(f"Creating build directory: {args.build_dir}")
+ args.build_dir.mkdir(parents=True, exist_ok=True)
+
+ try:
+ # Framework unit tests
+ if args.unit_tests in ["framework", "both"]:
+ logger.debug("Creating FRAMEWORK unit tests")
+ test_picks_file = (
+ args.param_json_dir / PROFILE_OPS_INFO[args.profile]["framework_tests"]
+ )
+ try:
+ with open(test_picks_file, "r") as fd:
+ test_picks = json.load(fd)
+ except Exception as e:
+ logger.error(
+ f"Couldn't load framework tests info - {test_picks_file}: {e}"
+ )
+ return 1
+
+ operators = args.operators
+ if not operators:
+ # Create tests for all the operators
+ operators = list(test_picks.keys())
+
+ root_output_dir = args.output_dir / "frameworks" / "tflite" / "operators"
+ for op in operators:
+ if op not in test_picks:
+ logger.warning(
+ f"Framework op {op} not found in {test_picks_file} - skipping"
+ )
+ continue
+
+ logger.debug(f"Copying and renaming {op}")
+ framework_test_dir = copy_rename_framework_tests(args, op, test_picks)
+ if args.convert_all_tests:
+ logger.debug("Running and converting all framework tests")
+ convert_tests(
+ args,
+ op,
+ framework_test_dir,
+ root_output_dir,
+ trim_op_subdir=True,
+ )
+ else:
+ framework_tests = get_framework_tests_selection(
+ args, op, test_picks, framework_test_dir
+ )
+ convert_tests(
+ args,
+ op,
+ framework_test_dir,
+ root_output_dir,
+ tests=framework_tests,
+ trim_op_subdir=True,
+ )
+
+ # Operator unit tests
+ if args.unit_tests in ["operator", "both"]:
+ logger.debug("Creating OPERATOR unit tests")
+ test_params_file = (
+ args.param_json_dir
+ / PROFILE_OPS_INFO[args.profile]["operator_test_params"]
+ )
+ try:
+ with open(test_params_file, "r") as fd:
+ test_params = json.load(fd)
+ except Exception as e:
+ logger.error(
+ f"Couldn't load operator test params - {test_params_file}: {e}"
+ )
+ return 1
+
+ operators = args.operators
+ if not operators:
+ # Create tests for all the operators
+ operators = list(test_params.keys())
+
+ for op in operators:
+ if op not in test_params:
+ logger.warning(
+ f"{op} operator parameters not found in {test_params_file} - skipping"
+ )
+ continue
+
+ if (
+ args.test_type == "negative"
+ and "no_negative_tests" in test_params[op]
+ and test_params[op]["no_negative_tests"]
+ ):
+ logger.warning(f"No negative tests for {op}")
+ continue
+
+ op_build_dir = build_op_tests(args, op, test_params)
+
+ operator_group = test_params[op]["group"]
+ root_output_dir = args.output_dir / "operators"
+ if args.convert_all_tests:
+ logger.debug(f"Running and converting all {op} tests")
+ generate_results(args, op, op_build_dir)
+ output_dir = convert_tests(
+ args, op, op_build_dir, root_output_dir, group=operator_group
+ )
+ else:
+ if args.test_type in ["positive", "both"]:
+ tests_gen1, tests_gen2 = tee(
+ get_op_tests_selection(args, op, op_build_dir, test_params)
+ )
+ generate_results(args, op, op_build_dir, tests_gen1)
+ output_dir = convert_tests(
+ args,
+ op,
+ op_build_dir,
+ root_output_dir,
+ tests=tests_gen2,
+ group=operator_group,
+ )
+ if args.test_type in ["negative", "both"] and (
+ "no_negative_tests" not in test_params[op]
+ or not test_params[op]["no_negative_tests"]
+ ):
+ negative_tests = get_op_tests_selection(
+ args, op, op_build_dir, test_params, negative=True
+ )
+ output_dir = convert_tests(
+ args,
+ op,
+ op_build_dir,
+ root_output_dir,
+ tests=negative_tests,
+ group=operator_group,
+ )
+ if not args.keep_large_files:
+ check_op_tests(args, op, output_dir)
+ except GenConformanceError:
+ return 1
+
+ return 0
+
+
+if __name__ == "__main__":
+ exit(main())