diff --git a/cpp/.vscode/settings.json b/cpp/.vscode/settings.json new file mode 100644 index 0000000..6e89031 --- /dev/null +++ b/cpp/.vscode/settings.json @@ -0,0 +1,57 @@ +{ + "files.associations": { + "array": "cpp", + "atomic": "cpp", + "bit": "cpp", + "cctype": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "codecvt": "cpp", + "compare": "cpp", + "complex": "cpp", + "concepts": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "deque": "cpp", + "string": "cpp", + "unordered_map": "cpp", + "vector": "cpp", + "exception": "cpp", + "algorithm": "cpp", + "functional": "cpp", + "iterator": "cpp", + "memory": "cpp", + "memory_resource": "cpp", + "numeric": "cpp", + "optional": "cpp", + "random": "cpp", + "ratio": "cpp", + "string_view": "cpp", + "system_error": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "utility": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "limits": "cpp", + "new": "cpp", + "numbers": "cpp", + "ostream": "cpp", + "semaphore": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "stop_token": "cpp", + "streambuf": "cpp", + "thread": "cpp", + "typeinfo": "cpp" + } +} \ No newline at end of file diff --git a/cpp/Makefile b/cpp/Makefile new file mode 100755 index 0000000..ac6bf41 --- /dev/null +++ b/cpp/Makefile @@ -0,0 +1,25 @@ +CC = g++ +FLAGS = -std=c++17 -pthread +# CFLAGS = +CFLAGS = -fPIC +LDFLAGS = -shared +# LDFLAGS = -L/usr/lib/aarch64-linux-gnu +DEBUGFLAGS = -O0 +RELEASEFLAGS = -O2 + +all: test_data_recorder + +PROGS=test_data_recorder library +all: $(PROGS) + +library: data_recorder.o + $(CC) $(LDFLAGS) $(CFLAGS) -o data_recorder.so $^ + +test_data_recorder: data_recorder.o test_data_recorder.o + $(CC) -pthread -o $@ $^ + +clean: + rm -f $(PROGS) *.o *.a *.d + +%.o: %.cpp + $(CC) -c $(FLAGS) $(CFLAGS) -o $@ $< diff --git a/cpp/data_recorder.cpp b/cpp/data_recorder.cpp new file mode 100755 index 0000000..322181c --- /dev/null +++ b/cpp/data_recorder.cpp @@ -0,0 +1,346 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "data_recorder.h" + +double timespec_to_double(struct timespec t) +{ + return t.tv_sec + t.tv_nsec * 1e-9; +} + +double timespec_sub(struct timespec t1, struct timespec t2) +{ + // Get seconds part + t1.tv_sec -= t2.tv_sec; + + // Nanoseconds, need to check for negative condition + t1.tv_nsec -= t2.tv_nsec; + if (t1.tv_nsec >= 1000000000) { + t1.tv_sec++; + t1.tv_nsec -= 1000000000; + } else if (t1.tv_nsec < 0) { + t1.tv_sec--; + t1.tv_nsec += 1000000000; + } + + return timespec_to_double(t1); +} + +DataRecorder::DataRecorder(int port_in) { + printf("Data Recorder\n"); + + port = port_in; + + recording_active = false; + allocate_memory(); + + return; +} + +void DataRecorder::allocate_memory() { + // Memory to buffer data into + data_buffer = (char *)aligned_alloc(4096, OVERALL_BUFFER_SIZE); + memset(data_buffer, 0, OVERALL_BUFFER_SIZE); + sem_init(&buffer_ready_sem, 0, 0); + + recording_rate = 0; + + total_bytes = 0; +} + +DataRecorder::~DataRecorder() { + printf("~Data Recorder\n"); + free(data_buffer); + + return; +} + +int DataRecorder::start_recording(const char* filename, long int max_bytes, int save_to_disk) { + + recording_active = true; + + int ret; + + while (true) { + if (ret = sem_trywait(&buffer_ready_sem) == -1) { + printf("Semaphore Cleared\n"); + break; + } + printf("Semaphore Was Still Available!!!!\n"); + } + + // Make sure old thread is done + if (recorder.joinable()) { + printf("Thread was still joinable!!!!\n"); + DataRecorder::stop_recording(); + } + + exit_thread = false; + + // Open Output file + char* file; + asprintf(&file, "%s", filename); + printf("Opening File: %s", file); + out_fd = open(file, O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0666); + if(out_fd < 0) + { + printf("- FAILED!!!\n"); + return -1; + } + printf(" - SUCCESS FD: %d\n", out_fd); + + // Check how much space is left on disk + struct statvfs fiData; + char *dirc; + char *dname; + dirc = strdup(filename); + dname = dirname(dirc); + printf("Dir Name: %s, %s\n", filename, dname); + if(statvfs(dname, &fiData) < 0) { + printf("Failed to check disk space\n"); + return -1; + } + + long int disk_bytes_free = fiData.f_bsize * fiData.f_bavail; + printf("Disk Space Left: %f GB\n", (float)disk_bytes_free / 1e9); + + // Limit max file size to lesser of specified max bytes and the remaining disk space. + // If max_bytes is passed in as -1, then the only limit is the disk space + printf("Requested Bytes: %ld \n", max_bytes); + if (max_bytes < 0) + { + max_bytes = disk_bytes_free; + } + else + { + max_bytes = std::min(max_bytes, disk_bytes_free); + } + + printf("Max File Size: %f GB\n", (float)max_bytes / 1e9); + + printf("Start Recording, Max Bytes %ld\n", max_bytes); + recorder = std::thread(&DataRecorder::get_data, this, save_to_disk); + + sleep(1); + + printf("Recording Started\n"); + + return 1; +} + +int DataRecorder::stop_recording() { + exit_thread = true; + // Wait for recorder thread to finish + if (recorder.joinable()) { + recorder.join(); + } + printf("Thread Joined!\n"); + recording_active = false; + return 1; +} + +float DataRecorder::get_recording_rate() { + return recording_rate; +} + +uint64_t DataRecorder::get_filesize() { + return total_bytes; +} + +void DataRecorder::write_data() { + printf("Opening Write Data Thread\n"); + + struct timespec ts; + int timeout = 1; + int ret; + int buffer_ind = 0; + + while (!exit_thread.load()) { + + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += timeout; + + if (ret = sem_timedwait(&buffer_ready_sem, &ts) == -1) { + // Error + if (errno == ETIMEDOUT) { + // printf("Writer sem timeout\n"); + } + else { + printf("sem_wait error %d\n", errno); + } + continue; + } + + // A chunk is ready write it out + int cnt = write(out_fd, &(data_buffer[buffer_ind]), WRITE_CHUNK_SIZE); + if (cnt < 0) + { + printf("File write error!\n"); + } + + buffer_ind += WRITE_CHUNK_SIZE; + buffer_ind = buffer_ind % OVERALL_BUFFER_SIZE; + } + + int sem_value; + sem_getvalue(&buffer_ready_sem, &sem_value); + + printf("Exiting Write Data Thread %d\n", sem_value); +} + +void DataRecorder::get_data(int save_to_disk) { + + // Start wirte to disk thread + if (save_to_disk) { + writer = std::thread(&DataRecorder::write_data, this); + } + + struct timespec ts_now; + struct timespec ts_last_print; + struct timespec ts_begin; + + total_bytes = 0; + long int bytes_since_last_update = 0; + + // For timing info + clock_gettime(CLOCK_MONOTONIC, &ts_begin); + ts_last_print = ts_begin; + double last_print_time = 0; + double last_irq_elapsed = 0; + double print_period = 1; + + // Open Socket Connection + int sock; + if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) + { + printf("Socket creation error\n"); + } + + struct timeval tv; + tv.tv_sec = 2; + tv.tv_usec = 0; + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv); + + struct sockaddr_in serv_addr; + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(port); + + if (inet_pton(AF_INET, "0.0.0.0", &serv_addr.sin_addr) <= 0) { + printf("Invalid IP Address\n"); + } + + if (bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) { + printf("Bind Failed\n"); + } + + // Set the receive buffer size + int recv_buf_size = 4 * 1024 * 1024; + if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &recv_buf_size, sizeof(recv_buf_size)) < 0) { + printf("setsockopt(SO_RCVBUF)"); + } + + // Optional: Verify the buffer size was set + int actual_size; + socklen_t optlen = sizeof(actual_size); + if (getsockopt(sock, SOL_SOCKET, SO_RCVBUF, &actual_size, &optlen) < 0) { + printf("getsockopt(SO_RCVBUF)"); + } else { + printf("Receive buffer size set to: %d bytes\n", actual_size); + } + + printf("Waiting for data\n"); + uint8_t read_buf[BUFFER_SIZE]; + uint32_t buffer_ind = 0; + uint32_t write_cnt = 0; + + while (!exit_thread.load()) { + // socklen_t len; + // struct sockaddr_in servaddr; + // int read_count = recvfrom(sock, read_buf, BUFFER_SIZE, 0, (struct sockaddr *) &servaddr, &len); + int read_count = recv(sock, read_buf, BUFFER_SIZE, 0); + // Validate sender IP address here??? + + bytes_since_last_update += read_count; + clock_gettime(CLOCK_MONOTONIC, &ts_now); + + if (read_count > 0) { + + // if (read_count != 4096) + // printf("recv %d\n", read_count); + // Handle case were data wraps the buffer + if ((buffer_ind + read_count) > OVERALL_BUFFER_SIZE) { + int tail_bytes = OVERALL_BUFFER_SIZE - buffer_ind; + read_count -= tail_bytes; + memcpy(data_buffer + buffer_ind, read_buf, tail_bytes); + buffer_ind = 0; + memcpy(data_buffer + buffer_ind, read_buf + tail_bytes, read_count); + buffer_ind += read_count; + } else { + memcpy(data_buffer + buffer_ind, read_buf, read_count); + buffer_ind += read_count; + buffer_ind = buffer_ind % OVERALL_BUFFER_SIZE; + } + + if (save_to_disk) { + write_cnt += read_count; + if (write_cnt >= WRITE_CHUNK_SIZE) { + write_cnt -= WRITE_CHUNK_SIZE; + if (sem_post(&buffer_ready_sem) == -1) { + printf("sem_post error\n"); + } + } + } + } + + if (timespec_sub(ts_now, ts_last_print) > 1) { + double elapsed = timespec_sub(ts_now, ts_begin); + double rate = (double)total_bytes / elapsed; + elapsed = timespec_sub(ts_now, ts_last_print); + double rate_last = (double)bytes_since_last_update / elapsed; + clock_gettime(CLOCK_MONOTONIC, &ts_last_print); + bytes_since_last_update = 0; + printf("Data Rate (MB/s) %0.2f, Data Rate last update (MB/s) %0.2f, Total Recorded (MB) %0.2f\n", rate/1e6, rate_last/1e6, total_bytes/1e6); + + recording_rate = rate; + } + + } + // Make sure write thread is done before closing file handle + if (writer.joinable()) { + writer.join(); + } + + close(sock); + close(out_fd); + + recording_rate = 0; + + printf("get_data exiting\n"); + +} + +// C Externs for Python C-Types +extern "C" { + DataRecorder* DataRecorder_new(int port){ return new DataRecorder(port); } + + int DataRecorder_start_recording(DataRecorder* recorder, char* filename, long int max_bytes, int save_to_disk){ + return recorder->start_recording(filename, max_bytes, save_to_disk); } + + int DataRecorder_stop_recording(DataRecorder* recorder){ + return recorder->stop_recording(); } + + float DataRecorder_get_recording_rate(DataRecorder* recorder){ + return recorder->get_recording_rate(); } +} \ No newline at end of file diff --git a/cpp/data_recorder.h b/cpp/data_recorder.h new file mode 100755 index 0000000..fb4ec73 --- /dev/null +++ b/cpp/data_recorder.h @@ -0,0 +1,99 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#define NUM_TEMPERATURES 11 + +#define WSRDMA_DMA_INIT 0 +#define WSRDMA_DMA_CLEAR 1 +#define WSRDMA_DMA_START 2 +#define WSRDMA_DMA_STOP 3 +#define WSRDMA_SET_NUM_BUFS 4 +#define WSRDMA_SET_NUM_BYTES 5 +#define WSRDMA_GET_NUM_BUFS 6 +#define WSRDMA_GET_NUM_BYTES 7 +#define WSRDMA_GET_FREE_BUFS 8 + +typedef struct { + unsigned int cmd; + unsigned int offset; + unsigned int value; +} wsrpcie_ioctl_t; + +// #define BUFFER_SIZE (4 * 1024 * 1024) +#define BUFFER_SIZE 8192 +#define NUM_BUFFERS 32768 +#define OVERALL_BUFFER_SIZE (BUFFER_SIZE * NUM_BUFFERS) +#define MAX_SAMPLES_PER_PULSE 32768 +#define WRITE_CHUNK_SIZE (16 * 1024 * 1024) + +struct Hdr { + uint32_t sync0; + uint32_t sync1; + uint16_t commanded_beam_az; + uint16_t commanded_beam_el; + uint32_t type; + + uint16_t crp_index; + uint16_t event_cntr; + uint16_t num_samples; + uint8_t out_sel; + uint8_t beam_pointing_mode; + + uint16_t desired_beam_az; + uint16_t desired_beam_el; + uint32_t start_sample; + + uint64_t event_cntr1; + uint64_t refclk_cntr; + uint64_t pps_frac_cntr; + uint64_t pps_cntr; + + double crp[3]; + + uint32_t msg_type; + uint32_t msg_length; + +}; + +class DataRecorder { + public: + DataRecorder(int port); + DataRecorder(std::string filename, long int pulse_ind_in); + ~DataRecorder(); + + void get_data(int save_to_disk); + void write_data(); + int start_recording(const char* filename, long int max_bytes, int save_to_disk); + int stop_recording(); + float get_recording_rate(); + uint64_t get_filesize(); + + int num_samples; + bool recording_active; + + private: + int pulse_size_bytes; + int iq_size_bytes; + long int pulse_ind; + + void allocate_memory(); + + std::thread recorder; + std::thread writer; + sem_t buffer_ready_sem; + int out_fd; + char * data_buffer; + std::atomic exit_thread; + + float recording_rate; + uint64_t total_bytes; + int port; + +}; \ No newline at end of file diff --git a/cpp/test.bin b/cpp/test.bin new file mode 100644 index 0000000..9a1310f Binary files /dev/null and b/cpp/test.bin differ diff --git a/cpp/test_data_recorder b/cpp/test_data_recorder new file mode 100755 index 0000000..ec0163d Binary files /dev/null and b/cpp/test_data_recorder differ diff --git a/cpp/test_data_recorder.cpp b/cpp/test_data_recorder.cpp new file mode 100755 index 0000000..5e6db05 --- /dev/null +++ b/cpp/test_data_recorder.cpp @@ -0,0 +1,14 @@ +#include "data_recorder.h" +int main() { + + // Instantiate the class + DataRecorder data_recorder(1234); + + + data_recorder.start_recording("test.bin", -1, 1); + + std::this_thread::sleep_for(std::chrono::milliseconds(20000)); + + data_recorder.stop_recording(); + +} \ No newline at end of file diff --git a/python/data_recorder.py b/python/data_recorder.py index 009a2cb..09e20e7 100755 --- a/python/data_recorder.py +++ b/python/data_recorder.py @@ -1,6 +1,7 @@ import socket import numpy as np import ctypes +from ctypes import cdll import data_structures import threading import queue @@ -13,123 +14,32 @@ class DataRecorder: port=1234, packet_size=4096): + self.lib = cdll.LoadLibrary('../cpp/data_recorder.so') - # # TESTTTT - # self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - # print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)) - # self.s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024) - # print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)) - # self.s.settimeout(1) - # self.s.bind(("", 1234)) - # data = np.arange(16, dtype=np.uint32) - # data = data.tobytes() - # self.s.sendto(data, (host, 1234)) - # self.s.close() - # # TESTTTTT + self.lib.DataRecorder_new.argtypes = [ctypes.c_int32] + self.lib.DataRecorder_new.restype = ctypes.c_int64 + self.lib.DataRecorder_get_recording_rate.argtypes = [ctypes.c_int64] + self.lib.DataRecorder_get_recording_rate.restype = ctypes.c_float + # self.lib.DataRecorder_get_current_recording_size.argtypes = [ctypes.c_int64] + # self.lib.DataRecorder_get_current_recording_size.restype = ctypes.c_float - # UDP Socket for High Speed Data - self.ip = host - self.port = port - self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)) - self.s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024) - print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)) - self.s.settimeout(1) - self.s.bind(("", port)) - # Need to send one udp message to set IP and port info inside FPGA - data = np.arange(16, dtype=np.uint32) - data = data.tobytes() - self.s.sendto(data, (self.ip, self.port)) + self.lib.DataRecorder_start_recording.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.c_int64, ctypes.c_int32] + self.lib.DataRecorder_start_recording.restype = ctypes.c_int32 + self.lib.DataRecorder_stop_recording.argtypes = [ctypes.c_int64] + self.lib.DataRecorder_stop_recording.restype = ctypes.c_int32 - self.max_packet_size = packet_size - - # Data Buffer - # self.buffer = bytearray(512 * 1024 * 1024) - self.buffer = mmap.mmap(-1, 512 * 1024 * 1024) - self.buffer_view = memoryview(self.buffer) - - self.stop_event = threading.Event() - - self.fid = None - self.write_to_disk = False - self.write_offset = 0 - self.write_count = 0 - self.write_queue = queue.SimpleQueue() + self.obj = self.lib.DataRecorder_new(port) def start_recording(self, filename, write_to_disk=False): + filename = filename.encode('utf-8') + filename = ctypes.c_char_p(filename) + self.lib.DataRecorder_start_recording(self.obj, filename, -1, 1) - self.write_to_disk = write_to_disk - - if write_to_disk: - self.write_offset = 0 - self.write_count = 0 - self.fid = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC | os.O_DIRECT ) - - self.write_queue = queue.SimpleQueue() - self.write_data_thread = threading.Thread(target=self.write_data) - self.write_data_thread.start() - - self.get_data_thread = threading.Thread(target=self.get_data) - - self.get_data_thread.start() def stop_recording(self): - print('Stop Thread') - self.stop_event.set() - self.get_data_thread.join() - print('Get Data Thread Joined') - if self.write_to_disk: - self.write_data_thread.join() - print('Write Data Thread Joined') - - def write_data(self): - - write_chunk_size = 4 * 1024 * 1024 - buffer_view = memoryview(self.buffer) - - print('Waiting For Data to Write') - while not self.stop_event.is_set(): - - try: - num_bytes = self.write_queue.get(timeout=1) - - self.write_count += num_bytes - - if self.write_count > write_chunk_size: - # print(self.write_offset) - # os.write(self.fid, self.buffer[self.write_offset:self.write_offset + write_chunk_size]) - os.write(self.fid, buffer_view[self.write_offset:self.write_offset + write_chunk_size]) - self.write_offset += write_chunk_size - self.write_count -= write_chunk_size - self.write_offset = self.write_offset % len(self.buffer) - except queue.Empty: - print('DR Queue Empty!', self.ip) - - - def get_data(self): - offset = 0 - - print('Waiting For Data From Socket') - while not self.stop_event.is_set(): - - try: - n = self.s.recv_into(self.buffer_view[offset:offset + self.max_packet_size]) - - if self.write_to_disk: - # print(n) - self.write_queue.put(n) - - offset += n - if offset > len(self.buffer): - if self.port == 1234: - print('hmmm', n, offset, len(self.buffer)) - offset = offset % len(self.buffer) - # print(offset) - - except socket.timeout: - continue + self.lib.DataRecorder_stop_recording(self.obj) diff --git a/python/data_recorder.py.python_version b/python/data_recorder.py.python_version new file mode 100755 index 0000000..009a2cb --- /dev/null +++ b/python/data_recorder.py.python_version @@ -0,0 +1,135 @@ +import socket +import numpy as np +import ctypes +import data_structures +import threading +import queue +import os +import mmap + +class DataRecorder: + def __init__(self, + host="192.168.2.128", + port=1234, + packet_size=4096): + + + # # TESTTTT + # self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)) + # self.s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024) + # print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)) + # self.s.settimeout(1) + # self.s.bind(("", 1234)) + # data = np.arange(16, dtype=np.uint32) + # data = data.tobytes() + # self.s.sendto(data, (host, 1234)) + # self.s.close() + # # TESTTTTT + + + + # UDP Socket for High Speed Data + self.ip = host + self.port = port + self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)) + self.s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 4 * 1024 * 1024) + print('SO_RCVBUF', self.s.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)) + self.s.settimeout(1) + self.s.bind(("", port)) + # Need to send one udp message to set IP and port info inside FPGA + data = np.arange(16, dtype=np.uint32) + data = data.tobytes() + self.s.sendto(data, (self.ip, self.port)) + + + self.max_packet_size = packet_size + + # Data Buffer + # self.buffer = bytearray(512 * 1024 * 1024) + self.buffer = mmap.mmap(-1, 512 * 1024 * 1024) + self.buffer_view = memoryview(self.buffer) + + self.stop_event = threading.Event() + + self.fid = None + self.write_to_disk = False + self.write_offset = 0 + self.write_count = 0 + self.write_queue = queue.SimpleQueue() + + def start_recording(self, filename, write_to_disk=False): + + self.write_to_disk = write_to_disk + + if write_to_disk: + self.write_offset = 0 + self.write_count = 0 + self.fid = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC | os.O_DIRECT ) + + self.write_queue = queue.SimpleQueue() + self.write_data_thread = threading.Thread(target=self.write_data) + self.write_data_thread.start() + + self.get_data_thread = threading.Thread(target=self.get_data) + + self.get_data_thread.start() + + def stop_recording(self): + print('Stop Thread') + self.stop_event.set() + self.get_data_thread.join() + print('Get Data Thread Joined') + if self.write_to_disk: + self.write_data_thread.join() + print('Write Data Thread Joined') + + def write_data(self): + + write_chunk_size = 4 * 1024 * 1024 + buffer_view = memoryview(self.buffer) + + print('Waiting For Data to Write') + while not self.stop_event.is_set(): + + try: + num_bytes = self.write_queue.get(timeout=1) + + self.write_count += num_bytes + + if self.write_count > write_chunk_size: + # print(self.write_offset) + # os.write(self.fid, self.buffer[self.write_offset:self.write_offset + write_chunk_size]) + os.write(self.fid, buffer_view[self.write_offset:self.write_offset + write_chunk_size]) + self.write_offset += write_chunk_size + self.write_count -= write_chunk_size + self.write_offset = self.write_offset % len(self.buffer) + except queue.Empty: + print('DR Queue Empty!', self.ip) + + + def get_data(self): + offset = 0 + + print('Waiting For Data From Socket') + while not self.stop_event.is_set(): + + try: + n = self.s.recv_into(self.buffer_view[offset:offset + self.max_packet_size]) + + if self.write_to_disk: + # print(n) + self.write_queue.put(n) + + offset += n + if offset > len(self.buffer): + if self.port == 1234: + print('hmmm', n, offset, len(self.buffer)) + offset = offset % len(self.buffer) + # print(offset) + + except socket.timeout: + continue + + diff --git a/python/radar_manager.py b/python/radar_manager.py index ae5639e..92520ac 100755 --- a/python/radar_manager.py +++ b/python/radar_manager.py @@ -370,8 +370,8 @@ class RadarManager: rf_atten = [1, 2, 3, 4, 5, 6] self.setup_rf_attenuators(rf_atten) - adc_nco = 5e9 % f_adc - dac_nco = 5.001e9 % f_dac + adc_nco = 1e9 % f_adc + dac_nco = 1.001e9 % f_dac # adc_nco = 2e9 # adc_nyquist_zone = np.floor(adc_nco / (f_adc / 2)) diff --git a/python/read_data_file.py b/python/read_data_file.py index c82b185..68b697d 100755 --- a/python/read_data_file.py +++ b/python/read_data_file.py @@ -3,12 +3,8 @@ import os.path import time import numpy as np from matplotlib import pyplot as plt -import socket import data_structures -import radar_manager -from data_recorder import DataRecorder - def db20(x): return 20*np.log10(np.abs(x)) @@ -48,7 +44,6 @@ def main(): header = data_structures.CpiHeader.from_buffer_copy(header) fid.seek(-ctypes.sizeof(data_structures.CpiHeader), 1) - # CPI Parameters (timing values are in clk ticks) num_pulses = header.num_pulses num_samples = header.num_samples @@ -93,18 +88,19 @@ def main(): plt.figure() plt.plot(np.diff(cpi_times)) - plt.ylim([0, .02]) + plt.plot(np.diff(pps_frac)) + # plt.ylim([0, .04]) - plt.figure() - plt.plot(iq.T.real, '.-') - plt.plot(iq.T.imag, '--.') - plt.grid() - - plt.figure() - plt.imshow(db20n(iq), aspect='auto', interpolation='nearest', vmin=vmin, vmax=vmax) - plt.ylabel('Pulse Count') - plt.xlabel('Sample Count') - plt.colorbar() + # plt.figure() + # plt.plot(iq.T.real, '.-') + # plt.plot(iq.T.imag, '--.') + # plt.grid() + # + # plt.figure() + # plt.imshow(db20n(iq), aspect='auto', interpolation='nearest', vmin=vmin, vmax=vmax) + # plt.ylabel('Pulse Count') + # plt.xlabel('Sample Count') + # plt.colorbar() plt.show() diff --git a/python/test_cpi.py b/python/test_cpi.py index f135805..b25f880 100755 --- a/python/test_cpi.py +++ b/python/test_cpi.py @@ -45,13 +45,16 @@ def main(): # CPI Parameters (timing values are in clk ticks) num_pulses = 128 # Should be multiple of udp packet size, currently 4096 bytes, or 1024 samples - num_samples = 5000 + num_samples = 16384 start_sample = 2000 tx_num_samples = 1024 tx_start_sample = start_sample - pri = int(.0004 * clk) + prf = 8000 + pri = int(1/prf * clk) + pri -= (pri % 3) + # pri = int(.0001 * clk) print(pri) - inter_cpi = 50 + inter_cpi = 2000 tx_lo_offset = 10e6 rx_lo_offset = 0 @@ -63,9 +66,9 @@ def main(): recorder0 = DataRecorder("192.168.2.128", 1234, packet_size=radar.packet_size) - recorder1 = DataRecorder("192.168.3.128", 1235, packet_size=radar.packet_size) + # recorder1 = DataRecorder("192.168.3.128", 1235, packet_size=radar.packet_size) recorder0.start_recording('test0.bin', True) - recorder1.start_recording('test1.bin', True) + # recorder1.start_recording('test1.bin', True) radar.configure_cpi(pri, inter_cpi, num_pulses, num_samples, start_sample, tx_num_samples, tx_start_sample, rx_lo_offset, tx_lo_offset) @@ -73,83 +76,83 @@ def main(): print('Start Running') radar.start_running() # Let it run for a bit - time.sleep(2) + time.sleep(60) # Stop running radar.stop_running() # Stop the data recorder recorder0.stop_recording() - recorder1.stop_recording() - - # Parse some data - - # Find header, recording buffer could have wrapped depending on data rate and how long we ran for - recorders = [recorder0, recorder1] - for recorder in recorders: - headers = [] - offset = 0 - plot_recorder = recorder - hdr_sync = False - while not hdr_sync: - data = plot_recorder.buffer[offset:offset + 4] - sync_word = np.frombuffer(data, dtype=np.uint32)[0] - if sync_word == 0xAABBCCDD: - hdr_sync = True - print('Header found at offset', offset) - else: - offset += 4 - - num_cpi = 1 - for i in range(num_cpi): - # Get Header - data = plot_recorder.buffer[offset:offset + ctypes.sizeof(data_structures.CpiHeader)] - offset += ctypes.sizeof(data_structures.CpiHeader) - headers.append(data_structures.CpiHeader.from_buffer_copy(data)) - num_pulses = headers[i].num_pulses - num_samples = headers[i].num_samples - - # Get CPI - data_size = num_pulses * num_samples * 4 - data = plot_recorder.buffer[offset:offset + data_size] - offset += data_size - - # Check some header fields - cpi_times = np.array([x.system_time for x in headers]) / 187.5e6 - pps_frac = np.array([x.pps_frac_sec for x in headers]) / 187.5e6 - pps_sec = np.array([x.pps_sec for x in headers]) - utc_time = pps_sec + pps_frac - print(pri, inter_cpi, num_pulses * pri + inter_cpi) - print(cpi_times - cpi_times[0]) - print(pps_frac) - print(pps_sec - pps_sec[0]) - - # Plot last CPI - data2 = np.frombuffer(data, dtype=np.int16) - i = data2[0::2] - q = data2[1::2] - iq = i + 1j * q - iq = iq.reshape(-1, num_samples) - iq = iq + 1e-15 - - vmin = -60 - vmax = 0 - - fid, axs = plt.subplots(3) - axs[0].plot(iq.T.real, '-') - axs[0].plot(iq.T.imag, '--') - axs[0].grid() - - # axs[1].imshow(db20n(iq), aspect='auto', interpolation='nearest', vmin=vmin, vmax=vmax) - axs[1].imshow(iq.real, aspect='auto', interpolation='nearest') - axs[1].set_ylabel('Pulse Count') - axs[1].set_xlabel('Sample Count') - - iq_freq = np.fft.fftshift(np.fft.fft(iq, axis=1), axes=1) - freq_axis = (np.arange(num_samples)/num_samples - 0.5) * radar_manager.BASEBAND_SAMPLE_RATE / 1e6 - axs[2].plot(freq_axis, db20n(iq_freq.T)) - axs[2].grid() - - - plt.show() + # recorder1.stop_recording() + # + # # Parse some data + # + # # Find header, recording buffer could have wrapped depending on data rate and how long we ran for + # recorders = [recorder0, recorder1] + # for recorder in recorders: + # headers = [] + # offset = 0 + # plot_recorder = recorder + # hdr_sync = False + # while not hdr_sync: + # data = plot_recorder.buffer[offset:offset + 4] + # sync_word = np.frombuffer(data, dtype=np.uint32)[0] + # if sync_word == 0xAABBCCDD: + # hdr_sync = True + # print('Header found at offset', offset) + # else: + # offset += 4 + # + # num_cpi = 1 + # for i in range(num_cpi): + # # Get Header + # data = plot_recorder.buffer[offset:offset + ctypes.sizeof(data_structures.CpiHeader)] + # offset += ctypes.sizeof(data_structures.CpiHeader) + # headers.append(data_structures.CpiHeader.from_buffer_copy(data)) + # num_pulses = headers[i].num_pulses + # num_samples = headers[i].num_samples + # + # # Get CPI + # data_size = num_pulses * num_samples * 4 + # data = plot_recorder.buffer[offset:offset + data_size] + # offset += data_size + # + # # Check some header fields + # cpi_times = np.array([x.system_time for x in headers]) / 187.5e6 + # pps_frac = np.array([x.pps_frac_sec for x in headers]) / 187.5e6 + # pps_sec = np.array([x.pps_sec for x in headers]) + # utc_time = pps_sec + pps_frac + # print(pri, inter_cpi, num_pulses * pri + inter_cpi) + # print(cpi_times - cpi_times[0]) + # print(pps_frac) + # print(pps_sec - pps_sec[0]) + # + # # Plot last CPI + # data2 = np.frombuffer(data, dtype=np.int16) + # i = data2[0::2] + # q = data2[1::2] + # iq = i + 1j * q + # iq = iq.reshape(-1, num_samples) + # iq = iq + 1e-15 + # + # vmin = -60 + # vmax = 0 + # + # fid, axs = plt.subplots(3) + # axs[0].plot(iq.T.real, '-') + # axs[0].plot(iq.T.imag, '--') + # axs[0].grid() + # + # # axs[1].imshow(db20n(iq), aspect='auto', interpolation='nearest', vmin=vmin, vmax=vmax) + # axs[1].imshow(iq.real, aspect='auto', interpolation='nearest') + # axs[1].set_ylabel('Pulse Count') + # axs[1].set_xlabel('Sample Count') + # + # iq_freq = np.fft.fftshift(np.fft.fft(iq, axis=1), axes=1) + # freq_axis = (np.arange(num_samples)/num_samples - 0.5) * radar_manager.BASEBAND_SAMPLE_RATE / 1e6 + # axs[2].plot(freq_axis, db20n(iq_freq.T)) + # axs[2].grid() + # + # + # plt.show() if __name__ == '__main__':