summaryrefslogtreecommitdiff
path: root/scripts/py/vsi/vsi_video_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/py/vsi/vsi_video_server.py')
-rw-r--r--scripts/py/vsi/vsi_video_server.py447
1 files changed, 447 insertions, 0 deletions
diff --git a/scripts/py/vsi/vsi_video_server.py b/scripts/py/vsi/vsi_video_server.py
new file mode 100644
index 0000000..f98b2ac
--- /dev/null
+++ b/scripts/py/vsi/vsi_video_server.py
@@ -0,0 +1,447 @@
+#!/usr/bin/env python3
+# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import ipaddress
+import logging
+import os
+from multiprocessing.connection import Listener
+
+import cv2
+import numpy as np
+
+## Set verbosity level
+verbosity = logging.ERROR
+
+# [debugging] Verbosity settings
+level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" }
+logging.basicConfig(format='VSI Server: [%(levelname)s]\t%(message)s', level = verbosity)
+logging.info("Verbosity level is set to " + level[verbosity])
+
+# Default Server configuration
+default_address = ('127.0.0.1', 6000)
+default_authkey = 'vsi_video'
+
+# Supported file extensions
+video_file_extensions = ('wmv', 'avi', 'mp4')
+image_file_extensions = ('bmp', 'png', 'jpg')
+video_fourcc = {'wmv' : 'WMV1', 'avi' : 'MJPG', 'mp4' : 'mp4v'}
+
+# Mode Input/Output
+MODE_IO_Msk = 1<<0
+MODE_Input = 0<<0
+MODE_Output = 1<<0
+
+class VideoServer:
+ def __init__(self, address, authkey):
+ # Server commands
+ self.SET_FILENAME = 1
+ self.STREAM_CONFIGURE = 2
+ self.STREAM_ENABLE = 3
+ self.STREAM_DISABLE = 4
+ self.FRAME_READ = 5
+ self.FRAME_WRITE = 6
+ self.CLOSE_SERVER = 7
+ # Color space
+ self.GRAYSCALE8 = 1
+ self.RGB888 = 2
+ self.BGR565 = 3
+ self.YUV420 = 4
+ self.NV12 = 5
+ self.NV21 = 6
+ # Variables
+ self.listener = Listener(address, authkey=authkey.encode('utf-8'))
+ self.filename = ""
+ self.mode = None
+ self.active = False
+ self.video = True
+ self.stream = None
+ self.frame_ratio = 0
+ self.frame_drop = 0
+ self.frame_index = 0
+ self.eos = False
+ # Stream configuration
+ self.resolution = (None, None)
+ self.color_format = None
+ self.frame_rate = None
+
+ # Set filename
+ def _setFilename(self, base_dir, filename, mode):
+ filename_valid = False
+
+ if self.active:
+ return filename_valid
+
+ self.filename = ""
+ self.frame_index = 0
+
+ file_extension = str(filename).split('.')[-1].lower()
+
+ if file_extension in video_file_extensions:
+ self.video = True
+ else:
+ self.video = False
+
+ file_path = os.path.join(base_dir, filename)
+ logging.debug(f"File path: {file_path}")
+
+ if (mode & MODE_IO_Msk) == MODE_Input:
+ self.mode = MODE_Input
+ if os.path.isfile(file_path):
+ if file_extension in (video_file_extensions + image_file_extensions):
+ self.filename = file_path
+ filename_valid = True
+ else:
+ self.mode = MODE_Output
+ if file_extension in (video_file_extensions + image_file_extensions):
+ if os.path.isfile(file_path):
+ os.remove(file_path)
+ self.filename = file_path
+ filename_valid = True
+
+ return filename_valid
+
+ # Configure video stream
+ def _configureStream(self, frame_width, frame_height, color_format, frame_rate):
+ if (frame_width == 0 or frame_height == 0 or frame_rate == 0):
+ return False
+
+ self.resolution = (frame_width, frame_height)
+ self.color_format = color_format
+ self.frame_rate = frame_rate
+
+ return True
+
+ # Enable video stream
+ def _enableStream(self, mode):
+ if self.active:
+ return
+
+ self.eos = False
+ self.frame_ratio = 0
+ self.frame_drop = 0
+
+ if self.stream is not None:
+ self.stream.release()
+ self.stream = None
+
+ if self.filename == "":
+ self.video = True
+ if (mode & MODE_IO_Msk) == MODE_Input:
+ # Device mode: camera
+ self.mode = MODE_Input
+ else:
+ # Device mode: display
+ self.mode = MODE_Output
+
+ if self.video:
+ if self.mode == MODE_Input:
+ if self.filename == "":
+ self.stream = cv2.VideoCapture(0)
+ if not self.stream.isOpened():
+ logging.error("Failed to open Camera interface")
+ return
+ else:
+ self.stream = cv2.VideoCapture(self.filename)
+ self.stream.set(cv2.CAP_PROP_POS_FRAMES, self.frame_index)
+ video_fps = self.stream.get(cv2.CAP_PROP_FPS)
+ if video_fps > self.frame_rate:
+ self.frame_ratio = video_fps / self.frame_rate
+ logging.debug(f"Frame ratio: {self.frame_ratio}")
+ else:
+ if self.filename != "":
+ extension = str(self.filename).split('.')[-1].lower()
+ fourcc = cv2.VideoWriter_fourcc(*f'{video_fourcc[extension]}')
+
+ if os.path.isfile(self.filename) and (self.frame_index != 0):
+ tmp_filename = f'{self.filename.rstrip(f".{extension}")}_tmp.{extension}'
+ os.rename(self.filename, tmp_filename)
+ cap = cv2.VideoCapture(tmp_filename)
+ width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
+ height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
+ self.resolution = (width, height)
+ self.frame_rate = cap.get(cv2.CAP_PROP_FPS)
+ self.stream = cv2.VideoWriter(self.filename, fourcc, self.frame_rate, self.resolution)
+
+ while cap.isOpened():
+ ret, frame = cap.read()
+ if not ret:
+ cap.release()
+ os.remove(tmp_filename)
+ break
+ self.stream.write(frame)
+ del frame
+
+ else:
+ self.stream = cv2.VideoWriter(self.filename, fourcc, self.frame_rate, self.resolution)
+
+ self.active = True
+ logging.info("Stream enabled")
+
+ # Disable Video Server
+ def _disableStream(self):
+ self.active = False
+ if self.stream is not None:
+ if self.mode == MODE_Input:
+ self.frame_index = self.stream.get(cv2.CAP_PROP_POS_FRAMES)
+ self.stream.release()
+ self.stream = None
+ logging.info("Stream disabled")
+
+ # Resize frame to requested resolution in pixels
+ def __resizeFrame(self, frame, resolution):
+ frame_h = frame.shape[0]
+ frame_w = frame.shape[1]
+
+ # Calculate requested aspect ratio (width/height):
+ crop_aspect_ratio = resolution[0] / resolution[1]
+
+ if crop_aspect_ratio != (frame_w / frame_h):
+ # Crop into image with resize aspect ratio
+ crop_w = int(frame_h * crop_aspect_ratio)
+ crop_h = int(frame_w / crop_aspect_ratio)
+
+ if crop_w > frame_w:
+ # Crop top and bottom part of the image
+ top = (frame_h - crop_h) // 2
+ bottom = top + crop_h
+ frame = frame[top : bottom, 0 : frame_w]
+ elif crop_h > frame_h:
+ # Crop left and right side of the image``
+ left = (frame_w - crop_w) // 2
+ right = left + crop_w
+ frame = frame[0 : frame_h, left : right]
+ else:
+ # Crop to the center of the image
+ left = (frame_w - crop_w) // 2
+ right = left + crop_w
+ top = (frame_h - crop_h) // 2
+ bottom = top + crop_h
+ frame = frame[top : bottom, left : right]
+ logging.debug(f"Frame cropped from ({frame_w}, {frame_h}) to ({frame.shape[1]}, {frame.shape[0]})")
+
+ logging.debug(f"Resize frame from ({frame.shape[1]}, {frame.shape[0]}) to ({resolution[0]}, {resolution[1]})")
+ try:
+ frame = cv2.resize(frame, resolution)
+ except Exception as e:
+ logging.error(f"Error in resizeFrame(): {e}")
+
+ return frame
+
+ # Change color space of a frame from BGR to selected profile
+ def __changeColorSpace(self, frame, color_space):
+ color_format = None
+
+ # Default OpenCV color profile: BGR
+ if self.mode == MODE_Input:
+ if color_space == self.GRAYSCALE8:
+ color_format = cv2.COLOR_BGR2GRAY
+ elif color_space == self.RGB888:
+ color_format = cv2.COLOR_BGR2RGB
+ elif color_space == self.BGR565:
+ color_format = cv2.COLOR_BGR2BGR565
+ elif color_space == self.YUV420:
+ color_format = cv2.COLOR_BGR2YUV_I420
+ elif color_space == self.NV12:
+ frame = self.__changeColorSpace(frame, self.YUV420)
+ color_format = cv2.COLOR_YUV2RGB_NV12
+ elif color_space == self.NV21:
+ frame = self.__changeColorSpace(frame, self.YUV420)
+ color_format = cv2.COLOR_YUV2RGB_NV21
+
+ else:
+ if color_space == self.GRAYSCALE8:
+ color_format = cv2.COLOR_GRAY2BGR
+ elif color_space == self.RGB888:
+ color_format = cv2.COLOR_RGB2BGR
+ elif color_space == self.BGR565:
+ color_format = cv2.COLOR_BGR5652BGR
+ elif color_space == self.YUV420:
+ color_format = cv2.COLOR_YUV2BGR_I420
+ elif color_space == self.NV12:
+ color_format = cv2.COLOR_YUV2BGR_I420
+ elif color_space == self.NV21:
+ color_format = cv2.COLOR_YUV2BGR_I420
+
+ if color_format != None:
+ logging.debug(f"Change color space to {color_format}")
+ try:
+ frame = cv2.cvtColor(frame, color_format)
+ except Exception as e:
+ logging.error(f"Error in changeColorSpace(): {e}")
+
+ return frame
+
+ # Read frame from source
+ def _readFrame(self):
+ frame = bytearray()
+
+ if not self.active:
+ return frame
+
+ if self.eos:
+ return frame
+
+ if self.video:
+ if self.frame_ratio > 1:
+ _, tmp_frame = self.stream.read()
+ self.frame_drop += (self.frame_ratio - 1)
+ if self.frame_drop > 1:
+ logging.debug(f"Frames to drop: {self.frame_drop}")
+ drop = int(self.frame_drop // 1)
+ for i in range(drop):
+ _, _ = self.stream.read()
+ logging.debug(f"Frames dropped: {drop}")
+ self.frame_drop -= drop
+ logging.debug(f"Frames left to drop: {self.frame_drop}")
+ else:
+ _, tmp_frame = self.stream.read()
+ if tmp_frame is None:
+ self.eos = True
+ logging.debug("End of stream.")
+ else:
+ tmp_frame = cv2.imread(self.filename)
+ self.eos = True
+ logging.debug("End of stream.")
+
+ if tmp_frame is not None:
+ tmp_frame = self.__resizeFrame(tmp_frame, self.resolution)
+ tmp_frame = self.__changeColorSpace(tmp_frame, self.color_format)
+ frame = bytearray(tmp_frame.tobytes())
+
+ return frame
+
+ # Write frame to destination
+ def _writeFrame(self, frame):
+ if not self.active:
+ return
+
+ try:
+ decoded_frame = np.frombuffer(frame, dtype=np.uint8)
+ decoded_frame = decoded_frame.reshape((self.resolution[0], self.resolution[1], 3))
+ bgr_frame = self.__changeColorSpace(decoded_frame, self.RGB888)
+
+ if self.filename == "":
+ cv2.imshow(self.filename, bgr_frame)
+ cv2.waitKey(10)
+ else:
+ if self.video:
+ self.stream.write(np.uint8(bgr_frame))
+ self.frame_index += 1
+ else:
+ cv2.imwrite(self.filename, bgr_frame)
+ except Exception:
+ pass
+
+ # Run Video Server
+ def run(self):
+ logging.info("Video server started")
+
+ try:
+ conn = self.listener.accept()
+ logging.info(f'Connection accepted {self.listener.address}')
+ except Exception:
+ logging.error("Connection not accepted")
+ return
+
+ while True:
+ try:
+ recv = conn.recv()
+ except EOFError:
+ return
+
+ cmd = recv[0] # Command
+ payload = recv[1:] # Payload
+
+ if cmd == self.SET_FILENAME:
+ logging.info("Set filename called")
+ filename_valid = self._setFilename(payload[0], payload[1], payload[2])
+ conn.send(filename_valid)
+
+ elif cmd == self.STREAM_CONFIGURE:
+ logging.info("Stream configure called")
+ configuration_valid = self._configureStream(payload[0], payload[1], payload[2], payload[3])
+ conn.send(configuration_valid)
+
+ elif cmd == self.STREAM_ENABLE:
+ logging.info("Enable stream called")
+ self._enableStream(payload[0])
+ conn.send(self.active)
+
+ elif cmd == self.STREAM_DISABLE:
+ logging.info("Disable stream called")
+ self._disableStream()
+ conn.send(self.active)
+
+ elif cmd == self.FRAME_READ:
+ logging.info("Read frame called")
+ frame = self._readFrame()
+ conn.send_bytes(frame)
+ conn.send(self.eos)
+
+ elif cmd == self.FRAME_WRITE:
+ logging.info("Write frame called")
+ frame = conn.recv_bytes()
+ self._writeFrame(frame)
+
+ elif cmd == self.CLOSE_SERVER:
+ logging.info("Close server connection")
+ self.stop()
+
+ # Stop Video Server
+ def stop(self):
+ self._disableStream()
+ if (self.mode == MODE_Output) and (self.filename == ""):
+ try:
+ cv2.destroyAllWindows()
+ except Exception:
+ pass
+ self.listener.close()
+ logging.info("Video server stopped")
+
+
+# Validate IP address
+def ip(ip):
+ try:
+ _ = ipaddress.ip_address(ip)
+ return ip
+ except:
+ raise argparse.ArgumentTypeError(f"Invalid IP address: {ip}!")
+
+def parse_arguments():
+ formatter = lambda prog: argparse.HelpFormatter(prog, max_help_position=41)
+ parser = argparse.ArgumentParser(formatter_class=formatter, description="VSI Video Server")
+
+ parser_optional = parser.add_argument_group("optional")
+ parser_optional.add_argument("--ip", dest="ip", metavar="<IP>",
+ help=f"Server IP address (default: {default_address[0]})",
+ type=ip, default=default_address[0])
+ parser_optional.add_argument("--port", dest="port", metavar="<TCP Port>",
+ help=f"TCP port (default: {default_address[1]})",
+ type=int, default=default_address[1])
+ parser_optional.add_argument("--authkey", dest="authkey", metavar="<Auth Key>",
+ help=f"Authorization key (default: {default_authkey})",
+ type=str, default=default_authkey)
+
+ return parser.parse_args()
+
+if __name__ == '__main__':
+ args = parse_arguments()
+ Server = VideoServer((args.ip, args.port), args.authkey)
+ try:
+ Server.run()
+ except KeyboardInterrupt:
+ Server.stop()