diff options
Diffstat (limited to 'scripts/py')
-rw-r--r-- | scripts/py/gen_rgb_cpp.py | 43 | ||||
-rw-r--r-- | scripts/py/templates/Images.cc.template | 20 | ||||
-rw-r--r-- | scripts/py/templates/Images.hpp.template | 11 | ||||
-rw-r--r-- | scripts/py/vsi/arm_vsi4.py | 207 | ||||
-rw-r--r-- | scripts/py/vsi/arm_vsi5.py | 207 | ||||
-rw-r--r-- | scripts/py/vsi/arm_vsi6.py | 207 | ||||
-rw-r--r-- | scripts/py/vsi/arm_vsi7.py | 207 | ||||
-rw-r--r-- | scripts/py/vsi/vsi_video.py | 461 | ||||
-rw-r--r-- | scripts/py/vsi/vsi_video_server.py | 447 |
9 files changed, 1796 insertions, 14 deletions
diff --git a/scripts/py/gen_rgb_cpp.py b/scripts/py/gen_rgb_cpp.py index e1c93bb..f1200e6 100644 --- a/scripts/py/gen_rgb_cpp.py +++ b/scripts/py/gen_rgb_cpp.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright 2021-2023 Arm Limited and/or its affiliates <open-source-office@arm.com> +# SPDX-FileCopyrightText: Copyright 2021-2024 Arm Limited and/or its affiliates <open-source-office@arm.com> # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,6 +18,7 @@ Utility script to convert a set of RGB images in a given location into corresponding cpp files and a single hpp file referencing the vectors from the cpp files. """ +import argparse import glob import math import typing @@ -60,6 +61,13 @@ parser.add_argument( ) parser.add_argument( + "--generate_file_paths", + type=bool, + action=argparse.BooleanOptionalAction, + help="Generate an array of file paths to the images as well as the images themselves." +) + +parser.add_argument( "--license_template", type=str, help="Header template file", @@ -85,11 +93,12 @@ class ImagesParams: image_filenames: typing.List[str] -def write_hpp_file( +def write_metadata_files( images_params: ImagesParams, header_file_path: Path, cc_file_path: Path, header_template_file: str, + source_directory: str = None ): """ Write Images.hpp and Images.cc @@ -98,6 +107,7 @@ def write_hpp_file( @param header_file_path: Images.hpp path @param cc_file_path: Images.cc path @param header_template_file: Header template file name + @param source_directory: Optional source directory of images """ print(f"++ Generating {header_file_path}") hdr = GenUtils.gen_header(env, header_template_file) @@ -109,14 +119,16 @@ def write_hpp_file( .stream(common_template_header=hdr, imgs_count=images_params.num_images, img_size=image_size, - var_names=images_params.image_array_names) \ + var_names=images_params.image_array_names, + source_directory=source_directory) \ .dump(str(header_file_path)) env \ .get_template('Images.cc.template') \ .stream(common_template_header=hdr, var_names=images_params.image_array_names, - img_names=images_params.image_filenames) \ + img_names=images_params.image_filenames, + source_directory=source_directory) \ .dump(str(cc_file_path)) @@ -196,9 +208,13 @@ def main(args): image_filenames = [] image_array_names = [] - if Path(args.image_path).is_dir(): + image_path = Path(args.image_path) + + if image_path.is_dir(): + image_directory = image_path filepaths = sorted(glob.glob(str(Path(args.image_path) / '**/*.*'), recursive=True)) - elif Path(args.image_path).is_file(): + elif image_path.is_file(): + image_directory = image_path.parent filepaths = [args.image_path] else: raise OSError("Directory or file does not exist.") @@ -228,13 +244,16 @@ def main(args): # Increment image index image_idx = image_idx + 1 - header_filepath = Path(args.header_folder_path) / "InputFiles.hpp" - common_cc_filepath = Path(args.source_folder_path) / "InputFiles.cc" - - images_params = ImagesParams(image_idx, args.image_size, image_array_names, image_filenames) - if len(image_filenames) > 0: - write_hpp_file(images_params, header_filepath, common_cc_filepath, args.license_template) + images_params = ImagesParams(image_idx, args.image_size, image_array_names, image_filenames) + + write_metadata_files( + images_params, + header_file_path=Path(args.header_folder_path) / "InputFiles.hpp", + cc_file_path=Path(args.source_folder_path) / "InputFiles.cc", + header_template_file=args.license_template, + source_directory=image_directory if args.generate_file_paths else None + ) else: raise FileNotFoundError("No valid images found.") diff --git a/scripts/py/templates/Images.cc.template b/scripts/py/templates/Images.cc.template index 2620ab4..c5b051a 100644 --- a/scripts/py/templates/Images.cc.template +++ b/scripts/py/templates/Images.cc.template @@ -1,5 +1,5 @@ {# - SPDX-FileCopyrightText: Copyright 2021 Arm Limited and/or its affiliates <open-source-office@arm.com> + SPDX-FileCopyrightText: Copyright 2021, 2024 Arm Limited and/or its affiliates <open-source-office@arm.com> SPDX-License-Identifier: Apache-2.0 Licensed under the Apache License, Version 2.0 (the "License"); @@ -24,6 +24,14 @@ static const char* imgFilenames[] = { {% endfor %} }; +{% if source_directory %} +static const char* imgFilePaths[] = { +{% for name in img_names %} + "{{source_directory}}/{{name}}", +{% endfor %} +}; +{% endif %} + static const uint8_t* imgArrays[] = { {{ var_names|join(',\n ') }} }; @@ -36,6 +44,16 @@ const char* GetFilename(const uint32_t idx) return nullptr; } +{% if source_directory %} +const char* GetFilePath(const uint32_t idx) +{ + if (idx < NUMBER_OF_FILES) { + return imgFilePaths[idx]; + } + return nullptr; +} +{% endif %} + const uint8_t* GetImgArray(const uint32_t idx) { if (idx < NUMBER_OF_FILES) { diff --git a/scripts/py/templates/Images.hpp.template b/scripts/py/templates/Images.hpp.template index d39fc49..1f0a70e 100644 --- a/scripts/py/templates/Images.hpp.template +++ b/scripts/py/templates/Images.hpp.template @@ -1,5 +1,5 @@ {# - SPDX-FileCopyrightText: Copyright 2021 Arm Limited and/or its affiliates <open-source-office@arm.com> + SPDX-FileCopyrightText: Copyright 2021, 2024 Arm Limited and/or its affiliates <open-source-office@arm.com> SPDX-License-Identifier: Apache-2.0 Licensed under the Apache License, Version 2.0 (the "License"); @@ -35,6 +35,15 @@ extern const uint8_t {{var_name}}[IMAGE_DATA_SIZE]; **/ const char* GetFilename(const uint32_t idx); +{% if source_directory %} +/** + * @brief Gets the file path for the image on the local filesystem + * @param[in] idx Index of the input. + * @return const C string pointer to the file path. + **/ +const char* GetFilePath(const uint32_t idx); +{% endif %} + /** * @brief Gets the pointer to image data. * @param[in] idx Index of the input. diff --git a/scripts/py/vsi/arm_vsi4.py b/scripts/py/vsi/arm_vsi4.py new file mode 100644 index 0000000..c903ab2 --- /dev/null +++ b/scripts/py/vsi/arm_vsi4.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com> +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import vsi_video + +## Set verbosity level +#verbosity = logging.DEBUG +verbosity = logging.ERROR + +# [debugging] Verbosity settings +level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" } +logging.basicConfig(format='Py: VSI4: [%(levelname)s]\t%(message)s', level = verbosity) +logging.info("Verbosity level is set to " + level[verbosity]) + + +# Video Server configuration +server_address = ('127.0.0.1', 6000) +server_authkey = 'vsi_video' + + +# IRQ registers +IRQ_Status = 0 + +# Timer registers +Timer_Control = 0 +Timer_Interval = 0 + +# Timer Control register definitions +Timer_Control_Run_Msk = 1<<0 +Timer_Control_Periodic_Msk = 1<<1 +Timer_Control_Trig_IRQ_Msk = 1<<2 +Timer_Control_Trig_DMA_Msk = 1<<3 + +# DMA registers +DMA_Control = 0 + +# DMA Control register definitions +DMA_Control_Enable_Msk = 1<<0 +DMA_Control_Direction_Msk = 1<<1 +DMA_Control_Direction_P2M = 0<<1 +DMA_Control_Direction_M2P = 1<<1 + +# User registers +Regs = [0] * 64 + +# Data buffer +Data = bytearray() + + +## Initialize +# @return None +def init(): + logging.info("Python function init() called") + vsi_video.init(server_address, server_authkey) + + +## Read interrupt request (the VSI IRQ Status Register) +# @return value value read (32-bit) +def rdIRQ(): + global IRQ_Status + logging.info("Python function rdIRQ() called") + + value = IRQ_Status + logging.debug("Read interrupt request: {}".format(value)) + + return value + + +## Write interrupt request (the VSI IRQ Status Register) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrIRQ(value): + global IRQ_Status + logging.info("Python function wrIRQ() called") + + value = vsi_video.wrIRQ(IRQ_Status, value) + IRQ_Status = value + logging.debug("Write interrupt request: {}".format(value)) + + return value + + +## Write Timer registers (the VSI Timer Registers) +# @param index Timer register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrTimer(index, value): + global Timer_Control, Timer_Interval + logging.info("Python function wrTimer() called") + + if index == 0: + Timer_Control = value + logging.debug("Write Timer_Control: {}".format(value)) + elif index == 1: + Timer_Interval = value + logging.debug("Write Timer_Interval: {}".format(value)) + + return value + + +## Timer event (called at Timer Overflow) +# @return None +def timerEvent(): + global IRQ_Status + + logging.info("Python function timerEvent() called") + + IRQ_Status = vsi_video.timerEvent(IRQ_Status) + + +## Write DMA registers (the VSI DMA Registers) +# @param index DMA register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrDMA(index, value): + global DMA_Control + logging.info("Python function wrDMA() called") + + if index == 0: + DMA_Control = value + logging.debug("Write DMA_Control: {}".format(value)) + + return value + + +## Read data from peripheral for DMA P2M transfer (VSI DMA) +# @param size size of data to read (in bytes, multiple of 4) +# @return data data read (bytearray) +def rdDataDMA(size): + global Data + logging.info("Python function rdDataDMA() called") + + Data = vsi_video.rdDataDMA(size) + + n = min(len(Data), size) + data = bytearray(size) + data[0:n] = Data[0:n] + logging.debug("Read data ({} bytes)".format(size)) + + return data + + +## Write data to peripheral for DMA M2P transfer (VSI DMA) +# @param data data to write (bytearray) +# @param size size of data to write (in bytes, multiple of 4) +# @return None +def wrDataDMA(data, size): + global Data + logging.info("Python function wrDataDMA() called") + + Data = data + logging.debug("Write data ({} bytes)".format(size)) + + vsi_video.wrDataDMA(data, size) + + return + + +## Read user registers (the VSI User Registers) +# @param index user register index (zero based) +# @return value value read (32-bit) +def rdRegs(index): + global Regs + logging.info("Python function rdRegs() called") + + if index <= vsi_video.REG_IDX_MAX: + Regs[index] = vsi_video.rdRegs(index) + + value = Regs[index] + logging.debug("Read user register at index {}: {}".format(index, value)) + + return value + + +## Write user registers (the VSI User Registers) +# @param index user register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrRegs(index, value): + global Regs + logging.info("Python function wrRegs() called") + + if index <= vsi_video.REG_IDX_MAX: + value = vsi_video.wrRegs(index, value) + + Regs[index] = value + logging.debug("Write user register at index {}: {}".format(index, value)) + + return value + + +## @} + diff --git a/scripts/py/vsi/arm_vsi5.py b/scripts/py/vsi/arm_vsi5.py new file mode 100644 index 0000000..8056096 --- /dev/null +++ b/scripts/py/vsi/arm_vsi5.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com> +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import vsi_video + +## Set verbosity level +#verbosity = logging.DEBUG +verbosity = logging.ERROR + +# [debugging] Verbosity settings +level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" } +logging.basicConfig(format='Py: VSI5: [%(levelname)s]\t%(message)s', level = verbosity) +logging.info("Verbosity level is set to " + level[verbosity]) + + +# Video Server configuration +server_address = ('127.0.0.1', 6001) +server_authkey = 'vsi_video' + + +# IRQ registers +IRQ_Status = 0 + +# Timer registers +Timer_Control = 0 +Timer_Interval = 0 + +# Timer Control register definitions +Timer_Control_Run_Msk = 1<<0 +Timer_Control_Periodic_Msk = 1<<1 +Timer_Control_Trig_IRQ_Msk = 1<<2 +Timer_Control_Trig_DMA_Msk = 1<<3 + +# DMA registers +DMA_Control = 0 + +# DMA Control register definitions +DMA_Control_Enable_Msk = 1<<0 +DMA_Control_Direction_Msk = 1<<1 +DMA_Control_Direction_P2M = 0<<1 +DMA_Control_Direction_M2P = 1<<1 + +# User registers +Regs = [0] * 64 + +# Data buffer +Data = bytearray() + + +## Initialize +# @return None +def init(): + logging.info("Python function init() called") + vsi_video.init(server_address, server_authkey) + + +## Read interrupt request (the VSI IRQ Status Register) +# @return value value read (32-bit) +def rdIRQ(): + global IRQ_Status + logging.info("Python function rdIRQ() called") + + value = IRQ_Status + logging.debug("Read interrupt request: {}".format(value)) + + return value + + +## Write interrupt request (the VSI IRQ Status Register) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrIRQ(value): + global IRQ_Status + logging.info("Python function wrIRQ() called") + + value = vsi_video.wrIRQ(IRQ_Status, value) + IRQ_Status = value + logging.debug("Write interrupt request: {}".format(value)) + + return value + + +## Write Timer registers (the VSI Timer Registers) +# @param index Timer register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrTimer(index, value): + global Timer_Control, Timer_Interval + logging.info("Python function wrTimer() called") + + if index == 0: + Timer_Control = value + logging.debug("Write Timer_Control: {}".format(value)) + elif index == 1: + Timer_Interval = value + logging.debug("Write Timer_Interval: {}".format(value)) + + return value + + +## Timer event (called at Timer Overflow) +# @return None +def timerEvent(): + global IRQ_Status + + logging.info("Python function timerEvent() called") + + IRQ_Status = vsi_video.timerEvent(IRQ_Status) + + +## Write DMA registers (the VSI DMA Registers) +# @param index DMA register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrDMA(index, value): + global DMA_Control + logging.info("Python function wrDMA() called") + + if index == 0: + DMA_Control = value + logging.debug("Write DMA_Control: {}".format(value)) + + return value + + +## Read data from peripheral for DMA P2M transfer (VSI DMA) +# @param size size of data to read (in bytes, multiple of 4) +# @return data data read (bytearray) +def rdDataDMA(size): + global Data + logging.info("Python function rdDataDMA() called") + + Data = vsi_video.rdDataDMA(size) + + n = min(len(Data), size) + data = bytearray(size) + data[0:n] = Data[0:n] + logging.debug("Read data ({} bytes)".format(size)) + + return data + + +## Write data to peripheral for DMA M2P transfer (VSI DMA) +# @param data data to write (bytearray) +# @param size size of data to write (in bytes, multiple of 4) +# @return None +def wrDataDMA(data, size): + global Data + logging.info("Python function wrDataDMA() called") + + Data = data + logging.debug("Write data ({} bytes)".format(size)) + + vsi_video.wrDataDMA(data, size) + + return + + +## Read user registers (the VSI User Registers) +# @param index user register index (zero based) +# @return value value read (32-bit) +def rdRegs(index): + global Regs + logging.info("Python function rdRegs() called") + + if index <= vsi_video.REG_IDX_MAX: + Regs[index] = vsi_video.rdRegs(index) + + value = Regs[index] + logging.debug("Read user register at index {}: {}".format(index, value)) + + return value + + +## Write user registers (the VSI User Registers) +# @param index user register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrRegs(index, value): + global Regs + logging.info("Python function wrRegs() called") + + if index <= vsi_video.REG_IDX_MAX: + value = vsi_video.wrRegs(index, value) + + Regs[index] = value + logging.debug("Write user register at index {}: {}".format(index, value)) + + return value + + +## @} + diff --git a/scripts/py/vsi/arm_vsi6.py b/scripts/py/vsi/arm_vsi6.py new file mode 100644 index 0000000..3d71562 --- /dev/null +++ b/scripts/py/vsi/arm_vsi6.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com> +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import vsi_video + +## Set verbosity level +#verbosity = logging.DEBUG +verbosity = logging.ERROR + +# [debugging] Verbosity settings +level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" } +logging.basicConfig(format='Py: VSI6: [%(levelname)s]\t%(message)s', level = verbosity) +logging.info("Verbosity level is set to " + level[verbosity]) + + +# Video Server configuration +server_address = ('127.0.0.1', 6002) +server_authkey = 'vsi_video' + + +# IRQ registers +IRQ_Status = 0 + +# Timer registers +Timer_Control = 0 +Timer_Interval = 0 + +# Timer Control register definitions +Timer_Control_Run_Msk = 1<<0 +Timer_Control_Periodic_Msk = 1<<1 +Timer_Control_Trig_IRQ_Msk = 1<<2 +Timer_Control_Trig_DMA_Msk = 1<<3 + +# DMA registers +DMA_Control = 0 + +# DMA Control register definitions +DMA_Control_Enable_Msk = 1<<0 +DMA_Control_Direction_Msk = 1<<1 +DMA_Control_Direction_P2M = 0<<1 +DMA_Control_Direction_M2P = 1<<1 + +# User registers +Regs = [0] * 64 + +# Data buffer +Data = bytearray() + + +## Initialize +# @return None +def init(): + logging.info("Python function init() called") + vsi_video.init(server_address, server_authkey) + + +## Read interrupt request (the VSI IRQ Status Register) +# @return value value read (32-bit) +def rdIRQ(): + global IRQ_Status + logging.info("Python function rdIRQ() called") + + value = IRQ_Status + logging.debug("Read interrupt request: {}".format(value)) + + return value + + +## Write interrupt request (the VSI IRQ Status Register) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrIRQ(value): + global IRQ_Status + logging.info("Python function wrIRQ() called") + + value = vsi_video.wrIRQ(IRQ_Status, value) + IRQ_Status = value + logging.debug("Write interrupt request: {}".format(value)) + + return value + + +## Write Timer registers (the VSI Timer Registers) +# @param index Timer register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrTimer(index, value): + global Timer_Control, Timer_Interval + logging.info("Python function wrTimer() called") + + if index == 0: + Timer_Control = value + logging.debug("Write Timer_Control: {}".format(value)) + elif index == 1: + Timer_Interval = value + logging.debug("Write Timer_Interval: {}".format(value)) + + return value + + +## Timer event (called at Timer Overflow) +# @return None +def timerEvent(): + global IRQ_Status + + logging.info("Python function timerEvent() called") + + IRQ_Status = vsi_video.timerEvent(IRQ_Status) + + +## Write DMA registers (the VSI DMA Registers) +# @param index DMA register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrDMA(index, value): + global DMA_Control + logging.info("Python function wrDMA() called") + + if index == 0: + DMA_Control = value + logging.debug("Write DMA_Control: {}".format(value)) + + return value + + +## Read data from peripheral for DMA P2M transfer (VSI DMA) +# @param size size of data to read (in bytes, multiple of 4) +# @return data data read (bytearray) +def rdDataDMA(size): + global Data + logging.info("Python function rdDataDMA() called") + + Data = vsi_video.rdDataDMA(size) + + n = min(len(Data), size) + data = bytearray(size) + data[0:n] = Data[0:n] + logging.debug("Read data ({} bytes)".format(size)) + + return data + + +## Write data to peripheral for DMA M2P transfer (VSI DMA) +# @param data data to write (bytearray) +# @param size size of data to write (in bytes, multiple of 4) +# @return None +def wrDataDMA(data, size): + global Data + logging.info("Python function wrDataDMA() called") + + Data = data + logging.debug("Write data ({} bytes)".format(size)) + + vsi_video.wrDataDMA(data, size) + + return + + +## Read user registers (the VSI User Registers) +# @param index user register index (zero based) +# @return value value read (32-bit) +def rdRegs(index): + global Regs + logging.info("Python function rdRegs() called") + + if index <= vsi_video.REG_IDX_MAX: + Regs[index] = vsi_video.rdRegs(index) + + value = Regs[index] + logging.debug("Read user register at index {}: {}".format(index, value)) + + return value + + +## Write user registers (the VSI User Registers) +# @param index user register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrRegs(index, value): + global Regs + logging.info("Python function wrRegs() called") + + if index <= vsi_video.REG_IDX_MAX: + value = vsi_video.wrRegs(index, value) + + Regs[index] = value + logging.debug("Write user register at index {}: {}".format(index, value)) + + return value + + +## @} + diff --git a/scripts/py/vsi/arm_vsi7.py b/scripts/py/vsi/arm_vsi7.py new file mode 100644 index 0000000..892433c --- /dev/null +++ b/scripts/py/vsi/arm_vsi7.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com> +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import vsi_video + +## Set verbosity level +#verbosity = logging.DEBUG +verbosity = logging.ERROR + +# [debugging] Verbosity settings +level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" } +logging.basicConfig(format='Py: VSI7: [%(levelname)s]\t%(message)s', level = verbosity) +logging.info("Verbosity level is set to " + level[verbosity]) + + +# Video Server configuration +server_address = ('127.0.0.1', 6003) +server_authkey = 'vsi_video' + + +# IRQ registers +IRQ_Status = 0 + +# Timer registers +Timer_Control = 0 +Timer_Interval = 0 + +# Timer Control register definitions +Timer_Control_Run_Msk = 1<<0 +Timer_Control_Periodic_Msk = 1<<1 +Timer_Control_Trig_IRQ_Msk = 1<<2 +Timer_Control_Trig_DMA_Msk = 1<<3 + +# DMA registers +DMA_Control = 0 + +# DMA Control register definitions +DMA_Control_Enable_Msk = 1<<0 +DMA_Control_Direction_Msk = 1<<1 +DMA_Control_Direction_P2M = 0<<1 +DMA_Control_Direction_M2P = 1<<1 + +# User registers +Regs = [0] * 64 + +# Data buffer +Data = bytearray() + + +## Initialize +# @return None +def init(): + logging.info("Python function init() called") + vsi_video.init(server_address, server_authkey) + + +## Read interrupt request (the VSI IRQ Status Register) +# @return value value read (32-bit) +def rdIRQ(): + global IRQ_Status + logging.info("Python function rdIRQ() called") + + value = IRQ_Status + logging.debug("Read interrupt request: {}".format(value)) + + return value + + +## Write interrupt request (the VSI IRQ Status Register) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrIRQ(value): + global IRQ_Status + logging.info("Python function wrIRQ() called") + + value = vsi_video.wrIRQ(IRQ_Status, value) + IRQ_Status = value + logging.debug("Write interrupt request: {}".format(value)) + + return value + + +## Write Timer registers (the VSI Timer Registers) +# @param index Timer register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrTimer(index, value): + global Timer_Control, Timer_Interval + logging.info("Python function wrTimer() called") + + if index == 0: + Timer_Control = value + logging.debug("Write Timer_Control: {}".format(value)) + elif index == 1: + Timer_Interval = value + logging.debug("Write Timer_Interval: {}".format(value)) + + return value + + +## Timer event (called at Timer Overflow) +# @return None +def timerEvent(): + global IRQ_Status + + logging.info("Python function timerEvent() called") + + IRQ_Status = vsi_video.timerEvent(IRQ_Status) + + +## Write DMA registers (the VSI DMA Registers) +# @param index DMA register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrDMA(index, value): + global DMA_Control + logging.info("Python function wrDMA() called") + + if index == 0: + DMA_Control = value + logging.debug("Write DMA_Control: {}".format(value)) + + return value + + +## Read data from peripheral for DMA P2M transfer (VSI DMA) +# @param size size of data to read (in bytes, multiple of 4) +# @return data data read (bytearray) +def rdDataDMA(size): + global Data + logging.info("Python function rdDataDMA() called") + + Data = vsi_video.rdDataDMA(size) + + n = min(len(Data), size) + data = bytearray(size) + data[0:n] = Data[0:n] + logging.debug("Read data ({} bytes)".format(size)) + + return data + + +## Write data to peripheral for DMA M2P transfer (VSI DMA) +# @param data data to write (bytearray) +# @param size size of data to write (in bytes, multiple of 4) +# @return None +def wrDataDMA(data, size): + global Data + logging.info("Python function wrDataDMA() called") + + Data = data + logging.debug("Write data ({} bytes)".format(size)) + + vsi_video.wrDataDMA(data, size) + + return + + +## Read user registers (the VSI User Registers) +# @param index user register index (zero based) +# @return value value read (32-bit) +def rdRegs(index): + global Regs + logging.info("Python function rdRegs() called") + + if index <= vsi_video.REG_IDX_MAX: + Regs[index] = vsi_video.rdRegs(index) + + value = Regs[index] + logging.debug("Read user register at index {}: {}".format(index, value)) + + return value + + +## Write user registers (the VSI User Registers) +# @param index user register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrRegs(index, value): + global Regs + logging.info("Python function wrRegs() called") + + if index <= vsi_video.REG_IDX_MAX: + value = vsi_video.wrRegs(index, value) + + Regs[index] = value + logging.debug("Write user register at index {}: {}".format(index, value)) + + return value + + +## @} + diff --git a/scripts/py/vsi/vsi_video.py b/scripts/py/vsi/vsi_video.py new file mode 100644 index 0000000..88f44fb --- /dev/null +++ b/scripts/py/vsi/vsi_video.py @@ -0,0 +1,461 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com> +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import atexit +import logging +import subprocess +from multiprocessing.connection import Client, Connection +from os import path, getcwd +from os import name as os_name + + +class VideoClient: + def __init__(self): + # Server commands + self.SET_FILENAME = 1 + self.STREAM_CONFIGURE = 2 + self.STREAM_ENABLE = 3 + self.STREAM_DISABLE = 4 + self.FRAME_READ = 5 + self.FRAME_WRITE = 6 + self.CLOSE_SERVER = 7 + # Color space + self.GRAYSCALE8 = 1 + self.RGB888 = 2 + self.BGR565 = 3 + self.YUV420 = 4 + self.NV12 = 5 + self.NV21 = 6 + # Variables + self.conn = None + + def connectToServer(self, address, authkey): + for _ in range(50): + try: + self.conn = Client(address, authkey=authkey.encode('utf-8')) + if isinstance(self.conn, Connection): + break + else: + self.conn = None + except Exception: + self.conn = None + time.sleep(0.01) + + def setFilename(self, filename, mode): + self.conn.send([self.SET_FILENAME, getcwd(), filename, mode]) + filename_valid = self.conn.recv() + + return filename_valid + + def configureStream(self, frame_width, frame_height, color_format, frame_rate): + self.conn.send([self.STREAM_CONFIGURE, frame_width, frame_height, color_format, frame_rate]) + configuration_valid = self.conn.recv() + + return configuration_valid + + def enableStream(self, mode): + self.conn.send([self.STREAM_ENABLE, mode]) + stream_active = self.conn.recv() + + return stream_active + + def disableStream(self): + self.conn.send([self.STREAM_DISABLE]) + stream_active = self.conn.recv() + + return stream_active + + def readFrame(self): + self.conn.send([self.FRAME_READ]) + data = self.conn.recv_bytes() + eos = self.conn.recv() + + return data, eos + + def writeFrame(self, data): + self.conn.send([self.FRAME_WRITE]) + self.conn.send_bytes(data) + + def closeServer(self): + try: + if isinstance(self.conn, Connection): + self.conn.send([self.CLOSE_SERVER]) + self.conn.close() + except Exception as e: + logging.error(f'Exception occurred on cleanup: {e}') + + +# User registers +REG_IDX_MAX = 12 # Maximum user register index used in VSI +MODE = 0 # Regs[0] // Mode: 0=Input, 1=Output +CONTROL = 0 # Regs[1] // Control: enable, flush +STATUS = 0 # Regs[2] // Status: active, buf_empty, buf_full, overflow, underflow, eos +FILENAME_LEN = 0 # Regs[3] // Filename length +FILENAME_CHAR = 0 # Regs[4] // Filename character +FILENAME_VALID = 0 # Regs[5] // Filename valid flag +FRAME_WIDTH = 300 # Regs[6] // Requested frame width +FRAME_HEIGHT = 300 # Regs[7] // Requested frame height +COLOR_FORMAT = 0 # Regs[8] // Color format +FRAME_RATE = 0 # Regs[9] // Frame rate +FRAME_INDEX = 0 # Regs[10] // Frame index +FRAME_COUNT = 0 # Regs[11] // Frame count +FRAME_COUNT_MAX = 0 # Regs[12] // Frame count maximum + +# MODE register definitions +MODE_IO_Msk = 1<<0 +MODE_Input = 0<<0 +MODE_Output = 1<<0 + +# CONTROL register definitions +CONTROL_ENABLE_Msk = 1<<0 +CONTROL_CONTINUOS_Msk = 1<<1 +CONTROL_BUF_FLUSH_Msk = 1<<2 + +# STATUS register definitions +STATUS_ACTIVE_Msk = 1<<0 +STATUS_BUF_EMPTY_Msk = 1<<1 +STATUS_BUF_FULL_Msk = 1<<2 +STATUS_OVERFLOW_Msk = 1<<3 +STATUS_UNDERFLOW_Msk = 1<<4 +STATUS_EOS_Msk = 1<<5 + +# IRQ Status register definitions +IRQ_Status_FRAME_Msk = 1<<0 +IRQ_Status_OVERFLOW_Msk = 1<<1 +IRQ_Status_UNDERFLOW_Msk = 1<<2 +IRQ_Status_EOS_Msk = 1<<3 + +# Variables +Video = VideoClient() +Filename = "" +FilenameIdx = 0 + + +# Close VSI Video Server on exit +def cleanup(): + Video.closeServer() + + +# Client connection to VSI Video Server +def init(address, authkey): + global FILENAME_VALID + + base_dir = path.dirname(__file__) + server_path = path.join(base_dir, 'vsi_video_server.py') + + logging.info("Start video server") + if path.isfile(server_path): + # Start Video Server + if os_name == 'nt': + py_cmd = 'python' + else: + py_cmd = 'python3' + cmd = f"{py_cmd} {server_path} " \ + f"--ip {address[0]} " \ + f"--port {address[1]} " \ + f"--authkey {authkey}" + subprocess.Popen(cmd, shell=True) + # Connect to Video Server + Video.connectToServer(address, authkey) + if Video.conn == None: + logging.error("Server not connected") + + else: + logging.error(f"Server script not found: {server_path}") + + # Register clean-up function + atexit.register(cleanup) + + +## Flush Stream buffer +def flushBuffer(): + global STATUS, FRAME_INDEX, FRAME_COUNT + + STATUS |= STATUS_BUF_EMPTY_Msk + STATUS &= ~STATUS_BUF_FULL_Msk + + FRAME_INDEX = 0 + FRAME_COUNT = 0 + + +## VSI IRQ Status register +# @param IRQ_Status IRQ status register to update +# @param value status bits to clear +# @return IRQ_Status return updated register +def wrIRQ(IRQ_Status, value): + IRQ_Status_Clear = IRQ_Status & ~value + IRQ_Status &= ~IRQ_Status_Clear + + return IRQ_Status + + +## Timer Event +# @param IRQ_Status IRQ status register to update +# @return IRQ_Status return updated register +def timerEvent(IRQ_Status): + + IRQ_Status |= IRQ_Status_FRAME_Msk + + if (STATUS & STATUS_OVERFLOW_Msk) != 0: + IRQ_Status |= IRQ_Status_OVERFLOW_Msk + + if (STATUS & STATUS_UNDERFLOW_Msk) != 0: + IRQ_Status |= IRQ_Status_UNDERFLOW_Msk + + if (STATUS & STATUS_EOS_Msk) != 0: + IRQ_Status |= IRQ_Status_EOS_Msk + + if (CONTROL & CONTROL_CONTINUOS_Msk) == 0: + wrCONTROL(CONTROL & ~(CONTROL_ENABLE_Msk | CONTROL_CONTINUOS_Msk)) + + return IRQ_Status + + +## Read data from peripheral for DMA P2M transfer (VSI DMA) +# @param size size of data to read (in bytes, multiple of 4) +# @return data data read (bytearray) +def rdDataDMA(size): + global STATUS, FRAME_COUNT + + if (STATUS & STATUS_ACTIVE_Msk) != 0: + + if Video.conn != None: + data, eos = Video.readFrame() + if eos: + STATUS |= STATUS_EOS_Msk + if FRAME_COUNT < FRAME_COUNT_MAX: + FRAME_COUNT += 1 + else: + STATUS |= STATUS_OVERFLOW_Msk + if FRAME_COUNT == FRAME_COUNT_MAX: + STATUS |= STATUS_BUF_FULL_Msk + STATUS &= ~STATUS_BUF_EMPTY_Msk + else: + data = bytearray() + + else: + data = bytearray() + + return data + + +## Write data to peripheral for DMA M2P transfer (VSI DMA) +# @param data data to write (bytearray) +# @param size size of data to write (in bytes, multiple of 4) +def wrDataDMA(data, size): + global STATUS, FRAME_COUNT + + if (STATUS & STATUS_ACTIVE_Msk) != 0: + + if Video.conn != None: + Video.writeFrame(data) + if FRAME_COUNT > 0: + FRAME_COUNT -= 1 + else: + STATUS |= STATUS_UNDERFLOW_Msk + if FRAME_COUNT == 0: + STATUS |= STATUS_BUF_EMPTY_Msk + STATUS &= ~STATUS_BUF_FULL_Msk + + +## Write CONTROL register (user register) +# @param value value to write (32-bit) +def wrCONTROL(value): + global CONTROL, STATUS + + if ((value ^ CONTROL) & CONTROL_ENABLE_Msk) != 0: + STATUS &= ~STATUS_ACTIVE_Msk + if (value & CONTROL_ENABLE_Msk) != 0: + logging.info("Start video stream") + if Video.conn != None: + logging.info("Configure video stream") + configuration_valid = Video.configureStream(FRAME_WIDTH, FRAME_HEIGHT, COLOR_FORMAT, FRAME_RATE) + if configuration_valid: + logging.info("Enable video stream") + server_active = Video.enableStream(MODE) + if server_active: + STATUS |= STATUS_ACTIVE_Msk + STATUS &= ~(STATUS_OVERFLOW_Msk | STATUS_UNDERFLOW_Msk | STATUS_EOS_Msk) + else: + logging.error("Enable video stream failed") + else: + logging.error("Configure video stream failed") + else: + logging.error("Server not connected") + else: + logging.info("Stop video stream") + if Video.conn != None: + logging.info("Disable video stream") + Video.disableStream() + else: + logging.error("Server not connected") + + if (value & CONTROL_BUF_FLUSH_Msk) != 0: + value &= ~CONTROL_BUF_FLUSH_Msk + flushBuffer() + + CONTROL = value + + +## Read STATUS register (user register) +# @return status current STATUS User register (32-bit) +def rdSTATUS(): + global STATUS + + status = STATUS + STATUS &= ~(STATUS_OVERFLOW_Msk | STATUS_UNDERFLOW_Msk | STATUS_EOS_Msk) + + return status + + +## Write FILENAME_LEN register (user register) +# @param value value to write (32-bit) +def wrFILENAME_LEN(value): + global STATUS, FILENAME_LEN, FILENAME_VALID, Filename, FilenameIdx + + logging.info("Set new source name length and reset filename and valid flag") + FilenameIdx = 0 + Filename = "" + FILENAME_VALID = 0 + FILENAME_LEN = value + + +## Write FILENAME_CHAR register (user register) +# @param value value to write (32-bit) +def wrFILENAME_CHAR(value): + global FILENAME_VALID, Filename, FilenameIdx + + if FilenameIdx < FILENAME_LEN: + logging.info(f"Append {value} to filename") + Filename += f"{value}" + FilenameIdx += 1 + logging.debug(f"Received {FilenameIdx} of {FILENAME_LEN} characters") + + if FilenameIdx == FILENAME_LEN: + logging.info("Check if file exists on Server side and set VALID flag") + logging.debug(f"Filename: {Filename}") + + if Video.conn != None: + FILENAME_VALID = Video.setFilename(Filename, MODE) + else: + logging.error("Server not connected") + + logging.debug(f"Filename VALID: {FILENAME_VALID}") + + +## Write FRAME_INDEX register (user register) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrFRAME_INDEX(value): + global STATUS, FRAME_INDEX, FRAME_COUNT + + FRAME_INDEX += 1 + if FRAME_INDEX == FRAME_COUNT_MAX: + FRAME_INDEX = 0 + + if (MODE & MODE_IO_Msk) == MODE_Input: + # Input + if FRAME_COUNT > 0: + FRAME_COUNT -= 1 + if FRAME_COUNT == 0: + STATUS |= STATUS_BUF_EMPTY_Msk + STATUS &= ~STATUS_BUF_FULL_Msk + else: + # Output + if FRAME_COUNT < FRAME_COUNT_MAX: + FRAME_COUNT += 1 + if FRAME_COUNT == FRAME_COUNT_MAX: + STATUS |= STATUS_BUF_FULL_Msk + STATUS &= ~STATUS_BUF_EMPTY_Msk + + return FRAME_INDEX + + +## Read user registers (the VSI User Registers) +# @param index user register index (zero based) +# @return value value read (32-bit) +def rdRegs(index): + value = 0 + + if index == 0: + value = MODE + elif index == 1: + value = CONTROL + elif index == 2: + value = rdSTATUS() + elif index == 3: + value = FILENAME_LEN + elif index == 4: + value = FILENAME_CHAR + elif index == 5: + value = FILENAME_VALID + elif index == 6: + value = FRAME_WIDTH + elif index == 7: + value = FRAME_HEIGHT + elif index == 8: + value = COLOR_FORMAT + elif index == 9: + value = FRAME_RATE + elif index == 10: + value = FRAME_INDEX + elif index == 11: + value = FRAME_COUNT + elif index == 12: + value = FRAME_COUNT_MAX + + return value + + +## Write user registers (the VSI User Registers) +# @param index user register index (zero based) +# @param value value to write (32-bit) +# @return value value written (32-bit) +def wrRegs(index, value): + global MODE, FRAME_WIDTH, FRAME_HEIGHT, COLOR_FORMAT, FRAME_RATE, FRAME_COUNT_MAX + + if index == 0: + MODE = value + elif index == 1: + wrCONTROL(value) + elif index == 2: + value = STATUS + elif index == 3: + wrFILENAME_LEN(value) + elif index == 4: + wrFILENAME_CHAR(chr(value)) + elif index == 5: + value = FILENAME_VALID + elif index == 6: + if value != 0: + FRAME_WIDTH = value + elif index == 7: + if value != 0: + FRAME_HEIGHT = value + elif index == 8: + COLOR_FORMAT = value + elif index == 9: + FRAME_RATE = value + elif index == 10: + value = wrFRAME_INDEX(value) + elif index == 11: + value = FRAME_COUNT + elif index == 12: + FRAME_COUNT_MAX = value + flushBuffer() + + return value diff --git a/scripts/py/vsi/vsi_video_server.py b/scripts/py/vsi/vsi_video_server.py new file mode 100644 index 0000000..f98b2ac --- /dev/null +++ b/scripts/py/vsi/vsi_video_server.py @@ -0,0 +1,447 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com> +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import ipaddress +import logging +import os +from multiprocessing.connection import Listener + +import cv2 +import numpy as np + +## Set verbosity level +verbosity = logging.ERROR + +# [debugging] Verbosity settings +level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" } +logging.basicConfig(format='VSI Server: [%(levelname)s]\t%(message)s', level = verbosity) +logging.info("Verbosity level is set to " + level[verbosity]) + +# Default Server configuration +default_address = ('127.0.0.1', 6000) +default_authkey = 'vsi_video' + +# Supported file extensions +video_file_extensions = ('wmv', 'avi', 'mp4') +image_file_extensions = ('bmp', 'png', 'jpg') +video_fourcc = {'wmv' : 'WMV1', 'avi' : 'MJPG', 'mp4' : 'mp4v'} + +# Mode Input/Output +MODE_IO_Msk = 1<<0 +MODE_Input = 0<<0 +MODE_Output = 1<<0 + +class VideoServer: + def __init__(self, address, authkey): + # Server commands + self.SET_FILENAME = 1 + self.STREAM_CONFIGURE = 2 + self.STREAM_ENABLE = 3 + self.STREAM_DISABLE = 4 + self.FRAME_READ = 5 + self.FRAME_WRITE = 6 + self.CLOSE_SERVER = 7 + # Color space + self.GRAYSCALE8 = 1 + self.RGB888 = 2 + self.BGR565 = 3 + self.YUV420 = 4 + self.NV12 = 5 + self.NV21 = 6 + # Variables + self.listener = Listener(address, authkey=authkey.encode('utf-8')) + self.filename = "" + self.mode = None + self.active = False + self.video = True + self.stream = None + self.frame_ratio = 0 + self.frame_drop = 0 + self.frame_index = 0 + self.eos = False + # Stream configuration + self.resolution = (None, None) + self.color_format = None + self.frame_rate = None + + # Set filename + def _setFilename(self, base_dir, filename, mode): + filename_valid = False + + if self.active: + return filename_valid + + self.filename = "" + self.frame_index = 0 + + file_extension = str(filename).split('.')[-1].lower() + + if file_extension in video_file_extensions: + self.video = True + else: + self.video = False + + file_path = os.path.join(base_dir, filename) + logging.debug(f"File path: {file_path}") + + if (mode & MODE_IO_Msk) == MODE_Input: + self.mode = MODE_Input + if os.path.isfile(file_path): + if file_extension in (video_file_extensions + image_file_extensions): + self.filename = file_path + filename_valid = True + else: + self.mode = MODE_Output + if file_extension in (video_file_extensions + image_file_extensions): + if os.path.isfile(file_path): + os.remove(file_path) + self.filename = file_path + filename_valid = True + + return filename_valid + + # Configure video stream + def _configureStream(self, frame_width, frame_height, color_format, frame_rate): + if (frame_width == 0 or frame_height == 0 or frame_rate == 0): + return False + + self.resolution = (frame_width, frame_height) + self.color_format = color_format + self.frame_rate = frame_rate + + return True + + # Enable video stream + def _enableStream(self, mode): + if self.active: + return + + self.eos = False + self.frame_ratio = 0 + self.frame_drop = 0 + + if self.stream is not None: + self.stream.release() + self.stream = None + + if self.filename == "": + self.video = True + if (mode & MODE_IO_Msk) == MODE_Input: + # Device mode: camera + self.mode = MODE_Input + else: + # Device mode: display + self.mode = MODE_Output + + if self.video: + if self.mode == MODE_Input: + if self.filename == "": + self.stream = cv2.VideoCapture(0) + if not self.stream.isOpened(): + logging.error("Failed to open Camera interface") + return + else: + self.stream = cv2.VideoCapture(self.filename) + self.stream.set(cv2.CAP_PROP_POS_FRAMES, self.frame_index) + video_fps = self.stream.get(cv2.CAP_PROP_FPS) + if video_fps > self.frame_rate: + self.frame_ratio = video_fps / self.frame_rate + logging.debug(f"Frame ratio: {self.frame_ratio}") + else: + if self.filename != "": + extension = str(self.filename).split('.')[-1].lower() + fourcc = cv2.VideoWriter_fourcc(*f'{video_fourcc[extension]}') + + if os.path.isfile(self.filename) and (self.frame_index != 0): + tmp_filename = f'{self.filename.rstrip(f".{extension}")}_tmp.{extension}' + os.rename(self.filename, tmp_filename) + cap = cv2.VideoCapture(tmp_filename) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + self.resolution = (width, height) + self.frame_rate = cap.get(cv2.CAP_PROP_FPS) + self.stream = cv2.VideoWriter(self.filename, fourcc, self.frame_rate, self.resolution) + + while cap.isOpened(): + ret, frame = cap.read() + if not ret: + cap.release() + os.remove(tmp_filename) + break + self.stream.write(frame) + del frame + + else: + self.stream = cv2.VideoWriter(self.filename, fourcc, self.frame_rate, self.resolution) + + self.active = True + logging.info("Stream enabled") + + # Disable Video Server + def _disableStream(self): + self.active = False + if self.stream is not None: + if self.mode == MODE_Input: + self.frame_index = self.stream.get(cv2.CAP_PROP_POS_FRAMES) + self.stream.release() + self.stream = None + logging.info("Stream disabled") + + # Resize frame to requested resolution in pixels + def __resizeFrame(self, frame, resolution): + frame_h = frame.shape[0] + frame_w = frame.shape[1] + + # Calculate requested aspect ratio (width/height): + crop_aspect_ratio = resolution[0] / resolution[1] + + if crop_aspect_ratio != (frame_w / frame_h): + # Crop into image with resize aspect ratio + crop_w = int(frame_h * crop_aspect_ratio) + crop_h = int(frame_w / crop_aspect_ratio) + + if crop_w > frame_w: + # Crop top and bottom part of the image + top = (frame_h - crop_h) // 2 + bottom = top + crop_h + frame = frame[top : bottom, 0 : frame_w] + elif crop_h > frame_h: + # Crop left and right side of the image`` + left = (frame_w - crop_w) // 2 + right = left + crop_w + frame = frame[0 : frame_h, left : right] + else: + # Crop to the center of the image + left = (frame_w - crop_w) // 2 + right = left + crop_w + top = (frame_h - crop_h) // 2 + bottom = top + crop_h + frame = frame[top : bottom, left : right] + logging.debug(f"Frame cropped from ({frame_w}, {frame_h}) to ({frame.shape[1]}, {frame.shape[0]})") + + logging.debug(f"Resize frame from ({frame.shape[1]}, {frame.shape[0]}) to ({resolution[0]}, {resolution[1]})") + try: + frame = cv2.resize(frame, resolution) + except Exception as e: + logging.error(f"Error in resizeFrame(): {e}") + + return frame + + # Change color space of a frame from BGR to selected profile + def __changeColorSpace(self, frame, color_space): + color_format = None + + # Default OpenCV color profile: BGR + if self.mode == MODE_Input: + if color_space == self.GRAYSCALE8: + color_format = cv2.COLOR_BGR2GRAY + elif color_space == self.RGB888: + color_format = cv2.COLOR_BGR2RGB + elif color_space == self.BGR565: + color_format = cv2.COLOR_BGR2BGR565 + elif color_space == self.YUV420: + color_format = cv2.COLOR_BGR2YUV_I420 + elif color_space == self.NV12: + frame = self.__changeColorSpace(frame, self.YUV420) + color_format = cv2.COLOR_YUV2RGB_NV12 + elif color_space == self.NV21: + frame = self.__changeColorSpace(frame, self.YUV420) + color_format = cv2.COLOR_YUV2RGB_NV21 + + else: + if color_space == self.GRAYSCALE8: + color_format = cv2.COLOR_GRAY2BGR + elif color_space == self.RGB888: + color_format = cv2.COLOR_RGB2BGR + elif color_space == self.BGR565: + color_format = cv2.COLOR_BGR5652BGR + elif color_space == self.YUV420: + color_format = cv2.COLOR_YUV2BGR_I420 + elif color_space == self.NV12: + color_format = cv2.COLOR_YUV2BGR_I420 + elif color_space == self.NV21: + color_format = cv2.COLOR_YUV2BGR_I420 + + if color_format != None: + logging.debug(f"Change color space to {color_format}") + try: + frame = cv2.cvtColor(frame, color_format) + except Exception as e: + logging.error(f"Error in changeColorSpace(): {e}") + + return frame + + # Read frame from source + def _readFrame(self): + frame = bytearray() + + if not self.active: + return frame + + if self.eos: + return frame + + if self.video: + if self.frame_ratio > 1: + _, tmp_frame = self.stream.read() + self.frame_drop += (self.frame_ratio - 1) + if self.frame_drop > 1: + logging.debug(f"Frames to drop: {self.frame_drop}") + drop = int(self.frame_drop // 1) + for i in range(drop): + _, _ = self.stream.read() + logging.debug(f"Frames dropped: {drop}") + self.frame_drop -= drop + logging.debug(f"Frames left to drop: {self.frame_drop}") + else: + _, tmp_frame = self.stream.read() + if tmp_frame is None: + self.eos = True + logging.debug("End of stream.") + else: + tmp_frame = cv2.imread(self.filename) + self.eos = True + logging.debug("End of stream.") + + if tmp_frame is not None: + tmp_frame = self.__resizeFrame(tmp_frame, self.resolution) + tmp_frame = self.__changeColorSpace(tmp_frame, self.color_format) + frame = bytearray(tmp_frame.tobytes()) + + return frame + + # Write frame to destination + def _writeFrame(self, frame): + if not self.active: + return + + try: + decoded_frame = np.frombuffer(frame, dtype=np.uint8) + decoded_frame = decoded_frame.reshape((self.resolution[0], self.resolution[1], 3)) + bgr_frame = self.__changeColorSpace(decoded_frame, self.RGB888) + + if self.filename == "": + cv2.imshow(self.filename, bgr_frame) + cv2.waitKey(10) + else: + if self.video: + self.stream.write(np.uint8(bgr_frame)) + self.frame_index += 1 + else: + cv2.imwrite(self.filename, bgr_frame) + except Exception: + pass + + # Run Video Server + def run(self): + logging.info("Video server started") + + try: + conn = self.listener.accept() + logging.info(f'Connection accepted {self.listener.address}') + except Exception: + logging.error("Connection not accepted") + return + + while True: + try: + recv = conn.recv() + except EOFError: + return + + cmd = recv[0] # Command + payload = recv[1:] # Payload + + if cmd == self.SET_FILENAME: + logging.info("Set filename called") + filename_valid = self._setFilename(payload[0], payload[1], payload[2]) + conn.send(filename_valid) + + elif cmd == self.STREAM_CONFIGURE: + logging.info("Stream configure called") + configuration_valid = self._configureStream(payload[0], payload[1], payload[2], payload[3]) + conn.send(configuration_valid) + + elif cmd == self.STREAM_ENABLE: + logging.info("Enable stream called") + self._enableStream(payload[0]) + conn.send(self.active) + + elif cmd == self.STREAM_DISABLE: + logging.info("Disable stream called") + self._disableStream() + conn.send(self.active) + + elif cmd == self.FRAME_READ: + logging.info("Read frame called") + frame = self._readFrame() + conn.send_bytes(frame) + conn.send(self.eos) + + elif cmd == self.FRAME_WRITE: + logging.info("Write frame called") + frame = conn.recv_bytes() + self._writeFrame(frame) + + elif cmd == self.CLOSE_SERVER: + logging.info("Close server connection") + self.stop() + + # Stop Video Server + def stop(self): + self._disableStream() + if (self.mode == MODE_Output) and (self.filename == ""): + try: + cv2.destroyAllWindows() + except Exception: + pass + self.listener.close() + logging.info("Video server stopped") + + +# Validate IP address +def ip(ip): + try: + _ = ipaddress.ip_address(ip) + return ip + except: + raise argparse.ArgumentTypeError(f"Invalid IP address: {ip}!") + +def parse_arguments(): + formatter = lambda prog: argparse.HelpFormatter(prog, max_help_position=41) + parser = argparse.ArgumentParser(formatter_class=formatter, description="VSI Video Server") + + parser_optional = parser.add_argument_group("optional") + parser_optional.add_argument("--ip", dest="ip", metavar="<IP>", + help=f"Server IP address (default: {default_address[0]})", + type=ip, default=default_address[0]) + parser_optional.add_argument("--port", dest="port", metavar="<TCP Port>", + help=f"TCP port (default: {default_address[1]})", + type=int, default=default_address[1]) + parser_optional.add_argument("--authkey", dest="authkey", metavar="<Auth Key>", + help=f"Authorization key (default: {default_authkey})", + type=str, default=default_authkey) + + return parser.parse_args() + +if __name__ == '__main__': + args = parse_arguments() + Server = VideoServer((args.ip, args.port), args.authkey) + try: + Server.run() + except KeyboardInterrupt: + Server.stop() |