aboutsummaryrefslogtreecommitdiff
path: root/scripts/ethosumonitor/inputs.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/ethosumonitor/inputs.py')
-rw-r--r--scripts/ethosumonitor/inputs.py173
1 files changed, 173 insertions, 0 deletions
diff --git a/scripts/ethosumonitor/inputs.py b/scripts/ethosumonitor/inputs.py
new file mode 100644
index 0000000..60fb8ed
--- /dev/null
+++ b/scripts/ethosumonitor/inputs.py
@@ -0,0 +1,173 @@
+#
+# SPDX-FileCopyrightText: Copyright 2022 Arm Limited and/or its affiliates <open-source-office@arm.com>
+#
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the License); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from .elf import *
+from .types import *
+import json
+import mmap
+import os
+from sys import stderr
+
+class InputInterface:
+ def readEventRecord(self) -> EventRecord_t:
+ ...
+
+class InputFile(InputInterface):
+ def __init__(self, fname):
+ self.file = open(fname, 'rb')
+
+ def readEventRecord(self):
+ data = self.file.read(EventRecord_t.SIZE)
+ if len(data) == 0:
+ raise EOFError
+
+ yield data
+
+class InputRingBuffer(InputInterface):
+ def __init__(self, elfFile):
+ from elftools.elf.elffile import ELFFile
+
+ with open(elfFile, 'rb') as f:
+ elf = ELFFile(f)
+ symbol = elfFindSymbol(elf, 'EventRecorderInfo')
+ elfInfo = EventRecorderInfo_t(elfGetData(elf, symbol.entry.st_value, symbol.entry.st_size))
+ memInfo = EventRecorderInfo_t(self.read(symbol.entry.st_value, symbol.entry.st_size))
+
+ # Validate EventRecorder info
+ if elfInfo.protocolType != memInfo.protocolType or \
+ elfInfo.protocolVersion != memInfo.protocolVersion or \
+ elfInfo.eventBuffer != memInfo.eventBuffer or \
+ elfInfo.eventStatus != memInfo.eventStatus:
+ raise Exception(f'EventRecorder info mismatch. elf={elfInfo}, mem={memInfo}')
+
+ self.info = elfInfo
+ status = EventStatus_t(self.read(self.info.eventStatus, EventStatus_t.SIZE))
+ self.timestamp = status.tsLast
+ self.recordIndex = status.recordIndex
+ self.overflow = 0
+
+ def readEventRecord(self):
+ # Read status and use timestamp to detect if there are new samples
+ status = EventStatus_t(self.read(self.info.eventStatus, EventStatus_t.SIZE))
+ if self.timestamp == status.tsLast:
+ return None
+
+ self.timestamp = status.tsLast
+
+ # Detect firmware reset
+ if self.recordIndex > status.recordIndex:
+ self.recordIndex = 0
+
+ # Detect of recordIndex has overflowed the ring buffer
+ if status.recordIndex - self.recordIndex > self.info.recordCount:
+ stderr.write('Warning: Ring buffer overflow\n')
+ self.overflow = self.overflow + 1
+ self.recordIndex = status.recordIndex
+
+ # Generate data for each event record
+ for i in range(self.recordIndex, status.recordIndex):
+ i = i % self.info.recordCount
+ yield self.read(self.info.eventBuffer + EventRecord_t.SIZE * i, EventRecord_t.SIZE)
+
+ self.recordIndex = status.recordIndex
+
+ def read(self, address, size) -> bytearray:
+ ...
+
+class InputDAPLink(InputRingBuffer):
+ def __init__(self, elfFile):
+ self._open()
+ super().__init__(elfFile)
+ self.target.reset()
+
+ def _open(self):
+ from pyocd.core.helpers import ConnectHelper
+
+ self.session = ConnectHelper.session_with_chosen_probe()
+ self.board = self.session.board
+ self.target = self.board.target
+
+ self.session.open()
+
+ def read(self, address, size):
+ from pyocd.core.exceptions import Error
+
+ for i in range(1000):
+ try:
+ return bytearray(self.target.read_memory_block8(address, size))
+ except Error:
+ pass
+
+class InputMem(InputRingBuffer):
+ def __init__(self, elfFile, jsonFile):
+ with open(jsonFile, 'r') as f:
+ jsonDoc = json.loads(f.read())
+
+ self.memoryMap = []
+ for memoryMap in jsonDoc['memoryMap']:
+ host = int(memoryMap['host'], 16)
+ device = int(memoryMap['device'], 16)
+ size = int(memoryMap['size'], 16)
+ self.memoryMap.append(DevMemDevice(host, device, size))
+
+ super().__init__(elfFile)
+
+ def read(self, device, size):
+ for memoryMap in self.memoryMap:
+ data = memoryMap.read(device, size)
+ if data:
+ return data
+
+ stderr.write(f'Warning: No mapping found for device address {hex(device)} size {size}.\n')
+ return None
+
+class DevMem:
+ def __init__(self, address, size):
+ self.base_address = address & ~(mmap.PAGESIZE - 1)
+ self.offset = address - self.base_address
+ self.size = size + self.offset
+
+ self.fd = os.open('/dev/mem', os.O_RDWR | os.O_SYNC)
+ self.mem = mmap.mmap(self.fd, self.size, mmap.MAP_SHARED, mmap.PROT_READ,
+ offset=self.base_address)
+
+ def __del__(self):
+ os.close(self.fd)
+
+ def read(self, offset, size):
+ self.mem.seek(self.offset + offset)
+
+ data = bytearray(size)
+ for i in range(size):
+ data[i] = self.mem.read_byte()
+
+ return data
+
+class DevMemDevice(DevMem):
+ def __init__(self, host, device, size):
+ super().__init__(host, size)
+
+ self.device = device
+ self.size = size
+
+ def read(self, device, size):
+ offset = device - self.device
+ if offset < 0 or (offset + size) > self.size:
+ return None
+
+ return super().read(offset, size)