summaryrefslogtreecommitdiff
path: root/scripts/py/vsi
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/py/vsi')
-rw-r--r--scripts/py/vsi/arm_vsi4.py207
-rw-r--r--scripts/py/vsi/arm_vsi5.py207
-rw-r--r--scripts/py/vsi/arm_vsi6.py207
-rw-r--r--scripts/py/vsi/arm_vsi7.py207
-rw-r--r--scripts/py/vsi/vsi_video.py461
-rw-r--r--scripts/py/vsi/vsi_video_server.py447
6 files changed, 1736 insertions, 0 deletions
diff --git a/scripts/py/vsi/arm_vsi4.py b/scripts/py/vsi/arm_vsi4.py
new file mode 100644
index 0000000..c903ab2
--- /dev/null
+++ b/scripts/py/vsi/arm_vsi4.py
@@ -0,0 +1,207 @@
+#!/usr/bin/env python3
+# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import vsi_video
+
+## Set verbosity level
+#verbosity = logging.DEBUG
+verbosity = logging.ERROR
+
+# [debugging] Verbosity settings
+level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" }
+logging.basicConfig(format='Py: VSI4: [%(levelname)s]\t%(message)s', level = verbosity)
+logging.info("Verbosity level is set to " + level[verbosity])
+
+
+# Video Server configuration
+server_address = ('127.0.0.1', 6000)
+server_authkey = 'vsi_video'
+
+
+# IRQ registers
+IRQ_Status = 0
+
+# Timer registers
+Timer_Control = 0
+Timer_Interval = 0
+
+# Timer Control register definitions
+Timer_Control_Run_Msk = 1<<0
+Timer_Control_Periodic_Msk = 1<<1
+Timer_Control_Trig_IRQ_Msk = 1<<2
+Timer_Control_Trig_DMA_Msk = 1<<3
+
+# DMA registers
+DMA_Control = 0
+
+# DMA Control register definitions
+DMA_Control_Enable_Msk = 1<<0
+DMA_Control_Direction_Msk = 1<<1
+DMA_Control_Direction_P2M = 0<<1
+DMA_Control_Direction_M2P = 1<<1
+
+# User registers
+Regs = [0] * 64
+
+# Data buffer
+Data = bytearray()
+
+
+## Initialize
+# @return None
+def init():
+ logging.info("Python function init() called")
+ vsi_video.init(server_address, server_authkey)
+
+
+## Read interrupt request (the VSI IRQ Status Register)
+# @return value value read (32-bit)
+def rdIRQ():
+ global IRQ_Status
+ logging.info("Python function rdIRQ() called")
+
+ value = IRQ_Status
+ logging.debug("Read interrupt request: {}".format(value))
+
+ return value
+
+
+## Write interrupt request (the VSI IRQ Status Register)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrIRQ(value):
+ global IRQ_Status
+ logging.info("Python function wrIRQ() called")
+
+ value = vsi_video.wrIRQ(IRQ_Status, value)
+ IRQ_Status = value
+ logging.debug("Write interrupt request: {}".format(value))
+
+ return value
+
+
+## Write Timer registers (the VSI Timer Registers)
+# @param index Timer register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrTimer(index, value):
+ global Timer_Control, Timer_Interval
+ logging.info("Python function wrTimer() called")
+
+ if index == 0:
+ Timer_Control = value
+ logging.debug("Write Timer_Control: {}".format(value))
+ elif index == 1:
+ Timer_Interval = value
+ logging.debug("Write Timer_Interval: {}".format(value))
+
+ return value
+
+
+## Timer event (called at Timer Overflow)
+# @return None
+def timerEvent():
+ global IRQ_Status
+
+ logging.info("Python function timerEvent() called")
+
+ IRQ_Status = vsi_video.timerEvent(IRQ_Status)
+
+
+## Write DMA registers (the VSI DMA Registers)
+# @param index DMA register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrDMA(index, value):
+ global DMA_Control
+ logging.info("Python function wrDMA() called")
+
+ if index == 0:
+ DMA_Control = value
+ logging.debug("Write DMA_Control: {}".format(value))
+
+ return value
+
+
+## Read data from peripheral for DMA P2M transfer (VSI DMA)
+# @param size size of data to read (in bytes, multiple of 4)
+# @return data data read (bytearray)
+def rdDataDMA(size):
+ global Data
+ logging.info("Python function rdDataDMA() called")
+
+ Data = vsi_video.rdDataDMA(size)
+
+ n = min(len(Data), size)
+ data = bytearray(size)
+ data[0:n] = Data[0:n]
+ logging.debug("Read data ({} bytes)".format(size))
+
+ return data
+
+
+## Write data to peripheral for DMA M2P transfer (VSI DMA)
+# @param data data to write (bytearray)
+# @param size size of data to write (in bytes, multiple of 4)
+# @return None
+def wrDataDMA(data, size):
+ global Data
+ logging.info("Python function wrDataDMA() called")
+
+ Data = data
+ logging.debug("Write data ({} bytes)".format(size))
+
+ vsi_video.wrDataDMA(data, size)
+
+ return
+
+
+## Read user registers (the VSI User Registers)
+# @param index user register index (zero based)
+# @return value value read (32-bit)
+def rdRegs(index):
+ global Regs
+ logging.info("Python function rdRegs() called")
+
+ if index <= vsi_video.REG_IDX_MAX:
+ Regs[index] = vsi_video.rdRegs(index)
+
+ value = Regs[index]
+ logging.debug("Read user register at index {}: {}".format(index, value))
+
+ return value
+
+
+## Write user registers (the VSI User Registers)
+# @param index user register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrRegs(index, value):
+ global Regs
+ logging.info("Python function wrRegs() called")
+
+ if index <= vsi_video.REG_IDX_MAX:
+ value = vsi_video.wrRegs(index, value)
+
+ Regs[index] = value
+ logging.debug("Write user register at index {}: {}".format(index, value))
+
+ return value
+
+
+## @}
+
diff --git a/scripts/py/vsi/arm_vsi5.py b/scripts/py/vsi/arm_vsi5.py
new file mode 100644
index 0000000..8056096
--- /dev/null
+++ b/scripts/py/vsi/arm_vsi5.py
@@ -0,0 +1,207 @@
+#!/usr/bin/env python3
+# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import vsi_video
+
+## Set verbosity level
+#verbosity = logging.DEBUG
+verbosity = logging.ERROR
+
+# [debugging] Verbosity settings
+level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" }
+logging.basicConfig(format='Py: VSI5: [%(levelname)s]\t%(message)s', level = verbosity)
+logging.info("Verbosity level is set to " + level[verbosity])
+
+
+# Video Server configuration
+server_address = ('127.0.0.1', 6001)
+server_authkey = 'vsi_video'
+
+
+# IRQ registers
+IRQ_Status = 0
+
+# Timer registers
+Timer_Control = 0
+Timer_Interval = 0
+
+# Timer Control register definitions
+Timer_Control_Run_Msk = 1<<0
+Timer_Control_Periodic_Msk = 1<<1
+Timer_Control_Trig_IRQ_Msk = 1<<2
+Timer_Control_Trig_DMA_Msk = 1<<3
+
+# DMA registers
+DMA_Control = 0
+
+# DMA Control register definitions
+DMA_Control_Enable_Msk = 1<<0
+DMA_Control_Direction_Msk = 1<<1
+DMA_Control_Direction_P2M = 0<<1
+DMA_Control_Direction_M2P = 1<<1
+
+# User registers
+Regs = [0] * 64
+
+# Data buffer
+Data = bytearray()
+
+
+## Initialize
+# @return None
+def init():
+ logging.info("Python function init() called")
+ vsi_video.init(server_address, server_authkey)
+
+
+## Read interrupt request (the VSI IRQ Status Register)
+# @return value value read (32-bit)
+def rdIRQ():
+ global IRQ_Status
+ logging.info("Python function rdIRQ() called")
+
+ value = IRQ_Status
+ logging.debug("Read interrupt request: {}".format(value))
+
+ return value
+
+
+## Write interrupt request (the VSI IRQ Status Register)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrIRQ(value):
+ global IRQ_Status
+ logging.info("Python function wrIRQ() called")
+
+ value = vsi_video.wrIRQ(IRQ_Status, value)
+ IRQ_Status = value
+ logging.debug("Write interrupt request: {}".format(value))
+
+ return value
+
+
+## Write Timer registers (the VSI Timer Registers)
+# @param index Timer register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrTimer(index, value):
+ global Timer_Control, Timer_Interval
+ logging.info("Python function wrTimer() called")
+
+ if index == 0:
+ Timer_Control = value
+ logging.debug("Write Timer_Control: {}".format(value))
+ elif index == 1:
+ Timer_Interval = value
+ logging.debug("Write Timer_Interval: {}".format(value))
+
+ return value
+
+
+## Timer event (called at Timer Overflow)
+# @return None
+def timerEvent():
+ global IRQ_Status
+
+ logging.info("Python function timerEvent() called")
+
+ IRQ_Status = vsi_video.timerEvent(IRQ_Status)
+
+
+## Write DMA registers (the VSI DMA Registers)
+# @param index DMA register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrDMA(index, value):
+ global DMA_Control
+ logging.info("Python function wrDMA() called")
+
+ if index == 0:
+ DMA_Control = value
+ logging.debug("Write DMA_Control: {}".format(value))
+
+ return value
+
+
+## Read data from peripheral for DMA P2M transfer (VSI DMA)
+# @param size size of data to read (in bytes, multiple of 4)
+# @return data data read (bytearray)
+def rdDataDMA(size):
+ global Data
+ logging.info("Python function rdDataDMA() called")
+
+ Data = vsi_video.rdDataDMA(size)
+
+ n = min(len(Data), size)
+ data = bytearray(size)
+ data[0:n] = Data[0:n]
+ logging.debug("Read data ({} bytes)".format(size))
+
+ return data
+
+
+## Write data to peripheral for DMA M2P transfer (VSI DMA)
+# @param data data to write (bytearray)
+# @param size size of data to write (in bytes, multiple of 4)
+# @return None
+def wrDataDMA(data, size):
+ global Data
+ logging.info("Python function wrDataDMA() called")
+
+ Data = data
+ logging.debug("Write data ({} bytes)".format(size))
+
+ vsi_video.wrDataDMA(data, size)
+
+ return
+
+
+## Read user registers (the VSI User Registers)
+# @param index user register index (zero based)
+# @return value value read (32-bit)
+def rdRegs(index):
+ global Regs
+ logging.info("Python function rdRegs() called")
+
+ if index <= vsi_video.REG_IDX_MAX:
+ Regs[index] = vsi_video.rdRegs(index)
+
+ value = Regs[index]
+ logging.debug("Read user register at index {}: {}".format(index, value))
+
+ return value
+
+
+## Write user registers (the VSI User Registers)
+# @param index user register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrRegs(index, value):
+ global Regs
+ logging.info("Python function wrRegs() called")
+
+ if index <= vsi_video.REG_IDX_MAX:
+ value = vsi_video.wrRegs(index, value)
+
+ Regs[index] = value
+ logging.debug("Write user register at index {}: {}".format(index, value))
+
+ return value
+
+
+## @}
+
diff --git a/scripts/py/vsi/arm_vsi6.py b/scripts/py/vsi/arm_vsi6.py
new file mode 100644
index 0000000..3d71562
--- /dev/null
+++ b/scripts/py/vsi/arm_vsi6.py
@@ -0,0 +1,207 @@
+#!/usr/bin/env python3
+# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import vsi_video
+
+## Set verbosity level
+#verbosity = logging.DEBUG
+verbosity = logging.ERROR
+
+# [debugging] Verbosity settings
+level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" }
+logging.basicConfig(format='Py: VSI6: [%(levelname)s]\t%(message)s', level = verbosity)
+logging.info("Verbosity level is set to " + level[verbosity])
+
+
+# Video Server configuration
+server_address = ('127.0.0.1', 6002)
+server_authkey = 'vsi_video'
+
+
+# IRQ registers
+IRQ_Status = 0
+
+# Timer registers
+Timer_Control = 0
+Timer_Interval = 0
+
+# Timer Control register definitions
+Timer_Control_Run_Msk = 1<<0
+Timer_Control_Periodic_Msk = 1<<1
+Timer_Control_Trig_IRQ_Msk = 1<<2
+Timer_Control_Trig_DMA_Msk = 1<<3
+
+# DMA registers
+DMA_Control = 0
+
+# DMA Control register definitions
+DMA_Control_Enable_Msk = 1<<0
+DMA_Control_Direction_Msk = 1<<1
+DMA_Control_Direction_P2M = 0<<1
+DMA_Control_Direction_M2P = 1<<1
+
+# User registers
+Regs = [0] * 64
+
+# Data buffer
+Data = bytearray()
+
+
+## Initialize
+# @return None
+def init():
+ logging.info("Python function init() called")
+ vsi_video.init(server_address, server_authkey)
+
+
+## Read interrupt request (the VSI IRQ Status Register)
+# @return value value read (32-bit)
+def rdIRQ():
+ global IRQ_Status
+ logging.info("Python function rdIRQ() called")
+
+ value = IRQ_Status
+ logging.debug("Read interrupt request: {}".format(value))
+
+ return value
+
+
+## Write interrupt request (the VSI IRQ Status Register)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrIRQ(value):
+ global IRQ_Status
+ logging.info("Python function wrIRQ() called")
+
+ value = vsi_video.wrIRQ(IRQ_Status, value)
+ IRQ_Status = value
+ logging.debug("Write interrupt request: {}".format(value))
+
+ return value
+
+
+## Write Timer registers (the VSI Timer Registers)
+# @param index Timer register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrTimer(index, value):
+ global Timer_Control, Timer_Interval
+ logging.info("Python function wrTimer() called")
+
+ if index == 0:
+ Timer_Control = value
+ logging.debug("Write Timer_Control: {}".format(value))
+ elif index == 1:
+ Timer_Interval = value
+ logging.debug("Write Timer_Interval: {}".format(value))
+
+ return value
+
+
+## Timer event (called at Timer Overflow)
+# @return None
+def timerEvent():
+ global IRQ_Status
+
+ logging.info("Python function timerEvent() called")
+
+ IRQ_Status = vsi_video.timerEvent(IRQ_Status)
+
+
+## Write DMA registers (the VSI DMA Registers)
+# @param index DMA register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrDMA(index, value):
+ global DMA_Control
+ logging.info("Python function wrDMA() called")
+
+ if index == 0:
+ DMA_Control = value
+ logging.debug("Write DMA_Control: {}".format(value))
+
+ return value
+
+
+## Read data from peripheral for DMA P2M transfer (VSI DMA)
+# @param size size of data to read (in bytes, multiple of 4)
+# @return data data read (bytearray)
+def rdDataDMA(size):
+ global Data
+ logging.info("Python function rdDataDMA() called")
+
+ Data = vsi_video.rdDataDMA(size)
+
+ n = min(len(Data), size)
+ data = bytearray(size)
+ data[0:n] = Data[0:n]
+ logging.debug("Read data ({} bytes)".format(size))
+
+ return data
+
+
+## Write data to peripheral for DMA M2P transfer (VSI DMA)
+# @param data data to write (bytearray)
+# @param size size of data to write (in bytes, multiple of 4)
+# @return None
+def wrDataDMA(data, size):
+ global Data
+ logging.info("Python function wrDataDMA() called")
+
+ Data = data
+ logging.debug("Write data ({} bytes)".format(size))
+
+ vsi_video.wrDataDMA(data, size)
+
+ return
+
+
+## Read user registers (the VSI User Registers)
+# @param index user register index (zero based)
+# @return value value read (32-bit)
+def rdRegs(index):
+ global Regs
+ logging.info("Python function rdRegs() called")
+
+ if index <= vsi_video.REG_IDX_MAX:
+ Regs[index] = vsi_video.rdRegs(index)
+
+ value = Regs[index]
+ logging.debug("Read user register at index {}: {}".format(index, value))
+
+ return value
+
+
+## Write user registers (the VSI User Registers)
+# @param index user register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrRegs(index, value):
+ global Regs
+ logging.info("Python function wrRegs() called")
+
+ if index <= vsi_video.REG_IDX_MAX:
+ value = vsi_video.wrRegs(index, value)
+
+ Regs[index] = value
+ logging.debug("Write user register at index {}: {}".format(index, value))
+
+ return value
+
+
+## @}
+
diff --git a/scripts/py/vsi/arm_vsi7.py b/scripts/py/vsi/arm_vsi7.py
new file mode 100644
index 0000000..892433c
--- /dev/null
+++ b/scripts/py/vsi/arm_vsi7.py
@@ -0,0 +1,207 @@
+#!/usr/bin/env python3
+# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import vsi_video
+
+## Set verbosity level
+#verbosity = logging.DEBUG
+verbosity = logging.ERROR
+
+# [debugging] Verbosity settings
+level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" }
+logging.basicConfig(format='Py: VSI7: [%(levelname)s]\t%(message)s', level = verbosity)
+logging.info("Verbosity level is set to " + level[verbosity])
+
+
+# Video Server configuration
+server_address = ('127.0.0.1', 6003)
+server_authkey = 'vsi_video'
+
+
+# IRQ registers
+IRQ_Status = 0
+
+# Timer registers
+Timer_Control = 0
+Timer_Interval = 0
+
+# Timer Control register definitions
+Timer_Control_Run_Msk = 1<<0
+Timer_Control_Periodic_Msk = 1<<1
+Timer_Control_Trig_IRQ_Msk = 1<<2
+Timer_Control_Trig_DMA_Msk = 1<<3
+
+# DMA registers
+DMA_Control = 0
+
+# DMA Control register definitions
+DMA_Control_Enable_Msk = 1<<0
+DMA_Control_Direction_Msk = 1<<1
+DMA_Control_Direction_P2M = 0<<1
+DMA_Control_Direction_M2P = 1<<1
+
+# User registers
+Regs = [0] * 64
+
+# Data buffer
+Data = bytearray()
+
+
+## Initialize
+# @return None
+def init():
+ logging.info("Python function init() called")
+ vsi_video.init(server_address, server_authkey)
+
+
+## Read interrupt request (the VSI IRQ Status Register)
+# @return value value read (32-bit)
+def rdIRQ():
+ global IRQ_Status
+ logging.info("Python function rdIRQ() called")
+
+ value = IRQ_Status
+ logging.debug("Read interrupt request: {}".format(value))
+
+ return value
+
+
+## Write interrupt request (the VSI IRQ Status Register)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrIRQ(value):
+ global IRQ_Status
+ logging.info("Python function wrIRQ() called")
+
+ value = vsi_video.wrIRQ(IRQ_Status, value)
+ IRQ_Status = value
+ logging.debug("Write interrupt request: {}".format(value))
+
+ return value
+
+
+## Write Timer registers (the VSI Timer Registers)
+# @param index Timer register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrTimer(index, value):
+ global Timer_Control, Timer_Interval
+ logging.info("Python function wrTimer() called")
+
+ if index == 0:
+ Timer_Control = value
+ logging.debug("Write Timer_Control: {}".format(value))
+ elif index == 1:
+ Timer_Interval = value
+ logging.debug("Write Timer_Interval: {}".format(value))
+
+ return value
+
+
+## Timer event (called at Timer Overflow)
+# @return None
+def timerEvent():
+ global IRQ_Status
+
+ logging.info("Python function timerEvent() called")
+
+ IRQ_Status = vsi_video.timerEvent(IRQ_Status)
+
+
+## Write DMA registers (the VSI DMA Registers)
+# @param index DMA register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrDMA(index, value):
+ global DMA_Control
+ logging.info("Python function wrDMA() called")
+
+ if index == 0:
+ DMA_Control = value
+ logging.debug("Write DMA_Control: {}".format(value))
+
+ return value
+
+
+## Read data from peripheral for DMA P2M transfer (VSI DMA)
+# @param size size of data to read (in bytes, multiple of 4)
+# @return data data read (bytearray)
+def rdDataDMA(size):
+ global Data
+ logging.info("Python function rdDataDMA() called")
+
+ Data = vsi_video.rdDataDMA(size)
+
+ n = min(len(Data), size)
+ data = bytearray(size)
+ data[0:n] = Data[0:n]
+ logging.debug("Read data ({} bytes)".format(size))
+
+ return data
+
+
+## Write data to peripheral for DMA M2P transfer (VSI DMA)
+# @param data data to write (bytearray)
+# @param size size of data to write (in bytes, multiple of 4)
+# @return None
+def wrDataDMA(data, size):
+ global Data
+ logging.info("Python function wrDataDMA() called")
+
+ Data = data
+ logging.debug("Write data ({} bytes)".format(size))
+
+ vsi_video.wrDataDMA(data, size)
+
+ return
+
+
+## Read user registers (the VSI User Registers)
+# @param index user register index (zero based)
+# @return value value read (32-bit)
+def rdRegs(index):
+ global Regs
+ logging.info("Python function rdRegs() called")
+
+ if index <= vsi_video.REG_IDX_MAX:
+ Regs[index] = vsi_video.rdRegs(index)
+
+ value = Regs[index]
+ logging.debug("Read user register at index {}: {}".format(index, value))
+
+ return value
+
+
+## Write user registers (the VSI User Registers)
+# @param index user register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrRegs(index, value):
+ global Regs
+ logging.info("Python function wrRegs() called")
+
+ if index <= vsi_video.REG_IDX_MAX:
+ value = vsi_video.wrRegs(index, value)
+
+ Regs[index] = value
+ logging.debug("Write user register at index {}: {}".format(index, value))
+
+ return value
+
+
+## @}
+
diff --git a/scripts/py/vsi/vsi_video.py b/scripts/py/vsi/vsi_video.py
new file mode 100644
index 0000000..88f44fb
--- /dev/null
+++ b/scripts/py/vsi/vsi_video.py
@@ -0,0 +1,461 @@
+#!/usr/bin/env python3
+# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import atexit
+import logging
+import subprocess
+from multiprocessing.connection import Client, Connection
+from os import path, getcwd
+from os import name as os_name
+
+
+class VideoClient:
+ def __init__(self):
+ # Server commands
+ self.SET_FILENAME = 1
+ self.STREAM_CONFIGURE = 2
+ self.STREAM_ENABLE = 3
+ self.STREAM_DISABLE = 4
+ self.FRAME_READ = 5
+ self.FRAME_WRITE = 6
+ self.CLOSE_SERVER = 7
+ # Color space
+ self.GRAYSCALE8 = 1
+ self.RGB888 = 2
+ self.BGR565 = 3
+ self.YUV420 = 4
+ self.NV12 = 5
+ self.NV21 = 6
+ # Variables
+ self.conn = None
+
+ def connectToServer(self, address, authkey):
+ for _ in range(50):
+ try:
+ self.conn = Client(address, authkey=authkey.encode('utf-8'))
+ if isinstance(self.conn, Connection):
+ break
+ else:
+ self.conn = None
+ except Exception:
+ self.conn = None
+ time.sleep(0.01)
+
+ def setFilename(self, filename, mode):
+ self.conn.send([self.SET_FILENAME, getcwd(), filename, mode])
+ filename_valid = self.conn.recv()
+
+ return filename_valid
+
+ def configureStream(self, frame_width, frame_height, color_format, frame_rate):
+ self.conn.send([self.STREAM_CONFIGURE, frame_width, frame_height, color_format, frame_rate])
+ configuration_valid = self.conn.recv()
+
+ return configuration_valid
+
+ def enableStream(self, mode):
+ self.conn.send([self.STREAM_ENABLE, mode])
+ stream_active = self.conn.recv()
+
+ return stream_active
+
+ def disableStream(self):
+ self.conn.send([self.STREAM_DISABLE])
+ stream_active = self.conn.recv()
+
+ return stream_active
+
+ def readFrame(self):
+ self.conn.send([self.FRAME_READ])
+ data = self.conn.recv_bytes()
+ eos = self.conn.recv()
+
+ return data, eos
+
+ def writeFrame(self, data):
+ self.conn.send([self.FRAME_WRITE])
+ self.conn.send_bytes(data)
+
+ def closeServer(self):
+ try:
+ if isinstance(self.conn, Connection):
+ self.conn.send([self.CLOSE_SERVER])
+ self.conn.close()
+ except Exception as e:
+ logging.error(f'Exception occurred on cleanup: {e}')
+
+
+# User registers
+REG_IDX_MAX = 12 # Maximum user register index used in VSI
+MODE = 0 # Regs[0] // Mode: 0=Input, 1=Output
+CONTROL = 0 # Regs[1] // Control: enable, flush
+STATUS = 0 # Regs[2] // Status: active, buf_empty, buf_full, overflow, underflow, eos
+FILENAME_LEN = 0 # Regs[3] // Filename length
+FILENAME_CHAR = 0 # Regs[4] // Filename character
+FILENAME_VALID = 0 # Regs[5] // Filename valid flag
+FRAME_WIDTH = 300 # Regs[6] // Requested frame width
+FRAME_HEIGHT = 300 # Regs[7] // Requested frame height
+COLOR_FORMAT = 0 # Regs[8] // Color format
+FRAME_RATE = 0 # Regs[9] // Frame rate
+FRAME_INDEX = 0 # Regs[10] // Frame index
+FRAME_COUNT = 0 # Regs[11] // Frame count
+FRAME_COUNT_MAX = 0 # Regs[12] // Frame count maximum
+
+# MODE register definitions
+MODE_IO_Msk = 1<<0
+MODE_Input = 0<<0
+MODE_Output = 1<<0
+
+# CONTROL register definitions
+CONTROL_ENABLE_Msk = 1<<0
+CONTROL_CONTINUOS_Msk = 1<<1
+CONTROL_BUF_FLUSH_Msk = 1<<2
+
+# STATUS register definitions
+STATUS_ACTIVE_Msk = 1<<0
+STATUS_BUF_EMPTY_Msk = 1<<1
+STATUS_BUF_FULL_Msk = 1<<2
+STATUS_OVERFLOW_Msk = 1<<3
+STATUS_UNDERFLOW_Msk = 1<<4
+STATUS_EOS_Msk = 1<<5
+
+# IRQ Status register definitions
+IRQ_Status_FRAME_Msk = 1<<0
+IRQ_Status_OVERFLOW_Msk = 1<<1
+IRQ_Status_UNDERFLOW_Msk = 1<<2
+IRQ_Status_EOS_Msk = 1<<3
+
+# Variables
+Video = VideoClient()
+Filename = ""
+FilenameIdx = 0
+
+
+# Close VSI Video Server on exit
+def cleanup():
+ Video.closeServer()
+
+
+# Client connection to VSI Video Server
+def init(address, authkey):
+ global FILENAME_VALID
+
+ base_dir = path.dirname(__file__)
+ server_path = path.join(base_dir, 'vsi_video_server.py')
+
+ logging.info("Start video server")
+ if path.isfile(server_path):
+ # Start Video Server
+ if os_name == 'nt':
+ py_cmd = 'python'
+ else:
+ py_cmd = 'python3'
+ cmd = f"{py_cmd} {server_path} " \
+ f"--ip {address[0]} " \
+ f"--port {address[1]} " \
+ f"--authkey {authkey}"
+ subprocess.Popen(cmd, shell=True)
+ # Connect to Video Server
+ Video.connectToServer(address, authkey)
+ if Video.conn == None:
+ logging.error("Server not connected")
+
+ else:
+ logging.error(f"Server script not found: {server_path}")
+
+ # Register clean-up function
+ atexit.register(cleanup)
+
+
+## Flush Stream buffer
+def flushBuffer():
+ global STATUS, FRAME_INDEX, FRAME_COUNT
+
+ STATUS |= STATUS_BUF_EMPTY_Msk
+ STATUS &= ~STATUS_BUF_FULL_Msk
+
+ FRAME_INDEX = 0
+ FRAME_COUNT = 0
+
+
+## VSI IRQ Status register
+# @param IRQ_Status IRQ status register to update
+# @param value status bits to clear
+# @return IRQ_Status return updated register
+def wrIRQ(IRQ_Status, value):
+ IRQ_Status_Clear = IRQ_Status & ~value
+ IRQ_Status &= ~IRQ_Status_Clear
+
+ return IRQ_Status
+
+
+## Timer Event
+# @param IRQ_Status IRQ status register to update
+# @return IRQ_Status return updated register
+def timerEvent(IRQ_Status):
+
+ IRQ_Status |= IRQ_Status_FRAME_Msk
+
+ if (STATUS & STATUS_OVERFLOW_Msk) != 0:
+ IRQ_Status |= IRQ_Status_OVERFLOW_Msk
+
+ if (STATUS & STATUS_UNDERFLOW_Msk) != 0:
+ IRQ_Status |= IRQ_Status_UNDERFLOW_Msk
+
+ if (STATUS & STATUS_EOS_Msk) != 0:
+ IRQ_Status |= IRQ_Status_EOS_Msk
+
+ if (CONTROL & CONTROL_CONTINUOS_Msk) == 0:
+ wrCONTROL(CONTROL & ~(CONTROL_ENABLE_Msk | CONTROL_CONTINUOS_Msk))
+
+ return IRQ_Status
+
+
+## Read data from peripheral for DMA P2M transfer (VSI DMA)
+# @param size size of data to read (in bytes, multiple of 4)
+# @return data data read (bytearray)
+def rdDataDMA(size):
+ global STATUS, FRAME_COUNT
+
+ if (STATUS & STATUS_ACTIVE_Msk) != 0:
+
+ if Video.conn != None:
+ data, eos = Video.readFrame()
+ if eos:
+ STATUS |= STATUS_EOS_Msk
+ if FRAME_COUNT < FRAME_COUNT_MAX:
+ FRAME_COUNT += 1
+ else:
+ STATUS |= STATUS_OVERFLOW_Msk
+ if FRAME_COUNT == FRAME_COUNT_MAX:
+ STATUS |= STATUS_BUF_FULL_Msk
+ STATUS &= ~STATUS_BUF_EMPTY_Msk
+ else:
+ data = bytearray()
+
+ else:
+ data = bytearray()
+
+ return data
+
+
+## Write data to peripheral for DMA M2P transfer (VSI DMA)
+# @param data data to write (bytearray)
+# @param size size of data to write (in bytes, multiple of 4)
+def wrDataDMA(data, size):
+ global STATUS, FRAME_COUNT
+
+ if (STATUS & STATUS_ACTIVE_Msk) != 0:
+
+ if Video.conn != None:
+ Video.writeFrame(data)
+ if FRAME_COUNT > 0:
+ FRAME_COUNT -= 1
+ else:
+ STATUS |= STATUS_UNDERFLOW_Msk
+ if FRAME_COUNT == 0:
+ STATUS |= STATUS_BUF_EMPTY_Msk
+ STATUS &= ~STATUS_BUF_FULL_Msk
+
+
+## Write CONTROL register (user register)
+# @param value value to write (32-bit)
+def wrCONTROL(value):
+ global CONTROL, STATUS
+
+ if ((value ^ CONTROL) & CONTROL_ENABLE_Msk) != 0:
+ STATUS &= ~STATUS_ACTIVE_Msk
+ if (value & CONTROL_ENABLE_Msk) != 0:
+ logging.info("Start video stream")
+ if Video.conn != None:
+ logging.info("Configure video stream")
+ configuration_valid = Video.configureStream(FRAME_WIDTH, FRAME_HEIGHT, COLOR_FORMAT, FRAME_RATE)
+ if configuration_valid:
+ logging.info("Enable video stream")
+ server_active = Video.enableStream(MODE)
+ if server_active:
+ STATUS |= STATUS_ACTIVE_Msk
+ STATUS &= ~(STATUS_OVERFLOW_Msk | STATUS_UNDERFLOW_Msk | STATUS_EOS_Msk)
+ else:
+ logging.error("Enable video stream failed")
+ else:
+ logging.error("Configure video stream failed")
+ else:
+ logging.error("Server not connected")
+ else:
+ logging.info("Stop video stream")
+ if Video.conn != None:
+ logging.info("Disable video stream")
+ Video.disableStream()
+ else:
+ logging.error("Server not connected")
+
+ if (value & CONTROL_BUF_FLUSH_Msk) != 0:
+ value &= ~CONTROL_BUF_FLUSH_Msk
+ flushBuffer()
+
+ CONTROL = value
+
+
+## Read STATUS register (user register)
+# @return status current STATUS User register (32-bit)
+def rdSTATUS():
+ global STATUS
+
+ status = STATUS
+ STATUS &= ~(STATUS_OVERFLOW_Msk | STATUS_UNDERFLOW_Msk | STATUS_EOS_Msk)
+
+ return status
+
+
+## Write FILENAME_LEN register (user register)
+# @param value value to write (32-bit)
+def wrFILENAME_LEN(value):
+ global STATUS, FILENAME_LEN, FILENAME_VALID, Filename, FilenameIdx
+
+ logging.info("Set new source name length and reset filename and valid flag")
+ FilenameIdx = 0
+ Filename = ""
+ FILENAME_VALID = 0
+ FILENAME_LEN = value
+
+
+## Write FILENAME_CHAR register (user register)
+# @param value value to write (32-bit)
+def wrFILENAME_CHAR(value):
+ global FILENAME_VALID, Filename, FilenameIdx
+
+ if FilenameIdx < FILENAME_LEN:
+ logging.info(f"Append {value} to filename")
+ Filename += f"{value}"
+ FilenameIdx += 1
+ logging.debug(f"Received {FilenameIdx} of {FILENAME_LEN} characters")
+
+ if FilenameIdx == FILENAME_LEN:
+ logging.info("Check if file exists on Server side and set VALID flag")
+ logging.debug(f"Filename: {Filename}")
+
+ if Video.conn != None:
+ FILENAME_VALID = Video.setFilename(Filename, MODE)
+ else:
+ logging.error("Server not connected")
+
+ logging.debug(f"Filename VALID: {FILENAME_VALID}")
+
+
+## Write FRAME_INDEX register (user register)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrFRAME_INDEX(value):
+ global STATUS, FRAME_INDEX, FRAME_COUNT
+
+ FRAME_INDEX += 1
+ if FRAME_INDEX == FRAME_COUNT_MAX:
+ FRAME_INDEX = 0
+
+ if (MODE & MODE_IO_Msk) == MODE_Input:
+ # Input
+ if FRAME_COUNT > 0:
+ FRAME_COUNT -= 1
+ if FRAME_COUNT == 0:
+ STATUS |= STATUS_BUF_EMPTY_Msk
+ STATUS &= ~STATUS_BUF_FULL_Msk
+ else:
+ # Output
+ if FRAME_COUNT < FRAME_COUNT_MAX:
+ FRAME_COUNT += 1
+ if FRAME_COUNT == FRAME_COUNT_MAX:
+ STATUS |= STATUS_BUF_FULL_Msk
+ STATUS &= ~STATUS_BUF_EMPTY_Msk
+
+ return FRAME_INDEX
+
+
+## Read user registers (the VSI User Registers)
+# @param index user register index (zero based)
+# @return value value read (32-bit)
+def rdRegs(index):
+ value = 0
+
+ if index == 0:
+ value = MODE
+ elif index == 1:
+ value = CONTROL
+ elif index == 2:
+ value = rdSTATUS()
+ elif index == 3:
+ value = FILENAME_LEN
+ elif index == 4:
+ value = FILENAME_CHAR
+ elif index == 5:
+ value = FILENAME_VALID
+ elif index == 6:
+ value = FRAME_WIDTH
+ elif index == 7:
+ value = FRAME_HEIGHT
+ elif index == 8:
+ value = COLOR_FORMAT
+ elif index == 9:
+ value = FRAME_RATE
+ elif index == 10:
+ value = FRAME_INDEX
+ elif index == 11:
+ value = FRAME_COUNT
+ elif index == 12:
+ value = FRAME_COUNT_MAX
+
+ return value
+
+
+## Write user registers (the VSI User Registers)
+# @param index user register index (zero based)
+# @param value value to write (32-bit)
+# @return value value written (32-bit)
+def wrRegs(index, value):
+ global MODE, FRAME_WIDTH, FRAME_HEIGHT, COLOR_FORMAT, FRAME_RATE, FRAME_COUNT_MAX
+
+ if index == 0:
+ MODE = value
+ elif index == 1:
+ wrCONTROL(value)
+ elif index == 2:
+ value = STATUS
+ elif index == 3:
+ wrFILENAME_LEN(value)
+ elif index == 4:
+ wrFILENAME_CHAR(chr(value))
+ elif index == 5:
+ value = FILENAME_VALID
+ elif index == 6:
+ if value != 0:
+ FRAME_WIDTH = value
+ elif index == 7:
+ if value != 0:
+ FRAME_HEIGHT = value
+ elif index == 8:
+ COLOR_FORMAT = value
+ elif index == 9:
+ FRAME_RATE = value
+ elif index == 10:
+ value = wrFRAME_INDEX(value)
+ elif index == 11:
+ value = FRAME_COUNT
+ elif index == 12:
+ FRAME_COUNT_MAX = value
+ flushBuffer()
+
+ return value
diff --git a/scripts/py/vsi/vsi_video_server.py b/scripts/py/vsi/vsi_video_server.py
new file mode 100644
index 0000000..f98b2ac
--- /dev/null
+++ b/scripts/py/vsi/vsi_video_server.py
@@ -0,0 +1,447 @@
+#!/usr/bin/env python3
+# SPDX-FileCopyrightText: Copyright 2024 Arm Limited and/or its affiliates <open-source-office@arm.com>
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import ipaddress
+import logging
+import os
+from multiprocessing.connection import Listener
+
+import cv2
+import numpy as np
+
+## Set verbosity level
+verbosity = logging.ERROR
+
+# [debugging] Verbosity settings
+level = { 10: "DEBUG", 20: "INFO", 30: "WARNING", 40: "ERROR" }
+logging.basicConfig(format='VSI Server: [%(levelname)s]\t%(message)s', level = verbosity)
+logging.info("Verbosity level is set to " + level[verbosity])
+
+# Default Server configuration
+default_address = ('127.0.0.1', 6000)
+default_authkey = 'vsi_video'
+
+# Supported file extensions
+video_file_extensions = ('wmv', 'avi', 'mp4')
+image_file_extensions = ('bmp', 'png', 'jpg')
+video_fourcc = {'wmv' : 'WMV1', 'avi' : 'MJPG', 'mp4' : 'mp4v'}
+
+# Mode Input/Output
+MODE_IO_Msk = 1<<0
+MODE_Input = 0<<0
+MODE_Output = 1<<0
+
+class VideoServer:
+ def __init__(self, address, authkey):
+ # Server commands
+ self.SET_FILENAME = 1
+ self.STREAM_CONFIGURE = 2
+ self.STREAM_ENABLE = 3
+ self.STREAM_DISABLE = 4
+ self.FRAME_READ = 5
+ self.FRAME_WRITE = 6
+ self.CLOSE_SERVER = 7
+ # Color space
+ self.GRAYSCALE8 = 1
+ self.RGB888 = 2
+ self.BGR565 = 3
+ self.YUV420 = 4
+ self.NV12 = 5
+ self.NV21 = 6
+ # Variables
+ self.listener = Listener(address, authkey=authkey.encode('utf-8'))
+ self.filename = ""
+ self.mode = None
+ self.active = False
+ self.video = True
+ self.stream = None
+ self.frame_ratio = 0
+ self.frame_drop = 0
+ self.frame_index = 0
+ self.eos = False
+ # Stream configuration
+ self.resolution = (None, None)
+ self.color_format = None
+ self.frame_rate = None
+
+ # Set filename
+ def _setFilename(self, base_dir, filename, mode):
+ filename_valid = False
+
+ if self.active:
+ return filename_valid
+
+ self.filename = ""
+ self.frame_index = 0
+
+ file_extension = str(filename).split('.')[-1].lower()
+
+ if file_extension in video_file_extensions:
+ self.video = True
+ else:
+ self.video = False
+
+ file_path = os.path.join(base_dir, filename)
+ logging.debug(f"File path: {file_path}")
+
+ if (mode & MODE_IO_Msk) == MODE_Input:
+ self.mode = MODE_Input
+ if os.path.isfile(file_path):
+ if file_extension in (video_file_extensions + image_file_extensions):
+ self.filename = file_path
+ filename_valid = True
+ else:
+ self.mode = MODE_Output
+ if file_extension in (video_file_extensions + image_file_extensions):
+ if os.path.isfile(file_path):
+ os.remove(file_path)
+ self.filename = file_path
+ filename_valid = True
+
+ return filename_valid
+
+ # Configure video stream
+ def _configureStream(self, frame_width, frame_height, color_format, frame_rate):
+ if (frame_width == 0 or frame_height == 0 or frame_rate == 0):
+ return False
+
+ self.resolution = (frame_width, frame_height)
+ self.color_format = color_format
+ self.frame_rate = frame_rate
+
+ return True
+
+ # Enable video stream
+ def _enableStream(self, mode):
+ if self.active:
+ return
+
+ self.eos = False
+ self.frame_ratio = 0
+ self.frame_drop = 0
+
+ if self.stream is not None:
+ self.stream.release()
+ self.stream = None
+
+ if self.filename == "":
+ self.video = True
+ if (mode & MODE_IO_Msk) == MODE_Input:
+ # Device mode: camera
+ self.mode = MODE_Input
+ else:
+ # Device mode: display
+ self.mode = MODE_Output
+
+ if self.video:
+ if self.mode == MODE_Input:
+ if self.filename == "":
+ self.stream = cv2.VideoCapture(0)
+ if not self.stream.isOpened():
+ logging.error("Failed to open Camera interface")
+ return
+ else:
+ self.stream = cv2.VideoCapture(self.filename)
+ self.stream.set(cv2.CAP_PROP_POS_FRAMES, self.frame_index)
+ video_fps = self.stream.get(cv2.CAP_PROP_FPS)
+ if video_fps > self.frame_rate:
+ self.frame_ratio = video_fps / self.frame_rate
+ logging.debug(f"Frame ratio: {self.frame_ratio}")
+ else:
+ if self.filename != "":
+ extension = str(self.filename).split('.')[-1].lower()
+ fourcc = cv2.VideoWriter_fourcc(*f'{video_fourcc[extension]}')
+
+ if os.path.isfile(self.filename) and (self.frame_index != 0):
+ tmp_filename = f'{self.filename.rstrip(f".{extension}")}_tmp.{extension}'
+ os.rename(self.filename, tmp_filename)
+ cap = cv2.VideoCapture(tmp_filename)
+ width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
+ height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
+ self.resolution = (width, height)
+ self.frame_rate = cap.get(cv2.CAP_PROP_FPS)
+ self.stream = cv2.VideoWriter(self.filename, fourcc, self.frame_rate, self.resolution)
+
+ while cap.isOpened():
+ ret, frame = cap.read()
+ if not ret:
+ cap.release()
+ os.remove(tmp_filename)
+ break
+ self.stream.write(frame)
+ del frame
+
+ else:
+ self.stream = cv2.VideoWriter(self.filename, fourcc, self.frame_rate, self.resolution)
+
+ self.active = True
+ logging.info("Stream enabled")
+
+ # Disable Video Server
+ def _disableStream(self):
+ self.active = False
+ if self.stream is not None:
+ if self.mode == MODE_Input:
+ self.frame_index = self.stream.get(cv2.CAP_PROP_POS_FRAMES)
+ self.stream.release()
+ self.stream = None
+ logging.info("Stream disabled")
+
+ # Resize frame to requested resolution in pixels
+ def __resizeFrame(self, frame, resolution):
+ frame_h = frame.shape[0]
+ frame_w = frame.shape[1]
+
+ # Calculate requested aspect ratio (width/height):
+ crop_aspect_ratio = resolution[0] / resolution[1]
+
+ if crop_aspect_ratio != (frame_w / frame_h):
+ # Crop into image with resize aspect ratio
+ crop_w = int(frame_h * crop_aspect_ratio)
+ crop_h = int(frame_w / crop_aspect_ratio)
+
+ if crop_w > frame_w:
+ # Crop top and bottom part of the image
+ top = (frame_h - crop_h) // 2
+ bottom = top + crop_h
+ frame = frame[top : bottom, 0 : frame_w]
+ elif crop_h > frame_h:
+ # Crop left and right side of the image``
+ left = (frame_w - crop_w) // 2
+ right = left + crop_w
+ frame = frame[0 : frame_h, left : right]
+ else:
+ # Crop to the center of the image
+ left = (frame_w - crop_w) // 2
+ right = left + crop_w
+ top = (frame_h - crop_h) // 2
+ bottom = top + crop_h
+ frame = frame[top : bottom, left : right]
+ logging.debug(f"Frame cropped from ({frame_w}, {frame_h}) to ({frame.shape[1]}, {frame.shape[0]})")
+
+ logging.debug(f"Resize frame from ({frame.shape[1]}, {frame.shape[0]}) to ({resolution[0]}, {resolution[1]})")
+ try:
+ frame = cv2.resize(frame, resolution)
+ except Exception as e:
+ logging.error(f"Error in resizeFrame(): {e}")
+
+ return frame
+
+ # Change color space of a frame from BGR to selected profile
+ def __changeColorSpace(self, frame, color_space):
+ color_format = None
+
+ # Default OpenCV color profile: BGR
+ if self.mode == MODE_Input:
+ if color_space == self.GRAYSCALE8:
+ color_format = cv2.COLOR_BGR2GRAY
+ elif color_space == self.RGB888:
+ color_format = cv2.COLOR_BGR2RGB
+ elif color_space == self.BGR565:
+ color_format = cv2.COLOR_BGR2BGR565
+ elif color_space == self.YUV420:
+ color_format = cv2.COLOR_BGR2YUV_I420
+ elif color_space == self.NV12:
+ frame = self.__changeColorSpace(frame, self.YUV420)
+ color_format = cv2.COLOR_YUV2RGB_NV12
+ elif color_space == self.NV21:
+ frame = self.__changeColorSpace(frame, self.YUV420)
+ color_format = cv2.COLOR_YUV2RGB_NV21
+
+ else:
+ if color_space == self.GRAYSCALE8:
+ color_format = cv2.COLOR_GRAY2BGR
+ elif color_space == self.RGB888:
+ color_format = cv2.COLOR_RGB2BGR
+ elif color_space == self.BGR565:
+ color_format = cv2.COLOR_BGR5652BGR
+ elif color_space == self.YUV420:
+ color_format = cv2.COLOR_YUV2BGR_I420
+ elif color_space == self.NV12:
+ color_format = cv2.COLOR_YUV2BGR_I420
+ elif color_space == self.NV21:
+ color_format = cv2.COLOR_YUV2BGR_I420
+
+ if color_format != None:
+ logging.debug(f"Change color space to {color_format}")
+ try:
+ frame = cv2.cvtColor(frame, color_format)
+ except Exception as e:
+ logging.error(f"Error in changeColorSpace(): {e}")
+
+ return frame
+
+ # Read frame from source
+ def _readFrame(self):
+ frame = bytearray()
+
+ if not self.active:
+ return frame
+
+ if self.eos:
+ return frame
+
+ if self.video:
+ if self.frame_ratio > 1:
+ _, tmp_frame = self.stream.read()
+ self.frame_drop += (self.frame_ratio - 1)
+ if self.frame_drop > 1:
+ logging.debug(f"Frames to drop: {self.frame_drop}")
+ drop = int(self.frame_drop // 1)
+ for i in range(drop):
+ _, _ = self.stream.read()
+ logging.debug(f"Frames dropped: {drop}")
+ self.frame_drop -= drop
+ logging.debug(f"Frames left to drop: {self.frame_drop}")
+ else:
+ _, tmp_frame = self.stream.read()
+ if tmp_frame is None:
+ self.eos = True
+ logging.debug("End of stream.")
+ else:
+ tmp_frame = cv2.imread(self.filename)
+ self.eos = True
+ logging.debug("End of stream.")
+
+ if tmp_frame is not None:
+ tmp_frame = self.__resizeFrame(tmp_frame, self.resolution)
+ tmp_frame = self.__changeColorSpace(tmp_frame, self.color_format)
+ frame = bytearray(tmp_frame.tobytes())
+
+ return frame
+
+ # Write frame to destination
+ def _writeFrame(self, frame):
+ if not self.active:
+ return
+
+ try:
+ decoded_frame = np.frombuffer(frame, dtype=np.uint8)
+ decoded_frame = decoded_frame.reshape((self.resolution[0], self.resolution[1], 3))
+ bgr_frame = self.__changeColorSpace(decoded_frame, self.RGB888)
+
+ if self.filename == "":
+ cv2.imshow(self.filename, bgr_frame)
+ cv2.waitKey(10)
+ else:
+ if self.video:
+ self.stream.write(np.uint8(bgr_frame))
+ self.frame_index += 1
+ else:
+ cv2.imwrite(self.filename, bgr_frame)
+ except Exception:
+ pass
+
+ # Run Video Server
+ def run(self):
+ logging.info("Video server started")
+
+ try:
+ conn = self.listener.accept()
+ logging.info(f'Connection accepted {self.listener.address}')
+ except Exception:
+ logging.error("Connection not accepted")
+ return
+
+ while True:
+ try:
+ recv = conn.recv()
+ except EOFError:
+ return
+
+ cmd = recv[0] # Command
+ payload = recv[1:] # Payload
+
+ if cmd == self.SET_FILENAME:
+ logging.info("Set filename called")
+ filename_valid = self._setFilename(payload[0], payload[1], payload[2])
+ conn.send(filename_valid)
+
+ elif cmd == self.STREAM_CONFIGURE:
+ logging.info("Stream configure called")
+ configuration_valid = self._configureStream(payload[0], payload[1], payload[2], payload[3])
+ conn.send(configuration_valid)
+
+ elif cmd == self.STREAM_ENABLE:
+ logging.info("Enable stream called")
+ self._enableStream(payload[0])
+ conn.send(self.active)
+
+ elif cmd == self.STREAM_DISABLE:
+ logging.info("Disable stream called")
+ self._disableStream()
+ conn.send(self.active)
+
+ elif cmd == self.FRAME_READ:
+ logging.info("Read frame called")
+ frame = self._readFrame()
+ conn.send_bytes(frame)
+ conn.send(self.eos)
+
+ elif cmd == self.FRAME_WRITE:
+ logging.info("Write frame called")
+ frame = conn.recv_bytes()
+ self._writeFrame(frame)
+
+ elif cmd == self.CLOSE_SERVER:
+ logging.info("Close server connection")
+ self.stop()
+
+ # Stop Video Server
+ def stop(self):
+ self._disableStream()
+ if (self.mode == MODE_Output) and (self.filename == ""):
+ try:
+ cv2.destroyAllWindows()
+ except Exception:
+ pass
+ self.listener.close()
+ logging.info("Video server stopped")
+
+
+# Validate IP address
+def ip(ip):
+ try:
+ _ = ipaddress.ip_address(ip)
+ return ip
+ except:
+ raise argparse.ArgumentTypeError(f"Invalid IP address: {ip}!")
+
+def parse_arguments():
+ formatter = lambda prog: argparse.HelpFormatter(prog, max_help_position=41)
+ parser = argparse.ArgumentParser(formatter_class=formatter, description="VSI Video Server")
+
+ parser_optional = parser.add_argument_group("optional")
+ parser_optional.add_argument("--ip", dest="ip", metavar="<IP>",
+ help=f"Server IP address (default: {default_address[0]})",
+ type=ip, default=default_address[0])
+ parser_optional.add_argument("--port", dest="port", metavar="<TCP Port>",
+ help=f"TCP port (default: {default_address[1]})",
+ type=int, default=default_address[1])
+ parser_optional.add_argument("--authkey", dest="authkey", metavar="<Auth Key>",
+ help=f"Authorization key (default: {default_authkey})",
+ type=str, default=default_authkey)
+
+ return parser.parse_args()
+
+if __name__ == '__main__':
+ args = parse_arguments()
+ Server = VideoServer((args.ip, args.port), args.authkey)
+ try:
+ Server.run()
+ except KeyboardInterrupt:
+ Server.stop()