cpp data recorder

This commit is contained in:
2025-05-23 06:18:23 -05:00
parent fcb291590b
commit 7f2dd0103e
12 changed files with 788 additions and 203 deletions

View File

@@ -1,6 +1,7 @@
import socket
import numpy as np
import ctypes
from ctypes import cdll
import data_structures
import threading
import queue
@@ -13,123 +14,32 @@ class DataRecorder:
port=1234,
packet_size=4096):
self.lib = cdll.LoadLibrary('../cpp/data_recorder.so')
# # TESTTTT
# self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
# self.s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024)
# print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
# self.s.settimeout(1)
# self.s.bind(("", 1234))
# data = np.arange(16, dtype=np.uint32)
# data = data.tobytes()
# self.s.sendto(data, (host, 1234))
# self.s.close()
# # TESTTTTT
self.lib.DataRecorder_new.argtypes = [ctypes.c_int32]
self.lib.DataRecorder_new.restype = ctypes.c_int64
self.lib.DataRecorder_get_recording_rate.argtypes = [ctypes.c_int64]
self.lib.DataRecorder_get_recording_rate.restype = ctypes.c_float
# self.lib.DataRecorder_get_current_recording_size.argtypes = [ctypes.c_int64]
# self.lib.DataRecorder_get_current_recording_size.restype = ctypes.c_float
# UDP Socket for High Speed Data
self.ip = host
self.port = port
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024)
print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
self.s.settimeout(1)
self.s.bind(("", port))
# Need to send one udp message to set IP and port info inside FPGA
data = np.arange(16, dtype=np.uint32)
data = data.tobytes()
self.s.sendto(data, (self.ip, self.port))
self.lib.DataRecorder_start_recording.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.c_int64, ctypes.c_int32]
self.lib.DataRecorder_start_recording.restype = ctypes.c_int32
self.lib.DataRecorder_stop_recording.argtypes = [ctypes.c_int64]
self.lib.DataRecorder_stop_recording.restype = ctypes.c_int32
self.max_packet_size = packet_size
# Data Buffer
# self.buffer = bytearray(512 * 1024 * 1024)
self.buffer = mmap.mmap(-1, 512 * 1024 * 1024)
self.buffer_view = memoryview(self.buffer)
self.stop_event = threading.Event()
self.fid = None
self.write_to_disk = False
self.write_offset = 0
self.write_count = 0
self.write_queue = queue.SimpleQueue()
self.obj = self.lib.DataRecorder_new(port)
def start_recording(self, filename, write_to_disk=False):
filename = filename.encode('utf-8')
filename = ctypes.c_char_p(filename)
self.lib.DataRecorder_start_recording(self.obj, filename, -1, 1)
self.write_to_disk = write_to_disk
if write_to_disk:
self.write_offset = 0
self.write_count = 0
self.fid = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC | os.O_DIRECT )
self.write_queue = queue.SimpleQueue()
self.write_data_thread = threading.Thread(target=self.write_data)
self.write_data_thread.start()
self.get_data_thread = threading.Thread(target=self.get_data)
self.get_data_thread.start()
def stop_recording(self):
print('Stop Thread')
self.stop_event.set()
self.get_data_thread.join()
print('Get Data Thread Joined')
if self.write_to_disk:
self.write_data_thread.join()
print('Write Data Thread Joined')
def write_data(self):
write_chunk_size = 4 * 1024 * 1024
buffer_view = memoryview(self.buffer)
print('Waiting For Data to Write')
while not self.stop_event.is_set():
try:
num_bytes = self.write_queue.get(timeout=1)
self.write_count += num_bytes
if self.write_count > write_chunk_size:
# print(self.write_offset)
# os.write(self.fid, self.buffer[self.write_offset:self.write_offset + write_chunk_size])
os.write(self.fid, buffer_view[self.write_offset:self.write_offset + write_chunk_size])
self.write_offset += write_chunk_size
self.write_count -= write_chunk_size
self.write_offset = self.write_offset % len(self.buffer)
except queue.Empty:
print('DR Queue Empty!', self.ip)
def get_data(self):
offset = 0
print('Waiting For Data From Socket')
while not self.stop_event.is_set():
try:
n = self.s.recv_into(self.buffer_view[offset:offset + self.max_packet_size])
if self.write_to_disk:
# print(n)
self.write_queue.put(n)
offset += n
if offset > len(self.buffer):
if self.port == 1234:
print('hmmm', n, offset, len(self.buffer))
offset = offset % len(self.buffer)
# print(offset)
except socket.timeout:
continue
self.lib.DataRecorder_stop_recording(self.obj)