diff options
Diffstat (limited to 'scripts/py/vsi/vsi_video_server.py')
-rw-r--r-- | scripts/py/vsi/vsi_video_server.py | 447 |
1 files changed, 447 insertions, 0 deletions
diff --git a/scripts/py/vsi/vsi_video_server.py b/scripts/py/vsi/vsi_video_server.py new file mode 100644 index 0000000..f98b2ac --- /dev/null +++ b/scripts/py/vsi/vsi_video_server.py @@ -0,0 +1,447 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com> +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import ipaddress +import logging +import os +from multiprocessing.connection import Listener + +import cv2 +import numpy as np + +## Set verbosity level +verbosity = logging.ERROR + +# [debugging] Verbosity settings +level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" } +logging.basicConfig(format='VSI Server: [%(levelname)s]\t%(message)s', level = verbosity) +logging.info("Verbosity level is set to " + level[verbosity]) + +# Default Server configuration +default_address = ('127.0.0.1', 6000) +default_authkey = 'vsi_video' + +# Supported file extensions +video_file_extensions = ('wmv', 'avi', 'mp4') +image_file_extensions = ('bmp', 'png', 'jpg') +video_fourcc = {'wmv' : 'WMV1', 'avi' : 'MJPG', 'mp4' : 'mp4v'} + +# Mode Input/Output +MODE_IO_Msk = 1<<0 +MODE_Input = 0<<0 +MODE_Output = 1<<0 + +class VideoServer: + def __init__(self, address, authkey): + # Server commands + self.SET_FILENAME = 1 + self.STREAM_CONFIGURE = 2 + self.STREAM_ENABLE = 3 + self.STREAM_DISABLE = 4 + self.FRAME_READ = 5 + self.FRAME_WRITE = 6 + self.CLOSE_SERVER = 7 + # Color space + self.GRAYSCALE8 = 1 + self.RGB888 = 2 + self.BGR565 = 3 + self.YUV420 = 4 + self.NV12 = 5 + self.NV21 = 6 + # Variables + self.listener = Listener(address, authkey=authkey.encode('utf-8')) + self.filename = "" + self.mode = None + self.active = False + self.video = True + self.stream = None + self.frame_ratio = 0 + self.frame_drop = 0 + self.frame_index = 0 + self.eos = False + # Stream configuration + self.resolution = (None, None) + self.color_format = None + self.frame_rate = None + + # Set filename + def _setFilename(self, base_dir, filename, mode): + filename_valid = False + + if self.active: + return filename_valid + + self.filename = "" + self.frame_index = 0 + + file_extension = str(filename).split('.')[-1].lower() + + if file_extension in video_file_extensions: + self.video = True + else: + self.video = False + + file_path = os.path.join(base_dir, filename) + logging.debug(f"File path: {file_path}") + + if (mode & MODE_IO_Msk) == MODE_Input: + self.mode = MODE_Input + if os.path.isfile(file_path): + if file_extension in (video_file_extensions + image_file_extensions): + self.filename = file_path + filename_valid = True + else: + self.mode = MODE_Output + if file_extension in (video_file_extensions + image_file_extensions): + if os.path.isfile(file_path): + os.remove(file_path) + self.filename = file_path + filename_valid = True + + return filename_valid + + # Configure video stream + def _configureStream(self, frame_width, frame_height, color_format, frame_rate): + if (frame_width == 0 or frame_height == 0 or frame_rate == 0): + return False + + self.resolution = (frame_width, frame_height) + self.color_format = color_format + self.frame_rate = frame_rate + + return True + + # Enable video stream + def _enableStream(self, mode): + if self.active: + return + + self.eos = False + self.frame_ratio = 0 + self.frame_drop = 0 + + if self.stream is not None: + self.stream.release() + self.stream = None + + if self.filename == "": + self.video = True + if (mode & MODE_IO_Msk) == MODE_Input: + # Device mode: camera + self.mode = MODE_Input + else: + # Device mode: display + self.mode = MODE_Output + + if self.video: + if self.mode == MODE_Input: + if self.filename == "": + self.stream = cv2.VideoCapture(0) + if not self.stream.isOpened(): + logging.error("Failed to open Camera interface") + return + else: + self.stream = cv2.VideoCapture(self.filename) + self.stream.set(cv2.CAP_PROP_POS_FRAMES, self.frame_index) + video_fps = self.stream.get(cv2.CAP_PROP_FPS) + if video_fps > self.frame_rate: + self.frame_ratio = video_fps / self.frame_rate + logging.debug(f"Frame ratio: {self.frame_ratio}") + else: + if self.filename != "": + extension = str(self.filename).split('.')[-1].lower() + fourcc = cv2.VideoWriter_fourcc(*f'{video_fourcc[extension]}') + + if os.path.isfile(self.filename) and (self.frame_index != 0): + tmp_filename = f'{self.filename.rstrip(f".{extension}")}_tmp.{extension}' + os.rename(self.filename, tmp_filename) + cap = cv2.VideoCapture(tmp_filename) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + self.resolution = (width, height) + self.frame_rate = cap.get(cv2.CAP_PROP_FPS) + self.stream = cv2.VideoWriter(self.filename, fourcc, self.frame_rate, self.resolution) + + while cap.isOpened(): + ret, frame = cap.read() + if not ret: + cap.release() + os.remove(tmp_filename) + break + self.stream.write(frame) + del frame + + else: + self.stream = cv2.VideoWriter(self.filename, fourcc, self.frame_rate, self.resolution) + + self.active = True + logging.info("Stream enabled") + + # Disable Video Server + def _disableStream(self): + self.active = False + if self.stream is not None: + if self.mode == MODE_Input: + self.frame_index = self.stream.get(cv2.CAP_PROP_POS_FRAMES) + self.stream.release() + self.stream = None + logging.info("Stream disabled") + + # Resize frame to requested resolution in pixels + def __resizeFrame(self, frame, resolution): + frame_h = frame.shape[0] + frame_w = frame.shape[1] + + # Calculate requested aspect ratio (width/height): + crop_aspect_ratio = resolution[0] / resolution[1] + + if crop_aspect_ratio != (frame_w / frame_h): + # Crop into image with resize aspect ratio + crop_w = int(frame_h * crop_aspect_ratio) + crop_h = int(frame_w / crop_aspect_ratio) + + if crop_w > frame_w: + # Crop top and bottom part of the image + top = (frame_h - crop_h) // 2 + bottom = top + crop_h + frame = frame[top : bottom, 0 : frame_w] + elif crop_h > frame_h: + # Crop left and right side of the image`` + left = (frame_w - crop_w) // 2 + right = left + crop_w + frame = frame[0 : frame_h, left : right] + else: + # Crop to the center of the image + left = (frame_w - crop_w) // 2 + right = left + crop_w + top = (frame_h - crop_h) // 2 + bottom = top + crop_h + frame = frame[top : bottom, left : right] + logging.debug(f"Frame cropped from ({frame_w}, {frame_h}) to ({frame.shape[1]}, {frame.shape[0]})") + + logging.debug(f"Resize frame from ({frame.shape[1]}, {frame.shape[0]}) to ({resolution[0]}, {resolution[1]})") + try: + frame = cv2.resize(frame, resolution) + except Exception as e: + logging.error(f"Error in resizeFrame(): {e}") + + return frame + + # Change color space of a frame from BGR to selected profile + def __changeColorSpace(self, frame, color_space): + color_format = None + + # Default OpenCV color profile: BGR + if self.mode == MODE_Input: + if color_space == self.GRAYSCALE8: + color_format = cv2.COLOR_BGR2GRAY + elif color_space == self.RGB888: + color_format = cv2.COLOR_BGR2RGB + elif color_space == self.BGR565: + color_format = cv2.COLOR_BGR2BGR565 + elif color_space == self.YUV420: + color_format = cv2.COLOR_BGR2YUV_I420 + elif color_space == self.NV12: + frame = self.__changeColorSpace(frame, self.YUV420) + color_format = cv2.COLOR_YUV2RGB_NV12 + elif color_space == self.NV21: + frame = self.__changeColorSpace(frame, self.YUV420) + color_format = cv2.COLOR_YUV2RGB_NV21 + + else: + if color_space == self.GRAYSCALE8: + color_format = cv2.COLOR_GRAY2BGR + elif color_space == self.RGB888: + color_format = cv2.COLOR_RGB2BGR + elif color_space == self.BGR565: + color_format = cv2.COLOR_BGR5652BGR + elif color_space == self.YUV420: + color_format = cv2.COLOR_YUV2BGR_I420 + elif color_space == self.NV12: + color_format = cv2.COLOR_YUV2BGR_I420 + elif color_space == self.NV21: + color_format = cv2.COLOR_YUV2BGR_I420 + + if color_format != None: + logging.debug(f"Change color space to {color_format}") + try: + frame = cv2.cvtColor(frame, color_format) + except Exception as e: + logging.error(f"Error in changeColorSpace(): {e}") + + return frame + + # Read frame from source + def _readFrame(self): + frame = bytearray() + + if not self.active: + return frame + + if self.eos: + return frame + + if self.video: + if self.frame_ratio > 1: + _, tmp_frame = self.stream.read() + self.frame_drop += (self.frame_ratio - 1) + if self.frame_drop > 1: + logging.debug(f"Frames to drop: {self.frame_drop}") + drop = int(self.frame_drop // 1) + for i in range(drop): + _, _ = self.stream.read() + logging.debug(f"Frames dropped: {drop}") + self.frame_drop -= drop + logging.debug(f"Frames left to drop: {self.frame_drop}") + else: + _, tmp_frame = self.stream.read() + if tmp_frame is None: + self.eos = True + logging.debug("End of stream.") + else: + tmp_frame = cv2.imread(self.filename) + self.eos = True + logging.debug("End of stream.") + + if tmp_frame is not None: + tmp_frame = self.__resizeFrame(tmp_frame, self.resolution) + tmp_frame = self.__changeColorSpace(tmp_frame, self.color_format) + frame = bytearray(tmp_frame.tobytes()) + + return frame + + # Write frame to destination + def _writeFrame(self, frame): + if not self.active: + return + + try: + decoded_frame = np.frombuffer(frame, dtype=np.uint8) + decoded_frame = decoded_frame.reshape((self.resolution[0], self.resolution[1], 3)) + bgr_frame = self.__changeColorSpace(decoded_frame, self.RGB888) + + if self.filename == "": + cv2.imshow(self.filename, bgr_frame) + cv2.waitKey(10) + else: + if self.video: + self.stream.write(np.uint8(bgr_frame)) + self.frame_index += 1 + else: + cv2.imwrite(self.filename, bgr_frame) + except Exception: + pass + + # Run Video Server + def run(self): + logging.info("Video server started") + + try: + conn = self.listener.accept() + logging.info(f'Connection accepted {self.listener.address}') + except Exception: + logging.error("Connection not accepted") + return + + while True: + try: + recv = conn.recv() + except EOFError: + return + + cmd = recv[0] # Command + payload = recv[1:] # Payload + + if cmd == self.SET_FILENAME: + logging.info("Set filename called") + filename_valid = self._setFilename(payload[0], payload[1], payload[2]) + conn.send(filename_valid) + + elif cmd == self.STREAM_CONFIGURE: + logging.info("Stream configure called") + configuration_valid = self._configureStream(payload[0], payload[1], payload[2], payload[3]) + conn.send(configuration_valid) + + elif cmd == self.STREAM_ENABLE: + logging.info("Enable stream called") + self._enableStream(payload[0]) + conn.send(self.active) + + elif cmd == self.STREAM_DISABLE: + logging.info("Disable stream called") + self._disableStream() + conn.send(self.active) + + elif cmd == self.FRAME_READ: + logging.info("Read frame called") + frame = self._readFrame() + conn.send_bytes(frame) + conn.send(self.eos) + + elif cmd == self.FRAME_WRITE: + logging.info("Write frame called") + frame = conn.recv_bytes() + self._writeFrame(frame) + + elif cmd == self.CLOSE_SERVER: + logging.info("Close server connection") + self.stop() + + # Stop Video Server + def stop(self): + self._disableStream() + if (self.mode == MODE_Output) and (self.filename == ""): + try: + cv2.destroyAllWindows() + except Exception: + pass + self.listener.close() + logging.info("Video server stopped") + + +# Validate IP address +def ip(ip): + try: + _ = ipaddress.ip_address(ip) + return ip + except: + raise argparse.ArgumentTypeError(f"Invalid IP address: {ip}!") + +def parse_arguments(): + formatter = lambda prog: argparse.HelpFormatter(prog, max_help_position=41) + parser = argparse.ArgumentParser(formatter_class=formatter, description="VSI Video Server") + + parser_optional = parser.add_argument_group("optional") + parser_optional.add_argument("--ip", dest="ip", metavar="<IP>", + help=f"Server IP address (default: {default_address[0]})", + type=ip, default=default_address[0]) + parser_optional.add_argument("--port", dest="port", metavar="<TCP Port>", + help=f"TCP port (default: {default_address[1]})", + type=int, default=default_address[1]) + parser_optional.add_argument("--authkey", dest="authkey", metavar="<Auth Key>", + help=f"Authorization key (default: {default_authkey})", + type=str, default=default_authkey) + + return parser.parse_args() + +if __name__ == '__main__': + args = parse_arguments() + Server = VideoServer((args.ip, args.port), args.authkey) + try: + Server.run() + except KeyboardInterrupt: + Server.stop() |