/* * Copyright (C) 2014-2015 Nuand LLC * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include /* Only switch on the verbose debug prints in this file when we *really* want * them. Otherwise, compile them out to avoid excessive log level checks * in our data path */ #include "log.h" #ifndef ENABLE_LIBBLADERF_SYNC_LOG_VERBOSE #undef log_verbose #define log_verbose(...) #endif #include "minmax.h" #include "rel_assert.h" #include "async.h" #include "sync.h" #include "sync_worker.h" #include "metadata.h" #include "board/board.h" #include "helpers/timeout.h" #include "helpers/have_cap.h" #ifdef ENABLE_LIBBLADERF_SYNC_LOG_VERBOSE static inline void dump_buf_states(struct bladerf_sync *s) { static char *out = NULL; struct buffer_mgmt *b = &s->buf_mgmt; char *statestr = "UNKNOWN"; if (out == NULL) { out = malloc((b->num_buffers + 1) * sizeof(char)); } if (out == NULL) { log_verbose("%s: malloc failed\n"); return; } out[b->num_buffers] = '\0'; for (size_t i = 0; i < b->num_buffers; ++i) { switch (b->status[i]) { case SYNC_BUFFER_EMPTY: out[i] = '_'; break; case SYNC_BUFFER_IN_FLIGHT: out[i] = '-'; break; case SYNC_BUFFER_FULL: out[i] = '*'; break; case SYNC_BUFFER_PARTIAL: out[i] = 'o'; break; } } switch (s->state) { case SYNC_STATE_BUFFER_READY: statestr = "BUFFER_READY"; break; case SYNC_STATE_CHECK_WORKER: statestr = "CHECK_WORKER"; break; case SYNC_STATE_RESET_BUF_MGMT: statestr = "RESET_BUF_MGMT"; break; case SYNC_STATE_START_WORKER: statestr = "START_WORKER"; break; case SYNC_STATE_USING_BUFFER: statestr = "USING_BUFFER"; break; case SYNC_STATE_USING_BUFFER_META: statestr = "USING_BUFFER_META"; break; case SYNC_STATE_USING_PACKET_META: statestr = "USING_PACKET_META"; break; case SYNC_STATE_WAIT_FOR_BUFFER: statestr = "WAIT_FOR_BUFFER"; break; } log_verbose("%s: %s (%s)\n", __FUNCTION__, out, statestr); } #else #define dump_buf_states(...) #endif // ENABLE_LIBBLADERF_SYNC_LOG_VERBOSE static inline size_t samples2bytes(struct bladerf_sync *s, size_t n) { return s->stream_config.bytes_per_sample * n; } static inline unsigned int msg_per_buf(size_t msg_size, size_t buf_size, size_t bytes_per_sample) { size_t n = buf_size / (msg_size / bytes_per_sample); assert(n <= UINT_MAX); return (unsigned int) n; } static inline unsigned int samples_per_msg(size_t msg_size, size_t bytes_per_sample) { size_t n = (msg_size - METADATA_HEADER_SIZE) / bytes_per_sample; assert(n <= UINT_MAX); return (unsigned int) n; } int sync_init(struct bladerf_sync *sync, struct bladerf *dev, bladerf_channel_layout layout, bladerf_format format, unsigned int num_buffers, size_t buffer_size, size_t msg_size, unsigned int num_transfers, unsigned int stream_timeout) { int status = 0; size_t i, bytes_per_sample; if (num_transfers >= num_buffers) { return BLADERF_ERR_INVAL; } if (format == BLADERF_FORMAT_PACKET_META) { if (!have_cap_dev(dev, BLADERF_CAP_FW_SHORT_PACKET)) { log_error("Firmware does not support short packets. " "Upgrade to at least firmware version 2.4.0.\n"); return BLADERF_ERR_UNSUPPORTED; } if (!have_cap_dev(dev, BLADERF_CAP_FPGA_PACKET_META)) { log_error("FPGA does not support packet meta format. " "Upgrade to at least FPGA version 0.12.0.\n"); return BLADERF_ERR_UNSUPPORTED; } } if (format == BLADERF_FORMAT_SC8_Q7 || format == BLADERF_FORMAT_SC8_Q7_META) { if (!have_cap_dev(dev, BLADERF_CAP_FPGA_8BIT_SAMPLES)) { log_error("FPGA does not support 8bit mode. " "Upgrade to at least FPGA version 0.15.0.\n"); return BLADERF_ERR_UNSUPPORTED; } } switch (format) { case BLADERF_FORMAT_SC8_Q7: case BLADERF_FORMAT_SC8_Q7_META: bytes_per_sample = 2; break; case BLADERF_FORMAT_SC16_Q11: case BLADERF_FORMAT_SC16_Q11_META: case BLADERF_FORMAT_PACKET_META: bytes_per_sample = 4; break; default: log_debug("Invalid format value: %d\n", format); return BLADERF_ERR_INVAL; } /* bladeRF GPIF DMA requirement */ if ((bytes_per_sample * buffer_size) % 4096 != 0) { assert(!"Invalid buffer size"); return BLADERF_ERR_INVAL; } /* Deinitialize sync handle if it's initialized */ sync_deinit(sync); MUTEX_INIT(&sync->lock); switch (layout & BLADERF_DIRECTION_MASK) { case BLADERF_TX: sync->buf_mgmt.submitter = SYNC_TX_SUBMITTER_FN; break; case BLADERF_RX: sync->buf_mgmt.submitter = SYNC_TX_SUBMITTER_INVALID; break; } sync->dev = dev; sync->state = SYNC_STATE_CHECK_WORKER; sync->buf_mgmt.num_buffers = num_buffers; sync->buf_mgmt.resubmit_count = 0; sync->stream_config.layout = layout; sync->stream_config.format = format; sync->stream_config.samples_per_buffer = (unsigned int)buffer_size; sync->stream_config.num_xfers = num_transfers; sync->stream_config.timeout_ms = stream_timeout; sync->stream_config.bytes_per_sample = bytes_per_sample; sync->meta.state = SYNC_META_STATE_HEADER; sync->meta.msg_size = msg_size; sync->meta.msg_per_buf = msg_per_buf(msg_size, buffer_size, bytes_per_sample); sync->meta.samples_per_msg = samples_per_msg(msg_size, bytes_per_sample); sync->meta.samples_per_ts = (layout == BLADERF_RX_X2 || layout == BLADERF_TX_X2) ? 2:1; log_verbose("%s: Buffer size (in bytes): %u\n", __FUNCTION__, buffer_size * bytes_per_sample); log_verbose("%s: Buffer size (in samples): %u\n", __FUNCTION__, buffer_size); log_verbose("%s: Msg per buffer: %u\n", __FUNCTION__, sync->meta.msg_per_buf); log_verbose("%s: Samples per msg: %u\n", __FUNCTION__, sync->meta.samples_per_msg); MUTEX_INIT(&sync->buf_mgmt.lock); pthread_cond_init(&sync->buf_mgmt.buf_ready, NULL); sync->buf_mgmt.status = (sync_buffer_status*) malloc(num_buffers * sizeof(sync_buffer_status)); if (sync->buf_mgmt.status == NULL) { status = BLADERF_ERR_MEM; goto error; } sync->buf_mgmt.actual_lengths = (size_t *) malloc(num_buffers * sizeof(size_t)); if (sync->buf_mgmt.actual_lengths == NULL) { status = BLADERF_ERR_MEM; goto error; } switch (layout & BLADERF_DIRECTION_MASK) { case BLADERF_RX: /* When starting up an RX stream, the first 'num_transfers' * transfers will be submitted to the USB layer to grab data */ sync->buf_mgmt.prod_i = num_transfers; sync->buf_mgmt.cons_i = 0; sync->buf_mgmt.partial_off = 0; for (i = 0; i < num_buffers; i++) { if (i < num_transfers) { sync->buf_mgmt.status[i] = SYNC_BUFFER_IN_FLIGHT; } else { sync->buf_mgmt.status[i] = SYNC_BUFFER_EMPTY; } } sync->meta.msg_timestamp = 0; sync->meta.msg_flags = 0; break; case BLADERF_TX: sync->buf_mgmt.prod_i = 0; sync->buf_mgmt.cons_i = BUFFER_MGMT_INVALID_INDEX; sync->buf_mgmt.partial_off = 0; for (i = 0; i < num_buffers; i++) { sync->buf_mgmt.status[i] = SYNC_BUFFER_EMPTY; } sync->meta.msg_timestamp = 0; sync->meta.in_burst = false; sync->meta.now = false; break; } status = sync_worker_init(sync); if (status < 0) { goto error; } sync->initialized = true; return 0; error: sync_deinit(sync); return status; } void sync_deinit(struct bladerf_sync *sync) { if (sync->initialized) { if ((sync->stream_config.layout & BLADERF_DIRECTION_MASK) == BLADERF_TX) { async_submit_stream_buffer(sync->worker->stream, BLADERF_STREAM_SHUTDOWN, NULL, 0, false); } sync_worker_deinit(sync->worker, &sync->buf_mgmt.lock, &sync->buf_mgmt.buf_ready); if (sync->buf_mgmt.actual_lengths) { free(sync->buf_mgmt.actual_lengths); } /* De-allocate our buffer management resources */ if (sync->buf_mgmt.status) { MUTEX_DESTROY(&sync->buf_mgmt.lock); free(sync->buf_mgmt.status); } MUTEX_DESTROY(&sync->lock); sync->initialized = false; } } static int wait_for_buffer(struct buffer_mgmt *b, unsigned int timeout_ms, const char *dbg_name, unsigned int dbg_idx) { int status; struct timespec timeout; if (timeout_ms == 0) { log_verbose("%s: Infinite wait for buffer[%d] (status: %d).\n", dbg_name, dbg_idx, b->status[dbg_idx]); status = pthread_cond_wait(&b->buf_ready, &b->lock); } else { log_verbose("%s: Timed wait for buffer[%d] (status: %d).\n", dbg_name, dbg_idx, b->status[dbg_idx]); status = populate_abs_timeout(&timeout, timeout_ms); if (status == 0) { status = pthread_cond_timedwait(&b->buf_ready, &b->lock, &timeout); } } if (status == ETIMEDOUT) { log_error("%s: Timed out waiting for buf_ready after %d ms\n", __FUNCTION__, timeout_ms); status = BLADERF_ERR_TIMEOUT; } else if (status != 0) { status = BLADERF_ERR_UNEXPECTED; } return status; } #ifndef SYNC_WORKER_START_TIMEOUT_MS # define SYNC_WORKER_START_TIMEOUT_MS 250 #endif /* Returns # of timestamps (or time steps) left in a message */ static inline unsigned int ts_remaining(struct bladerf_sync *s) { size_t ret = s->meta.samples_per_msg / s->meta.samples_per_ts - s->meta.curr_msg_off; assert(ret <= UINT_MAX); return (unsigned int) ret; } /* Returns # of samples left in a message (SC16Q11 mode only) */ static inline unsigned int left_in_msg(struct bladerf_sync *s) { size_t ret = s->meta.samples_per_msg - s->meta.curr_msg_off; assert(ret <= UINT_MAX); return (unsigned int) ret; } static inline void advance_rx_buffer(struct buffer_mgmt *b) { log_verbose("%s: Marking buf[%u] empty.\n", __FUNCTION__, b->cons_i); b->status[b->cons_i] = SYNC_BUFFER_EMPTY; b->cons_i = (b->cons_i + 1) % b->num_buffers; } static inline unsigned int timestamp_to_msg(struct bladerf_sync *s, uint64_t t) { uint64_t m = t / s->meta.samples_per_msg; assert(m <= UINT_MAX); return (unsigned int) m; } int sync_rx(struct bladerf_sync *s, void *samples, unsigned num_samples, struct bladerf_metadata *user_meta, unsigned int timeout_ms) { struct buffer_mgmt *b; int status = 0; bool exit_early = false; bool copied_data = false; unsigned int samples_returned = 0; uint8_t *samples_dest = (uint8_t*)samples; uint8_t *buf_src = NULL; unsigned int samples_to_copy = 0; unsigned int samples_per_buffer = 0; uint64_t target_timestamp = UINT64_MAX; unsigned int pkt_len_dwords = 0; if (s == NULL || samples == NULL) { log_debug("NULL pointer passed to %s\n", __FUNCTION__); return BLADERF_ERR_INVAL; } else if (!s->initialized) { return BLADERF_ERR_INVAL; } if (num_samples % s->meta.samples_per_ts != 0) { log_debug("%s: %u samples %% %u channels != 0\n", __FUNCTION__, num_samples, s->meta.samples_per_ts); return BLADERF_ERR_INVAL; } MUTEX_LOCK(&s->lock); if (s->stream_config.format == BLADERF_FORMAT_SC16_Q11_META || s->stream_config.format == BLADERF_FORMAT_SC8_Q7_META || s->stream_config.format == BLADERF_FORMAT_PACKET_META) { if (user_meta == NULL) { log_debug("NULL metadata pointer passed to %s\n", __FUNCTION__); status = BLADERF_ERR_INVAL; goto out; } else { user_meta->status = 0; target_timestamp = user_meta->timestamp; } } b = &s->buf_mgmt; samples_per_buffer = s->stream_config.samples_per_buffer; log_verbose("%s: Requests %u samples.\n", __FUNCTION__, num_samples); while (!exit_early && samples_returned < num_samples && status == 0) { dump_buf_states(s); switch (s->state) { case SYNC_STATE_CHECK_WORKER: { int stream_error; sync_worker_state worker_state = sync_worker_get_state(s->worker, &stream_error); /* Propagate stream error back to the caller. * They can call this function again to restart the stream and * try again. */ if (stream_error != 0) { status = stream_error; } else { if (worker_state == SYNC_WORKER_STATE_IDLE) { log_debug("%s: Worker is idle. Going to reset buf " "mgmt.\n", __FUNCTION__); s->state = SYNC_STATE_RESET_BUF_MGMT; } else if (worker_state == SYNC_WORKER_STATE_RUNNING) { s->state = SYNC_STATE_WAIT_FOR_BUFFER; } else { status = BLADERF_ERR_UNEXPECTED; log_debug("%s: Unexpected worker state=%d\n", __FUNCTION__, worker_state); } } break; } case SYNC_STATE_RESET_BUF_MGMT: MUTEX_LOCK(&b->lock); /* When the RX stream starts up, it will submit the first T * transfers, so the consumer index must be reset to 0 */ b->cons_i = 0; MUTEX_UNLOCK(&b->lock); log_debug("%s: Reset buf_mgmt consumer index\n", __FUNCTION__); s->state = SYNC_STATE_START_WORKER; break; case SYNC_STATE_START_WORKER: sync_worker_submit_request(s->worker, SYNC_WORKER_START); status = sync_worker_wait_for_state( s->worker, SYNC_WORKER_STATE_RUNNING, SYNC_WORKER_START_TIMEOUT_MS); if (status == 0) { s->state = SYNC_STATE_WAIT_FOR_BUFFER; log_debug("%s: Worker is now running.\n", __FUNCTION__); } else { log_debug("%s: Failed to start worker, (%d)\n", __FUNCTION__, status); } break; case SYNC_STATE_WAIT_FOR_BUFFER: MUTEX_LOCK(&b->lock); /* Check the buffer state, as the worker may have produced one * since we last queried the status */ if (b->status[b->cons_i] == SYNC_BUFFER_FULL) { s->state = SYNC_STATE_BUFFER_READY; log_verbose("%s: buffer %u is ready to consume\n", __FUNCTION__, b->cons_i); } else { status = wait_for_buffer(b, timeout_ms, __FUNCTION__, b->cons_i); if (status == 0) { if (b->status[b->cons_i] != SYNC_BUFFER_FULL) { s->state = SYNC_STATE_CHECK_WORKER; } else { s->state = SYNC_STATE_BUFFER_READY; log_verbose("%s: buffer %u is ready to consume\n", __FUNCTION__, b->cons_i); } } } MUTEX_UNLOCK(&b->lock); break; case SYNC_STATE_BUFFER_READY: MUTEX_LOCK(&b->lock); b->status[b->cons_i] = SYNC_BUFFER_PARTIAL; b->partial_off = 0; switch (s->stream_config.format) { case BLADERF_FORMAT_SC16_Q11: case BLADERF_FORMAT_SC8_Q7: s->state = SYNC_STATE_USING_BUFFER; break; case BLADERF_FORMAT_SC16_Q11_META: case BLADERF_FORMAT_SC8_Q7_META: s->state = SYNC_STATE_USING_BUFFER_META; s->meta.curr_msg_off = 0; s->meta.msg_num = 0; break; case BLADERF_FORMAT_PACKET_META: s->state = SYNC_STATE_USING_PACKET_META; break; default: assert(!"Invalid stream format"); status = BLADERF_ERR_UNEXPECTED; } MUTEX_UNLOCK(&b->lock); break; case SYNC_STATE_USING_BUFFER: /* SC16Q11 buffers w/o metadata */ MUTEX_LOCK(&b->lock); buf_src = (uint8_t*)b->buffers[b->cons_i]; samples_to_copy = uint_min(num_samples - samples_returned, samples_per_buffer - b->partial_off); memcpy(samples_dest + samples2bytes(s, samples_returned), buf_src + samples2bytes(s, b->partial_off), samples2bytes(s, samples_to_copy)); b->partial_off += samples_to_copy; samples_returned += samples_to_copy; log_verbose("%s: Provided %u samples to caller\n", __FUNCTION__, samples_to_copy); /* We've finished consuming this buffer and can start looking * for available samples in the next buffer */ if (b->partial_off >= samples_per_buffer) { /* Check for symptom of out-of-bounds accesses */ assert(b->partial_off == samples_per_buffer); advance_rx_buffer(b); s->state = SYNC_STATE_WAIT_FOR_BUFFER; } MUTEX_UNLOCK(&b->lock); break; case SYNC_STATE_USING_BUFFER_META: /* SC16Q11 buffers w/ metadata */ MUTEX_LOCK(&b->lock); switch (s->meta.state) { case SYNC_META_STATE_HEADER: assert(s->meta.msg_num < s->meta.msg_per_buf); buf_src = (uint8_t*)b->buffers[b->cons_i]; s->meta.curr_msg = buf_src + s->meta.msg_size * s->meta.msg_num; s->meta.msg_timestamp = metadata_get_timestamp(s->meta.curr_msg); s->meta.msg_flags = metadata_get_flags(s->meta.curr_msg); user_meta->status |= s->meta.msg_flags & (BLADERF_META_FLAG_RX_HW_UNDERFLOW | BLADERF_META_FLAG_RX_HW_MINIEXP1 | BLADERF_META_FLAG_RX_HW_MINIEXP2); s->meta.curr_msg_off = 0; /* We've encountered a discontinuity and need to return * what we have so far, setting the status flags */ if (copied_data && s->meta.msg_timestamp != s->meta.curr_timestamp) { user_meta->status |= BLADERF_META_STATUS_OVERRUN; exit_early = true; log_debug("Sample discontinuity detected @ " "buffer %u, message %u: Expected t=%llu, " "got t=%llu\n", b->cons_i, s->meta.msg_num, (unsigned long long)s->meta.curr_timestamp, (unsigned long long)s->meta.msg_timestamp); } else { log_verbose("Got header for message %u: " "t_new=%u, t_old=%u\n", s->meta.msg_num, s->meta.msg_timestamp, s->meta.curr_timestamp); } s->meta.curr_timestamp = s->meta.msg_timestamp; s->meta.state = SYNC_META_STATE_SAMPLES; break; case SYNC_META_STATE_SAMPLES: if (!copied_data && (user_meta->flags & BLADERF_META_FLAG_RX_NOW) == 0 && target_timestamp < s->meta.curr_timestamp) { log_debug("Current timestamp is %llu, " "target=%llu (user=%llu)\n", (unsigned long long)s->meta.curr_timestamp, (unsigned long long)target_timestamp, (unsigned long long)user_meta->timestamp); status = BLADERF_ERR_TIME_PAST; } else if ((user_meta->flags & BLADERF_META_FLAG_RX_NOW) || target_timestamp == s->meta.curr_timestamp) { /* Copy the request amount up to the end of a * this message in the current buffer */ samples_to_copy = uint_min(num_samples - samples_returned, left_in_msg(s)); memcpy(samples_dest + samples2bytes(s, samples_returned), s->meta.curr_msg + METADATA_HEADER_SIZE + samples2bytes(s, s->meta.curr_msg_off), samples2bytes(s, samples_to_copy)); samples_returned += samples_to_copy; s->meta.curr_msg_off += samples_to_copy; if (!copied_data && (user_meta->flags & BLADERF_META_FLAG_RX_NOW)) { /* Provide the user with the timestamp at the * first returned sample when the * NOW flag has been provided */ user_meta->timestamp = s->meta.curr_timestamp; log_verbose("Updated user meta timestamp with: " "%llu\n", (unsigned long long) user_meta->timestamp); } copied_data = true; s->meta.curr_timestamp += samples_to_copy / s->meta.samples_per_ts; /* We've begun copying samples, so our target will * just keep tracking the current timestamp. */ target_timestamp = s->meta.curr_timestamp; log_verbose("After copying samples, t=%llu\n", (unsigned long long)s->meta.curr_timestamp); if (left_in_msg(s) == 0) { assert(s->meta.curr_msg_off == s->meta.samples_per_msg); s->meta.state = SYNC_META_STATE_HEADER; s->meta.msg_num++; if (s->meta.msg_num >= s->meta.msg_per_buf) { assert(s->meta.msg_num == s->meta.msg_per_buf); advance_rx_buffer(b); s->meta.msg_num = 0; s->state = SYNC_STATE_WAIT_FOR_BUFFER; } } } else { const uint64_t time_delta = target_timestamp - s->meta.curr_timestamp; uint64_t samples_left = time_delta * s->meta.samples_per_ts; uint64_t left_in_buffer = (uint64_t) s->meta.samples_per_msg * (s->meta.msg_per_buf - s->meta.msg_num); /* Account for current position in buffer */ left_in_buffer -= s->meta.curr_msg_off; if (samples_left >= left_in_buffer) { /* Discard the remainder of this buffer */ advance_rx_buffer(b); s->state = SYNC_STATE_WAIT_FOR_BUFFER; s->meta.state = SYNC_META_STATE_HEADER; log_verbose("%s: Discarding rest of buffer.\n", __FUNCTION__); } else if (time_delta <= ts_remaining(s)) { /* Fast forward within the current message */ assert(time_delta <= SIZE_MAX); s->meta.curr_msg_off += (size_t)samples_left; s->meta.curr_timestamp += time_delta; log_verbose("%s: Seeking within message (t=%llu)\n", __FUNCTION__, s->meta.curr_timestamp); } else { s->meta.state = SYNC_META_STATE_HEADER; s->meta.msg_num += timestamp_to_msg(s, samples_left); log_verbose("%s: Seeking to message %u.\n", __FUNCTION__, s->meta.msg_num); } } break; default: assert(!"Invalid state"); status = BLADERF_ERR_UNEXPECTED; } MUTEX_UNLOCK(&b->lock); break; case SYNC_STATE_USING_PACKET_META: /* Packet buffers w/ metadata */ MUTEX_LOCK(&b->lock); buf_src = (uint8_t*)b->buffers[b->cons_i]; pkt_len_dwords = metadata_get_packet_len(buf_src); if (pkt_len_dwords > 0) { samples_returned += num_samples; user_meta->actual_count = pkt_len_dwords; memcpy(samples_dest, buf_src + METADATA_HEADER_SIZE, samples2bytes(s, pkt_len_dwords)); } advance_rx_buffer(b); s->state = SYNC_STATE_WAIT_FOR_BUFFER; MUTEX_UNLOCK(&b->lock); break; } } if (user_meta && s->stream_config.format != BLADERF_FORMAT_PACKET_META) { user_meta->actual_count = samples_returned; } out: MUTEX_UNLOCK(&s->lock); return status; } /* Assumes buffer lock is held */ static int advance_tx_buffer(struct bladerf_sync *s, struct buffer_mgmt *b) { int status = 0; const unsigned int idx = b->prod_i; if (b->submitter == SYNC_TX_SUBMITTER_FN) { /* Mark buffer in flight because we're going to send it out. * This ensures that if the callback fires before this function * completes, its state will be correct. */ b->status[idx] = SYNC_BUFFER_IN_FLIGHT; /* This call may block and it results in a per-stream lock being held, * so the buffer lock must be dropped. * * A callback may occur in the meantime, but this will not touch the * status for this this buffer, or the producer index. */ MUTEX_UNLOCK(&b->lock); size_t len; if (s->stream_config.format == BLADERF_FORMAT_PACKET_META) { len = b->actual_lengths[idx]; } else { len = async_stream_buf_bytes(s->worker->stream); } status = async_submit_stream_buffer(s->worker->stream, b->buffers[idx], &len, s->stream_config.timeout_ms, true); MUTEX_LOCK(&b->lock); if (status == 0) { log_verbose("%s: buf[%u] submitted.\n", __FUNCTION__, idx); } else if (status == BLADERF_ERR_WOULD_BLOCK) { log_verbose("%s: Deferring buf[%u] submission to worker callback.\n", __FUNCTION__, idx); /* Mark this buffer as being full of data, but not in flight */ b->status[idx] = SYNC_BUFFER_FULL; /* Assign callback the duty of submitting deferred buffers, * and use buffer_mgmt.cons_i to denote which it should submit * (i.e., consume). */ b->submitter = SYNC_TX_SUBMITTER_CALLBACK; b->cons_i = idx; /* This is expected and we are handling it. Don't propagate this * status back up */ status = 0; } else { /* Unmark this as being in flight */ b->status[idx] = SYNC_BUFFER_FULL; log_debug("%s: Failed to submit buf[%u].\n", __FUNCTION__, idx); return status; } } else { /* We are not submitting this buffer; this is deffered to the worker * call back. Just update its state to being full of samples. */ b->status[idx] = SYNC_BUFFER_FULL; } /* Advance "producer" insertion index. */ b->prod_i = (idx + 1) % b->num_buffers; /* Determine our next state based upon the state of the next buffer we * want to use. */ if (b->status[b->prod_i] == SYNC_BUFFER_EMPTY) { /* Buffer is empty and ready for use */ s->state = SYNC_STATE_BUFFER_READY; } else { /* We'll have to wait on this buffer to become ready. First, we'll * verify that the worker is running. */ s->state = SYNC_STATE_CHECK_WORKER; } return status; } static inline bool timestamp_in_past(struct bladerf_metadata *user_meta, struct bladerf_sync *s) { const bool in_past = user_meta->timestamp < s->meta.curr_timestamp; if (in_past) { log_debug("Provided timestamp=%"PRIu64" is in past: current=%"PRIu64"\n", user_meta->timestamp, s->meta.curr_timestamp); } return in_past; } struct tx_options { bool flush; bool zero_pad; }; static inline int handle_tx_parameters(struct bladerf_metadata *user_meta, struct bladerf_sync *s, struct tx_options *options) { if (s->stream_config.format == BLADERF_FORMAT_SC16_Q11_META) { if (user_meta == NULL) { log_debug("NULL metadata pointer passed to %s\n", __FUNCTION__); return BLADERF_ERR_INVAL; } if (user_meta->flags & BLADERF_META_FLAG_TX_BURST_START) { bool now = user_meta->flags & BLADERF_META_FLAG_TX_NOW; if (s->meta.in_burst) { log_debug("%s: BURST_START provided while already in a burst.\n", __FUNCTION__); return BLADERF_ERR_INVAL; } else if (!now && timestamp_in_past(user_meta, s)) { return BLADERF_ERR_TIME_PAST; } s->meta.in_burst = true; if (now) { s->meta.now = true; log_verbose("%s: Starting burst \"now\"\n", __FUNCTION__); } else { s->meta.curr_timestamp = user_meta->timestamp; log_verbose("%s: Starting burst @ %llu\n", __FUNCTION__, (unsigned long long)s->meta.curr_timestamp); } if (user_meta->flags & BLADERF_META_FLAG_TX_UPDATE_TIMESTAMP) { log_debug("UPDATE_TIMESTAMP ignored; BURST_START flag was used.\n"); } } else if (user_meta->flags & BLADERF_META_FLAG_TX_NOW) { log_debug("%s: TX_NOW was specified without BURST_START.\n", __FUNCTION__); return BLADERF_ERR_INVAL; } else if (user_meta->flags & BLADERF_META_FLAG_TX_UPDATE_TIMESTAMP) { if (timestamp_in_past(user_meta, s)) { return BLADERF_ERR_TIME_PAST; } else { options->zero_pad = true; } } if (user_meta->flags & BLADERF_META_FLAG_TX_BURST_END) { if (s->meta.in_burst) { options->flush = true; } else { log_debug("%s: BURST_END provided while not in a burst.\n", __FUNCTION__); return BLADERF_ERR_INVAL; } } user_meta->status = 0; } return 0; } int sync_tx(struct bladerf_sync *s, void const *samples, unsigned int num_samples, struct bladerf_metadata *user_meta, unsigned int timeout_ms) { struct buffer_mgmt *b = NULL; int status = 0; unsigned int samples_written = 0; unsigned int samples_to_copy = 0; unsigned int samples_per_buffer = 0; uint8_t const *samples_src = (uint8_t const *)samples; uint8_t *buf_dest = NULL; struct tx_options op = { FIELD_INIT(.flush, false), FIELD_INIT(.zero_pad, false), }; log_verbose("%s: called for %u samples.\n", __FUNCTION__, num_samples); if (s == NULL || samples == NULL || !s->initialized) { return BLADERF_ERR_INVAL; } MUTEX_LOCK(&s->lock); status = handle_tx_parameters(user_meta, s, &op); if (status != 0) { goto out; } b = &s->buf_mgmt; samples_per_buffer = s->stream_config.samples_per_buffer; while (status == 0 && ((samples_written < num_samples) || op.flush)) { switch (s->state) { case SYNC_STATE_CHECK_WORKER: { int stream_error; sync_worker_state worker_state = sync_worker_get_state(s->worker, &stream_error); if (stream_error != 0) { status = stream_error; } else { if (worker_state == SYNC_WORKER_STATE_IDLE) { /* No need to reset any buffer management for TX since * the TX stream does not submit an initial set of * buffers. Therefore the RESET_BUF_MGMT state is * skipped here. */ s->state = SYNC_STATE_START_WORKER; } else { /* Worker is running - continue onto checking for and * potentially waiting for an available buffer */ s->state = SYNC_STATE_WAIT_FOR_BUFFER; } } break; } case SYNC_STATE_RESET_BUF_MGMT: assert(!"Bug"); break; case SYNC_STATE_START_WORKER: sync_worker_submit_request(s->worker, SYNC_WORKER_START); status = sync_worker_wait_for_state( s->worker, SYNC_WORKER_STATE_RUNNING, SYNC_WORKER_START_TIMEOUT_MS); if (status == 0) { s->state = SYNC_STATE_WAIT_FOR_BUFFER; log_debug("%s: Worker is now running.\n", __FUNCTION__); } break; case SYNC_STATE_WAIT_FOR_BUFFER: MUTEX_LOCK(&b->lock); /* Check the buffer state, as the worker may have consumed one * since we last queried the status */ if (b->status[b->prod_i] == SYNC_BUFFER_EMPTY) { s->state = SYNC_STATE_BUFFER_READY; } else { status = wait_for_buffer(b, timeout_ms, __FUNCTION__, b->prod_i); } MUTEX_UNLOCK(&b->lock); break; case SYNC_STATE_BUFFER_READY: MUTEX_LOCK(&b->lock); b->status[b->prod_i] = SYNC_BUFFER_PARTIAL; b->partial_off = 0; switch (s->stream_config.format) { case BLADERF_FORMAT_SC16_Q11: case BLADERF_FORMAT_SC8_Q7: s->state = SYNC_STATE_USING_BUFFER; break; case BLADERF_FORMAT_SC16_Q11_META: case BLADERF_FORMAT_SC8_Q7_META: s->state = SYNC_STATE_USING_BUFFER_META; s->meta.curr_msg_off = 0; s->meta.msg_num = 0; break; case BLADERF_FORMAT_PACKET_META: s->state = SYNC_STATE_USING_PACKET_META; s->meta.curr_msg_off = 0; s->meta.msg_num = 0; break; default: assert(!"Invalid stream format"); status = BLADERF_ERR_UNEXPECTED; } MUTEX_UNLOCK(&b->lock); break; case SYNC_STATE_USING_BUFFER: MUTEX_LOCK(&b->lock); buf_dest = (uint8_t *)b->buffers[b->prod_i]; samples_to_copy = uint_min(num_samples - samples_written, samples_per_buffer - b->partial_off); memcpy(buf_dest + samples2bytes(s, b->partial_off), samples_src + samples2bytes(s, samples_written), samples2bytes(s, samples_to_copy)); b->partial_off += samples_to_copy; samples_written += samples_to_copy; log_verbose("%s: Buffered %u samples from caller\n", __FUNCTION__, samples_to_copy); if (b->partial_off >= samples_per_buffer) { /* Check for symptom of out-of-bounds accesses */ assert(b->partial_off == samples_per_buffer); /* Submit buffer and advance to the next one */ status = advance_tx_buffer(s, b); } MUTEX_UNLOCK(&b->lock); break; case SYNC_STATE_USING_BUFFER_META: /* SC16Q11 buffers w/ metadata */ MUTEX_LOCK(&b->lock); switch (s->meta.state) { case SYNC_META_STATE_HEADER: buf_dest = (uint8_t *)b->buffers[b->prod_i]; s->meta.curr_msg = buf_dest + s->meta.msg_size * s->meta.msg_num; log_verbose("%s: Set curr_msg to: %p (buf @ %p)\n", __FUNCTION__, s->meta.curr_msg, buf_dest); s->meta.curr_msg_off = 0; if (s->meta.now) { metadata_set(s->meta.curr_msg, 0, 0); } else { metadata_set(s->meta.curr_msg, s->meta.curr_timestamp, 0); } s->meta.state = SYNC_META_STATE_SAMPLES; log_verbose("%s: Filled in header (t=%llu)\n", __FUNCTION__, (unsigned long long)s->meta.curr_timestamp); break; case SYNC_META_STATE_SAMPLES: if (op.zero_pad) { const uint64_t delta = user_meta->timestamp - s->meta.curr_timestamp; size_t to_zero; log_verbose("%s: User requested zero padding to " "t=%" PRIu64 " (%" PRIu64 " + %" PRIu64 ")\n", __FUNCTION__, user_meta->timestamp, s->meta.curr_timestamp, delta); if (delta < left_in_msg(s)) { to_zero = (size_t)delta; log_verbose("%s: Padded subset of msg " "(%" PRIu64 " samples)\n", __FUNCTION__, (uint64_t)to_zero); } else { to_zero = left_in_msg(s); log_verbose("%s: Padded remainder of msg " "(%" PRIu64 " samples)\n", __FUNCTION__, (uint64_t)to_zero); } memset(s->meta.curr_msg + METADATA_HEADER_SIZE + samples2bytes(s, s->meta.curr_msg_off), 0, samples2bytes(s, to_zero)); s->meta.curr_msg_off += to_zero; /* If we're going to supply the FPGA with a * discontinuity, it is required that the last three * samples provided be zero in order to hold the * DAC @ (0 + 0j). * * See "Figure 9: TX data interface" in the LMS6002D * data sheet for the register stages that create * this requirement. * * If we're ending a burst with < 3 zeros samples at * the end of the message, we'll need to continue * onto the next message. At this next message, * we'll either encounter the requested timestamp or * zero-fill the message to fulfil this "three zero * sample" requirement, and set the timestamp * appropriately at the following message. */ if (to_zero < 3 && left_in_msg(s) == 0) { s->meta.curr_timestamp += to_zero; log_verbose("Ended msg with < 3 zero samples. " "Padding into next message.\n"); } else { s->meta.curr_timestamp = user_meta->timestamp; op.zero_pad = false; } } samples_to_copy = uint_min( num_samples - samples_written, left_in_msg(s)); if (samples_to_copy != 0) { /* We have user data to copy into the current * message within the buffer */ memcpy(s->meta.curr_msg + METADATA_HEADER_SIZE + samples2bytes(s, s->meta.curr_msg_off), samples_src + samples2bytes(s, samples_written), samples2bytes(s, samples_to_copy)); s->meta.curr_msg_off += samples_to_copy; if (s->stream_config.layout == BLADERF_TX_X2) s->meta.curr_timestamp += samples_to_copy / 2; else s->meta.curr_timestamp += samples_to_copy; samples_written += samples_to_copy; log_verbose("%s: Copied %u samples. " "Current message offset is now: %u\n", __FUNCTION__, samples_to_copy, s->meta.curr_msg_off); } if (left_in_msg(s) != 0 && op.flush) { /* We're ending this buffer early and need to * flush the remaining samples by setting all * samples in the messages to (0 + 0j) */ const unsigned int to_zero = left_in_msg(s); const size_t off = METADATA_HEADER_SIZE + samples2bytes(s, s->meta.curr_msg_off); /* If we're here, we should have already copied * all requested data to the buffer */ assert(num_samples == samples_written); memset(s->meta.curr_msg + off, 0, samples2bytes(s, to_zero)); log_verbose( "%s: Flushed %u samples @ %u (0x%08x)\n", __FUNCTION__, to_zero, s->meta.curr_msg_off, off); s->meta.curr_msg_off += to_zero; s->meta.curr_timestamp += to_zero; } if (left_in_msg(s) == 0) { s->meta.msg_num++; s->meta.state = SYNC_META_STATE_HEADER; log_verbose("%s: Advancing to next message (%u)\n", __FUNCTION__, s->meta.msg_num); } if (s->meta.msg_num >= s->meta.msg_per_buf) { assert(s->meta.msg_num == s->meta.msg_per_buf); /* Submit buffer of samples for transmission */ status = advance_tx_buffer(s, b); s->meta.msg_num = 0; s->state = SYNC_STATE_WAIT_FOR_BUFFER; /* We want to clear the flush flag if we've written * all of our data, but keep it set if we have more * data and need wrap around to another buffer */ op.flush = op.flush && (samples_written != num_samples); } break; default: assert(!"Invalid state"); status = BLADERF_ERR_UNEXPECTED; } MUTEX_UNLOCK(&b->lock); break; case SYNC_STATE_USING_PACKET_META: /* Packet buffers w/ metadata */ MUTEX_LOCK(&b->lock); buf_dest = (uint8_t *)b->buffers[b->prod_i]; memcpy(buf_dest + METADATA_HEADER_SIZE, samples_src, num_samples*4); b->actual_lengths[b->prod_i] = samples2bytes(s, num_samples) + METADATA_HEADER_SIZE; metadata_set_packet(buf_dest, 0, 0, num_samples, 0, 0); samples_written = num_samples; status = advance_tx_buffer(s, b); s->meta.msg_num = 0; s->state = SYNC_STATE_WAIT_FOR_BUFFER; MUTEX_UNLOCK(&b->lock); break; } } if (status == 0 && s->stream_config.format == BLADERF_FORMAT_SC16_Q11_META && (user_meta->flags & BLADERF_META_FLAG_TX_BURST_END)) { s->meta.in_burst = false; s->meta.now = false; } out: MUTEX_UNLOCK(&s->lock); return status; } unsigned int sync_buf2idx(struct buffer_mgmt *b, void *addr) { unsigned int i; for (i = 0; i < b->num_buffers; i++) { if (b->buffers[i] == addr) { return i; } } assert(!"Bug: Buffer not found."); /* Assertions are intended to always remain on. If someone turned them * off, do the best we can...complain loudly and clobber a buffer */ log_critical("Bug: Buffer not found."); return 0; }