diff options
author | Jeremy Johnson <jeremy.johnson@arm.com> | 2021-12-15 17:14:56 +0000 |
---|---|---|
committer | Jeremy Johnson <jeremy.johnson@arm.com> | 2022-01-06 11:40:12 +0000 |
commit | be1a9408eb53871d96a022f59664f016926a8cf4 (patch) | |
tree | 458e8a389c0c909fc6008dfb4cc577e1b0a895e5 /verif | |
parent | 2ec3494060ffdafec072fe1b2099a8177b8eca6a (diff) | |
download | reference_model-be1a9408eb53871d96a022f59664f016926a8cf4.tar.gz |
Update tosa_verif_run_ref
Rename to tosa_verif_run_tests to match build_tests
Improve output and system under test support
Improve xunit support
Add results checker
Add utilities json2numpy and json2fbbin
Add set of python tests
Update README.md
Signed-off-by: Jeremy Johnson <jeremy.johnson@arm.com>
Change-Id: Ia09f8e6fd126579b3ba1c1cda95c1326802417ca
Diffstat (limited to 'verif')
-rw-r--r-- | verif/checker/__init__.py | 3 | ||||
-rw-r--r-- | verif/checker/tosa_result_checker.py | 187 | ||||
-rw-r--r-- | verif/runner/run_command.py | 61 | ||||
-rw-r--r-- | verif/runner/tosa_ref_run.py | 78 | ||||
-rw-r--r-- | verif/runner/tosa_refmodel_sut_run.py | 73 | ||||
-rw-r--r-- | verif/runner/tosa_test_runner.py | 212 | ||||
-rw-r--r-- | verif/runner/tosa_verif_run_ref.py | 267 | ||||
-rw-r--r-- | verif/runner/tosa_verif_run_tests.py | 375 | ||||
-rwxr-xr-x | verif/tests/mock_flatc.py | 51 | ||||
-rw-r--r-- | verif/tests/test_json2numpy.py | 142 | ||||
-rw-r--r-- | verif/tests/test_tosa_result_checker.py | 197 | ||||
-rw-r--r-- | verif/tests/test_tosa_run_tests_args.py | 68 | ||||
-rw-r--r-- | verif/tests/test_tosa_run_tests_mocksut.py | 241 | ||||
-rw-r--r-- | verif/tests/test_tosa_run_tests_runshcmd.py | 54 | ||||
-rw-r--r-- | verif/tests/tosa_dummy_sut_run.py | 20 | ||||
-rw-r--r-- | verif/tests/tosa_mock_sut_run.py | 118 |
16 files changed, 1757 insertions, 390 deletions
diff --git a/verif/checker/__init__.py b/verif/checker/__init__.py new file mode 100644 index 0000000..39e9ecc --- /dev/null +++ b/verif/checker/__init__.py @@ -0,0 +1,3 @@ +"""Namespace.""" +# Copyright (c) 2021-2022 Arm Limited. +# SPDX-License-Identifier: Apache-2.0 diff --git a/verif/checker/tosa_result_checker.py b/verif/checker/tosa_result_checker.py new file mode 100644 index 0000000..3a15de9 --- /dev/null +++ b/verif/checker/tosa_result_checker.py @@ -0,0 +1,187 @@ +"""TOSA result checker script.""" +# Copyright (c) 2020-2022, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +import argparse +import os +from enum import Enum +from enum import IntEnum +from enum import unique +from pathlib import Path + +import numpy as np + +################################## +no_color_printing = False + + +@unique +class LogColors(Enum): + """Shell escape sequence colors for logging.""" + + NONE = "\u001b[0m" + GREEN = "\u001b[32;1m" + RED = "\u001b[31;1m" + YELLOW = "\u001b[33;1m" + BOLD_WHITE = "\u001b[1m" + + +def print_color(color, msg): + """Print color status messages if enabled.""" + if no_color_printing: + print(msg) + else: + print("{}{}{}".format(color.value, msg, LogColors.NONE.value)) + + +@unique +class TestResult(IntEnum): + """Test result values.""" + + # Note: PASS must be 0 for command line return success + PASS = 0 + MISSING_FILE = 1 + INCORRECT_FORMAT = 2 + MISMATCH = 3 + INTERNAL_ERROR = 4 + + +TestResultErrorStr = [ + "", + "Missing file", + "Incorrect format", + "Mismatch", + "Internal error", +] +################################## + + +def test_check( + reference, result, test_name="test", quantize_tolerance=0, float_tolerance=1e-3 +): + """Check if the result is the same as the expected reference.""" + if not os.path.isfile(reference): + print_color(LogColors.RED, "Reference MISSING FILE {}".format(test_name)) + msg = "Missing reference file: {}".format(reference) + return (TestResult.MISSING_FILE, 0.0, msg) + if not os.path.isfile(result): + print_color(LogColors.RED, "Results MISSING FILE {}".format(test_name)) + msg = "Missing result file: {}".format(result) + return (TestResult.MISSING_FILE, 0.0, msg) + + try: + test_result = np.load(result) + except Exception as e: + print_color(LogColors.RED, "Results INCORRECT FORMAT {}".format(test_name)) + msg = "Incorrect numpy format of {}\nnumpy.load exception: {}".format(result, e) + return (TestResult.INCORRECT_FORMAT, 0.0, msg) + try: + reference_result = np.load(reference) + except Exception as e: + print_color(LogColors.RED, "Reference INCORRECT FORMAT {}".format(test_name)) + msg = "Incorrect numpy format of {}\nnumpy.load exception: {}".format( + reference, e + ) + return (TestResult.INCORRECT_FORMAT, 0.0, msg) + + # Type comparison + if test_result.dtype != reference_result.dtype: + print_color(LogColors.RED, "Results TYPE MISMATCH {}".format(test_name)) + msg = "Mismatch results type: Expected {}, got {}".format( + reference_result.dtype, test_result.dtype + ) + return (TestResult.MISMATCH, 0.0, msg) + + # Size comparison + # Size = 1 tensors can be equivalently represented as having rank 0 or rank + # >= 0, allow that special case + test_result = np.squeeze(test_result) + reference_result = np.squeeze(reference_result) + + if np.shape(test_result) != np.shape(reference_result): + print_color(LogColors.RED, "Results MISCOMPARE {}".format(test_name)) + msg = "Shapes mismatch: Reference {} vs {}".format( + np.shape(test_result), np.shape(reference_result) + ) + return (TestResult.MISMATCH, 0.0, msg) + + # for quantized test, allow +-(quantize_tolerance) error + if reference_result.dtype == np.int32 or reference_result.dtype == np.int64: + + if np.all(np.absolute(reference_result - test_result) <= quantize_tolerance): + print_color(LogColors.GREEN, "Results PASS {}".format(test_name)) + return (TestResult.PASS, 0.0, "") + else: + tolerance = quantize_tolerance + 1 + while not np.all( + np.absolute(reference_result - test_result) <= quantize_tolerance + ): + tolerance = tolerance + 1 + if tolerance > 10: + break + + if tolerance > 10: + msg = "Integer result does not match and is greater than 10 difference" + else: + msg = ( + "Integer result does not match but is within {} difference".format( + tolerance + ) + ) + # Fall-through to below to add failure values + + elif reference_result.dtype == bool: + assert test_result.dtype == bool + # All boolean values must match, xor will show up differences + test = np.array_equal(reference_result, test_result) + if np.all(test): + print_color(LogColors.GREEN, "Results PASS {}".format(test_name)) + return (TestResult.PASS, 0.0, "") + msg = "Boolean result does not match" + tolerance = 0.0 + # Fall-through to below to add failure values + + elif reference_result.dtype == np.float32: + tolerance = float_tolerance + if np.allclose(reference_result, test_result, atol=tolerance, equal_nan=True): + print_color(LogColors.GREEN, "Results PASS {}".format(test_name)) + return (TestResult.PASS, tolerance, "") + msg = "Float result does not match within tolerance of {}".format(tolerance) + # Fall-through to below to add failure values + + else: + print_color(LogColors.RED, "Results UNSUPPORTED TYPE {}".format(test_name)) + msg = "Unsupported results type: {}".format(reference_result.dtype) + return (TestResult.MISMATCH, 0.0, msg) + + # Fall-through for mismatch failure to add values to msg + print_color(LogColors.RED, "Results MISCOMPARE {}".format(test_name)) + np.set_printoptions(threshold=128) + msg = "{}\ntest_result: {}\n{}".format(msg, test_result.shape, test_result) + msg = "{}\nreference_result: {}\n{}".format( + msg, reference_result.shape, reference_result + ) + return (TestResult.MISMATCH, tolerance, msg) + + +def main(argv=None): + """Check that the supplied reference and result files are the same.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "reference_path", type=Path, help="the path to the reference file to test" + ) + parser.add_argument( + "result_path", type=Path, help="the path to the result file to test" + ) + args = parser.parse_args(argv) + ref_path = args.reference_path + res_path = args.result_path + + result, tolerance, msg = test_check(ref_path, res_path) + if result != TestResult.PASS: + print(msg) + + return result + + +if __name__ == "__main__": + exit(main()) diff --git a/verif/runner/run_command.py b/verif/runner/run_command.py new file mode 100644 index 0000000..eef5a76 --- /dev/null +++ b/verif/runner/run_command.py @@ -0,0 +1,61 @@ +"""Shell command runner function.""" +# Copyright (c) 2020-2022, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +import shlex +import subprocess + + +class RunShCommandError(Exception): + """Exception raised for errors running the shell command. + + Attributes: + return_code - non-zero return code from running command + full_cmd_esc - command and arguments list (pre-escaped) + stderr - (optional) - standard error output + """ + + def __init__(self, return_code, full_cmd_esc, stderr=None, stdout=None): + """Initialize run shell command error.""" + self.return_code = return_code + self.full_cmd_esc = full_cmd_esc + self.stderr = stderr + self.stdout = stdout + self.message = "Error {} running command: {}".format( + self.return_code, " ".join(self.full_cmd_esc) + ) + if stdout: + self.message = "{}\n{}".format(self.message, self.stdout) + if stderr: + self.message = "{}\n{}".format(self.message, self.stderr) + super().__init__(self.message) + + +def run_sh_command(full_cmd, verbose=False, capture_output=False): + """Run an external shell command. + + full_cmd: array containing shell command and its arguments + verbose: optional flag that enables verbose output + capture_output: optional flag to return captured stdout/stderr + """ + # Quote the command line for printing + full_cmd_esc = [shlex.quote(x) for x in full_cmd] + + if verbose: + print("### Running {}".format(" ".join(full_cmd_esc))) + + if capture_output: + rc = subprocess.run(full_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout = rc.stdout.decode("utf-8") + stderr = rc.stderr.decode("utf-8") + if verbose: + if stdout: + print(stdout, end="") + if stderr: + print(stderr, end="") + else: + stdout, stderr = None, None + rc = subprocess.run(full_cmd) + + if rc.returncode != 0: + raise RunShCommandError(rc.returncode, full_cmd_esc, stderr, stdout) + return (stdout, stderr) diff --git a/verif/runner/tosa_ref_run.py b/verif/runner/tosa_ref_run.py deleted file mode 100644 index c1d5e79..0000000 --- a/verif/runner/tosa_ref_run.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright (c) 2020-2021, ARM Limited. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import json -import shlex -import subprocess -from enum import Enum, IntEnum, unique -from runner.tosa_test_runner import TosaTestRunner, run_sh_command - - -@unique -class TosaReturnCode(IntEnum): - VALID = 0 - UNPREDICTABLE = 1 - ERROR = 2 - - -class TosaRefRunner(TosaTestRunner): - def __init__(self, args, runnerArgs, testDir): - super().__init__(args, runnerArgs, testDir) - - def runModel(self): - # Build up the TOSA reference command line - # Uses arguments from the argParser args, not the runnerArgs - args = self.args - - ref_cmd = [ - args.ref_model_path, - "-Ctest_desc={}".format(os.path.join(self.testDir, "desc.json")), - ] - - if args.ref_debug: - ref_cmd.extend(["-dALL", "-l{}".format(args.ref_debug)]) - - if args.ref_intermediates: - ref_cmd.extend(["-Ddump_intermediates=1"]) - - expectedReturnCode = self.testDesc["expected_return_code"] - - try: - rc = run_sh_command(self.args, ref_cmd) - if rc == TosaReturnCode.VALID: - if expectedReturnCode == TosaReturnCode.VALID: - result = TosaTestRunner.Result.EXPECTED_PASS - else: - result = TosaTestRunner.Result.UNEXPECTED_PASS - elif rc == TosaReturnCode.ERROR: - if expectedReturnCode == TosaReturnCode.ERROR: - result = TosaTestRunner.Result.EXPECTED_FAILURE - else: - result = TosaTestRunner.Result.UNEXPECTED_FAILURE - elif rc == TosaReturnCode.UNPREDICTABLE: - if expectedReturnCode == TosaReturnCode.UNPREDICTABLE: - result = TosaTestRunner.Result.EXPECTED_FAILURE - else: - result = TosaTestRunner.Result.UNEXPECTED_FAILURE - elif rc < 0: - # Unix signal caught (e.g., SIGABRT, SIGSEGV, SIGFPE, etc) - result = TosaTestRunner.Result.INTERNAL_ERROR - else: - raise Exception(f"Return code ({rc}) unknown.") - - except Exception as e: - raise Exception("Runtime Error when running: {}".format(" ".join(ref_cmd))) - - return result diff --git a/verif/runner/tosa_refmodel_sut_run.py b/verif/runner/tosa_refmodel_sut_run.py new file mode 100644 index 0000000..b9a9575 --- /dev/null +++ b/verif/runner/tosa_refmodel_sut_run.py @@ -0,0 +1,73 @@ +"""TOSA test runner module for the Reference Model.""" +# Copyright (c) 2020-2022, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +from enum import IntEnum +from enum import unique + +from runner.run_command import run_sh_command +from runner.run_command import RunShCommandError +from runner.tosa_test_runner import TosaTestRunner + + +@unique +class TosaRefReturnCode(IntEnum): + """Return codes from the Tosa Reference Model.""" + + VALID = 0 + UNPREDICTABLE = 1 + ERROR = 2 + + +class TosaSUTRunner(TosaTestRunner): + """TOSA Reference Model runner.""" + + def __init__(self, args, runnerArgs, testDir): + """Initialize using the given test details.""" + super().__init__(args, runnerArgs, testDir) + + def runTestGraph(self): + """Run the test on the reference model.""" + # Build up the TOSA reference command line + # Uses arguments from the argParser args, not the runnerArgs + args = self.args + + # Call Reference model with description file to provide all file details + cmd = [ + args.ref_model_path, + "-Coperator_fbs={}".format(args.operator_fbs), + "-Ctest_desc={}".format(self.descFile), + ] + + # Specific debug options for reference model + if args.ref_debug: + cmd.extend(["-dALL", "-l{}".format(args.ref_debug)]) + + if args.ref_intermediates: + cmd.extend(["-Ddump_intermediates=1"]) + + # Run command and interpret tosa graph result via process return codes + graphMessage = None + try: + run_sh_command(cmd, self.args.verbose, capture_output=True) + graphResult = TosaTestRunner.TosaGraphResult.TOSA_VALID + except RunShCommandError as e: + graphMessage = e.stderr + if e.return_code == TosaRefReturnCode.ERROR: + graphResult = TosaTestRunner.TosaGraphResult.TOSA_ERROR + elif e.return_code == TosaRefReturnCode.UNPREDICTABLE: + graphResult = TosaTestRunner.TosaGraphResult.TOSA_UNPREDICTABLE + else: + graphResult = TosaTestRunner.TosaGraphResult.OTHER_ERROR + if ( + self.args.verbose + or graphResult == TosaTestRunner.TosaGraphResult.OTHER_ERROR + ): + print(e) + + except Exception as e: + print(e) + graphMessage = str(e) + graphResult = TosaTestRunner.TosaGraphResult.OTHER_ERROR + + # Return graph result and message + return graphResult, graphMessage diff --git a/verif/runner/tosa_test_runner.py b/verif/runner/tosa_test_runner.py index e8f921d..0fd7f13 100644 --- a/verif/runner/tosa_test_runner.py +++ b/verif/runner/tosa_test_runner.py @@ -1,68 +1,190 @@ -import os - -# Copyright (c) 2020, ARM Limited. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - +"""Template test runner class for running TOSA tests.""" +# Copyright (c) 2020-2022, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 import json -import shlex -import subprocess -from enum import IntEnum, unique - +from enum import IntEnum +from pathlib import Path -def run_sh_command(args, full_cmd, capture_output=False): - """Utility function to run an external command. Optionally return captured stdout/stderr""" +from checker.tosa_result_checker import LogColors +from checker.tosa_result_checker import print_color +from checker.tosa_result_checker import test_check +from json2fbbin import json2fbbin - # Quote the command line for printing - full_cmd_esc = [shlex.quote(x) for x in full_cmd] - if args.verbose: - print("### Running {}".format(" ".join(full_cmd_esc))) +class TosaTestInvalid(Exception): + """Exception raised for errors loading test description. - if capture_output: - rc = subprocess.run(full_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if rc.returncode != 0: - print(rc.stdout.decode("utf-8")) - print(rc.stderr.decode("utf-8")) - raise Exception( - "Error running command: {}.\n{}".format( - " ".join(full_cmd_esc), rc.stderr.decode("utf-8") - ) - ) - return (rc.stdout, rc.stderr) - else: - rc = subprocess.run(full_cmd) + Attributes: + path - full path to missing test description file + exception = underlying exception + """ - return rc.returncode + def __init__(self, path, exception): + """Initialize test not found error.""" + self.path = path + self.exception = exception + self.message = "Invalid test, could not read test description {}: {}".format( + self.path, str(self.exception) + ) + super().__init__(self.message) class TosaTestRunner: - def __init__(self, args, runnerArgs, testDir): + """TOSA Test Runner template class for systems under test.""" + def __init__(self, args, runnerArgs, testDir): + """Initialize and load JSON meta data file.""" self.args = args self.runnerArgs = runnerArgs self.testDir = testDir + self.testName = Path(self.testDir).name + + # Check if we want to run binary and if its already converted + descFilePath = Path(testDir, "desc.json") + descBinFilePath = Path(testDir, "desc_binary.json") + if args.binary: + if descBinFilePath.is_file(): + descFilePath = descBinFilePath + + try: + # Load the json test file + with open(descFilePath, "r") as fd: + self.testDesc = json.load(fd) + except Exception as e: + raise TosaTestInvalid(str(descFilePath), e) + + # Convert to binary if needed + tosaFilePath = Path(testDir, self.testDesc["tosa_file"]) + if args.binary and tosaFilePath.suffix == ".json": + # Convert tosa JSON to binary + json2fbbin.json_to_fbbin( + Path(args.flatc_path), + Path(args.operator_fbs), + tosaFilePath, + Path(testDir), + ) + # Write new desc_binary file + self.testDesc["tosa_file"] = tosaFilePath.stem + ".tosa" + with open(descBinFilePath, "w") as fd: + json.dump(self.testDesc, fd, indent=2) + descFilePath = descBinFilePath + + # Set location of desc.json (or desc_binary.json) file in use + self.descFile = str(descFilePath) - # Load the json test file - with open(os.path.join(testDir, "desc.json"), "r") as fd: - self.testDesc = json.load(fd) + def skipTest(self): + """Check if the test is skipped due to test type selection.""" + expectedFailure = self.testDesc["expected_failure"] + if self.args.test_type == "negative" and not expectedFailure: + return True + elif self.args.test_type == "positive" and expectedFailure: + return True + return False - def runModel(self): + def runTestGraph(self): + """Override with function that calls system under test.""" pass + def testResult(self, tosaGraphResult, graphMessage=None): + """Work out test result based on graph result and output files.""" + expectedFailure = self.testDesc["expected_failure"] + print_result_line = True + + if tosaGraphResult == TosaTestRunner.TosaGraphResult.TOSA_VALID: + if expectedFailure: + result = TosaTestRunner.Result.UNEXPECTED_PASS + resultMessage = "Expected failure test incorrectly passed" + else: + # Work through all the results produced by the testing, assuming success + # but overriding this with any failures found + result = TosaTestRunner.Result.EXPECTED_PASS + messages = [] + for resultNum, resultFileName in enumerate(self.testDesc["ofm_file"]): + if "expected_result_file" in self.testDesc: + try: + conformanceFile = Path( + self.testDir, + self.testDesc["expected_result_file"][resultNum], + ) + except IndexError: + result = TosaTestRunner.Result.INTERNAL_ERROR + msg = "Internal error: Missing expected_result_file {} in {}".format( + resultNum, self.descFile + ) + messages.append(msg) + print(msg) + break + else: + conformanceFile = None + resultFile = Path(self.testDir, resultFileName) + + if conformanceFile: + print_result_line = False # Checker will print one for us + chkResult, tolerance, msg = test_check( + str(conformanceFile), + str(resultFile), + test_name=self.testName, + ) + # Change EXPECTED_PASS assumption if we have any failures + if chkResult != 0: + result = TosaTestRunner.Result.UNEXPECTED_FAILURE + messages.append(msg) + if self.args.verbose: + print(msg) + else: + # No conformance file to verify, just check results file exists + if not resultFile.is_file(): + result = TosaTestRunner.Result.UNEXPECTED_FAILURE + msg = "Results file is missing: {}".format(resultFile) + messages.append(msg) + print(msg) + + if resultFile.is_file(): + # Move the resultFile to allow subsequent system under + # tests to create them and to test they have been created + resultFile = resultFile.rename( + resultFile.with_suffix( + ".{}{}".format(self.__module__, resultFile.suffix) + ) + ) + + resultMessage = "\n".join(messages) if len(messages) > 0 else None + else: + if ( + expectedFailure + and tosaGraphResult == TosaTestRunner.TosaGraphResult.TOSA_ERROR + ): + result = TosaTestRunner.Result.EXPECTED_FAILURE + resultMessage = None + else: + result = TosaTestRunner.Result.UNEXPECTED_FAILURE + resultMessage = graphMessage + + if print_result_line: + if ( + result == TosaTestRunner.Result.EXPECTED_FAILURE + or result == TosaTestRunner.Result.EXPECTED_PASS + ): + print_color(LogColors.GREEN, "Results PASS {}".format(self.testName)) + else: + print_color(LogColors.RED, "Results FAIL {}".format(self.testName)) + + return result, resultMessage + class Result(IntEnum): + """Test result codes.""" + EXPECTED_PASS = 0 EXPECTED_FAILURE = 1 UNEXPECTED_PASS = 2 UNEXPECTED_FAILURE = 3 INTERNAL_ERROR = 4 + SKIPPED = 5 + + class TosaGraphResult(IntEnum): + """The tosa_graph_result codes.""" + + TOSA_VALID = 0 + TOSA_UNPREDICTABLE = 1 + TOSA_ERROR = 2 + OTHER_ERROR = 3 diff --git a/verif/runner/tosa_verif_run_ref.py b/verif/runner/tosa_verif_run_ref.py deleted file mode 100644 index 626819f..0000000 --- a/verif/runner/tosa_verif_run_ref.py +++ /dev/null @@ -1,267 +0,0 @@ -# Copyright (c) 2020-2021, ARM Limited. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import argparse -import sys -import re -import os -import subprocess -import shlex -import json -import glob -import math -import queue -import threading -import traceback -import importlib - - -from enum import IntEnum, Enum, unique -from datetime import datetime - -from xunit import xunit - -from runner.tosa_test_runner import TosaTestRunner - -no_color_printing = False -# from run_tf_unit_test import LogColors, print_color, run_sh_command - - -def parseArgs(): - - parser = argparse.ArgumentParser() - parser.add_argument( - "-t", - "--test", - dest="test", - type=str, - nargs="+", - help="Test(s) to run", - required=True, - ) - parser.add_argument( - "--seed", - dest="random_seed", - default=42, - type=int, - help="Random seed for test generation", - ) - parser.add_argument( - "--ref-model-path", - dest="ref_model_path", - default="build/reference_model/tosa_reference_model", - type=str, - help="Path to reference model executable", - ) - parser.add_argument( - "--ref-debug", - dest="ref_debug", - default="", - type=str, - help="Reference debug flag (low, med, high)", - ) - parser.add_argument( - "--ref-intermediates", - dest="ref_intermediates", - default=0, - type=int, - help="Reference model dumps intermediate tensors", - ) - parser.add_argument( - "-v", "--verbose", dest="verbose", action="count", help="Verbose operation" - ) - parser.add_argument( - "-j", "--jobs", dest="jobs", type=int, default=1, help="Number of parallel jobs" - ) - parser.add_argument( - "--sut-module", - "-s", - dest="sut_module", - type=str, - nargs="+", - default=["runner.tosa_ref_run"], - help="System under test module to load (derives from TosaTestRunner). May be repeated", - ) - parser.add_argument( - "--sut-module-args", - dest="sut_module_args", - type=str, - nargs="+", - default=[], - help="System under test module arguments. Use sutmodulename:argvalue to pass an argument. May be repeated.", - ) - parser.add_argument( - "--xunit-file", - dest="xunit_file", - type=str, - default="result.xml", - help="XUnit output file", - ) - - args = parser.parse_args() - - # Autodetect CPU count - if args.jobs <= 0: - args.jobs = os.cpu_count() - - return args - - -def workerThread(task_queue, runnerList, args, result_queue): - while True: - try: - test = task_queue.get(block=False) - except queue.Empty: - break - - if test is None: - break - - msg = "" - start_time = datetime.now() - try: - - for runnerModule, runnerArgs in runnerList: - if args.verbose: - print( - "Running runner {} with test {}".format( - runnerModule.__name__, test - ) - ) - runner = runnerModule.TosaRefRunner(args, runnerArgs, test) - try: - rc = runner.runModel() - except Exception as e: - rc = TosaTestRunner.Result.INTERNAL_ERROR - print(f"runner.runModel Exception: {e}") - print( - "".join( - traceback.format_exception( - etype=type(e), value=e, tb=e.__traceback__ - ) - ) - ) - except Exception as e: - print("Internal regression error: {}".format(e)) - print( - "".join( - traceback.format_exception( - etype=type(e), value=e, tb=e.__traceback__ - ) - ) - ) - rc = TosaTestRunner.Result.INTERNAL_ERROR - - end_time = datetime.now() - - result_queue.put((test, rc, msg, end_time - start_time)) - task_queue.task_done() - - return True - - -def loadRefModules(args): - # Returns a tuple of (runner_module, [argument list]) - runnerList = [] - for r in args.sut_module: - if args.verbose: - print("Loading module {}".format(r)) - - runner = importlib.import_module(r) - - # Look for arguments associated with this runner - runnerArgPrefix = "{}:".format(r) - runnerArgList = [] - for a in args.sut_module_args: - if a.startswith(runnerArgPrefix): - runnerArgList.append(a[len(runnerArgPrefix) :]) - runnerList.append((runner, runnerArgList)) - - return runnerList - - -def main(): - args = parseArgs() - - runnerList = loadRefModules(args) - - threads = [] - taskQueue = queue.Queue() - resultQueue = queue.Queue() - - for t in args.test: - taskQueue.put((t)) - - print("Running {} tests ".format(taskQueue.qsize())) - - for i in range(args.jobs): - t = threading.Thread( - target=workerThread, args=(taskQueue, runnerList, args, resultQueue) - ) - t.setDaemon(True) - t.start() - threads.append(t) - - taskQueue.join() - - resultList = [] - results = [0] * len(TosaTestRunner.Result) - - while True: - try: - test, rc, msg, time_delta = resultQueue.get(block=False) - except queue.Empty: - break - - resultList.append((test, rc, msg, time_delta)) - results[rc] = results[rc] + 1 - - xunit_result = xunit.xunit_results("Regressions") - xunit_suite = xunit_result.create_suite("Unit tests") - - # Sort by test name - for test, rc, msg, time_delta in sorted(resultList, key=lambda tup: tup[0]): - test_name = test - xt = xunit.xunit_test(test_name, "reference") - - xt.time = str( - float(time_delta.seconds) + (float(time_delta.microseconds) * 1e-6) - ) - - if ( - rc == TosaTestRunner.Result.EXPECTED_PASS - or rc == TosaTestRunner.Result.EXPECTED_FAILURE - ): - if args.verbose: - print("{} {}".format(rc.name, test_name)) - else: - xt.failed(msg) - print("{} {}".format(rc.name, test_name)) - - xunit_suite.tests.append(xt) - resultQueue.task_done() - - xunit_result.write_results(args.xunit_file) - - print("Totals: ", end="") - for result in TosaTestRunner.Result: - print("{} {}, ".format(results[result], result.name.lower()), end="") - print() - - return 0 - - -if __name__ == "__main__": - exit(main()) diff --git a/verif/runner/tosa_verif_run_tests.py b/verif/runner/tosa_verif_run_tests.py new file mode 100644 index 0000000..dd86950 --- /dev/null +++ b/verif/runner/tosa_verif_run_tests.py @@ -0,0 +1,375 @@ +"""TOSA verification runner script.""" +# Copyright (c) 2020-2022, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +import argparse +import glob +import importlib +import os +import queue +import threading +import traceback +from datetime import datetime +from pathlib import Path + +from json2numpy import json2numpy +from runner.tosa_test_runner import TosaTestInvalid +from runner.tosa_test_runner import TosaTestRunner +from xunit import xunit + +TOSA_REFMODEL_RUNNER = "runner.tosa_refmodel_sut_run" +MAX_XUNIT_TEST_MESSAGE = 1000 + + +def parseArgs(argv): + """Parse the arguments and return the settings.""" + parser = argparse.ArgumentParser() + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "-t", + "--test", + dest="test", + type=str, + nargs="+", + help="Test(s) to run", + ) + group.add_argument( + "-T", + "--test-list", + dest="test_list_file", + type=Path, + help="File containing list of tests to run (one per line)", + ) + parser.add_argument( + "--operator-fbs", + dest="operator_fbs", + default="conformance_tests/third_party/serialization_lib/schema/tosa.fbs", + type=str, + help="flat buffer syntax file", + ) + parser.add_argument( + "--ref-model-path", + dest="ref_model_path", + default="reference_model/build/reference_model/tosa_reference_model", + type=str, + help="Path to reference model executable", + ) + parser.add_argument( + "--flatc-path", + dest="flatc_path", + default="reference_model/build/thirdparty/serialization_lib/third_party/flatbuffers/flatc", + type=str, + help="Path to flatc compiler executable", + ) + parser.add_argument( + "--ref-debug", + dest="ref_debug", + default="", + type=str, + help="Reference debug flag (low, med, high)", + ) + parser.add_argument( + "--ref-intermediates", + dest="ref_intermediates", + default=0, + type=int, + help="Reference model dumps intermediate tensors", + ) + parser.add_argument( + "-b", + "--binary", + dest="binary", + action="store_true", + help="Convert to using binary flatbuffers instead of JSON", + ) + parser.add_argument( + "-v", "--verbose", dest="verbose", action="count", help="Verbose operation" + ) + parser.add_argument( + "-j", "--jobs", dest="jobs", type=int, default=1, help="Number of parallel jobs" + ) + parser.add_argument( + "--sut-module", + "-s", + dest="sut_module", + type=str, + nargs="+", + default=[TOSA_REFMODEL_RUNNER], + help="System under test module to load (derives from TosaTestRunner). May be repeated", + ) + parser.add_argument( + "--sut-module-args", + dest="sut_module_args", + type=str, + nargs="+", + default=[], + help="System under test module arguments. Use sutmodulename:argvalue to pass an argument. May be repeated.", + ) + parser.add_argument( + "--xunit-file", + dest="xunit_file", + type=str, + default="result.xml", + help="XUnit output file", + ) + parser.add_argument( + "--test-type", + dest="test_type", + type=str, + default="both", + choices=["positive", "negative", "both"], + help="Filter tests based on expected failure status (positive, negative or both)", + ) + + args = parser.parse_args(argv) + + # Autodetect CPU count + if args.jobs <= 0: + args.jobs = os.cpu_count() + + return args + + +EXCLUSION_PREFIX = ["test", "model", "desc"] + + +def convert2Numpy(testDir): + """Convert all the JSON numpy files back into binary numpy.""" + jsons = glob.glob(os.path.join(testDir, "*.json")) + for json in jsons: + for exclude in EXCLUSION_PREFIX: + if os.path.basename(json).startswith(exclude): + json = "" + if json: + # debug print("Converting " + json) + json2numpy.json_to_npy(Path(json)) + + +def workerThread(task_queue, runnerList, args, result_queue): + """Worker thread that runs the next test from the queue.""" + while True: + try: + test = task_queue.get(block=False) + except queue.Empty: + break + + if test is None: + break + + msg = "" + converted = False + for runnerModule, runnerArgs in runnerList: + try: + start_time = datetime.now() + # Set up system under test runner + runnerName = runnerModule.__name__ + runner = runnerModule.TosaSUTRunner(args, runnerArgs, test) + + if runner.skipTest(): + msg = "Skipping non-{} test".format(args.test_type) + print("{} {}".format(msg, test)) + rc = TosaTestRunner.Result.SKIPPED + else: + # Convert JSON data files into numpy format on first pass + if not converted: + convert2Numpy(test) + converted = True + + if args.verbose: + print("Running runner {} with test {}".format(runnerName, test)) + try: + grc, gmsg = runner.runTestGraph() + rc, msg = runner.testResult(grc, gmsg) + except Exception as e: + msg = "System Under Test error: {}".format(e) + print(msg) + print( + "".join( + traceback.format_exception( + etype=type(e), value=e, tb=e.__traceback__ + ) + ) + ) + rc = TosaTestRunner.Result.INTERNAL_ERROR + except Exception as e: + msg = "Internal error: {}".format(e) + print(msg) + if not isinstance(e, TosaTestInvalid): + # Show stack trace on unexpected exceptions + print( + "".join( + traceback.format_exception( + etype=type(e), value=e, tb=e.__traceback__ + ) + ) + ) + rc = TosaTestRunner.Result.INTERNAL_ERROR + finally: + end_time = datetime.now() + result_queue.put((runnerName, test, rc, msg, end_time - start_time)) + + task_queue.task_done() + + return True + + +def loadSUTRunnerModules(args): + """Load in the system under test modules. + + Returns a list of tuples of (runner_module, [argument list]) + """ + runnerList = [] + # Remove any duplicates from the list + sut_module_list = list(set(args.sut_module)) + for r in sut_module_list: + if args.verbose: + print("Loading module {}".format(r)) + + runner = importlib.import_module(r) + + # Look for arguments associated with this runner + runnerArgPrefix = "{}:".format(r) + runnerArgList = [] + for a in args.sut_module_args: + if a.startswith(runnerArgPrefix): + runnerArgList.append(a[len(runnerArgPrefix) :]) + runnerList.append((runner, runnerArgList)) + + return runnerList + + +def createXUnitResults(xunitFile, runnerList, resultLists, verbose): + """Create the xunit results file.""" + xunit_result = xunit.xunit_results() + + for runnerModule, _ in runnerList: + # Create test suite per system under test (runner) + runner = runnerModule.__name__ + xunit_suite = xunit_result.create_suite(runner) + + # Sort by test name + for test, rc, msg, time_delta in sorted( + resultLists[runner], key=lambda tup: tup[0] + ): + test_name = test + xt = xunit.xunit_test(test_name, runner) + + xt.time = str( + float(time_delta.seconds) + (float(time_delta.microseconds) * 1e-6) + ) + + testMsg = rc.name if not msg else "{}: {}".format(rc.name, msg) + + if ( + rc == TosaTestRunner.Result.EXPECTED_PASS + or rc == TosaTestRunner.Result.EXPECTED_FAILURE + ): + if verbose: + print("{} {} ({})".format(rc.name, test_name, runner)) + elif rc == TosaTestRunner.Result.SKIPPED: + xt.skipped() + if verbose: + print("{} {} ({})".format(rc.name, test_name, runner)) + else: + xt.failed(testMsg) + print("{} {} ({})".format(rc.name, test_name, runner)) + + xunit_suite.tests.append(xt) + + xunit_result.write_results(xunitFile) + + +def main(argv=None): + """Start worker threads to do the testing and outputs the results.""" + args = parseArgs(argv) + + if TOSA_REFMODEL_RUNNER in args.sut_module and not os.path.isfile( + args.ref_model_path + ): + print( + "Argument error: Reference Model not found ({})".format(args.ref_model_path) + ) + exit(2) + + if args.test_list_file: + try: + with open(args.test_list_file) as f: + args.test = f.read().splitlines() + except Exception as e: + print( + "Argument error: Cannot read list of tests in {}\n{}".format( + args.test_list_file, e + ) + ) + exit(2) + + runnerList = loadSUTRunnerModules(args) + + threads = [] + taskQueue = queue.Queue() + resultQueue = queue.Queue() + + for t in args.test: + if os.path.isfile(t): + if not os.path.basename(t) == "README": + print("Warning: Skipping test {} as not a valid directory".format(t)) + else: + taskQueue.put((t)) + + print( + "Running {} tests on {} system{} under test".format( + taskQueue.qsize(), len(runnerList), "s" if len(runnerList) > 1 else "" + ) + ) + + for i in range(args.jobs): + t = threading.Thread( + target=workerThread, args=(taskQueue, runnerList, args, resultQueue) + ) + t.setDaemon(True) + t.start() + threads.append(t) + + taskQueue.join() + + # Set up results lists for each system under test + resultLists = {} + results = {} + for runnerModule, _ in runnerList: + runner = runnerModule.__name__ + resultLists[runner] = [] + results[runner] = [0] * len(TosaTestRunner.Result) + + while True: + try: + runner, test, rc, msg, time_delta = resultQueue.get(block=False) + resultQueue.task_done() + except queue.Empty: + break + + # Limit error messages to make results easier to digest + if msg and len(msg) > MAX_XUNIT_TEST_MESSAGE: + half = int(MAX_XUNIT_TEST_MESSAGE / 2) + trimmed = len(msg) - MAX_XUNIT_TEST_MESSAGE + msg = "{} ...\nskipped {} bytes\n... {}".format( + msg[:half], trimmed, msg[-half:] + ) + resultLists[runner].append((test, rc, msg, time_delta)) + results[runner][rc] += 1 + + createXUnitResults(args.xunit_file, runnerList, resultLists, args.verbose) + + # Print out results for each system under test + for runnerModule, _ in runnerList: + runner = runnerModule.__name__ + resultSummary = [] + for result in TosaTestRunner.Result: + resultSummary.append( + "{} {}".format(results[runner][result], result.name.lower()) + ) + print("Totals ({}): {}".format(runner, ", ".join(resultSummary))) + + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/verif/tests/mock_flatc.py b/verif/tests/mock_flatc.py new file mode 100755 index 0000000..bdee0f8 --- /dev/null +++ b/verif/tests/mock_flatc.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +"""Mocked flatc compiler for testing.""" +# Copyright (c) 2021-2022, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +from pathlib import Path + + +def main(argv=None): + """Mock the required behaviour of the flatc compiler.""" + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument( + "-o", + dest="output_dir", + type=Path, + help="output directory", + ) + parser.add_argument( + "--json", + action="store_true", + help="convert to JSON", + ) + parser.add_argument( + "--binary", + action="store_true", + help="convert to binary", + ) + parser.add_argument( + "--raw-binary", + action="store_true", + help="convert from raw-binary", + ) + parser.add_argument( + "path", + type=Path, + action="append", + nargs="*", + help="the path to fbs or files to convert", + ) + + args = parser.parse_args(argv) + path = args.path + if len(path) == 0: + print("ERROR: Missing fbs files and files to convert") + return 2 + return 0 + + +if __name__ == "__main__": + exit(main()) diff --git a/verif/tests/test_json2numpy.py b/verif/tests/test_json2numpy.py new file mode 100644 index 0000000..aec555c --- /dev/null +++ b/verif/tests/test_json2numpy.py @@ -0,0 +1,142 @@ +"""Tests for json2numpy.py.""" +# Copyright (c) 2021-2022, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +import json +import os + +import numpy as np +import pytest + +from json2numpy.json2numpy import main + + +@pytest.mark.parametrize( + "npy_filename,json_filename,data_type", + [ + ("single_num.npy", "single_num.json", np.int8), + ("multiple_num.npy", "multiple_num.json", np.int8), + ("single_num.npy", "single_num.json", np.int16), + ("multiple_num.npy", "multiple_num.json", np.int16), + ("single_num.npy", "single_num.json", np.int32), + ("multiple_num.npy", "multiple_num.json", np.int32), + ("single_num.npy", "single_num.json", np.int64), + ("multiple_num.npy", "multiple_num.json", np.int64), + ("single_num.npy", "single_num.json", np.uint8), + ("multiple_num.npy", "multiple_num.json", np.uint8), + ("single_num.npy", "single_num.json", np.uint16), + ("multiple_num.npy", "multiple_num.json", np.uint16), + ("single_num.npy", "single_num.json", np.uint32), + ("multiple_num.npy", "multiple_num.json", np.uint32), + ("single_num.npy", "single_num.json", np.uint64), + ("multiple_num.npy", "multiple_num.json", np.uint64), + ("single_num.npy", "single_num.json", np.float16), + ("multiple_num.npy", "multiple_num.json", np.float16), + ("single_num.npy", "single_num.json", np.float32), + ("multiple_num.npy", "multiple_num.json", np.float32), + ("single_num.npy", "single_num.json", np.float64), + ("multiple_num.npy", "multiple_num.json", np.float64), + ("single_num.npy", "single_num.json", bool), + ("multiple_num.npy", "multiple_num.json", bool), + ], +) +def test_json2numpy_npy_file(npy_filename, json_filename, data_type): + """Test conversion to JSON.""" + # Generate numpy data. + if "single" in npy_filename: + npy_data = np.ndarray(shape=(1, 1), dtype=data_type) + elif "multiple" in npy_filename: + npy_data = np.ndarray(shape=(2, 3), dtype=data_type) + + # Get filepaths + npy_file = os.path.join(os.path.dirname(__file__), npy_filename) + json_file = os.path.join(os.path.dirname(__file__), json_filename) + + # Save npy data to file and reload it. + with open(npy_file, "wb") as f: + np.save(f, npy_data) + npy_data = np.load(npy_file) + + args = [npy_file] + """Converts npy file to json""" + assert main(args) == 0 + + json_data = json.load(open(json_file)) + assert np.dtype(json_data["type"]) == npy_data.dtype + assert np.array(json_data["data"]).shape == npy_data.shape + assert (np.array(json_data["data"]) == npy_data).all() + + # Remove files created + if os.path.exists(npy_file): + os.remove(npy_file) + if os.path.exists(json_file): + os.remove(json_file) + + +@pytest.mark.parametrize( + "npy_filename,json_filename,data_type", + [ + ("single_num.npy", "single_num.json", np.int8), + ("multiple_num.npy", "multiple_num.json", np.int8), + ("single_num.npy", "single_num.json", np.int16), + ("multiple_num.npy", "multiple_num.json", np.int16), + ("single_num.npy", "single_num.json", np.int32), + ("multiple_num.npy", "multiple_num.json", np.int32), + ("single_num.npy", "single_num.json", np.int64), + ("multiple_num.npy", "multiple_num.json", np.int64), + ("single_num.npy", "single_num.json", np.uint8), + ("multiple_num.npy", "multiple_num.json", np.uint8), + ("single_num.npy", "single_num.json", np.uint16), + ("multiple_num.npy", "multiple_num.json", np.uint16), + ("single_num.npy", "single_num.json", np.uint32), + ("multiple_num.npy", "multiple_num.json", np.uint32), + ("single_num.npy", "single_num.json", np.uint64), + ("multiple_num.npy", "multiple_num.json", np.uint64), + ("single_num.npy", "single_num.json", np.float16), + ("multiple_num.npy", "multiple_num.json", np.float16), + ("single_num.npy", "single_num.json", np.float32), + ("multiple_num.npy", "multiple_num.json", np.float32), + ("single_num.npy", "single_num.json", np.float64), + ("multiple_num.npy", "multiple_num.json", np.float64), + ("single_num.npy", "single_num.json", bool), + ("multiple_num.npy", "multiple_num.json", bool), + ], +) +def test_json2numpy_json_file(npy_filename, json_filename, data_type): + """Test conversion to binary.""" + # Generate json data. + if "single" in npy_filename: + npy_data = np.ndarray(shape=(1, 1), dtype=data_type) + elif "multiple" in npy_filename: + npy_data = np.ndarray(shape=(2, 3), dtype=data_type) + + # Generate json dictionary + list_data = npy_data.tolist() + json_data_type = str(npy_data.dtype) + + json_data = {} + json_data["type"] = json_data_type + json_data["data"] = list_data + + # Get filepaths + npy_file = os.path.join(os.path.dirname(__file__), npy_filename) + json_file = os.path.join(os.path.dirname(__file__), json_filename) + + # Save json data to file and reload it. + with open(json_file, "w") as f: + json.dump(json_data, f) + json_data = json.load(open(json_file)) + + args = [json_file] + """Converts json file to npy""" + assert main(args) == 0 + + npy_data = np.load(npy_file) + assert np.dtype(json_data["type"]) == npy_data.dtype + assert np.array(json_data["data"]).shape == npy_data.shape + assert (np.array(json_data["data"]) == npy_data).all() + + # Remove files created + if os.path.exists(npy_file): + os.remove(npy_file) + if os.path.exists(json_file): + os.remove(json_file) diff --git a/verif/tests/test_tosa_result_checker.py b/verif/tests/test_tosa_result_checker.py new file mode 100644 index 0000000..bc8a2fc --- /dev/null +++ b/verif/tests/test_tosa_result_checker.py @@ -0,0 +1,197 @@ +"""Tests for tosa_result_checker.py.""" +# Copyright (c) 2021-2022, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +from pathlib import Path + +import numpy as np +import pytest + +import checker.tosa_result_checker as trc + + +def _create_data_file(name, npy_data): + """Create numpy data file.""" + file = Path(__file__).parent / name + with open(file, "wb") as f: + np.save(f, npy_data) + return file + + +def _create_empty_file(name): + """Create numpy data file.""" + file = Path(__file__).parent / name + f = open(file, "wb") + f.close() + return file + + +def _delete_data_file(file: Path): + """Delete numpy data file.""" + file.unlink() + + +@pytest.mark.parametrize( + "data_type,expected", + [ + (np.int8, trc.TestResult.MISMATCH), + (np.int16, trc.TestResult.MISMATCH), + (np.int32, trc.TestResult.PASS), + (np.int64, trc.TestResult.PASS), + (np.uint8, trc.TestResult.MISMATCH), + (np.uint16, trc.TestResult.MISMATCH), + (np.uint32, trc.TestResult.MISMATCH), + (np.uint64, trc.TestResult.MISMATCH), + (np.float16, trc.TestResult.MISMATCH), + (np.float32, trc.TestResult.PASS), + (np.float64, trc.TestResult.MISMATCH), + (bool, trc.TestResult.PASS), + ], +) +def test_supported_types(data_type, expected): + """Check which data types are supported.""" + # Generate data + npy_data = np.ndarray(shape=(2, 3), dtype=data_type) + + # Save data as reference and result files to compare. + reference_file = _create_data_file("reference.npy", npy_data) + result_file = _create_data_file("result.npy", npy_data) + + args = [str(reference_file), str(result_file)] + """Compares reference and result npy files, returns zero if it passes.""" + assert trc.main(args) == expected + + # Remove files created + _delete_data_file(reference_file) + _delete_data_file(result_file) + + +@pytest.mark.parametrize( + "data_type,expected", + [ + (np.int32, trc.TestResult.MISMATCH), + (np.int64, trc.TestResult.MISMATCH), + (np.float32, trc.TestResult.MISMATCH), + (bool, trc.TestResult.MISMATCH), + ], +) +def test_shape_mismatch(data_type, expected): + """Check that mismatch shapes do not pass.""" + # Generate and save data as reference and result files to compare. + npy_data = np.ones(shape=(3, 2), dtype=data_type) + reference_file = _create_data_file("reference.npy", npy_data) + npy_data = np.ones(shape=(2, 3), dtype=data_type) + result_file = _create_data_file("result.npy", npy_data) + + args = [str(reference_file), str(result_file)] + """Compares reference and result npy files, returns zero if it passes.""" + assert trc.main(args) == expected + + # Remove files created + _delete_data_file(reference_file) + _delete_data_file(result_file) + + +@pytest.mark.parametrize( + "data_type,expected", + [ + (np.int32, trc.TestResult.MISMATCH), + (np.int64, trc.TestResult.MISMATCH), + (np.float32, trc.TestResult.MISMATCH), + (bool, trc.TestResult.MISMATCH), + ], +) +def test_results_mismatch(data_type, expected): + """Check that different results do not pass.""" + # Generate and save data as reference and result files to compare. + npy_data = np.zeros(shape=(2, 3), dtype=data_type) + reference_file = _create_data_file("reference.npy", npy_data) + npy_data = np.ones(shape=(2, 3), dtype=data_type) + result_file = _create_data_file("result.npy", npy_data) + + args = [str(reference_file), str(result_file)] + """Compares reference and result npy files, returns zero if it passes.""" + assert trc.main(args) == expected + + # Remove files created + _delete_data_file(reference_file) + _delete_data_file(result_file) + + +@pytest.mark.parametrize( + "data_type1,data_type2,expected", + [ # Pairwise testing of all supported types + (np.int32, np.int64, trc.TestResult.MISMATCH), + (bool, np.float32, trc.TestResult.MISMATCH), + ], +) +def test_types_mismatch(data_type1, data_type2, expected): + """Check that different types in results do not pass.""" + # Generate and save data as reference and result files to compare. + npy_data = np.ones(shape=(3, 2), dtype=data_type1) + reference_file = _create_data_file("reference.npy", npy_data) + npy_data = np.ones(shape=(3, 2), dtype=data_type2) + result_file = _create_data_file("result.npy", npy_data) + + args = [str(reference_file), str(result_file)] + """Compares reference and result npy files, returns zero if it passes.""" + assert trc.main(args) == expected + + # Remove files created + _delete_data_file(reference_file) + _delete_data_file(result_file) + + +@pytest.mark.parametrize( + "reference_exists,result_exists,expected", + [ + (True, False, trc.TestResult.MISSING_FILE), + (False, True, trc.TestResult.MISSING_FILE), + ], +) +def test_missing_files(reference_exists, result_exists, expected): + """Check that missing files are caught.""" + # Generate and save data + npy_data = np.ndarray(shape=(2, 3), dtype=bool) + reference_file = _create_data_file("reference.npy", npy_data) + result_file = _create_data_file("result.npy", npy_data) + if not reference_exists: + _delete_data_file(reference_file) + if not result_exists: + _delete_data_file(result_file) + + args = [str(reference_file), str(result_file)] + assert trc.main(args) == expected + + if reference_exists: + _delete_data_file(reference_file) + if result_exists: + _delete_data_file(result_file) + + +@pytest.mark.parametrize( + "reference_numpy,result_numpy,expected", + [ + (True, False, trc.TestResult.INCORRECT_FORMAT), + (False, True, trc.TestResult.INCORRECT_FORMAT), + ], +) +def test_incorrect_format_files(reference_numpy, result_numpy, expected): + """Check that incorrect format files are caught.""" + # Generate and save data + npy_data = np.ndarray(shape=(2, 3), dtype=bool) + reference_file = ( + _create_data_file("reference.npy", npy_data) + if reference_numpy + else _create_empty_file("empty.npy") + ) + result_file = ( + _create_data_file("result.npy", npy_data) + if result_numpy + else _create_empty_file("empty.npy") + ) + + args = [str(reference_file), str(result_file)] + assert trc.main(args) == expected + + _delete_data_file(reference_file) + _delete_data_file(result_file) diff --git a/verif/tests/test_tosa_run_tests_args.py b/verif/tests/test_tosa_run_tests_args.py new file mode 100644 index 0000000..a0c3ed5 --- /dev/null +++ b/verif/tests/test_tosa_run_tests_args.py @@ -0,0 +1,68 @@ +"""Tests for tosa_verif_run_tests.py.""" +# Copyright (c) 2021-2022, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +from runner.tosa_verif_run_tests import parseArgs + + +def test_args_test(): + """Test arguments - test.""" + args = ["-t", "test"] + parsed_args = parseArgs(args) + assert parsed_args.test == ["test"] + + +def test_args_ref_model_path(): + """Test arguments - ref_model_path.""" + args = ["--ref-model-path", "ref_model_path", "-t", "test"] + parsed_args = parseArgs(args) + assert parsed_args.ref_model_path == "ref_model_path" + + +def test_args_ref_debug(): + """Test arguments - ref_debug.""" + args = ["--ref-debug", "ref_debug", "-t", "test"] + parsed_args = parseArgs(args) + assert parsed_args.ref_debug == "ref_debug" + + +def test_args_ref_intermediates(): + """Test arguments - ref_intermediates.""" + args = ["--ref-intermediates", "2", "-t", "test"] + parsed_args = parseArgs(args) + assert parsed_args.ref_intermediates == 2 + + +def test_args_verbose(): + """Test arguments - ref_verbose.""" + args = ["-v", "-t", "test"] + parsed_args = parseArgs(args) + print(parsed_args.verbose) + assert parsed_args.verbose == 1 + + +def test_args_jobs(): + """Test arguments - jobs.""" + args = ["-j", "42", "-t", "test"] + parsed_args = parseArgs(args) + assert parsed_args.jobs == 42 + + +def test_args_sut_module(): + """Test arguments - sut_module.""" + args = ["--sut-module", "sut_module", "-t", "test"] + parsed_args = parseArgs(args) + assert parsed_args.sut_module == ["sut_module"] + + +def test_args_sut_module_args(): + """Test arguments - sut_module_args.""" + args = ["--sut-module-args", "sut_module_args", "-t", "test"] + parsed_args = parseArgs(args) + assert parsed_args.sut_module_args == ["sut_module_args"] + + +def test_args_xunit_file(): + """Test arguments - xunit-file.""" + args = ["--xunit-file", "xunit_file", "-t", "test"] + parsed_args = parseArgs(args) + assert parsed_args.xunit_file == "xunit_file" diff --git a/verif/tests/test_tosa_run_tests_mocksut.py b/verif/tests/test_tosa_run_tests_mocksut.py new file mode 100644 index 0000000..98044e0 --- /dev/null +++ b/verif/tests/test_tosa_run_tests_mocksut.py @@ -0,0 +1,241 @@ +"""Tests for tosa_verif_run_tests.py.""" +# Copyright (c) 2021-2022, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +import json +from copy import deepcopy +from pathlib import Path +from xml.dom import minidom + +import pytest + +from runner.tosa_verif_run_tests import main + + +TEST_DESC = { + "tosa_file": "pytest.json", + "ifm_name": ["test-0", "test-1"], + "ifm_file": ["test-0.npy", "test-1.npy"], + "ofm_name": ["test-result-0"], + "ofm_file": ["test-result-0.npy"], + "expected_failure": False, +} +GRAPH_RESULT_VALID = "valid" +GRAPH_RESULT_ERROR = "error" + + +def _create_desc_json(json_object) -> Path: + """Create test desc.json.""" + file = Path(__file__).parent / "desc.json" + with open(file, "w") as fd: + json.dump(json_object, fd, indent=2) + return file + + +def _delete_desc_json(file: Path): + """Clean up desc.json.""" + binary_file = file.parent / "desc_binary.json" + if binary_file.exists(): + print(binary_file.read_text()) + binary_file.unlink() + else: + print(file.read_text()) + file.unlink() + + +@pytest.fixture +def testDir() -> str: + """Set up a mock expected pass test.""" + print("SET UP - testDir") + file = _create_desc_json(TEST_DESC) + yield file.parent + print("TEAR DOWN - testDir") + _delete_desc_json(file) + + +@pytest.fixture +def testDirExpectedFail() -> str: + """Set up a mock expected fail test.""" + print("SET UP - testDirExpectedFail") + fail = deepcopy(TEST_DESC) + fail["expected_failure"] = True + file = _create_desc_json(fail) + yield file.parent + print("TEAR DOWN - testDirExpectedFail") + _delete_desc_json(file) + + +@pytest.fixture +def testDirMultiOutputs() -> str: + """Set up a mock multiple results output test.""" + print("SET UP - testDirMultiOutputs") + out = deepcopy(TEST_DESC) + out["ofm_name"].append("tr1") + out["ofm_file"].append("test-result-1.npy") + file = _create_desc_json(out) + yield file.parent + print("TEAR DOWN - testDirMultiOutputs") + _delete_desc_json(file) + + +def _get_default_argv(testDir: Path, graphResult: str) -> list: + """Create default args based on test directory and graph result.""" + return [ + "--sut-module", + "tests.tosa_mock_sut_run", + "--test", + str(testDir), + "--xunit-file", + str(testDir / "result.xml"), + # Must be last argument to allow easy extension with extra args + "--sut-module-args", + f"tests.tosa_mock_sut_run:graph={graphResult}", + ] + + +def _get_xml_results(argv: list): + """Get XML results and remove file.""" + resultsFile = Path(argv[argv.index("--xunit-file") + 1]) + results = minidom.parse(str(resultsFile)) + resultsFile.unlink() + return results + + +def _get_xml_testsuites_from_results(results, expectedTestSuites: int): + """Get XML testcases from results.""" + testSuites = results.getElementsByTagName("testsuite") + assert len(testSuites) == expectedTestSuites + return testSuites + + +def _get_xml_testcases_from_results(results, expectedTestCases: int): + """Get XML testcases from results.""" + testCases = results.getElementsByTagName("testcase") + assert len(testCases) == expectedTestCases + return testCases + + +def _get_xml_failure(argv: list): + """Get the results and single testcase with the failure result entry if there is one.""" + results = _get_xml_results(argv) + testCases = _get_xml_testcases_from_results(results, 1) + fail = testCases[0].getElementsByTagName("failure") + if fail: + return fail[0].firstChild.data + return None + + +def test_mock_sut_expected_pass(testDir: Path): + """Run expected pass SUT test.""" + try: + argv = _get_default_argv(testDir, GRAPH_RESULT_VALID) + main(argv) + fail = _get_xml_failure(argv) + except Exception as e: + assert False, f"Unexpected exception {e}" + assert not fail + + +UNEXPECTED_PASS_PREFIX_STR = "UNEXPECTED_PASS" +UNEXPECTED_FAIL_PREFIX_STR = "UNEXPECTED_FAIL" + + +def test_mock_sut_unexpected_pass(testDirExpectedFail: Path): + """Run unexpected pass SUT test.""" + try: + argv = _get_default_argv(testDirExpectedFail, GRAPH_RESULT_VALID) + main(argv) + fail = _get_xml_failure(argv) + except Exception as e: + assert False, f"Unexpected exception {e}" + assert fail.startswith(UNEXPECTED_PASS_PREFIX_STR) + + +def test_mock_sut_expected_failure(testDirExpectedFail: Path): + """Run expected failure SUT test.""" + try: + argv = _get_default_argv(testDirExpectedFail, GRAPH_RESULT_ERROR) + main(argv) + fail = _get_xml_failure(argv) + except Exception as e: + assert False, f"Unexpected exception {e}" + assert not fail + + +def test_mock_sut_unexpected_failure(testDir: Path): + """Run unexpected failure SUT test.""" + try: + argv = _get_default_argv(testDir, GRAPH_RESULT_ERROR) + main(argv) + fail = _get_xml_failure(argv) + except Exception as e: + assert False, f"Unexpected exception {e}" + assert fail.startswith(UNEXPECTED_FAIL_PREFIX_STR) + + +def test_mock_sut_binary_conversion(testDir: Path): + """Run unexpected failure SUT test.""" + try: + argv = _get_default_argv(testDir, GRAPH_RESULT_VALID) + argv.extend(["--binary", "--flatc-path", str(testDir / "mock_flatc.py")]) + main(argv) + binary_desc = testDir / "desc_binary.json" + assert binary_desc.exists() + fail = _get_xml_failure(argv) + except Exception as e: + assert False, f"Unexpected exception {e}" + assert not fail + + +def test_mock_and_dummy_sut_results(testDir: Path): + """Run two SUTs and check they both return results.""" + try: + argv = _get_default_argv(testDir, GRAPH_RESULT_VALID) + # Override sut-module setting with both SUTs + argv.extend( + ["--sut-module", "tests.tosa_dummy_sut_run", "tests.tosa_mock_sut_run"] + ) + main(argv) + results = _get_xml_results(argv) + _get_xml_testsuites_from_results(results, 2) + _get_xml_testcases_from_results(results, 2) + except Exception as e: + assert False, f"Unexpected exception {e}" + + +def test_two_mock_suts(testDir: Path): + """Test that a duplicated SUT is ignored.""" + try: + argv = _get_default_argv(testDir, GRAPH_RESULT_VALID) + # Override sut-module setting with duplicated SUT + argv.extend( + ["--sut-module", "tests.tosa_mock_sut_run", "tests.tosa_mock_sut_run"] + ) + main(argv) + results = _get_xml_results(argv) + _get_xml_testsuites_from_results(results, 1) + _get_xml_testcases_from_results(results, 1) + except Exception as e: + assert False, f"Unexpected exception {e}" + + +def test_mock_sut_multi_outputs_expected_pass(testDirMultiOutputs: Path): + """Run expected pass SUT test with multiple outputs.""" + try: + argv = _get_default_argv(testDirMultiOutputs, GRAPH_RESULT_VALID) + main(argv) + fail = _get_xml_failure(argv) + except Exception as e: + assert False, f"Unexpected exception {e}" + assert not fail + + +def test_mock_sut_multi_outputs_unexpected_failure(testDirMultiOutputs: Path): + """Run SUT test which expects multiple outputs, but last one is missing.""" + try: + argv = _get_default_argv(testDirMultiOutputs, GRAPH_RESULT_VALID) + argv.append("tests.tosa_mock_sut_run:num_results=1") + main(argv) + fail = _get_xml_failure(argv) + except Exception as e: + assert False, f"Unexpected exception {e}" + assert fail.startswith(UNEXPECTED_FAIL_PREFIX_STR) diff --git a/verif/tests/test_tosa_run_tests_runshcmd.py b/verif/tests/test_tosa_run_tests_runshcmd.py new file mode 100644 index 0000000..a765413 --- /dev/null +++ b/verif/tests/test_tosa_run_tests_runshcmd.py @@ -0,0 +1,54 @@ +"""Tests for tosa_verif_run_tests.py.""" +# Copyright (c) 2021-2022, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +from runner.run_command import run_sh_command +from runner.run_command import RunShCommandError + + +def test_run_command_success(): + """Run successful command.""" + cmd = ["echo", "Hello Space Cadets"] + try: + run_sh_command(cmd) + ok = True + except RunShCommandError: + ok = False + assert ok + + +def test_run_command_fail(): + """Run unsuccessful command.""" + cmd = ["cat", "non-existant-file-432342.txt"] + try: + run_sh_command(cmd) + ok = True + except RunShCommandError as e: + assert e.return_code == 1 + ok = False + assert not ok + + +def test_run_command_fail_with_stderr(): + """Run unsuccessful command capturing output.""" + cmd = ["ls", "--unknown-option"] + try: + stdout, stderr = run_sh_command(cmd, capture_output=True) + ok = True + except RunShCommandError as e: + assert e.return_code == 2 + assert e.stderr + ok = False + assert not ok + + +def test_run_command_success_verbose_with_stdout(): + """Run successful command capturing output.""" + output = "There is no Planet B" + cmd = ["echo", output] + try: + stdout, stderr = run_sh_command(cmd, verbose=True, capture_output=True) + assert stdout == f"{output}\n" + ok = True + except RunShCommandError: + ok = False + assert ok diff --git a/verif/tests/tosa_dummy_sut_run.py b/verif/tests/tosa_dummy_sut_run.py new file mode 100644 index 0000000..fffcfa1 --- /dev/null +++ b/verif/tests/tosa_dummy_sut_run.py @@ -0,0 +1,20 @@ +"""TOSA test runner module for a dummy System Under Test (SUT).""" +# Copyright (c) 2021, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +from runner.tosa_test_runner import TosaTestRunner + + +class TosaSUTRunner(TosaTestRunner): + """TOSA dummy SUT runner.""" + + def __init__(self, args, runnerArgs, testDir): + """Initialize using the given test details.""" + super().__init__(args, runnerArgs, testDir) + + def runTestGraph(self): + """Nothing run as this is a dummy SUT that does nothing.""" + graphResult = TosaTestRunner.TosaGraphResult.TOSA_VALID + graphMessage = "Dummy system under test - nothing run" + + # Return graph result and message + return graphResult, graphMessage diff --git a/verif/tests/tosa_mock_sut_run.py b/verif/tests/tosa_mock_sut_run.py new file mode 100644 index 0000000..9572618 --- /dev/null +++ b/verif/tests/tosa_mock_sut_run.py @@ -0,0 +1,118 @@ +"""TOSA test runner module for a mock System Under Test (SUT).""" +# Copyright (c) 2021, ARM Limited. +# SPDX-License-Identifier: Apache-2.0 +import os + +from runner.run_command import run_sh_command +from runner.run_command import RunShCommandError +from runner.tosa_test_runner import TosaTestRunner + + +class TosaSUTRunner(TosaTestRunner): + """TOSA mock SUT runner.""" + + def __init__(self, args, runnerArgs, testDir): + """Initialize using the given test details.""" + super().__init__(args, runnerArgs, testDir) + + def runTestGraph(self): + """Run the test on a mock SUT.""" + # Read the command line sut-module-args in form arg=value + # and put them in a dictionary + # Note: On the command line (for this module) they look like: + # tests.tosa_mock_sut_run:arg=value + sutArgs = {} + for runArg in self.runnerArgs: + try: + arg, value = runArg.split("=", 1) + except ValueError: + # Argument without a value - treat it as a flag + arg = runArg + value = True + sutArgs[arg] = value + print(f"MOCK SUT: Runner argument dictionary: {sutArgs}") + + # Useful meta data and arguments + tosaFlatbufferSchema = self.args.operator_fbs + tosaSubgraphFile = self.testDesc["tosa_file"] + tosaTestDirectory = self.testDir + tosaTestDescFile = self.descFile + + # Expected file name for the graph results on valid graph + graphResultFiles = [] + for idx, name in enumerate(self.testDesc["ofm_name"]): + graphResultFiles.append( + "{}:{}".format(name, self.testDesc["ofm_file"][idx]) + ) + + # Build up input "tensor_name":"filename" list + tosaInputTensors = [] + for idx, name in enumerate(self.testDesc["ifm_name"]): + tosaInputTensors.append( + "{}:{}".format(name, self.testDesc["ifm_file"][idx]) + ) + + # Build up command line + cmd = [ + "echo", + f"FBS={tosaFlatbufferSchema}", + f"Path={tosaTestDirectory}", + f"Desc={tosaTestDescFile}", + f"Graph={tosaSubgraphFile}", + "Results={}".format(",".join(graphResultFiles)), + "Inputs={}".format(",".join(tosaInputTensors)), + ] + + # Run test on implementation + graphResult = None + graphMessage = None + try: + stdout, stderr = run_sh_command(cmd, verbose=True, capture_output=True) + except RunShCommandError as e: + # Return codes can be used to indicate graphResult status (see tosa_ref_run.py) + # But in this mock version we just set the result based on sutArgs below + print(f"MOCK SUT: Unexpected error {e.return_code} from command: {e}") + graphResult = TosaTestRunner.TosaGraphResult.OTHER_ERROR + graphMessage = e.stderr + + # Other mock system testing + if self.args.binary: + # Check that the mock binary conversion has happened + _, ext = os.path.splitext(tosaSubgraphFile) + if ( + os.path.basename(tosaTestDescFile) != "desc_binary.json" + and ext != ".tosa" + ): + graphResult = TosaTestRunner.TosaGraphResult.OTHER_ERROR + + # Mock up graph result based on passed arguments + if not graphResult: + try: + if sutArgs["graph"] == "valid": + graphResult = TosaTestRunner.TosaGraphResult.TOSA_VALID + # Create dummy output file(s) for passing result checker + for idx, fname in enumerate(self.testDesc["ofm_file"]): + if "num_results" in sutArgs and idx == int( + sutArgs["num_results"] + ): + # Skip writing any more to test results checker + break + print("Created " + fname) + fp = open(os.path.join(tosaTestDirectory, fname), "w") + fp.close() + elif sutArgs["graph"] == "error": + graphResult = TosaTestRunner.TosaGraphResult.TOSA_ERROR + graphMessage = "MOCK SUT: ERROR_IF" + elif sutArgs["graph"] == "unpredictable": + graphResult = TosaTestRunner.TosaGraphResult.TOSA_UNPREDICTABLE + graphMessage = "MOCK SUT: UNPREDICTABLE" + else: + graphResult = TosaTestRunner.TosaGraphResult.OTHER_ERROR + graphMessage = "MOCK SUT: error from system under test" + except KeyError: + graphMessage = "MOCK SUT: No graph result specified!" + print(graphMessage) + graphResult = TosaTestRunner.TosaGraphResult.OTHER_ERROR + + # Return graph result and message + return graphResult, graphMessage |