#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "data_recorder.h" double timespec_to_double(struct timespec t) { return t.tv_sec + t.tv_nsec * 1e-9; } double timespec_sub(struct timespec t1, struct timespec t2) { // Get seconds part t1.tv_sec -= t2.tv_sec; // Nanoseconds, need to check for negative condition t1.tv_nsec -= t2.tv_nsec; if (t1.tv_nsec >= 1000000000) { t1.tv_sec++; t1.tv_nsec -= 1000000000; } else if (t1.tv_nsec < 0) { t1.tv_sec--; t1.tv_nsec += 1000000000; } return timespec_to_double(t1); } DataRecorder::DataRecorder(int port_in) { printf("Data Recorder\n"); port = port_in; recording_active = false; allocate_memory(); return; } void DataRecorder::allocate_memory() { // Memory to buffer data into data_buffer = (char *)aligned_alloc(4096, OVERALL_BUFFER_SIZE); memset(data_buffer, 0, OVERALL_BUFFER_SIZE); sem_init(&buffer_ready_sem, 0, 0); recording_rate = 0; total_bytes = 0; } DataRecorder::~DataRecorder() { printf("~Data Recorder\n"); free(data_buffer); return; } int DataRecorder::start_recording(const char* filename, long int max_bytes, int save_to_disk) { recording_active = true; int ret; while (true) { if (ret = sem_trywait(&buffer_ready_sem) == -1) { printf("Semaphore Cleared\n"); break; } printf("Semaphore Was Still Available!!!!\n"); } // Make sure old thread is done if (recorder.joinable()) { printf("Thread was still joinable!!!!\n"); DataRecorder::stop_recording(); } exit_thread = false; // Open Output file char* file; asprintf(&file, "%s", filename); printf("Opening File: %s", file); out_fd = open(file, O_WRONLY | O_CREAT | O_TRUNC | O_DIRECT, 0666); if(out_fd < 0) { printf("- FAILED!!!\n"); return -1; } printf(" - SUCCESS FD: %d\n", out_fd); // Check how much space is left on disk struct statvfs fiData; char *dirc; char *dname; dirc = strdup(filename); dname = dirname(dirc); printf("Dir Name: %s, %s\n", filename, dname); if(statvfs(dname, &fiData) < 0) { printf("Failed to check disk space\n"); return -1; } long int disk_bytes_free = fiData.f_bsize * fiData.f_bavail; printf("Disk Space Left: %f GB\n", (float)disk_bytes_free / 1e9); // Limit max file size to lesser of specified max bytes and the remaining disk space. // If max_bytes is passed in as -1, then the only limit is the disk space printf("Requested Bytes: %ld \n", max_bytes); if (max_bytes < 0) { max_bytes = disk_bytes_free; } else { max_bytes = std::min(max_bytes, disk_bytes_free); } printf("Max File Size: %f GB\n", (float)max_bytes / 1e9); printf("Start Recording, Max Bytes %ld\n", max_bytes); recorder = std::thread(&DataRecorder::get_data, this, save_to_disk); sleep(1); printf("Recording Started\n"); return 1; } int DataRecorder::stop_recording() { exit_thread = true; // Wait for recorder thread to finish if (recorder.joinable()) { recorder.join(); } printf("Thread Joined!\n"); recording_active = false; return 1; } float DataRecorder::get_recording_rate() { return recording_rate; } uint64_t DataRecorder::get_filesize() { return total_bytes; } void DataRecorder::write_data() { printf("Opening Write Data Thread\n"); struct timespec ts; int timeout = 1; int ret; int buffer_ind = 0; while (!exit_thread.load()) { clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += timeout; if (ret = sem_timedwait(&buffer_ready_sem, &ts) == -1) { // Error if (errno == ETIMEDOUT) { // printf("Writer sem timeout\n"); } else { printf("sem_wait error %d\n", errno); } continue; } // A chunk is ready write it out int cnt = write(out_fd, &(data_buffer[buffer_ind]), WRITE_CHUNK_SIZE); total_bytes += cnt; if (cnt < 0) { printf("File write error!\n"); } buffer_ind += WRITE_CHUNK_SIZE; buffer_ind = buffer_ind % OVERALL_BUFFER_SIZE; } int sem_value; sem_getvalue(&buffer_ready_sem, &sem_value); printf("Exiting Write Data Thread %d\n", sem_value); } void DataRecorder::get_data(int save_to_disk) { // Start wirte to disk thread if (save_to_disk) { writer = std::thread(&DataRecorder::write_data, this); } struct timespec ts_now; struct timespec ts_last_print; struct timespec ts_begin; total_bytes = 0; long int bytes_since_last_update = 0; // For timing info clock_gettime(CLOCK_MONOTONIC, &ts_begin); ts_last_print = ts_begin; double last_print_time = 0; double last_irq_elapsed = 0; double print_period = 1; // Open Socket Connection int sock; if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { printf("Socket creation error\n"); } struct timeval tv; tv.tv_sec = 2; tv.tv_usec = 0; setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv); struct sockaddr_in serv_addr; serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(port); if (inet_pton(AF_INET, "0.0.0.0", &serv_addr.sin_addr) <= 0) { printf("Invalid IP Address\n"); } if (bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) { printf("Bind Failed\n"); } // Set the receive buffer size int recv_buf_size = 4 * 1024 * 1024; if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &recv_buf_size, sizeof(recv_buf_size)) < 0) { printf("setsockopt(SO_RCVBUF)"); } // Optional: Verify the buffer size was set int actual_size; socklen_t optlen = sizeof(actual_size); if (getsockopt(sock, SOL_SOCKET, SO_RCVBUF, &actual_size, &optlen) < 0) { printf("getsockopt(SO_RCVBUF)"); } else { printf("Receive buffer size set to: %d bytes\n", actual_size); } printf("Waiting for data\n"); uint8_t read_buf[BUFFER_SIZE]; uint32_t buffer_ind = 0; uint32_t write_cnt = 0; while (!exit_thread.load()) { // socklen_t len; // struct sockaddr_in servaddr; // int read_count = recvfrom(sock, read_buf, BUFFER_SIZE, 0, (struct sockaddr *) &servaddr, &len); int read_count = recv(sock, read_buf, BUFFER_SIZE, 0); // Validate sender IP address here??? bytes_since_last_update += read_count; clock_gettime(CLOCK_MONOTONIC, &ts_now); if (read_count > 0) { // if (read_count != 4096) // printf("recv %d\n", read_count); // Handle case were data wraps the buffer if ((buffer_ind + read_count) > OVERALL_BUFFER_SIZE) { int tail_bytes = OVERALL_BUFFER_SIZE - buffer_ind; read_count -= tail_bytes; memcpy(data_buffer + buffer_ind, read_buf, tail_bytes); buffer_ind = 0; memcpy(data_buffer + buffer_ind, read_buf + tail_bytes, read_count); buffer_ind += read_count; } else { memcpy(data_buffer + buffer_ind, read_buf, read_count); buffer_ind += read_count; buffer_ind = buffer_ind % OVERALL_BUFFER_SIZE; } if (save_to_disk) { write_cnt += read_count; if (write_cnt >= WRITE_CHUNK_SIZE) { write_cnt -= WRITE_CHUNK_SIZE; if (sem_post(&buffer_ready_sem) == -1) { printf("sem_post error\n"); } } } } if (timespec_sub(ts_now, ts_last_print) > 1) { double elapsed = timespec_sub(ts_now, ts_begin); double rate = (double)total_bytes / elapsed; elapsed = timespec_sub(ts_now, ts_last_print); double rate_last = (double)bytes_since_last_update / elapsed; clock_gettime(CLOCK_MONOTONIC, &ts_last_print); bytes_since_last_update = 0; printf("Data Rate (MB/s) %0.2f, Data Rate last update (MB/s) %0.2f, Total Recorded (MB) %0.2f\n", rate/1e6, rate_last/1e6, total_bytes/1e6); recording_rate = rate; } } // Make sure write thread is done before closing file handle if (writer.joinable()) { writer.join(); } close(sock); close(out_fd); recording_rate = 0; printf("get_data exiting\n"); } // C Externs for Python C-Types extern "C" { DataRecorder* DataRecorder_new(int port){ return new DataRecorder(port); } int DataRecorder_start_recording(DataRecorder* recorder, char* filename, long int max_bytes, int save_to_disk){ return recorder->start_recording(filename, max_bytes, save_to_disk); } int DataRecorder_stop_recording(DataRecorder* recorder){ return recorder->stop_recording(); } float DataRecorder_get_recording_rate(DataRecorder* recorder){ return recorder->get_recording_rate(); } }