#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "data_recorder.h" double timespec_to_double(struct timespec t) { return t.tv_sec + t.tv_nsec * 1e-9; } double timespec_sub(struct timespec t1, struct timespec t2) { // Get seconds part t1.tv_sec -= t2.tv_sec; // Nanoseconds, need to check for negative condition t1.tv_nsec -= t2.tv_nsec; if (t1.tv_nsec >= 1000000000) { t1.tv_sec++; t1.tv_nsec -= 1000000000; } else if (t1.tv_nsec < 0) { t1.tv_sec--; t1.tv_nsec += 1000000000; } return timespec_to_double(t1); } timespec timespec_add(struct timespec t1, struct timespec t2) { // Get seconds part t1.tv_sec += t2.tv_sec; // Nanoseconds, need to check for negative condition t1.tv_nsec += t2.tv_nsec; if (t1.tv_nsec >= 1000000000) { t1.tv_sec++; t1.tv_nsec -= 1000000000; } return t1; } DataRecorder::DataRecorder() { printf("Data Recorder\n"); recording_active = false; allocate_memory(); // Prep access to PL registers plfd = open("/dev/wsrpl0", O_RDWR); if (plfd < 0){ printf("Failed to open %s\n", "/dev/wsrpl0"); } plmmap = (uint32_t *)mmap(NULL, 0x1000000, PROT_WRITE | PROT_READ, MAP_SHARED, plfd, 0); if (plmmap < (uint32_t *)0){ printf("Failed to mmap %s\n", "/dev/wsrpl0"); close(plfd); } validate_cnt_data = false; return; } void DataRecorder::allocate_memory() { for (int ch_ind = 0; ch_ind < NUM_DMA_CH; ch_ind++) { // Memory to buffer data into data_buffer[ch_ind] = (char *)aligned_alloc(4096, OVERALL_BUFFER_SIZE); memset(data_buffer[ch_ind], 0, OVERALL_BUFFER_SIZE); sem_init(&buffer_ready_sem[ch_ind], 0, 0); recording_rate[ch_ind] = 0; total_bytes[ch_ind] = 0; } } void DataRecorder::write_reg(uint32_t addr, uint32_t data) { plmmap[addr >> 2] = data; } uint32_t DataRecorder::read_reg(uint32_t addr) { return plmmap[addr >> 2]; } DataRecorder::~DataRecorder() { printf("~Data Recorder\n"); for (int ch_ind = 0; ch_ind < NUM_DMA_CH; ch_ind++) { free(data_buffer[ch_ind]); sem_destroy(&buffer_ready_sem[ch_ind]); } return; } int DataRecorder::start_recording(const char* filename, int save_to_disk) { recording_active = true; int ret; for (int ch_ind = 0; ch_ind < NUM_DMA_CH; ch_ind++) { while (true) { if (ret = sem_trywait(&buffer_ready_sem[ch_ind]) == -1) { printf("Ch %d Semaphore Cleared\n", ch_ind); break; } printf("Ch %d Semaphore Was Still Available!!!!\n", ch_ind); } } // Make sure old thread is done for (int ch_ind = 0; ch_ind < NUM_DMA_CH; ch_ind++) { if (recorder[ch_ind].joinable()) { printf("Thread was still joinable!!!! %d\n", ch_ind); DataRecorder::stop_recording(); } } exit_thread = false; // Open Output files for (int i = 0; i < NUM_DMA_CH; i++) { char* file; asprintf(&file, "%s_%d", filename, i); printf("Opening File: %s", file); out_fd[i] = open(file, O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0666); if(out_fd[i] < 0) { printf("- FAILED!!!\n"); return -1; } printf(" - SUCCESS FD: %d\n", out_fd[i]); } for (int ch_ind = 0; ch_ind < NUM_DMA_CH; ch_ind++) { recorder[ch_ind] = std::thread(&DataRecorder::get_data, this, ch_ind, save_to_disk); } sleep(1); printf("Recording Started\n"); return 1; } int DataRecorder::stop_recording() { exit_thread = true; // Wait for recorder thread to finish for (int ch_ind = 0; ch_ind < NUM_DMA_CH; ch_ind++) { if (recorder[ch_ind].joinable()) { recorder[ch_ind].join(); } } printf("Thread Joined!\n"); recording_active = false; return 1; } std::vector DataRecorder::get_recording_rate() { std::vector rate; for (int ch_ind = 0; ch_ind < NUM_DMA_CH; ch_ind++) { rate.push_back(recording_rate[ch_ind]); } return rate; } std::vector DataRecorder::get_filesize() { std::vector bytes; for (int ch_ind = 0; ch_ind < NUM_DMA_CH; ch_ind++) { bytes.push_back(total_bytes[ch_ind]); } return bytes; } void DataRecorder::write_data(int ch_ind) { printf("Opening Write Data Thread\n"); int buffer_ind = 0; struct timespec ts; struct timespec timeout; timeout.tv_sec = 0; timeout.tv_nsec = 0.1e9; int ret; int write_chunk_size = BUFFER_SIZE; int bytes_avail = 0; int write_ind = 0; int sem_value; while (!exit_thread.load()) { clock_gettime(CLOCK_REALTIME, &ts); ts = timespec_add(ts, timeout); if (ret = sem_timedwait(&buffer_ready_sem[ch_ind], &ts) == -1) { // Error if (errno == ETIMEDOUT) { // printf("Writer sem timeout\n"); } else { printf("sem_wait error %d\n", errno); } continue; } if (num_bytes_to_write[ch_ind][buffer_ind] != BUFFER_SIZE) { printf("hmmmmmmmmm %d", num_bytes_to_write[ch_ind][buffer_ind]); } bytes_avail += num_bytes_to_write[ch_ind][buffer_ind]; if (bytes_avail >= write_chunk_size) { sem_getvalue(&buffer_ready_sem[ch_ind], &sem_value); int cnt = write(out_fd[ch_ind], &(data_buffer[ch_ind][write_ind]), bytes_avail); if (cnt < 0) { printf("File write error!\n"); } total_bytes[ch_ind] += cnt; write_ind += bytes_avail; bytes_avail = 0; write_ind = write_ind % OVERALL_BUFFER_SIZE; } buffer_ind++; buffer_ind = buffer_ind % NUM_BUFFERS; // int cnt = write(out_fd[ch_ind], &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]), num_bytes_to_write[ch_ind][buffer_ind]); // if (cnt < 0) // { // printf("File write error! %d, %d, %d, %p\n", ch_ind, buffer_ind, errno, &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE])); // } // total_bytes[ch_ind] += cnt; // buffer_ind++; // buffer_ind = buffer_ind % NUM_USERSPACE_BUFFERS; } sem_getvalue(&buffer_ready_sem[ch_ind], &sem_value); printf("Exiting Write Data Thread %d, %d\n", ch_ind, sem_value); } void DataRecorder::set_validate_cnt_data(bool enable, uint32_t pri, uint32_t inter, uint32_t count) { validate_cnt_data = enable; cnt_pri = pri; cnt_num_pulse = count; cnt_inter_pri = inter; } void DataRecorder::get_data(int ch_ind, int save_to_disk) { char* file; asprintf(&file, "/dev/wsrdma%d", ch_ind); // Start write to disk thread if (save_to_disk) { writer[ch_ind] = std::thread(&DataRecorder::write_data, this, ch_ind); } int fd = open(file, O_RDWR); if (fd < 0) { printf(" open failed\n"); return; } wsrpcie_ioctl_t wsrpcie_ioctl; wsrpcie_ioctl.cmd = WSRDMA_SET_NUM_BYTES; wsrpcie_ioctl.offset = 0; wsrpcie_ioctl.value = BUFFER_SIZE; ioctl(fd, 0, &wsrpcie_ioctl); wsrpcie_ioctl.cmd = WSRDMA_SET_NUM_BUFS; wsrpcie_ioctl.offset = 0; wsrpcie_ioctl.value = NUM_BUFFERS; ioctl(fd, 0, &wsrpcie_ioctl); wsrpcie_ioctl.cmd = WSRDMA_DMA_INIT; wsrpcie_ioctl.offset = 0; wsrpcie_ioctl.value = 0; ioctl(fd, 0, &wsrpcie_ioctl); wsrpcie_ioctl.cmd = WSRDMA_DMA_START; wsrpcie_ioctl.offset = 0; wsrpcie_ioctl.value = 0; ioctl(fd, 0, &wsrpcie_ioctl); struct timespec ts_now; struct timespec ts_last_print; struct timespec ts_begin; total_bytes[ch_ind] = 0; long int bytes_since_last_update = 0; // For timing info clock_gettime(CLOCK_MONOTONIC, &ts_begin); ts_last_print = ts_begin; double last_print_time = 0; double last_irq_elapsed = 0; double print_period = 1; printf("Waiting for data\n"); uint32_t buffer_ind = 0; uint32_t write_cnt = 0; bool init_cnt = true; uint64_t current_cnt; uint64_t pulse_cnt = 0; while (!exit_thread.load()) { int read_count = read(fd, &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]), BUFFER_SIZE); // reference to current data buffer char * data_buf = &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]); bytes_since_last_update += read_count; clock_gettime(CLOCK_MONOTONIC, &ts_now); if (read_count) { // printf("read count %d\n", read_count); if (save_to_disk) { num_bytes_to_write[ch_ind][buffer_ind] = read_count; if (sem_post(&buffer_ready_sem[ch_ind]) == -1) { printf("sem_post error\n"); } } if (validate_cnt_data) { uint64_t * data = (uint64_t *)&(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]); int num_samp = read_count / (sizeof(uint64_t) * 2); if (init_cnt) { current_cnt = data[0]; init_cnt = false; } uint64_t first_sample_cnt = data[0]; for (int i = 0; i < num_samp; i++) { if (data[i*2] != current_cnt) { printf("Data Mismatch!!! CH %d, Got 0x%lx, Expected 0x%lx, Diff %lu, Pulse %lu\n", ch_ind, data[i*2], current_cnt, data[i*2] - current_cnt, pulse_cnt); break; } current_cnt++; } // count data is determined from a freerunning counter in the FPGA, so there will be expected gaps in count values between // pulses. Set the current count to the next expected value at the end of a pulse current_cnt = first_sample_cnt + cnt_pri; pulse_cnt++; // Handle inter cpi delay for count data if ((pulse_cnt % cnt_num_pulse) == 0) { current_cnt += cnt_inter_pri; } } buffer_ind++; buffer_ind = buffer_ind % NUM_BUFFERS; } else { // Small sleep if no data is available, without this each thread will peg a CPU core usleep(1); } if (timespec_sub(ts_now, ts_last_print) > 1) { double elapsed = timespec_sub(ts_now, ts_begin); double rate = (double)total_bytes[ch_ind] / elapsed; elapsed = timespec_sub(ts_now, ts_last_print); double rate_last = (double)bytes_since_last_update / elapsed; clock_gettime(CLOCK_MONOTONIC, &ts_last_print); bytes_since_last_update = 0; printf("Ch %d, Data Rate (MB/s) %0.2f, Data Rate last update (MB/s) %0.2f, Total Recorded (MB) %0.2f\n", ch_ind, rate/1e6, rate_last/1e6, total_bytes[ch_ind]/1e6); recording_rate[ch_ind] = rate; } } wsrpcie_ioctl.cmd = WSRDMA_DMA_STOP; wsrpcie_ioctl.offset = 0; wsrpcie_ioctl.value = 0; ioctl(fd, 0, &wsrpcie_ioctl); wsrpcie_ioctl.cmd = WSRDMA_DMA_CLEAR; wsrpcie_ioctl.offset = 0; wsrpcie_ioctl.value = 0; ioctl(fd, 0, &wsrpcie_ioctl); // Make sure write thread is done before closing file handle if (writer[ch_ind].joinable()) { writer[ch_ind].join(); } // Want to write out that last little bit of data if we didn't make it to a full chunk close(fd); close(out_fd[ch_ind]); recording_rate[ch_ind] = 0; printf("DMA %s exiting\n", file); }