Files
castelion_radar_alinx_kintex/python/data_recorder.py

132 lines
4.1 KiB
Python
Executable File

import socket
import numpy as np
import ctypes
import data_structures
import threading
import queue
import os
import mmap
class DataRecorder:
def __init__(self,
host="192.168.2.128",
port=1234,
packet_size=4096):
# # TESTTTT
# self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
# self.s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024)
# print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
# self.s.settimeout(1)
# self.s.bind(("", 1234))
# data = np.arange(16, dtype=np.uint32)
# data = data.tobytes()
# self.s.sendto(data, (host, 1234))
# self.s.close()
# # TESTTTTT
# UDP Socket for High Speed Data
self.ip = host
self.port = port
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024)
print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
self.s.settimeout(1)
self.s.bind(("", port))
# Need to send one udp message to set IP and port info inside FPGA
data = np.arange(16, dtype=np.uint32)
data = data.tobytes()
self.s.sendto(data, (self.ip, self.port))
self.max_packet_size = packet_size
# Data Buffer
# self.buffer = bytearray(512 * 1024 * 1024)
self.buffer = mmap.mmap(-1, 512 * 1024 * 1024)
self.buffer_view = memoryview(self.buffer)
self.stop_event = threading.Event()
self.fid = None
self.write_to_disk = False
self.write_offset = 0
self.write_count = 0
self.write_queue = queue.SimpleQueue()
def start_recording(self, filename, write_to_disk=False):
self.write_to_disk = write_to_disk
if write_to_disk:
self.write_offset = 0
self.write_count = 0
self.fid = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC | os.O_DIRECT )
self.write_queue = queue.SimpleQueue()
self.write_data_thread = threading.Thread(target=self.write_data)
self.write_data_thread.start()
self.get_data_thread = threading.Thread(target=self.get_data)
self.get_data_thread.start()
def stop_recording(self):
print('Stop Thread')
self.stop_event.set()
self.get_data_thread.join()
print('Get Data Thread Joined')
if self.write_to_disk:
self.write_data_thread.join()
print('Write Data Thread Joined')
def write_data(self):
write_chunk_size = 4 * 1024 * 1024
buffer_view = memoryview(self.buffer)
print('Waiting For Data to Write')
while not self.stop_event.is_set():
try:
num_bytes = self.write_queue.get(timeout=1)
self.write_count += num_bytes
if self.write_count > write_chunk_size:
# print(self.write_offset)
# os.write(self.fid, self.buffer[self.write_offset:self.write_offset + write_chunk_size])
os.write(self.fid, buffer_view[self.write_offset:self.write_offset + write_chunk_size])
self.write_offset += write_chunk_size
self.write_count -= write_chunk_size
self.write_offset = self.write_offset % len(self.buffer)
except queue.Empty:
print('DR Queue Empty!', self.ip)
def get_data(self):
offset = 0
print('Waiting For Data From Socket')
while not self.stop_event.is_set():
try:
n = self.s.recv_into(self.buffer_view[offset:offset + self.max_packet_size])
if self.write_to_disk:
self.write_queue.put(n)
offset += n
offset = offset % len(self.buffer)
# print(offset)
except socket.timeout:
continue