updates
This commit is contained in:
@@ -196,8 +196,7 @@ void DataRecorder::write_data(int ch_ind) {
|
||||
|
||||
int ret;
|
||||
|
||||
int write_chunk_size = BUFFER_SIZE;
|
||||
|
||||
int write_chunk_size = 4 * 1024 * 1024;
|
||||
int bytes_avail = 0;
|
||||
int write_ind = 0;
|
||||
int sem_value;
|
||||
@@ -218,41 +217,43 @@ void DataRecorder::write_data(int ch_ind) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (num_bytes_to_write[ch_ind][buffer_ind] != BUFFER_SIZE) {
|
||||
printf("hmmmmmmmmm %d", num_bytes_to_write[ch_ind][buffer_ind]);
|
||||
bytes_avail += num_bytes_to_write[ch_ind][buffer_ind];
|
||||
buffer_ind++;
|
||||
buffer_ind = buffer_ind % NUM_BUFFERS;
|
||||
|
||||
// Detect buffer warp condition and write data
|
||||
bool wrap = false;
|
||||
if ((write_ind + bytes_avail) > (OVERALL_BUFFER_SIZE - BUFFER_SIZE)) {
|
||||
// printf("write bla %d, %d\n", write_ind, bytes_avail);
|
||||
wrap = true;
|
||||
}
|
||||
|
||||
bytes_avail += num_bytes_to_write[ch_ind][buffer_ind];
|
||||
|
||||
if (bytes_avail >= write_chunk_size) {
|
||||
sem_getvalue(&buffer_ready_sem[ch_ind], &sem_value);
|
||||
if ((bytes_avail >= write_chunk_size) || wrap) {
|
||||
int cnt = write(out_fd[ch_ind], &(data_buffer[ch_ind][write_ind]), bytes_avail);
|
||||
if (cnt < 0)
|
||||
{
|
||||
printf("File write error!\n");
|
||||
printf("File write error! %d, %d, %d\n", cnt, write_ind, bytes_avail);
|
||||
return;
|
||||
}
|
||||
total_bytes[ch_ind] += cnt;
|
||||
write_ind += bytes_avail;
|
||||
bytes_avail = 0;
|
||||
write_ind = write_ind % OVERALL_BUFFER_SIZE;
|
||||
|
||||
if (wrap) {
|
||||
write_ind = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
buffer_ind++;
|
||||
buffer_ind = buffer_ind % NUM_BUFFERS;
|
||||
|
||||
// int cnt = write(out_fd[ch_ind], &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]), num_bytes_to_write[ch_ind][buffer_ind]);
|
||||
// if (cnt < 0)
|
||||
// {
|
||||
// printf("File write error! %d, %d, %d, %p\n", ch_ind, buffer_ind, errno, &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]));
|
||||
// }
|
||||
// total_bytes[ch_ind] += cnt;
|
||||
// buffer_ind++;
|
||||
// buffer_ind = buffer_ind % NUM_USERSPACE_BUFFERS;
|
||||
if (bytes_avail > 0) {
|
||||
int cnt = write(out_fd[ch_ind], &(data_buffer[ch_ind][write_ind]), bytes_avail);
|
||||
total_bytes[ch_ind] += cnt;
|
||||
printf("Writing last chunk, Total Recorded (MB) %0.2f\n", total_bytes[ch_ind] / 1e6);
|
||||
}
|
||||
|
||||
sem_getvalue(&buffer_ready_sem[ch_ind], &sem_value);
|
||||
|
||||
printf("Exiting Write Data Thread %d, %d\n", ch_ind, sem_value);
|
||||
// printf("Exiting Write Data Thread %d, %d\n", ch_ind, sem_value);
|
||||
}
|
||||
|
||||
void DataRecorder::set_validate_cnt_data(bool enable, uint32_t pri, uint32_t inter, uint32_t count) {
|
||||
@@ -307,6 +308,7 @@ void DataRecorder::get_data(int ch_ind, int save_to_disk) {
|
||||
|
||||
total_bytes[ch_ind] = 0;
|
||||
long int bytes_since_last_update = 0;
|
||||
long int total_bytes_received = 0;
|
||||
|
||||
// For timing info
|
||||
clock_gettime(CLOCK_MONOTONIC, &ts_begin);
|
||||
@@ -316,32 +318,34 @@ void DataRecorder::get_data(int ch_ind, int save_to_disk) {
|
||||
double print_period = 1;
|
||||
|
||||
printf("Waiting for data\n");
|
||||
uint32_t buffer_ind = 0;
|
||||
uint32_t buffer_offset = 0;
|
||||
uint32_t sem_ind = 0;
|
||||
uint32_t write_cnt = 0;
|
||||
bool init_cnt = true;
|
||||
uint64_t current_cnt;
|
||||
uint64_t pulse_cnt = 0;
|
||||
|
||||
while (!exit_thread.load()) {
|
||||
int read_count = read(fd, &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]), BUFFER_SIZE);
|
||||
// reference to current data buffer
|
||||
char * data_buf = &(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]);
|
||||
int read_count = read(fd, &(data_buffer[ch_ind][buffer_offset]), BUFFER_SIZE);
|
||||
bytes_since_last_update += read_count;
|
||||
total_bytes_received += read_count;
|
||||
clock_gettime(CLOCK_MONOTONIC, &ts_now);
|
||||
|
||||
if (read_count) {
|
||||
// printf("read count %d\n", read_count);
|
||||
|
||||
if (save_to_disk) {
|
||||
num_bytes_to_write[ch_ind][buffer_ind] = read_count;
|
||||
num_bytes_to_write[ch_ind][sem_ind] = read_count;
|
||||
if (sem_post(&buffer_ready_sem[ch_ind]) == -1) {
|
||||
printf("sem_post error\n");
|
||||
}
|
||||
sem_ind++;
|
||||
sem_ind = sem_ind % NUM_BUFFERS;
|
||||
}
|
||||
|
||||
if (validate_cnt_data) {
|
||||
|
||||
uint64_t * data = (uint64_t *)&(data_buffer[ch_ind][buffer_ind*BUFFER_SIZE]);
|
||||
uint64_t * data = (uint64_t *)&(data_buffer[ch_ind][buffer_offset]);
|
||||
int num_samp = read_count / (sizeof(uint64_t) * 2);
|
||||
if (init_cnt) {
|
||||
current_cnt = data[0];
|
||||
@@ -366,8 +370,11 @@ void DataRecorder::get_data(int ch_ind, int save_to_disk) {
|
||||
}
|
||||
}
|
||||
|
||||
buffer_ind++;
|
||||
buffer_ind = buffer_ind % NUM_BUFFERS;
|
||||
buffer_offset += read_count;
|
||||
if (buffer_offset > (OVERALL_BUFFER_SIZE - BUFFER_SIZE)) {
|
||||
// printf("reset recv buffer - %d\n", buffer_offset);
|
||||
buffer_offset = 0;
|
||||
}
|
||||
} else {
|
||||
// Small sleep if no data is available, without this each thread will peg a CPU core
|
||||
usleep(1);
|
||||
@@ -380,7 +387,7 @@ void DataRecorder::get_data(int ch_ind, int save_to_disk) {
|
||||
double rate_last = (double)bytes_since_last_update / elapsed;
|
||||
clock_gettime(CLOCK_MONOTONIC, &ts_last_print);
|
||||
bytes_since_last_update = 0;
|
||||
printf("Ch %d, Data Rate (MB/s) %0.2f, Data Rate last update (MB/s) %0.2f, Total Recorded (MB) %0.2f\n", ch_ind, rate/1e6, rate_last/1e6, total_bytes[ch_ind]/1e6);
|
||||
printf("Ch %d, Data Rate (MB/s) %0.2f, Data Rate last update (MB/s) %0.2f, Total Recorded (MB) %0.2f, Total Received (MB) %0.2f\n", ch_ind, rate/1e6, rate_last/1e6, total_bytes[ch_ind]/1e6, total_bytes_received/1e6);
|
||||
|
||||
recording_rate[ch_ind] = rate;
|
||||
}
|
||||
@@ -403,9 +410,6 @@ void DataRecorder::get_data(int ch_ind, int save_to_disk) {
|
||||
if (writer[ch_ind].joinable()) {
|
||||
writer[ch_ind].join();
|
||||
}
|
||||
|
||||
// Want to write out that last little bit of data if we didn't make it to a full chunk
|
||||
|
||||
|
||||
close(fd);
|
||||
close(out_fd[ch_ind]);
|
||||
|
||||
Reference in New Issue
Block a user