From 37ca65a65b7fe9740f6e34c7b0ef208a122bad54 Mon Sep 17 00:00:00 2001 From: "bkiedinger@gmail.com" Date: Sat, 13 Jun 2026 17:02:22 -0500 Subject: [PATCH] updates --- cpp/data_recorder.cpp | 72 ++++++++++++++++++++------------------ cpp/data_recorder.h | 2 +- cpp/test_data_recorder.cpp | 42 ++++++++++------------ 3 files changed, 57 insertions(+), 59 deletions(-) diff --git a/cpp/data_recorder.cpp b/cpp/data_recorder.cpp index cf71bfb..2c5b3b6 100755 --- a/cpp/data_recorder.cpp +++ b/cpp/data_recorder.cpp @@ -196,8 +196,7 @@ void DataRecorder::write_data(int ch_ind) { int ret; - int write_chunk_size = BUFFER_SIZE; - + int write_chunk_size = 4 * 1024 * 1024; int bytes_avail = 0; int write_ind = 0; int sem_value; @@ -218,41 +217,43 @@ void DataRecorder::write_data(int ch_ind) { continue; } - if (num_bytes_to_write[ch_ind][buffer_ind] != BUFFER_SIZE) { - printf("hmmmmmmmmm %d", num_bytes_to_write[ch_ind][buffer_ind]); + bytes_avail += num_bytes_to_write[ch_ind][buffer_ind]; + buffer_ind++; + buffer_ind = buffer_ind % NUM_BUFFERS; + + // Detect buffer warp condition and write data + bool wrap = false; + if ((write_ind + bytes_avail) > (OVERALL_BUFFER_SIZE - BUFFER_SIZE)) { + // printf("write bla %d, %d\n", write_ind, bytes_avail); + wrap = true; } - bytes_avail += num_bytes_to_write[ch_ind][buffer_ind]; - - if (bytes_avail >= write_chunk_size) { - sem_getvalue(&buffer_ready_sem[ch_ind], &sem_value); + if ((bytes_avail >= write_chunk_size) || wrap) { int cnt = write(out_fd[ch_ind], &(data_buffer[ch_ind][write_ind]), bytes_avail); if (cnt < 0) { - printf("File write error!\n"); + printf("File write error! %d, %d, %d\n", cnt, write_ind, bytes_avail); + return; } total_bytes[ch_ind] += cnt; write_ind += bytes_avail; bytes_avail = 0; - write_ind = write_ind % OVERALL_BUFFER_SIZE; + + if (wrap) { + write_ind = 0; + } } + } - buffer_ind++; - buffer_ind = buffer_ind % NUM_BUFFERS; - - // int cnt = write(out_fd[ch_ind], &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]), num_bytes_to_write[ch_ind][buffer_ind]); - // if (cnt < 0) - // { - // printf("File write error! %d, %d, %d, %p\n", ch_ind, buffer_ind, errno, &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE])); - // } - // total_bytes[ch_ind] += cnt; - // buffer_ind++; - // buffer_ind = buffer_ind % NUM_USERSPACE_BUFFERS; + if (bytes_avail > 0) { + int cnt = write(out_fd[ch_ind], &(data_buffer[ch_ind][write_ind]), bytes_avail); + total_bytes[ch_ind] += cnt; + printf("Writing last chunk, Total Recorded (MB) %0.2f\n", total_bytes[ch_ind] / 1e6); } sem_getvalue(&buffer_ready_sem[ch_ind], &sem_value); - printf("Exiting Write Data Thread %d, %d\n", ch_ind, sem_value); + // printf("Exiting Write Data Thread %d, %d\n", ch_ind, sem_value); } void DataRecorder::set_validate_cnt_data(bool enable, uint32_t pri, uint32_t inter, uint32_t count) { @@ -307,6 +308,7 @@ void DataRecorder::get_data(int ch_ind, int save_to_disk) { total_bytes[ch_ind] = 0; long int bytes_since_last_update = 0; + long int total_bytes_received = 0; // For timing info clock_gettime(CLOCK_MONOTONIC, &ts_begin); @@ -316,32 +318,34 @@ void DataRecorder::get_data(int ch_ind, int save_to_disk) { double print_period = 1; printf("Waiting for data\n"); - uint32_t buffer_ind = 0; + uint32_t buffer_offset = 0; + uint32_t sem_ind = 0; uint32_t write_cnt = 0; bool init_cnt = true; uint64_t current_cnt; uint64_t pulse_cnt = 0; while (!exit_thread.load()) { - int read_count = read(fd, &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]), BUFFER_SIZE); - // reference to current data buffer - char * data_buf = &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]); + int read_count = read(fd, &(data_buffer[ch_ind][buffer_offset]), BUFFER_SIZE); bytes_since_last_update += read_count; + total_bytes_received += read_count; clock_gettime(CLOCK_MONOTONIC, &ts_now); if (read_count) { // printf("read count %d\n", read_count); if (save_to_disk) { - num_bytes_to_write[ch_ind][buffer_ind] = read_count; + num_bytes_to_write[ch_ind][sem_ind] = read_count; if (sem_post(&buffer_ready_sem[ch_ind]) == -1) { printf("sem_post error\n"); } + sem_ind++; + sem_ind = sem_ind % NUM_BUFFERS; } if (validate_cnt_data) { - uint64_t * data = (uint64_t *)&(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]); + uint64_t * data = (uint64_t *)&(data_buffer[ch_ind][buffer_offset]); int num_samp = read_count / (sizeof(uint64_t) * 2); if (init_cnt) { current_cnt = data[0]; @@ -366,8 +370,11 @@ void DataRecorder::get_data(int ch_ind, int save_to_disk) { } } - buffer_ind++; - buffer_ind = buffer_ind % NUM_BUFFERS; + buffer_offset += read_count; + if (buffer_offset > (OVERALL_BUFFER_SIZE - BUFFER_SIZE)) { + // printf("reset recv buffer - %d\n", buffer_offset); + buffer_offset = 0; + } } else { // Small sleep if no data is available, without this each thread will peg a CPU core usleep(1); @@ -380,7 +387,7 @@ void DataRecorder::get_data(int ch_ind, int save_to_disk) { double rate_last = (double)bytes_since_last_update / elapsed; clock_gettime(CLOCK_MONOTONIC, &ts_last_print); bytes_since_last_update = 0; - printf("Ch %d, Data Rate (MB/s) %0.2f, Data Rate last update (MB/s) %0.2f, Total Recorded (MB) %0.2f\n", ch_ind, rate/1e6, rate_last/1e6, total_bytes[ch_ind]/1e6); + printf("Ch %d, Data Rate (MB/s) %0.2f, Data Rate last update (MB/s) %0.2f, Total Recorded (MB) %0.2f, Total Received (MB) %0.2f\n", ch_ind, rate/1e6, rate_last/1e6, total_bytes[ch_ind]/1e6, total_bytes_received/1e6); recording_rate[ch_ind] = rate; } @@ -403,9 +410,6 @@ void DataRecorder::get_data(int ch_ind, int save_to_disk) { if (writer[ch_ind].joinable()) { writer[ch_ind].join(); } - - // Want to write out that last little bit of data if we didn't make it to a full chunk - close(fd); close(out_fd[ch_ind]); diff --git a/cpp/data_recorder.h b/cpp/data_recorder.h index b4f0d90..daaebb1 100755 --- a/cpp/data_recorder.h +++ b/cpp/data_recorder.h @@ -32,7 +32,7 @@ typedef struct { unsigned int value; } wsrpcie_ioctl_t; -#define NUM_DMA_CH 2 +#define NUM_DMA_CH 4 #define BUFFER_SIZE (4 * 1024 * 1024) // #define BUFFER_SIZE (8192) #define NUM_BUFFERS 128 diff --git a/cpp/test_data_recorder.cpp b/cpp/test_data_recorder.cpp index e71659a..05cf55c 100755 --- a/cpp/test_data_recorder.cpp +++ b/cpp/test_data_recorder.cpp @@ -1,16 +1,6 @@ -// sudo setpci -s 0000:05:00.0 0x78.w=293f -/* -https://forums.developer.nvidia.com/t/the-devctl-maxpayload-is-lower-than-devcap/319292 -Some example for how to use setpci. - -sudo setpci -s 0005:00:00.0 74.w (device capabilities register for x4) -sudo setpci -s 0005:00:00.0 78.w ( Device Control register for x4) and write value for bits 7:5 as (001b) for 256 Bytes MPS - -You have to change 0005:00:00.0 to the device you are using here. - -*/ - #include "data_recorder.h" +#include + int main() { // Instantiate the class DataRecorder dr; @@ -20,19 +10,20 @@ int main() { // Setup Timing Engine and data generator to test uint32_t n_pulses = 128; float inter_cpi = 100e-6; - float pri = 70e-6; + // float pri = 75e-6; + float pri = 200e-6; uint32_t n_samples = 16384; - // float pri = 40e-6; - // uint32_t n_samples = 4096; float cpi_time = n_pulses * pri + inter_cpi; float cpi_num_bytes = n_pulses * n_samples * 16; float expected_data_rate = cpi_num_bytes / cpi_time / 1e6; - // PCIe Gen3 x4 Therotecial Max is 4GBPS, - // 128B/120B encoding drops that to 3938 MBPS - // Assuming an MSP of 128 bytes (86.5% effeciency), that further drops to 3406 MBPS - // Currently achieving 3092 MBPS without errors which is ~90% of 3406 MBPS + // PCIe Gen3 x16 Theoretical Max is 16GBPS, + // 128B/130B encoding drops that to 15753.8 MBPS + // Assuming an MSP of 128 bytes (84.2% effeciency), that further drops to 13264 MBPS + // Assuming an MSP of 256 bytes (91.4% effeciency), that further drops to 14399 MBPS + // Assuming an MSP of 512 bytes (95.5% effeciency), that further drops to 15044 MBPS <- Threadripper box uses 512 + // Currently achieving 13836 MBPS without errors which is ~92% of 15044 MBPS printf("Expected Data Rate - %.2f MBps Per Channel, Total %.2f\n", expected_data_rate, expected_data_rate * NUM_DMA_CH); dr.write_reg(TIMING_REG_BASE + 0x0, 1); @@ -49,16 +40,19 @@ int main() { dr.set_validate_cnt_data(true, dr.read_reg(TIMING_REG_BASE + 0x4) + 1, dr.read_reg(TIMING_REG_BASE + 0x10) + 1, n_pulses); // Start listening for data - dr.start_recording("test.bin", 0); + // dr.start_recording("test.bin", 1); + dr.start_recording("/media/hptnvme/test.bin", 1); + sleep(1); // Start the timing engine so data starts flowing dr.write_reg(TIMING_REG_BASE + 0x0, 0); // Wait a while - std::this_thread::sleep_for(std::chrono::milliseconds(6000)); - - dr.stop_recording(); + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); dr.write_reg(TIMING_REG_BASE + 0x0, 1); + sleep(2); + dr.stop_recording(); + printf("Expected Data Rate - %.2f MBps Per Channel, Total %.2f\n", expected_data_rate, expected_data_rate * NUM_DMA_CH); -} \ No newline at end of file +}