136 lines
4.3 KiB
Python
Executable File
136 lines
4.3 KiB
Python
Executable File
import socket
|
|
import numpy as np
|
|
import ctypes
|
|
import data_structures
|
|
import threading
|
|
import queue
|
|
import os
|
|
import mmap
|
|
|
|
class DataRecorder:
|
|
def __init__(self,
|
|
host="192.168.2.128",
|
|
port=1234,
|
|
packet_size=4096):
|
|
|
|
|
|
# # TESTTTT
|
|
# self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
# print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
|
|
# self.s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024)
|
|
# print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
|
|
# self.s.settimeout(1)
|
|
# self.s.bind(("", 1234))
|
|
# data = np.arange(16, dtype=np.uint32)
|
|
# data = data.tobytes()
|
|
# self.s.sendto(data, (host, 1234))
|
|
# self.s.close()
|
|
# # TESTTTTT
|
|
|
|
|
|
|
|
# UDP Socket for High Speed Data
|
|
self.ip = host
|
|
self.port = port
|
|
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
|
|
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024)
|
|
print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF))
|
|
self.s.settimeout(1)
|
|
self.s.bind(("", port))
|
|
# Need to send one udp message to set IP and port info inside FPGA
|
|
data = np.arange(16, dtype=np.uint32)
|
|
data = data.tobytes()
|
|
self.s.sendto(data, (self.ip, self.port))
|
|
|
|
|
|
self.max_packet_size = packet_size
|
|
|
|
# Data Buffer
|
|
# self.buffer = bytearray(512 * 1024 * 1024)
|
|
self.buffer = mmap.mmap(-1, 512 * 1024 * 1024)
|
|
self.buffer_view = memoryview(self.buffer)
|
|
|
|
self.stop_event = threading.Event()
|
|
|
|
self.fid = None
|
|
self.write_to_disk = False
|
|
self.write_offset = 0
|
|
self.write_count = 0
|
|
self.write_queue = queue.SimpleQueue()
|
|
|
|
def start_recording(self, filename, write_to_disk=False):
|
|
|
|
self.write_to_disk = write_to_disk
|
|
|
|
if write_to_disk:
|
|
self.write_offset = 0
|
|
self.write_count = 0
|
|
self.fid = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC | os.O_DIRECT )
|
|
|
|
self.write_queue = queue.SimpleQueue()
|
|
self.write_data_thread = threading.Thread(target=self.write_data)
|
|
self.write_data_thread.start()
|
|
|
|
self.get_data_thread = threading.Thread(target=self.get_data)
|
|
|
|
self.get_data_thread.start()
|
|
|
|
def stop_recording(self):
|
|
print('Stop Thread')
|
|
self.stop_event.set()
|
|
self.get_data_thread.join()
|
|
print('Get Data Thread Joined')
|
|
if self.write_to_disk:
|
|
self.write_data_thread.join()
|
|
print('Write Data Thread Joined')
|
|
|
|
def write_data(self):
|
|
|
|
write_chunk_size = 4 * 1024 * 1024
|
|
buffer_view = memoryview(self.buffer)
|
|
|
|
print('Waiting For Data to Write')
|
|
while not self.stop_event.is_set():
|
|
|
|
try:
|
|
num_bytes = self.write_queue.get(timeout=1)
|
|
|
|
self.write_count += num_bytes
|
|
|
|
if self.write_count > write_chunk_size:
|
|
# print(self.write_offset)
|
|
# os.write(self.fid, self.buffer[self.write_offset:self.write_offset + write_chunk_size])
|
|
os.write(self.fid, buffer_view[self.write_offset:self.write_offset + write_chunk_size])
|
|
self.write_offset += write_chunk_size
|
|
self.write_count -= write_chunk_size
|
|
self.write_offset = self.write_offset % len(self.buffer)
|
|
except queue.Empty:
|
|
print('DR Queue Empty!', self.ip)
|
|
|
|
|
|
def get_data(self):
|
|
offset = 0
|
|
|
|
print('Waiting For Data From Socket')
|
|
while not self.stop_event.is_set():
|
|
|
|
try:
|
|
n = self.s.recv_into(self.buffer_view[offset:offset + self.max_packet_size])
|
|
|
|
if self.write_to_disk:
|
|
# print(n)
|
|
self.write_queue.put(n)
|
|
|
|
offset += n
|
|
if offset > len(self.buffer):
|
|
if self.port == 1234:
|
|
print('hmmm', n, offset, len(self.buffer))
|
|
offset = offset % len(self.buffer)
|
|
# print(offset)
|
|
|
|
except socket.timeout:
|
|
continue
|
|
|
|
|