diff options
Diffstat (limited to 'Radio/HW/BladeRF/src/streaming')
-rw-r--r-- | Radio/HW/BladeRF/src/streaming/async.c | 294 | ||||
-rw-r--r-- | Radio/HW/BladeRF/src/streaming/async.h | 111 | ||||
-rw-r--r-- | Radio/HW/BladeRF/src/streaming/format.h | 109 | ||||
-rw-r--r-- | Radio/HW/BladeRF/src/streaming/metadata.h | 180 | ||||
-rw-r--r-- | Radio/HW/BladeRF/src/streaming/sync.c | 1339 | ||||
-rw-r--r-- | Radio/HW/BladeRF/src/streaming/sync.h | 193 | ||||
-rw-r--r-- | Radio/HW/BladeRF/src/streaming/sync_worker.c | 532 | ||||
-rw-r--r-- | Radio/HW/BladeRF/src/streaming/sync_worker.h | 130 |
8 files changed, 2888 insertions, 0 deletions
diff --git a/Radio/HW/BladeRF/src/streaming/async.c b/Radio/HW/BladeRF/src/streaming/async.c new file mode 100644 index 0000000..cffa9d2 --- /dev/null +++ b/Radio/HW/BladeRF/src/streaming/async.c @@ -0,0 +1,294 @@ +/* + * This file is part of the bladeRF project: + * http://www.github.com/nuand/bladeRF + * + * Copyright (C) 2014 Nuand LLC + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include <stdint.h> +#include <stdlib.h> +#include <errno.h> + +#include "log.h" + +#include "backend/usb/usb.h" + +#include "async.h" +#include "board/board.h" +#include "helpers/timeout.h" +#include "helpers/have_cap.h" + +int async_init_stream(struct bladerf_stream **stream, + struct bladerf *dev, + bladerf_stream_cb callback, + void ***buffers, + size_t num_buffers, + bladerf_format format, + size_t samples_per_buffer, + size_t num_transfers, + void *user_data) +{ + struct bladerf_stream *lstream; + size_t buffer_size_bytes; + size_t i; + int status = 0; + + if (num_transfers > num_buffers) { + log_error("num_transfers must be <= num_buffers\n"); + return BLADERF_ERR_INVAL; + } + + if (samples_per_buffer < 1024 || samples_per_buffer % 1024 != 0) { + log_error("samples_per_buffer must be multiples of 1024\n"); + return BLADERF_ERR_INVAL; + } + + /* Create a stream and populate it with the appropriate information */ + lstream = malloc(sizeof(struct bladerf_stream)); + + if (!lstream) { + return BLADERF_ERR_MEM; + } + + MUTEX_INIT(&lstream->lock); + + if (pthread_cond_init(&lstream->can_submit_buffer, NULL) != 0) { + free(lstream); + return BLADERF_ERR_UNEXPECTED; + } + + if (pthread_cond_init(&lstream->stream_started, NULL) != 0) { + free(lstream); + return BLADERF_ERR_UNEXPECTED; + } + + lstream->dev = dev; + lstream->error_code = 0; + lstream->state = STREAM_IDLE; + lstream->samples_per_buffer = samples_per_buffer; + lstream->num_buffers = num_buffers; + lstream->format = format; + lstream->transfer_timeout = BULK_TIMEOUT_MS; + lstream->cb = callback; + lstream->user_data = user_data; + lstream->buffers = NULL; + + if (format == BLADERF_FORMAT_PACKET_META) { + if (!have_cap_dev(dev, BLADERF_CAP_FW_SHORT_PACKET)) { + log_error("Firmware does not support short packets. " + "Upgrade to at least firmware version 2.4.0."); + return BLADERF_ERR_UNSUPPORTED; + } + + if (!have_cap_dev(dev, BLADERF_CAP_FPGA_PACKET_META)) { + log_error("FPGA does not support packet meta format. " + "Upgrade to at least FPGA version 0.12.0 ."); + return BLADERF_ERR_UNSUPPORTED; + } + } + + if (format == BLADERF_FORMAT_SC8_Q7 || format == BLADERF_FORMAT_SC8_Q7_META) { + if (!have_cap_dev(dev, BLADERF_CAP_FPGA_8BIT_SAMPLES)) { + log_error("FPGA does not support 8bit mode. " + "Upgrade to at least FPGA version 0.15.0.\n"); + return BLADERF_ERR_UNSUPPORTED; + } + } + + switch(format) { + case BLADERF_FORMAT_SC8_Q7: + case BLADERF_FORMAT_SC8_Q7_META: + buffer_size_bytes = sc8q7_to_bytes(samples_per_buffer); + break; + + case BLADERF_FORMAT_SC16_Q11: + case BLADERF_FORMAT_SC16_Q11_META: + buffer_size_bytes = sc16q11_to_bytes(samples_per_buffer); + break; + + case BLADERF_FORMAT_PACKET_META: + buffer_size_bytes = samples_per_buffer; + break; + + default: + status = BLADERF_ERR_INVAL; + break; + } + + if (!status) { + lstream->buffers = calloc(num_buffers, sizeof(lstream->buffers[0])); + if (lstream->buffers) { + for (i = 0; i < num_buffers && !status; i++) { + lstream->buffers[i] = calloc(1, buffer_size_bytes); + if (!lstream->buffers[i]) { + status = BLADERF_ERR_MEM; + } + } + } else { + status = BLADERF_ERR_MEM; + } + } + + /* Clean up everything we've allocated if we hit any errors */ + if (status) { + + if (lstream->buffers) { + for (i = 0; i < num_buffers; i++) { + free(lstream->buffers[i]); + } + + free(lstream->buffers); + } + + free(lstream); + } else { + /* Perform any backend-specific stream initialization */ + status = dev->backend->init_stream(lstream, num_transfers); + + if (status < 0) { + async_deinit_stream(lstream); + *stream = NULL; + } else { + /* Update the caller's pointers */ + *stream = lstream; + + if (buffers) { + *buffers = lstream->buffers; + } + } + } + + return status; +} + +int async_set_transfer_timeout(struct bladerf_stream *stream, + unsigned int transfer_timeout_ms) +{ + MUTEX_LOCK(&stream->lock); + stream->transfer_timeout = transfer_timeout_ms; + MUTEX_UNLOCK(&stream->lock); + + return 0; +} + +int async_get_transfer_timeout(struct bladerf_stream *stream, + unsigned int *transfer_timeout_ms) +{ + MUTEX_LOCK(&stream->lock); + *transfer_timeout_ms = stream->transfer_timeout; + MUTEX_UNLOCK(&stream->lock); + + return 0; +} + +int async_run_stream(struct bladerf_stream *stream, bladerf_channel_layout layout) +{ + int status; + struct bladerf *dev = stream->dev; + + MUTEX_LOCK(&stream->lock); + stream->layout = layout; + stream->state = STREAM_RUNNING; + pthread_cond_signal(&stream->stream_started); + MUTEX_UNLOCK(&stream->lock); + + status = dev->backend->stream(stream, layout); + + /* Backend return value takes precedence over stream error status */ + return status == 0 ? stream->error_code : status; +} + +int async_submit_stream_buffer(struct bladerf_stream *stream, + void *buffer, size_t *length, + unsigned int timeout_ms, + bool nonblock) +{ + int status = 0; + struct timespec timeout_abs; + + MUTEX_LOCK(&stream->lock); + + if (buffer != BLADERF_STREAM_SHUTDOWN) { + if (stream->state != STREAM_RUNNING && timeout_ms != 0) { + status = populate_abs_timeout(&timeout_abs, timeout_ms); + if (status != 0) { + log_debug("Failed to populate timeout value\n"); + goto error; + } + } + + while (stream->state != STREAM_RUNNING) { + log_debug("Buffer submitted while stream's not running. " + "Waiting for stream to start.\n"); + + if (timeout_ms == 0) { + status = pthread_cond_wait(&stream->stream_started, + &stream->lock); + } else { + status = pthread_cond_timedwait(&stream->stream_started, + &stream->lock, &timeout_abs); + } + + if (status == ETIMEDOUT) { + status = BLADERF_ERR_TIMEOUT; + log_debug("%s: %u ms timeout expired", + __FUNCTION__, timeout_ms); + goto error; + } else if (status != 0) { + status = BLADERF_ERR_UNEXPECTED; + goto error; + } + } + } + + status = stream->dev->backend->submit_stream_buffer(stream, buffer, + length, timeout_ms, nonblock); + +error: + MUTEX_UNLOCK(&stream->lock); + return status; +} + +void async_deinit_stream(struct bladerf_stream *stream) +{ + size_t i; + + if (!stream) { + log_debug("%s called with NULL stream\n", __FUNCTION__); + return; + } + + while(stream->state != STREAM_DONE && stream->state != STREAM_IDLE) { + log_verbose( "Stream not done...\n" ); + usleep(1000000); + } + + /* Free up the backend data */ + stream->dev->backend->deinit_stream(stream); + + /* Free up the buffers */ + for (i = 0; i < stream->num_buffers; i++) { + free(stream->buffers[i]); + } + + /* Free up the pointer to the buffers */ + free(stream->buffers); + + /* Free up the stream itself */ + free(stream); +} + diff --git a/Radio/HW/BladeRF/src/streaming/async.h b/Radio/HW/BladeRF/src/streaming/async.h new file mode 100644 index 0000000..b0fd3b8 --- /dev/null +++ b/Radio/HW/BladeRF/src/streaming/async.h @@ -0,0 +1,111 @@ +/* + * This file is part of the bladeRF project: + * http://www.github.com/nuand/bladeRF + * + * Copyright (C) 2014 Nuand LLC + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef STREAMING_ASYNC_H_ +#define STREAMING_ASYNC_H_ + +#include <pthread.h> + +#include <libbladeRF.h> + +#include "thread.h" + +#include "format.h" + +typedef enum { + STREAM_IDLE, /* Idle and initialized */ + STREAM_RUNNING, /* Currently running */ + STREAM_SHUTTING_DOWN, /* Currently tearing down. + * See bladerf_stream->error_code to determine + * whether or not the shutdown was a clean exit + * or due to an error. */ + STREAM_DONE /* Done and deallocated */ +} bladerf_stream_state; + +struct bladerf_stream { + /* These items are configured in async_init_stream() and should only be + * read (NOT MODIFIED) during the execution of the stream */ + struct bladerf *dev; + bladerf_channel_layout layout; + bladerf_format format; + unsigned int transfer_timeout; + bladerf_stream_cb cb; + void *user_data; + size_t samples_per_buffer; + size_t num_buffers; + void **buffers; + + MUTEX lock; + + /* The following items must be accessed atomically */ + int error_code; + bladerf_stream_state state; + pthread_cond_t can_submit_buffer; + pthread_cond_t stream_started; + void *backend_data; +}; + +/* Get the number of bytes per stream buffer */ +static inline size_t async_stream_buf_bytes(struct bladerf_stream *s) +{ + if (s->format == BLADERF_FORMAT_PACKET_META) + return s->samples_per_buffer; + return samples_to_bytes(s->format, s->samples_per_buffer); +} + +int async_init_stream(struct bladerf_stream **stream, + struct bladerf *dev, + bladerf_stream_cb callback, + void ***buffers, + size_t num_buffers, + bladerf_format format, + size_t buffer_size, + size_t num_transfers, + void *user_data); + +/* Set the transfer timeout. This acquires stream->lock. */ +int async_set_transfer_timeout(struct bladerf_stream *stream, + unsigned int transfer_timeout_ms); + +/* Get the transfer timeout. This acquires stream->lock. */ +int async_get_transfer_timeout(struct bladerf_stream *stream, + unsigned int *transfer_timeout_ms); + +/* Backend code is responsible for acquiring stream->lock in their callbacks */ +int async_run_stream(struct bladerf_stream *stream, + bladerf_channel_layout layout); + + +/* This function WILL acquire stream->lock before calling backend code. + * + * If nonblock=true and no transfers are available, this function shall return + * BLADERF_ERR_WOULD_BLOCK. + */ +int async_submit_stream_buffer(struct bladerf_stream *stream, + void *buffer, + size_t *length, + unsigned int timeout_ms, + bool nonblock); + + +void async_deinit_stream(struct bladerf_stream *stream); + +#endif diff --git a/Radio/HW/BladeRF/src/streaming/format.h b/Radio/HW/BladeRF/src/streaming/format.h new file mode 100644 index 0000000..95cf5da --- /dev/null +++ b/Radio/HW/BladeRF/src/streaming/format.h @@ -0,0 +1,109 @@ +/* + * This file is part of the bladeRF project: + * http://www.github.com/nuand/bladeRF + * + * Copyright (C) 2014 Nuand LLC + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef STREAMING_FORMAT_H_ +#define STREAMING_FORMAT_H_ + +#include "rel_assert.h" + +/* + * Convert SC8Q8 samples to bytes + */ +static inline size_t sc8q7_to_bytes(size_t n_samples) +{ + const size_t sample_size = 2 * sizeof(int8_t); + assert(n_samples <= (SIZE_MAX / sample_size)); + return n_samples * sample_size; +} + +/* + * Convert bytes to SC8Q8 samples + */ +static inline size_t bytes_to_sc8q7(size_t n_bytes) +{ + const size_t sample_size = 2 * sizeof(int8_t); + assert((n_bytes % sample_size) == 0); + return n_bytes / sample_size; +} + +/* + * Convert SC16Q11 samples to bytes + */ +static inline size_t sc16q11_to_bytes(size_t n_samples) +{ + const size_t sample_size = 2 * sizeof(int16_t); + assert(n_samples <= (SIZE_MAX / sample_size)); + return n_samples * sample_size; +} + +/* + * Convert bytes to SC16Q11 samples + */ +static inline size_t bytes_to_sc16q11(size_t n_bytes) +{ + const size_t sample_size = 2 * sizeof(int16_t); + assert((n_bytes % sample_size) == 0); + return n_bytes / sample_size; +} + +/* Covert samples to bytes based upon the provided format */ +static inline size_t samples_to_bytes(bladerf_format format, size_t n) +{ + switch (format) { + case BLADERF_FORMAT_SC8_Q7: + case BLADERF_FORMAT_SC8_Q7_META: + return sc8q7_to_bytes(n); + + case BLADERF_FORMAT_SC16_Q11: + case BLADERF_FORMAT_SC16_Q11_META: + return sc16q11_to_bytes(n); + + case BLADERF_FORMAT_PACKET_META: + return n*4; + + default: + assert(!"Invalid format"); + return 0; + } +} + +/* Convert bytes to samples based upon the provided format */ +static inline size_t bytes_to_samples(bladerf_format format, size_t n) +{ + switch (format) { + case BLADERF_FORMAT_SC8_Q7: + case BLADERF_FORMAT_SC8_Q7_META: + return bytes_to_sc8q7(n); + + case BLADERF_FORMAT_SC16_Q11: + case BLADERF_FORMAT_SC16_Q11_META: + return bytes_to_sc16q11(n); + + case BLADERF_FORMAT_PACKET_META: + return (n+3)/4; + + default: + assert(!"Invalid format"); + return 0; + } +} + +#endif diff --git a/Radio/HW/BladeRF/src/streaming/metadata.h b/Radio/HW/BladeRF/src/streaming/metadata.h new file mode 100644 index 0000000..e268559 --- /dev/null +++ b/Radio/HW/BladeRF/src/streaming/metadata.h @@ -0,0 +1,180 @@ +/* + * Copyright (C) 2014 Nuand LLC + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef STREAMING_METADATA_H_ +#define STREAMING_METADATA_H_ + +/* + * Metadata layout + * ~~~~~~~~~~~~~~~~~~~~~~~ + * + * The FPGA handles data in units of "messages." These messages are + * 1024 or 2048 bytes for USB 2.0 (Hi-Speed) or USB 3.0 (SuperSpeed), + * respectively. + * + * The first 16 bytes of the message form a header, which includes metadata + * for the samples within the message. This header is shown below: + * + * +-----------------+ + * 0x00 | Packet length | 2 bytes, Little-endian uint16_t + * +-----------------+ + * 0x02 | Packet flags | 1 byte + * +-----------------+ + * 0x03 | Packet core ID | 1 byte + * +-----------------+ + * 0x04 | Timestamp | 8 bytes, Little-endian uint64_t + * +-----------------+ + * 0x0c | Flags | 4 bytes, Little-endian uint32_t + * +-----------------+ + * + * The term "buffer" is used to describe a block of of data received from or + * sent to the device. The size of a "buffer" (in bytes) is always a multiple + * of the size of a "message." Said another way, a buffer will always evenly + * divide into multiple messages. Messages are *not* fragmented across + * consecutive buffers. + * + * +-----------------+ <-. <-. + * | header | | | + * +-----------------+ | | + * | | | | + * | samples | | | + * | | | | + * +-----------------+ | <-+---- message + * | header | | + * +-----------------+ | + * | | | + * | samples | | + * | | | + * +-----------------+ | + * | header | | + * +-----------------+ | + * | | | + * | samples | | + * | | | + * +-----------------+ | + * | header | | + * +-----------------+ | + * | | | + * | samples | | + * | | | + * +-----------------+ <-+---------- buffer + * + * + * When intentionally transmitting discontinuous groups of samples (such + * as bursts), it is important that the last two samples within a message + * be (0 + 0j). Otherwise, the DAC will not properly hold its output + * at (0 + 0j) for the duration of the discontinuity. + */ + +/* Components of the metadata header */ +#define METADATA_RESV_SIZE (sizeof(uint32_t)) +#define METADATA_TIMESTAMP_SIZE (sizeof(uint64_t)) +#define METADATA_FLAGS_SIZE (sizeof(uint32_t)) +#define METADATA_PACKET_LEN_SIZE (sizeof(uint16_t)) +#define METADATA_PACKET_CORE_SIZE (sizeof(uint8_t)) +#define METADATA_PACKET_FLAGS_SIZE (sizeof(uint8_t)) + +#define METADATA_RESV_OFFSET 0 +#define METADATA_PACKET_LEN_OFFSET 0 +#define METADATA_PACKET_FLAGS_OFFSET 2 +#define METADATA_PACKET_CORE_OFFSET 3 +#define METADATA_TIMESTAMP_OFFSET (METADATA_RESV_SIZE) +#define METADATA_FLAGS_OFFSET \ + (METADATA_TIMESTAMP_OFFSET + METADATA_TIMESTAMP_SIZE) + +#define METADATA_HEADER_SIZE (METADATA_FLAGS_OFFSET + METADATA_FLAGS_SIZE) + +static inline uint64_t metadata_get_timestamp(const uint8_t *header) +{ + uint64_t ret; + assert(sizeof(ret) == METADATA_TIMESTAMP_SIZE); + memcpy(&ret, &header[METADATA_TIMESTAMP_OFFSET], METADATA_TIMESTAMP_SIZE); + + ret = LE64_TO_HOST(ret); + + return ret; +} + +static inline uint32_t metadata_get_flags(const uint8_t *header) +{ + uint32_t ret; + assert(sizeof(ret) == METADATA_FLAGS_SIZE); + memcpy(&ret, &header[METADATA_FLAGS_OFFSET], METADATA_FLAGS_SIZE); + return LE32_TO_HOST(ret); +} + +static inline uint16_t metadata_get_packet_len(const uint8_t *header) +{ + uint16_t ret; + assert(sizeof(ret) == METADATA_PACKET_LEN_SIZE); + memcpy(&ret, &header[METADATA_PACKET_LEN_OFFSET], METADATA_PACKET_LEN_SIZE); + return LE16_TO_HOST(ret); +} + +static inline uint8_t metadata_get_packet_core(const uint8_t *header) +{ + uint8_t ret; + assert(sizeof(ret) == METADATA_PACKET_CORE_SIZE); + memcpy(&ret, &header[METADATA_PACKET_CORE_OFFSET], METADATA_PACKET_CORE_SIZE); + return ret; +} + +static inline uint8_t metadata_get_packet_flags(const uint8_t *header) +{ + uint8_t ret; + assert(sizeof(ret) == METADATA_PACKET_FLAGS_SIZE); + memcpy(&ret, &header[METADATA_PACKET_FLAGS_OFFSET], METADATA_PACKET_FLAGS_SIZE); + return ret; +} + +static inline void metadata_set_packet(uint8_t *header, + uint64_t timestamp, + uint32_t flags, + uint16_t length, + uint8_t core, + uint8_t pkt_flags) +{ + timestamp = HOST_TO_LE64(timestamp); + + flags = HOST_TO_LE32(flags); + + length = HOST_TO_LE16(length); + + assert(sizeof(timestamp) == METADATA_TIMESTAMP_SIZE); + assert(sizeof(flags) == METADATA_FLAGS_SIZE); + + memset(&header[METADATA_RESV_OFFSET], 0, METADATA_RESV_SIZE); + + memcpy(&header[METADATA_PACKET_LEN_OFFSET], &length, METADATA_PACKET_LEN_SIZE); + memcpy(&header[METADATA_PACKET_CORE_OFFSET], &core, METADATA_PACKET_CORE_SIZE); + memcpy(&header[METADATA_PACKET_FLAGS_OFFSET], &pkt_flags, METADATA_PACKET_FLAGS_SIZE); + + memcpy(&header[METADATA_TIMESTAMP_OFFSET], ×tamp, + METADATA_TIMESTAMP_SIZE); + + memcpy(&header[METADATA_FLAGS_OFFSET], &flags, METADATA_FLAGS_SIZE); +} + +static inline void metadata_set(uint8_t *header, + uint64_t timestamp, + uint32_t flags) +{ + metadata_set_packet(header, timestamp, flags, 0, 0, 0); +} + +#endif diff --git a/Radio/HW/BladeRF/src/streaming/sync.c b/Radio/HW/BladeRF/src/streaming/sync.c new file mode 100644 index 0000000..feef101 --- /dev/null +++ b/Radio/HW/BladeRF/src/streaming/sync.c @@ -0,0 +1,1339 @@ +/* + * Copyright (C) 2014-2015 Nuand LLC + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include <string.h> +#include <errno.h> +#include <inttypes.h> + +/* Only switch on the verbose debug prints in this file when we *really* want + * them. Otherwise, compile them out to avoid excessive log level checks + * in our data path */ +#include "log.h" +#ifndef ENABLE_LIBBLADERF_SYNC_LOG_VERBOSE +#undef log_verbose +#define log_verbose(...) +#endif +#include "minmax.h" +#include "rel_assert.h" + +#include "async.h" +#include "sync.h" +#include "sync_worker.h" +#include "metadata.h" + +#include "board/board.h" +#include "helpers/timeout.h" +#include "helpers/have_cap.h" + +#ifdef ENABLE_LIBBLADERF_SYNC_LOG_VERBOSE +static inline void dump_buf_states(struct bladerf_sync *s) +{ + static char *out = NULL; + struct buffer_mgmt *b = &s->buf_mgmt; + char *statestr = "UNKNOWN"; + + if (out == NULL) { + out = malloc((b->num_buffers + 1) * sizeof(char)); + } + + if (out == NULL) { + log_verbose("%s: malloc failed\n"); + return; + } + + out[b->num_buffers] = '\0'; + + for (size_t i = 0; i < b->num_buffers; ++i) { + switch (b->status[i]) { + case SYNC_BUFFER_EMPTY: + out[i] = '_'; + break; + case SYNC_BUFFER_IN_FLIGHT: + out[i] = '-'; + break; + case SYNC_BUFFER_FULL: + out[i] = '*'; + break; + case SYNC_BUFFER_PARTIAL: + out[i] = 'o'; + break; + } + } + + switch (s->state) { + case SYNC_STATE_BUFFER_READY: + statestr = "BUFFER_READY"; + break; + case SYNC_STATE_CHECK_WORKER: + statestr = "CHECK_WORKER"; + break; + case SYNC_STATE_RESET_BUF_MGMT: + statestr = "RESET_BUF_MGMT"; + break; + case SYNC_STATE_START_WORKER: + statestr = "START_WORKER"; + break; + case SYNC_STATE_USING_BUFFER: + statestr = "USING_BUFFER"; + break; + case SYNC_STATE_USING_BUFFER_META: + statestr = "USING_BUFFER_META"; + break; + case SYNC_STATE_USING_PACKET_META: + statestr = "USING_PACKET_META"; + break; + case SYNC_STATE_WAIT_FOR_BUFFER: + statestr = "WAIT_FOR_BUFFER"; + break; + } + + log_verbose("%s: %s (%s)\n", __FUNCTION__, out, statestr); +} +#else +#define dump_buf_states(...) +#endif // ENABLE_LIBBLADERF_SYNC_LOG_VERBOSE + +static inline size_t samples2bytes(struct bladerf_sync *s, size_t n) { + return s->stream_config.bytes_per_sample * n; +} + +static inline unsigned int msg_per_buf(size_t msg_size, size_t buf_size, + size_t bytes_per_sample) +{ + size_t n = buf_size / (msg_size / bytes_per_sample); + assert(n <= UINT_MAX); + return (unsigned int) n; +} + +static inline unsigned int samples_per_msg(size_t msg_size, + size_t bytes_per_sample) +{ + size_t n = (msg_size - METADATA_HEADER_SIZE) / bytes_per_sample; + assert(n <= UINT_MAX); + return (unsigned int) n; +} + +int sync_init(struct bladerf_sync *sync, + struct bladerf *dev, + bladerf_channel_layout layout, + bladerf_format format, + unsigned int num_buffers, + size_t buffer_size, + size_t msg_size, + unsigned int num_transfers, + unsigned int stream_timeout) + +{ + int status = 0; + size_t i, bytes_per_sample; + + if (num_transfers >= num_buffers) { + return BLADERF_ERR_INVAL; + } + + if (format == BLADERF_FORMAT_PACKET_META) { + if (!have_cap_dev(dev, BLADERF_CAP_FW_SHORT_PACKET)) { + log_error("Firmware does not support short packets. " + "Upgrade to at least firmware version 2.4.0.\n"); + return BLADERF_ERR_UNSUPPORTED; + } + + if (!have_cap_dev(dev, BLADERF_CAP_FPGA_PACKET_META)) { + log_error("FPGA does not support packet meta format. " + "Upgrade to at least FPGA version 0.12.0.\n"); + return BLADERF_ERR_UNSUPPORTED; + } + } + + if (format == BLADERF_FORMAT_SC8_Q7 || format == BLADERF_FORMAT_SC8_Q7_META) { + if (!have_cap_dev(dev, BLADERF_CAP_FPGA_8BIT_SAMPLES)) { + log_error("FPGA does not support 8bit mode. " + "Upgrade to at least FPGA version 0.15.0.\n"); + return BLADERF_ERR_UNSUPPORTED; + } + } + + switch (format) { + case BLADERF_FORMAT_SC8_Q7: + case BLADERF_FORMAT_SC8_Q7_META: + bytes_per_sample = 2; + break; + + case BLADERF_FORMAT_SC16_Q11: + case BLADERF_FORMAT_SC16_Q11_META: + case BLADERF_FORMAT_PACKET_META: + bytes_per_sample = 4; + break; + + default: + log_debug("Invalid format value: %d\n", format); + return BLADERF_ERR_INVAL; + } + + /* bladeRF GPIF DMA requirement */ + if ((bytes_per_sample * buffer_size) % 4096 != 0) { + assert(!"Invalid buffer size"); + return BLADERF_ERR_INVAL; + } + + /* Deinitialize sync handle if it's initialized */ + sync_deinit(sync); + + MUTEX_INIT(&sync->lock); + + switch (layout & BLADERF_DIRECTION_MASK) { + case BLADERF_TX: + sync->buf_mgmt.submitter = SYNC_TX_SUBMITTER_FN; + break; + case BLADERF_RX: + sync->buf_mgmt.submitter = SYNC_TX_SUBMITTER_INVALID; + break; + } + + sync->dev = dev; + sync->state = SYNC_STATE_CHECK_WORKER; + + sync->buf_mgmt.num_buffers = num_buffers; + sync->buf_mgmt.resubmit_count = 0; + + sync->stream_config.layout = layout; + sync->stream_config.format = format; + sync->stream_config.samples_per_buffer = (unsigned int)buffer_size; + sync->stream_config.num_xfers = num_transfers; + sync->stream_config.timeout_ms = stream_timeout; + sync->stream_config.bytes_per_sample = bytes_per_sample; + + sync->meta.state = SYNC_META_STATE_HEADER; + sync->meta.msg_size = msg_size; + sync->meta.msg_per_buf = msg_per_buf(msg_size, buffer_size, bytes_per_sample); + sync->meta.samples_per_msg = samples_per_msg(msg_size, bytes_per_sample); + sync->meta.samples_per_ts = (layout == BLADERF_RX_X2 || layout == BLADERF_TX_X2) ? 2:1; + + log_verbose("%s: Buffer size (in bytes): %u\n", + __FUNCTION__, buffer_size * bytes_per_sample); + + log_verbose("%s: Buffer size (in samples): %u\n", + __FUNCTION__, buffer_size); + + log_verbose("%s: Msg per buffer: %u\n", + __FUNCTION__, sync->meta.msg_per_buf); + + log_verbose("%s: Samples per msg: %u\n", + __FUNCTION__, sync->meta.samples_per_msg); + + MUTEX_INIT(&sync->buf_mgmt.lock); + pthread_cond_init(&sync->buf_mgmt.buf_ready, NULL); + + sync->buf_mgmt.status = (sync_buffer_status*) malloc(num_buffers * sizeof(sync_buffer_status)); + if (sync->buf_mgmt.status == NULL) { + status = BLADERF_ERR_MEM; + goto error; + } + + sync->buf_mgmt.actual_lengths = (size_t *) malloc(num_buffers * sizeof(size_t)); + if (sync->buf_mgmt.actual_lengths == NULL) { + status = BLADERF_ERR_MEM; + goto error; + } + + switch (layout & BLADERF_DIRECTION_MASK) { + case BLADERF_RX: + /* When starting up an RX stream, the first 'num_transfers' + * transfers will be submitted to the USB layer to grab data */ + sync->buf_mgmt.prod_i = num_transfers; + sync->buf_mgmt.cons_i = 0; + sync->buf_mgmt.partial_off = 0; + + for (i = 0; i < num_buffers; i++) { + if (i < num_transfers) { + sync->buf_mgmt.status[i] = SYNC_BUFFER_IN_FLIGHT; + } else { + sync->buf_mgmt.status[i] = SYNC_BUFFER_EMPTY; + } + } + + sync->meta.msg_timestamp = 0; + sync->meta.msg_flags = 0; + + break; + + case BLADERF_TX: + sync->buf_mgmt.prod_i = 0; + sync->buf_mgmt.cons_i = BUFFER_MGMT_INVALID_INDEX; + sync->buf_mgmt.partial_off = 0; + + for (i = 0; i < num_buffers; i++) { + sync->buf_mgmt.status[i] = SYNC_BUFFER_EMPTY; + } + + sync->meta.msg_timestamp = 0; + sync->meta.in_burst = false; + sync->meta.now = false; + break; + } + + status = sync_worker_init(sync); + if (status < 0) { + goto error; + } + + sync->initialized = true; + + return 0; + +error: + sync_deinit(sync); + return status; +} + +void sync_deinit(struct bladerf_sync *sync) +{ + if (sync->initialized) { + if ((sync->stream_config.layout & BLADERF_DIRECTION_MASK) == BLADERF_TX) { + async_submit_stream_buffer(sync->worker->stream, + BLADERF_STREAM_SHUTDOWN, NULL, 0, false); + } + + sync_worker_deinit(sync->worker, &sync->buf_mgmt.lock, + &sync->buf_mgmt.buf_ready); + + if (sync->buf_mgmt.actual_lengths) { + free(sync->buf_mgmt.actual_lengths); + } + /* De-allocate our buffer management resources */ + if (sync->buf_mgmt.status) { + MUTEX_DESTROY(&sync->buf_mgmt.lock); + free(sync->buf_mgmt.status); + } + + MUTEX_DESTROY(&sync->lock); + + sync->initialized = false; + } +} + +static int wait_for_buffer(struct buffer_mgmt *b, + unsigned int timeout_ms, + const char *dbg_name, + unsigned int dbg_idx) +{ + int status; + struct timespec timeout; + + if (timeout_ms == 0) { + log_verbose("%s: Infinite wait for buffer[%d] (status: %d).\n", + dbg_name, dbg_idx, b->status[dbg_idx]); + status = pthread_cond_wait(&b->buf_ready, &b->lock); + } else { + log_verbose("%s: Timed wait for buffer[%d] (status: %d).\n", dbg_name, + dbg_idx, b->status[dbg_idx]); + status = populate_abs_timeout(&timeout, timeout_ms); + if (status == 0) { + status = pthread_cond_timedwait(&b->buf_ready, &b->lock, &timeout); + } + } + + if (status == ETIMEDOUT) { + log_error("%s: Timed out waiting for buf_ready after %d ms\n", + __FUNCTION__, timeout_ms); + status = BLADERF_ERR_TIMEOUT; + } else if (status != 0) { + status = BLADERF_ERR_UNEXPECTED; + } + + return status; +} + +#ifndef SYNC_WORKER_START_TIMEOUT_MS +# define SYNC_WORKER_START_TIMEOUT_MS 250 +#endif + +/* Returns # of timestamps (or time steps) left in a message */ +static inline unsigned int ts_remaining(struct bladerf_sync *s) +{ + size_t ret = s->meta.samples_per_msg / s->meta.samples_per_ts - s->meta.curr_msg_off; + assert(ret <= UINT_MAX); + + return (unsigned int) ret; +} + +/* Returns # of samples left in a message (SC16Q11 mode only) */ +static inline unsigned int left_in_msg(struct bladerf_sync *s) +{ + size_t ret = s->meta.samples_per_msg - s->meta.curr_msg_off; + assert(ret <= UINT_MAX); + + return (unsigned int) ret; +} + +static inline void advance_rx_buffer(struct buffer_mgmt *b) +{ + log_verbose("%s: Marking buf[%u] empty.\n", __FUNCTION__, b->cons_i); + + b->status[b->cons_i] = SYNC_BUFFER_EMPTY; + b->cons_i = (b->cons_i + 1) % b->num_buffers; +} + +static inline unsigned int timestamp_to_msg(struct bladerf_sync *s, uint64_t t) +{ + uint64_t m = t / s->meta.samples_per_msg; + assert(m <= UINT_MAX); + return (unsigned int) m; +} + +int sync_rx(struct bladerf_sync *s, void *samples, unsigned num_samples, + struct bladerf_metadata *user_meta, unsigned int timeout_ms) +{ + struct buffer_mgmt *b; + + int status = 0; + bool exit_early = false; + bool copied_data = false; + unsigned int samples_returned = 0; + uint8_t *samples_dest = (uint8_t*)samples; + uint8_t *buf_src = NULL; + unsigned int samples_to_copy = 0; + unsigned int samples_per_buffer = 0; + uint64_t target_timestamp = UINT64_MAX; + unsigned int pkt_len_dwords = 0; + + if (s == NULL || samples == NULL) { + log_debug("NULL pointer passed to %s\n", __FUNCTION__); + return BLADERF_ERR_INVAL; + } else if (!s->initialized) { + return BLADERF_ERR_INVAL; + } + + if (num_samples % s->meta.samples_per_ts != 0) { + log_debug("%s: %u samples %% %u channels != 0\n", + __FUNCTION__, num_samples, s->meta.samples_per_ts); + return BLADERF_ERR_INVAL; + } + + MUTEX_LOCK(&s->lock); + + if (s->stream_config.format == BLADERF_FORMAT_SC16_Q11_META || + s->stream_config.format == BLADERF_FORMAT_SC8_Q7_META || + s->stream_config.format == BLADERF_FORMAT_PACKET_META) { + if (user_meta == NULL) { + log_debug("NULL metadata pointer passed to %s\n", __FUNCTION__); + status = BLADERF_ERR_INVAL; + goto out; + } else { + user_meta->status = 0; + target_timestamp = user_meta->timestamp; + } + } + + b = &s->buf_mgmt; + samples_per_buffer = s->stream_config.samples_per_buffer; + + log_verbose("%s: Requests %u samples.\n", __FUNCTION__, num_samples); + + while (!exit_early && samples_returned < num_samples && status == 0) { + dump_buf_states(s); + + switch (s->state) { + case SYNC_STATE_CHECK_WORKER: { + int stream_error; + sync_worker_state worker_state = + sync_worker_get_state(s->worker, &stream_error); + + /* Propagate stream error back to the caller. + * They can call this function again to restart the stream and + * try again. + */ + if (stream_error != 0) { + status = stream_error; + } else { + if (worker_state == SYNC_WORKER_STATE_IDLE) { + log_debug("%s: Worker is idle. Going to reset buf " + "mgmt.\n", __FUNCTION__); + s->state = SYNC_STATE_RESET_BUF_MGMT; + } else if (worker_state == SYNC_WORKER_STATE_RUNNING) { + s->state = SYNC_STATE_WAIT_FOR_BUFFER; + } else { + status = BLADERF_ERR_UNEXPECTED; + log_debug("%s: Unexpected worker state=%d\n", + __FUNCTION__, worker_state); + } + } + + break; + } + + case SYNC_STATE_RESET_BUF_MGMT: + MUTEX_LOCK(&b->lock); + /* When the RX stream starts up, it will submit the first T + * transfers, so the consumer index must be reset to 0 */ + b->cons_i = 0; + MUTEX_UNLOCK(&b->lock); + log_debug("%s: Reset buf_mgmt consumer index\n", __FUNCTION__); + s->state = SYNC_STATE_START_WORKER; + break; + + + case SYNC_STATE_START_WORKER: + sync_worker_submit_request(s->worker, SYNC_WORKER_START); + + status = sync_worker_wait_for_state( + s->worker, + SYNC_WORKER_STATE_RUNNING, + SYNC_WORKER_START_TIMEOUT_MS); + + if (status == 0) { + s->state = SYNC_STATE_WAIT_FOR_BUFFER; + log_debug("%s: Worker is now running.\n", __FUNCTION__); + } else { + log_debug("%s: Failed to start worker, (%d)\n", + __FUNCTION__, status); + } + break; + + case SYNC_STATE_WAIT_FOR_BUFFER: + MUTEX_LOCK(&b->lock); + + /* Check the buffer state, as the worker may have produced one + * since we last queried the status */ + if (b->status[b->cons_i] == SYNC_BUFFER_FULL) { + s->state = SYNC_STATE_BUFFER_READY; + log_verbose("%s: buffer %u is ready to consume\n", + __FUNCTION__, b->cons_i); + } else { + status = wait_for_buffer(b, timeout_ms, + __FUNCTION__, b->cons_i); + + if (status == 0) { + if (b->status[b->cons_i] != SYNC_BUFFER_FULL) { + s->state = SYNC_STATE_CHECK_WORKER; + } else { + s->state = SYNC_STATE_BUFFER_READY; + log_verbose("%s: buffer %u is ready to consume\n", + __FUNCTION__, b->cons_i); + } + } + } + + MUTEX_UNLOCK(&b->lock); + break; + + case SYNC_STATE_BUFFER_READY: + MUTEX_LOCK(&b->lock); + b->status[b->cons_i] = SYNC_BUFFER_PARTIAL; + b->partial_off = 0; + + switch (s->stream_config.format) { + case BLADERF_FORMAT_SC16_Q11: + case BLADERF_FORMAT_SC8_Q7: + s->state = SYNC_STATE_USING_BUFFER; + break; + + case BLADERF_FORMAT_SC16_Q11_META: + case BLADERF_FORMAT_SC8_Q7_META: + s->state = SYNC_STATE_USING_BUFFER_META; + s->meta.curr_msg_off = 0; + s->meta.msg_num = 0; + break; + + case BLADERF_FORMAT_PACKET_META: + s->state = SYNC_STATE_USING_PACKET_META; + break; + + default: + assert(!"Invalid stream format"); + status = BLADERF_ERR_UNEXPECTED; + } + + MUTEX_UNLOCK(&b->lock); + break; + + case SYNC_STATE_USING_BUFFER: /* SC16Q11 buffers w/o metadata */ + MUTEX_LOCK(&b->lock); + + buf_src = (uint8_t*)b->buffers[b->cons_i]; + + samples_to_copy = uint_min(num_samples - samples_returned, + samples_per_buffer - b->partial_off); + + memcpy(samples_dest + samples2bytes(s, samples_returned), + buf_src + samples2bytes(s, b->partial_off), + samples2bytes(s, samples_to_copy)); + + b->partial_off += samples_to_copy; + samples_returned += samples_to_copy; + + log_verbose("%s: Provided %u samples to caller\n", + __FUNCTION__, samples_to_copy); + + /* We've finished consuming this buffer and can start looking + * for available samples in the next buffer */ + if (b->partial_off >= samples_per_buffer) { + + /* Check for symptom of out-of-bounds accesses */ + assert(b->partial_off == samples_per_buffer); + + advance_rx_buffer(b); + s->state = SYNC_STATE_WAIT_FOR_BUFFER; + } + + MUTEX_UNLOCK(&b->lock); + break; + + + case SYNC_STATE_USING_BUFFER_META: /* SC16Q11 buffers w/ metadata */ + MUTEX_LOCK(&b->lock); + + switch (s->meta.state) { + case SYNC_META_STATE_HEADER: + + assert(s->meta.msg_num < s->meta.msg_per_buf); + + buf_src = (uint8_t*)b->buffers[b->cons_i]; + + s->meta.curr_msg = + buf_src + s->meta.msg_size * s->meta.msg_num; + + s->meta.msg_timestamp = + metadata_get_timestamp(s->meta.curr_msg); + + s->meta.msg_flags = + metadata_get_flags(s->meta.curr_msg); + + user_meta->status |= s->meta.msg_flags & + (BLADERF_META_FLAG_RX_HW_UNDERFLOW | + BLADERF_META_FLAG_RX_HW_MINIEXP1 | + BLADERF_META_FLAG_RX_HW_MINIEXP2); + + s->meta.curr_msg_off = 0; + + /* We've encountered a discontinuity and need to return + * what we have so far, setting the status flags */ + if (copied_data && + s->meta.msg_timestamp != s->meta.curr_timestamp) { + + user_meta->status |= BLADERF_META_STATUS_OVERRUN; + exit_early = true; + log_debug("Sample discontinuity detected @ " + "buffer %u, message %u: Expected t=%llu, " + "got t=%llu\n", + b->cons_i, s->meta.msg_num, + (unsigned long long)s->meta.curr_timestamp, + (unsigned long long)s->meta.msg_timestamp); + + } else { + log_verbose("Got header for message %u: " + "t_new=%u, t_old=%u\n", + s->meta.msg_num, + s->meta.msg_timestamp, + s->meta.curr_timestamp); + } + + s->meta.curr_timestamp = s->meta.msg_timestamp; + s->meta.state = SYNC_META_STATE_SAMPLES; + break; + + case SYNC_META_STATE_SAMPLES: + if (!copied_data && + (user_meta->flags & BLADERF_META_FLAG_RX_NOW) == 0 && + target_timestamp < s->meta.curr_timestamp) { + + log_debug("Current timestamp is %llu, " + "target=%llu (user=%llu)\n", + (unsigned long long)s->meta.curr_timestamp, + (unsigned long long)target_timestamp, + (unsigned long long)user_meta->timestamp); + + status = BLADERF_ERR_TIME_PAST; + } else if ((user_meta->flags & BLADERF_META_FLAG_RX_NOW) || + target_timestamp == s->meta.curr_timestamp) { + + /* Copy the request amount up to the end of a + * this message in the current buffer */ + samples_to_copy = + uint_min(num_samples - samples_returned, + left_in_msg(s)); + + memcpy(samples_dest + samples2bytes(s, samples_returned), + s->meta.curr_msg + + METADATA_HEADER_SIZE + + samples2bytes(s, s->meta.curr_msg_off), + samples2bytes(s, samples_to_copy)); + + samples_returned += samples_to_copy; + s->meta.curr_msg_off += samples_to_copy; + + if (!copied_data && + (user_meta->flags & BLADERF_META_FLAG_RX_NOW)) { + + /* Provide the user with the timestamp at the + * first returned sample when the + * NOW flag has been provided */ + user_meta->timestamp = s->meta.curr_timestamp; + log_verbose("Updated user meta timestamp with: " + "%llu\n", (unsigned long long) + user_meta->timestamp); + } + + copied_data = true; + + s->meta.curr_timestamp += samples_to_copy / s->meta.samples_per_ts; + + /* We've begun copying samples, so our target will + * just keep tracking the current timestamp. */ + target_timestamp = s->meta.curr_timestamp; + + log_verbose("After copying samples, t=%llu\n", + (unsigned long long)s->meta.curr_timestamp); + + if (left_in_msg(s) == 0) { + assert(s->meta.curr_msg_off == s->meta.samples_per_msg); + + s->meta.state = SYNC_META_STATE_HEADER; + s->meta.msg_num++; + + if (s->meta.msg_num >= s->meta.msg_per_buf) { + assert(s->meta.msg_num == s->meta.msg_per_buf); + advance_rx_buffer(b); + s->meta.msg_num = 0; + s->state = SYNC_STATE_WAIT_FOR_BUFFER; + } + } + + } else { + const uint64_t time_delta = target_timestamp - s->meta.curr_timestamp; + uint64_t samples_left = time_delta * s->meta.samples_per_ts; + uint64_t left_in_buffer = + (uint64_t) s->meta.samples_per_msg * + (s->meta.msg_per_buf - s->meta.msg_num); + + /* Account for current position in buffer */ + left_in_buffer -= s->meta.curr_msg_off; + + if (samples_left >= left_in_buffer) { + /* Discard the remainder of this buffer */ + advance_rx_buffer(b); + s->state = SYNC_STATE_WAIT_FOR_BUFFER; + s->meta.state = SYNC_META_STATE_HEADER; + + log_verbose("%s: Discarding rest of buffer.\n", + __FUNCTION__); + + } else if (time_delta <= ts_remaining(s)) { + /* Fast forward within the current message */ + assert(time_delta <= SIZE_MAX); + + s->meta.curr_msg_off += (size_t)samples_left; + s->meta.curr_timestamp += time_delta; + + log_verbose("%s: Seeking within message (t=%llu)\n", + __FUNCTION__, + s->meta.curr_timestamp); + } else { + s->meta.state = SYNC_META_STATE_HEADER; + s->meta.msg_num += timestamp_to_msg(s, samples_left); + + log_verbose("%s: Seeking to message %u.\n", + __FUNCTION__, s->meta.msg_num); + } + } + break; + + default: + assert(!"Invalid state"); + status = BLADERF_ERR_UNEXPECTED; + } + + MUTEX_UNLOCK(&b->lock); + break; + + case SYNC_STATE_USING_PACKET_META: /* Packet buffers w/ metadata */ + MUTEX_LOCK(&b->lock); + + buf_src = (uint8_t*)b->buffers[b->cons_i]; + + pkt_len_dwords = metadata_get_packet_len(buf_src); + + if (pkt_len_dwords > 0) { + samples_returned += num_samples; + user_meta->actual_count = pkt_len_dwords; + memcpy(samples_dest, buf_src + METADATA_HEADER_SIZE, samples2bytes(s, pkt_len_dwords)); + } + + advance_rx_buffer(b); + s->state = SYNC_STATE_WAIT_FOR_BUFFER; + MUTEX_UNLOCK(&b->lock); + break; + + + } + } + + if (user_meta && s->stream_config.format != BLADERF_FORMAT_PACKET_META) { + user_meta->actual_count = samples_returned; + } + +out: + MUTEX_UNLOCK(&s->lock); + + return status; +} + +/* Assumes buffer lock is held */ +static int advance_tx_buffer(struct bladerf_sync *s, struct buffer_mgmt *b) +{ + int status = 0; + const unsigned int idx = b->prod_i; + + if (b->submitter == SYNC_TX_SUBMITTER_FN) { + /* Mark buffer in flight because we're going to send it out. + * This ensures that if the callback fires before this function + * completes, its state will be correct. */ + b->status[idx] = SYNC_BUFFER_IN_FLIGHT; + + /* This call may block and it results in a per-stream lock being held, + * so the buffer lock must be dropped. + * + * A callback may occur in the meantime, but this will not touch the + * status for this this buffer, or the producer index. + */ + MUTEX_UNLOCK(&b->lock); + size_t len; + if (s->stream_config.format == BLADERF_FORMAT_PACKET_META) { + len = b->actual_lengths[idx]; + } else { + len = async_stream_buf_bytes(s->worker->stream); + } + status = async_submit_stream_buffer(s->worker->stream, + b->buffers[idx], + &len, + s->stream_config.timeout_ms, + true); + MUTEX_LOCK(&b->lock); + + if (status == 0) { + log_verbose("%s: buf[%u] submitted.\n", + __FUNCTION__, idx); + + } else if (status == BLADERF_ERR_WOULD_BLOCK) { + log_verbose("%s: Deferring buf[%u] submission to worker callback.\n", + __FUNCTION__, idx); + + /* Mark this buffer as being full of data, but not in flight */ + b->status[idx] = SYNC_BUFFER_FULL; + + /* Assign callback the duty of submitting deferred buffers, + * and use buffer_mgmt.cons_i to denote which it should submit + * (i.e., consume). */ + b->submitter = SYNC_TX_SUBMITTER_CALLBACK; + b->cons_i = idx; + + /* This is expected and we are handling it. Don't propagate this + * status back up */ + status = 0; + } else { + /* Unmark this as being in flight */ + b->status[idx] = SYNC_BUFFER_FULL; + + log_debug("%s: Failed to submit buf[%u].\n", __FUNCTION__, idx); + return status; + } + } else { + /* We are not submitting this buffer; this is deffered to the worker + * call back. Just update its state to being full of samples. */ + b->status[idx] = SYNC_BUFFER_FULL; + } + + /* Advance "producer" insertion index. */ + b->prod_i = (idx + 1) % b->num_buffers; + + /* Determine our next state based upon the state of the next buffer we + * want to use. */ + if (b->status[b->prod_i] == SYNC_BUFFER_EMPTY) { + /* Buffer is empty and ready for use */ + s->state = SYNC_STATE_BUFFER_READY; + } else { + /* We'll have to wait on this buffer to become ready. First, we'll + * verify that the worker is running. */ + s->state = SYNC_STATE_CHECK_WORKER; + } + + return status; +} + +static inline bool timestamp_in_past(struct bladerf_metadata *user_meta, + struct bladerf_sync *s) +{ + const bool in_past = user_meta->timestamp < s->meta.curr_timestamp; + + if (in_past) { + log_debug("Provided timestamp=%"PRIu64" is in past: current=%"PRIu64"\n", + user_meta->timestamp, s->meta.curr_timestamp); + } + + return in_past; +} + +struct tx_options { + bool flush; + bool zero_pad; +}; + +static inline int handle_tx_parameters(struct bladerf_metadata *user_meta, + struct bladerf_sync *s, + struct tx_options *options) +{ + if (s->stream_config.format == BLADERF_FORMAT_SC16_Q11_META) { + if (user_meta == NULL) { + log_debug("NULL metadata pointer passed to %s\n", __FUNCTION__); + return BLADERF_ERR_INVAL; + } + + if (user_meta->flags & BLADERF_META_FLAG_TX_BURST_START) { + bool now = user_meta->flags & BLADERF_META_FLAG_TX_NOW; + + if (s->meta.in_burst) { + log_debug("%s: BURST_START provided while already in a burst.\n", + __FUNCTION__); + return BLADERF_ERR_INVAL; + } else if (!now && timestamp_in_past(user_meta, s)) { + return BLADERF_ERR_TIME_PAST; + } + + s->meta.in_burst = true; + if (now) { + s->meta.now = true; + log_verbose("%s: Starting burst \"now\"\n", __FUNCTION__); + } else { + s->meta.curr_timestamp = user_meta->timestamp; + log_verbose("%s: Starting burst @ %llu\n", __FUNCTION__, + (unsigned long long)s->meta.curr_timestamp); + } + + if (user_meta->flags & BLADERF_META_FLAG_TX_UPDATE_TIMESTAMP) { + log_debug("UPDATE_TIMESTAMP ignored; BURST_START flag was used.\n"); + } + + } else if (user_meta->flags & BLADERF_META_FLAG_TX_NOW) { + log_debug("%s: TX_NOW was specified without BURST_START.\n", + __FUNCTION__); + return BLADERF_ERR_INVAL; + } else if (user_meta->flags & BLADERF_META_FLAG_TX_UPDATE_TIMESTAMP) { + if (timestamp_in_past(user_meta, s)) { + return BLADERF_ERR_TIME_PAST; + } else { + options->zero_pad = true; + } + } + + if (user_meta->flags & BLADERF_META_FLAG_TX_BURST_END) { + if (s->meta.in_burst) { + options->flush = true; + } else { + log_debug("%s: BURST_END provided while not in a burst.\n", + __FUNCTION__); + return BLADERF_ERR_INVAL; + } + } + + user_meta->status = 0; + } + + return 0; +} + +int sync_tx(struct bladerf_sync *s, + void const *samples, + unsigned int num_samples, + struct bladerf_metadata *user_meta, + unsigned int timeout_ms) +{ + struct buffer_mgmt *b = NULL; + + int status = 0; + unsigned int samples_written = 0; + unsigned int samples_to_copy = 0; + unsigned int samples_per_buffer = 0; + uint8_t const *samples_src = (uint8_t const *)samples; + uint8_t *buf_dest = NULL; + struct tx_options op = { + FIELD_INIT(.flush, false), FIELD_INIT(.zero_pad, false), + }; + + log_verbose("%s: called for %u samples.\n", __FUNCTION__, num_samples); + + if (s == NULL || samples == NULL || !s->initialized) { + return BLADERF_ERR_INVAL; + } + + MUTEX_LOCK(&s->lock); + + status = handle_tx_parameters(user_meta, s, &op); + if (status != 0) { + goto out; + } + + b = &s->buf_mgmt; + samples_per_buffer = s->stream_config.samples_per_buffer; + + while (status == 0 && ((samples_written < num_samples) || op.flush)) { + switch (s->state) { + case SYNC_STATE_CHECK_WORKER: { + int stream_error; + sync_worker_state worker_state = + sync_worker_get_state(s->worker, &stream_error); + + if (stream_error != 0) { + status = stream_error; + } else { + if (worker_state == SYNC_WORKER_STATE_IDLE) { + /* No need to reset any buffer management for TX since + * the TX stream does not submit an initial set of + * buffers. Therefore the RESET_BUF_MGMT state is + * skipped here. */ + s->state = SYNC_STATE_START_WORKER; + } else { + /* Worker is running - continue onto checking for and + * potentially waiting for an available buffer */ + s->state = SYNC_STATE_WAIT_FOR_BUFFER; + } + } + break; + } + + case SYNC_STATE_RESET_BUF_MGMT: + assert(!"Bug"); + break; + + case SYNC_STATE_START_WORKER: + sync_worker_submit_request(s->worker, SYNC_WORKER_START); + + status = sync_worker_wait_for_state( + s->worker, SYNC_WORKER_STATE_RUNNING, + SYNC_WORKER_START_TIMEOUT_MS); + + if (status == 0) { + s->state = SYNC_STATE_WAIT_FOR_BUFFER; + log_debug("%s: Worker is now running.\n", __FUNCTION__); + } + break; + + case SYNC_STATE_WAIT_FOR_BUFFER: + MUTEX_LOCK(&b->lock); + + /* Check the buffer state, as the worker may have consumed one + * since we last queried the status */ + if (b->status[b->prod_i] == SYNC_BUFFER_EMPTY) { + s->state = SYNC_STATE_BUFFER_READY; + } else { + status = + wait_for_buffer(b, timeout_ms, __FUNCTION__, b->prod_i); + } + + MUTEX_UNLOCK(&b->lock); + break; + + case SYNC_STATE_BUFFER_READY: + MUTEX_LOCK(&b->lock); + b->status[b->prod_i] = SYNC_BUFFER_PARTIAL; + b->partial_off = 0; + + switch (s->stream_config.format) { + case BLADERF_FORMAT_SC16_Q11: + case BLADERF_FORMAT_SC8_Q7: + s->state = SYNC_STATE_USING_BUFFER; + break; + + case BLADERF_FORMAT_SC16_Q11_META: + case BLADERF_FORMAT_SC8_Q7_META: + s->state = SYNC_STATE_USING_BUFFER_META; + s->meta.curr_msg_off = 0; + s->meta.msg_num = 0; + break; + + case BLADERF_FORMAT_PACKET_META: + s->state = SYNC_STATE_USING_PACKET_META; + s->meta.curr_msg_off = 0; + s->meta.msg_num = 0; + break; + + default: + assert(!"Invalid stream format"); + status = BLADERF_ERR_UNEXPECTED; + } + + MUTEX_UNLOCK(&b->lock); + break; + + + case SYNC_STATE_USING_BUFFER: + MUTEX_LOCK(&b->lock); + + buf_dest = (uint8_t *)b->buffers[b->prod_i]; + samples_to_copy = uint_min(num_samples - samples_written, + samples_per_buffer - b->partial_off); + + memcpy(buf_dest + samples2bytes(s, b->partial_off), + samples_src + samples2bytes(s, samples_written), + samples2bytes(s, samples_to_copy)); + + b->partial_off += samples_to_copy; + samples_written += samples_to_copy; + + log_verbose("%s: Buffered %u samples from caller\n", + __FUNCTION__, samples_to_copy); + + if (b->partial_off >= samples_per_buffer) { + /* Check for symptom of out-of-bounds accesses */ + assert(b->partial_off == samples_per_buffer); + + /* Submit buffer and advance to the next one */ + status = advance_tx_buffer(s, b); + } + + MUTEX_UNLOCK(&b->lock); + break; + + case SYNC_STATE_USING_BUFFER_META: /* SC16Q11 buffers w/ metadata */ + MUTEX_LOCK(&b->lock); + + switch (s->meta.state) { + case SYNC_META_STATE_HEADER: + buf_dest = (uint8_t *)b->buffers[b->prod_i]; + + s->meta.curr_msg = + buf_dest + s->meta.msg_size * s->meta.msg_num; + + log_verbose("%s: Set curr_msg to: %p (buf @ %p)\n", + __FUNCTION__, s->meta.curr_msg, buf_dest); + + s->meta.curr_msg_off = 0; + + if (s->meta.now) { + metadata_set(s->meta.curr_msg, 0, 0); + } else { + metadata_set(s->meta.curr_msg, + s->meta.curr_timestamp, 0); + } + + s->meta.state = SYNC_META_STATE_SAMPLES; + + log_verbose("%s: Filled in header (t=%llu)\n", + __FUNCTION__, + (unsigned long long)s->meta.curr_timestamp); + break; + + case SYNC_META_STATE_SAMPLES: + if (op.zero_pad) { + const uint64_t delta = + user_meta->timestamp - s->meta.curr_timestamp; + + size_t to_zero; + + log_verbose("%s: User requested zero padding to " + "t=%" PRIu64 " (%" PRIu64 " + %" PRIu64 + ")\n", + __FUNCTION__, user_meta->timestamp, + s->meta.curr_timestamp, delta); + + if (delta < left_in_msg(s)) { + to_zero = (size_t)delta; + + log_verbose("%s: Padded subset of msg " + "(%" PRIu64 " samples)\n", + __FUNCTION__, (uint64_t)to_zero); + } else { + to_zero = left_in_msg(s); + + log_verbose("%s: Padded remainder of msg " + "(%" PRIu64 " samples)\n", + __FUNCTION__, (uint64_t)to_zero); + } + + memset(s->meta.curr_msg + METADATA_HEADER_SIZE + + samples2bytes(s, s->meta.curr_msg_off), + 0, samples2bytes(s, to_zero)); + + s->meta.curr_msg_off += to_zero; + + /* If we're going to supply the FPGA with a + * discontinuity, it is required that the last three + * samples provided be zero in order to hold the + * DAC @ (0 + 0j). + * + * See "Figure 9: TX data interface" in the LMS6002D + * data sheet for the register stages that create + * this requirement. + * + * If we're ending a burst with < 3 zeros samples at + * the end of the message, we'll need to continue + * onto the next message. At this next message, + * we'll either encounter the requested timestamp or + * zero-fill the message to fulfil this "three zero + * sample" requirement, and set the timestamp + * appropriately at the following message. + */ + if (to_zero < 3 && left_in_msg(s) == 0) { + s->meta.curr_timestamp += to_zero; + log_verbose("Ended msg with < 3 zero samples. " + "Padding into next message.\n"); + } else { + s->meta.curr_timestamp = user_meta->timestamp; + op.zero_pad = false; + } + } + + samples_to_copy = uint_min( + num_samples - samples_written, left_in_msg(s)); + + if (samples_to_copy != 0) { + /* We have user data to copy into the current + * message within the buffer */ + memcpy(s->meta.curr_msg + METADATA_HEADER_SIZE + + samples2bytes(s, s->meta.curr_msg_off), + samples_src + + samples2bytes(s, samples_written), + samples2bytes(s, samples_to_copy)); + + s->meta.curr_msg_off += samples_to_copy; + if (s->stream_config.layout == BLADERF_TX_X2) + s->meta.curr_timestamp += samples_to_copy / 2; + else + s->meta.curr_timestamp += samples_to_copy; + + samples_written += samples_to_copy; + + log_verbose("%s: Copied %u samples. " + "Current message offset is now: %u\n", + __FUNCTION__, samples_to_copy, + s->meta.curr_msg_off); + } + + if (left_in_msg(s) != 0 && op.flush) { + /* We're ending this buffer early and need to + * flush the remaining samples by setting all + * samples in the messages to (0 + 0j) */ + const unsigned int to_zero = left_in_msg(s); + + const size_t off = + METADATA_HEADER_SIZE + + samples2bytes(s, s->meta.curr_msg_off); + + /* If we're here, we should have already copied + * all requested data to the buffer */ + assert(num_samples == samples_written); + + memset(s->meta.curr_msg + off, 0, + samples2bytes(s, to_zero)); + + log_verbose( + "%s: Flushed %u samples @ %u (0x%08x)\n", + __FUNCTION__, to_zero, s->meta.curr_msg_off, + off); + + s->meta.curr_msg_off += to_zero; + s->meta.curr_timestamp += to_zero; + } + + if (left_in_msg(s) == 0) { + s->meta.msg_num++; + s->meta.state = SYNC_META_STATE_HEADER; + + log_verbose("%s: Advancing to next message (%u)\n", + __FUNCTION__, s->meta.msg_num); + } + + if (s->meta.msg_num >= s->meta.msg_per_buf) { + assert(s->meta.msg_num == s->meta.msg_per_buf); + + /* Submit buffer of samples for transmission */ + status = advance_tx_buffer(s, b); + + s->meta.msg_num = 0; + s->state = SYNC_STATE_WAIT_FOR_BUFFER; + + /* We want to clear the flush flag if we've written + * all of our data, but keep it set if we have more + * data and need wrap around to another buffer */ + op.flush = + op.flush && (samples_written != num_samples); + } + + break; + + default: + assert(!"Invalid state"); + status = BLADERF_ERR_UNEXPECTED; + } + + MUTEX_UNLOCK(&b->lock); + break; + + case SYNC_STATE_USING_PACKET_META: /* Packet buffers w/ metadata */ + MUTEX_LOCK(&b->lock); + + buf_dest = (uint8_t *)b->buffers[b->prod_i]; + + memcpy(buf_dest + METADATA_HEADER_SIZE, samples_src, num_samples*4); + + b->actual_lengths[b->prod_i] = samples2bytes(s, num_samples) + METADATA_HEADER_SIZE; + + metadata_set_packet(buf_dest, 0, 0, num_samples, 0, 0); + + samples_written = num_samples; + + status = advance_tx_buffer(s, b); + + s->meta.msg_num = 0; + s->state = SYNC_STATE_WAIT_FOR_BUFFER; + + MUTEX_UNLOCK(&b->lock); + break; + + } + } + + if (status == 0 && + s->stream_config.format == BLADERF_FORMAT_SC16_Q11_META && + (user_meta->flags & BLADERF_META_FLAG_TX_BURST_END)) { + s->meta.in_burst = false; + s->meta.now = false; + } + +out: + MUTEX_UNLOCK(&s->lock); + + return status; +} + +unsigned int sync_buf2idx(struct buffer_mgmt *b, void *addr) +{ + unsigned int i; + + for (i = 0; i < b->num_buffers; i++) { + if (b->buffers[i] == addr) { + return i; + } + } + + assert(!"Bug: Buffer not found."); + + /* Assertions are intended to always remain on. If someone turned them + * off, do the best we can...complain loudly and clobber a buffer */ + log_critical("Bug: Buffer not found."); + return 0; +} diff --git a/Radio/HW/BladeRF/src/streaming/sync.h b/Radio/HW/BladeRF/src/streaming/sync.h new file mode 100644 index 0000000..8977791 --- /dev/null +++ b/Radio/HW/BladeRF/src/streaming/sync.h @@ -0,0 +1,193 @@ +/* + * This file is part of the bladeRF project: + * http://www.github.com/nuand/bladeRF + * + * Copyright (C) 2014 Nuand LLC + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef STREAMING_SYNC_H_ +#define STREAMING_SYNC_H_ + +#include <limits.h> +#include <pthread.h> + +#include <libbladeRF.h> + +#include "thread.h" + +/* These parameters are only written during sync_init */ +struct stream_config { + bladerf_format format; + bladerf_channel_layout layout; + + unsigned int samples_per_buffer; + unsigned int num_xfers; + unsigned int timeout_ms; + + size_t bytes_per_sample; +}; + +typedef enum { + SYNC_BUFFER_EMPTY = 0, /**< Buffer contains no data */ + SYNC_BUFFER_PARTIAL, /**< sync_rx/tx is currently emptying/filling */ + SYNC_BUFFER_FULL, /**< Buffer is full of data */ + SYNC_BUFFER_IN_FLIGHT, /**< Currently being transferred */ +} sync_buffer_status; + +typedef enum { + SYNC_META_STATE_HEADER, /**< Extract the metadata header */ + SYNC_META_STATE_SAMPLES, /**< Process samples */ +} sync_meta_state; + +typedef enum { + /** Invalid selection */ + SYNC_TX_SUBMITTER_INVALID = -1, + + /** sync_tx() is repsonsible for submitting buffers for async transfer */ + SYNC_TX_SUBMITTER_FN, + + /** The TX worker callbacks should be returning buffers for submission */ + SYNC_TX_SUBMITTER_CALLBACK +} sync_tx_submitter; + +#define BUFFER_MGMT_INVALID_INDEX (UINT_MAX) + +struct buffer_mgmt { + sync_buffer_status *status; + size_t *actual_lengths; + + void **buffers; + unsigned int num_buffers; + + unsigned int prod_i; /**< Producer index - next buffer to fill */ + unsigned int cons_i; /**< Consumer index - next buffer to empty */ + unsigned int partial_off; /**< Current index into partial buffer */ + + /* In the event of a SW RX overrun, this count is used to determine + * how many more transfers should be considered invalid and require + * resubmission */ + unsigned int resubmit_count; + + /* Applicable to TX only. Denotes which context is responsible for + * submitting full buffers to the underlying async system */ + sync_tx_submitter submitter; + + + MUTEX lock; + pthread_cond_t buf_ready; /**< Buffer produced by RX callback, or + * buffer emptied by TX callback */ +}; + +/* State of API-side sync interface */ +typedef enum { + SYNC_STATE_CHECK_WORKER, + SYNC_STATE_RESET_BUF_MGMT, + SYNC_STATE_START_WORKER, + SYNC_STATE_WAIT_FOR_BUFFER, + SYNC_STATE_BUFFER_READY, + SYNC_STATE_USING_BUFFER, + SYNC_STATE_USING_PACKET_META, + SYNC_STATE_USING_BUFFER_META +} sync_state; + +struct sync_meta { + sync_meta_state state; /* State of metadata processing */ + + uint8_t *curr_msg; /* Points to current message in the buffer */ + size_t curr_msg_off; /* Offset into current message (samples), + * ignoring the 4-samples worth of metadata */ + size_t msg_size; /* Size of data message */ + unsigned int msg_per_buf; /* Number of data messages per buffer */ + unsigned int msg_num; /* Which message within the buffer are we in? + * Range is: 0 to msg_per_buf */ + unsigned int samples_per_msg; /* Number of samples within a message */ + unsigned int samples_per_ts; /* Number of samples within a timestamp */ + + union { + /* Used only for RX */ + struct { + uint64_t + msg_timestamp; /* Timestamp contained in the current message */ + uint32_t msg_flags; /* Flags for the current message */ + }; + + /* Used only for TX */ + struct { + bool in_burst; + bool now; + }; + }; + + uint64_t curr_timestamp; /* Timestamp at the sample we've + * consumed up to */ +}; + +struct bladerf_sync { + MUTEX lock; + struct bladerf *dev; + bool initialized; + sync_state state; + struct buffer_mgmt buf_mgmt; + struct stream_config stream_config; + struct sync_worker *worker; + struct sync_meta meta; +}; + +/** + * Create and initialize as synchronous interface handle for the specified + * device and direction. If the synchronous handle is already initialized, this + * call will first deinitialize it. + * + * The associated stream will be started at the first RX or TX call + * + * @return 0 or BLADERF_ERR_* value on failure + */ +int sync_init(struct bladerf_sync *sync, + struct bladerf *dev, + bladerf_channel_layout layout, + bladerf_format format, + unsigned int num_buffers, + size_t buffer_size, + size_t msg_size, + unsigned int num_transfers, + unsigned int stream_timeout); + +/** + * Deinitialize the sync handle. This tears down and deallocates the underlying + * asynchronous stream. + * + * @param[inout] sync Handle to deinitialize. + */ +void sync_deinit(struct bladerf_sync *sync); + +int sync_rx(struct bladerf_sync *sync, + void *samples, + unsigned int num_samples, + struct bladerf_metadata *metadata, + unsigned int timeout_ms); + +int sync_tx(struct bladerf_sync *sync, + void const *samples, + unsigned int num_samples, + struct bladerf_metadata *metadata, + unsigned int timeout_ms); + +unsigned int sync_buf2idx(struct buffer_mgmt *b, void *addr); + +void *sync_idx2buf(struct buffer_mgmt *b, unsigned int idx); + +#endif diff --git a/Radio/HW/BladeRF/src/streaming/sync_worker.c b/Radio/HW/BladeRF/src/streaming/sync_worker.c new file mode 100644 index 0000000..b2ec806 --- /dev/null +++ b/Radio/HW/BladeRF/src/streaming/sync_worker.c @@ -0,0 +1,532 @@ +/* + * Copyright (C) 2014-2015 Nuand LLC + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include <errno.h> +#include <stdlib.h> +#include <string.h> + +/* Only switch on the verbose debug prints in this file when we *really* want + * them. Otherwise, compile them out to avoid excessive log level checks + * in our data path */ +#include "log.h" +#ifndef ENABLE_LIBBLADERF_SYNC_LOG_VERBOSE +#undef log_verbose +#define log_verbose(...) +#endif +#include "rel_assert.h" +#include "conversions.h" +#include "minmax.h" + +#include "async.h" +#include "sync.h" +#include "sync_worker.h" + +#include "board/board.h" +#include "backend/usb/usb.h" + +#define worker2str(s) (direction2str(s->stream_config.layout & BLADERF_DIRECTION_MASK)) + +void *sync_worker_task(void *arg); + +static void *rx_callback(struct bladerf *dev, + struct bladerf_stream *stream, + struct bladerf_metadata *meta, + void *samples, + size_t num_samples, + void *user_data) +{ + unsigned int requests; /* Pending requests */ + unsigned int next_idx; + unsigned int samples_idx; + void *next_buf = NULL; /* Next buffer to submit for reception */ + + struct bladerf_sync *s = (struct bladerf_sync *)user_data; + struct sync_worker *w = s->worker; + struct buffer_mgmt *b = &s->buf_mgmt; + + /* Check if the caller has requested us to shut down. We'll keep the + * SHUTDOWN bit set through our transition into the IDLE state so we + * can act on it there. */ + MUTEX_LOCK(&w->request_lock); + requests = w->requests; + MUTEX_UNLOCK(&w->request_lock); + + if (requests & SYNC_WORKER_STOP) { + log_verbose("%s worker: Got STOP request upon entering callback. " + "Ending stream.\n", worker2str(s)); + return NULL; + } + + MUTEX_LOCK(&b->lock); + + /* Get the index of the buffer that was just filled */ + samples_idx = sync_buf2idx(b, samples); + + if (b->resubmit_count == 0) { + if (b->status[b->prod_i] == SYNC_BUFFER_EMPTY) { + + /* This buffer is now ready for the consumer */ + b->status[samples_idx] = SYNC_BUFFER_FULL; + b->actual_lengths[samples_idx] = num_samples; + pthread_cond_signal(&b->buf_ready); + + /* Update the state of the buffer being submitted next */ + next_idx = b->prod_i; + b->status[next_idx] = SYNC_BUFFER_IN_FLIGHT; + next_buf = b->buffers[next_idx]; + + /* Advance to the next buffer for the next callback */ + b->prod_i = (next_idx + 1) % b->num_buffers; + + log_verbose("%s worker: buf[%u] = full, buf[%u] = in_flight\n", + worker2str(s), samples_idx, next_idx); + + } else { + /* TODO propagate back the RX Overrun to the sync_rx() caller */ + log_debug("RX overrun @ buffer %u\r\n", samples_idx); + + next_buf = samples; + b->resubmit_count = s->stream_config.num_xfers - 1; + } + } else { + /* We're still recovering from an overrun at this point. Just + * turn around and resubmit this buffer */ + next_buf = samples; + b->resubmit_count--; + log_verbose("Resubmitting buffer %u (%u resubmissions left)\r\n", + samples_idx, b->resubmit_count); + } + + + MUTEX_UNLOCK(&b->lock); + return next_buf; +} + +static void *tx_callback(struct bladerf *dev, + struct bladerf_stream *stream, + struct bladerf_metadata *meta, + void *samples, + size_t num_samples, + void *user_data) +{ + unsigned int requests; /* Pending requests */ + unsigned int completed_idx; /* Index of completed buffer */ + + struct bladerf_sync *s = (struct bladerf_sync *)user_data; + struct sync_worker *w = s->worker; + struct buffer_mgmt *b = &s->buf_mgmt; + + void *ret = BLADERF_STREAM_NO_DATA; + + /* Check if the caller has requested us to shut down. We'll keep the + * SHUTDOWN bit set through our transition into the IDLE state so we + * can act on it there. */ + MUTEX_LOCK(&w->request_lock); + requests = w->requests; + MUTEX_UNLOCK(&w->request_lock); + + if (requests & SYNC_WORKER_STOP) { + log_verbose("%s worker: Got STOP request upon entering callback. " + "Ending stream.\r\n", worker2str(s)); + return NULL; + } + + /* The initial set of callbacks will do not provide us with any + * completed sample buffers */ + if (samples != NULL) { + MUTEX_LOCK(&b->lock); + + /* Mark the completed buffer as being empty */ + completed_idx = sync_buf2idx(b, samples); + assert(b->status[completed_idx] == SYNC_BUFFER_IN_FLIGHT); + b->status[completed_idx] = SYNC_BUFFER_EMPTY; + pthread_cond_signal(&b->buf_ready); + + /* If the callback is assigned to be the submitter, there are + * buffers pending submission */ + if (b->submitter == SYNC_TX_SUBMITTER_CALLBACK) { + assert(b->cons_i != BUFFER_MGMT_INVALID_INDEX); + if (b->status[b->cons_i] == SYNC_BUFFER_FULL) { + /* This buffer is ready to ship out ("consume") */ + log_verbose("%s: Submitting deferred buf[%u]\n", + __FUNCTION__, b->cons_i); + + ret = b->buffers[b->cons_i]; + /* This is actually # of 32bit DWORDs for PACKET_META */ + meta->actual_count = b->actual_lengths[b->cons_i]; + b->status[b->cons_i] = SYNC_BUFFER_IN_FLIGHT; + b->cons_i = (b->cons_i + 1) % b->num_buffers; + } else { + log_verbose("%s: No deferred buffer available. " + "Assigning submitter=FN\n", __FUNCTION__); + + b->submitter = SYNC_TX_SUBMITTER_FN; + b->cons_i = BUFFER_MGMT_INVALID_INDEX; + } + } + + MUTEX_UNLOCK(&b->lock); + + log_verbose("%s worker: Buffer %u emptied.\r\n", + worker2str(s), completed_idx); + } + + return ret; +} + +int sync_worker_init(struct bladerf_sync *s) +{ + int status = 0; + s->worker = (struct sync_worker *)calloc(1, sizeof(*s->worker)); + + if (s->worker == NULL) { + status = BLADERF_ERR_MEM; + goto worker_init_out; + } + + s->worker->state = SYNC_WORKER_STATE_STARTUP; + s->worker->err_code = 0; + + s->worker->cb = + (s->stream_config.layout & BLADERF_DIRECTION_MASK) == BLADERF_RX + ? rx_callback + : tx_callback; + + status = async_init_stream( + &s->worker->stream, s->dev, s->worker->cb, &s->buf_mgmt.buffers, + s->buf_mgmt.num_buffers, s->stream_config.format, + s->stream_config.samples_per_buffer, s->stream_config.num_xfers, s); + + if (status != 0) { + log_debug("%s worker: Failed to init stream: %s\n", worker2str(s), + bladerf_strerror(status)); + goto worker_init_out; + } + + status = async_set_transfer_timeout( + s->worker->stream, + uint_max(s->stream_config.timeout_ms, BULK_TIMEOUT_MS)); + if (status != 0) { + log_debug("%s worker: Failed to set transfer timeout: %s\n", + worker2str(s), bladerf_strerror(status)); + goto worker_init_out; + } + + MUTEX_INIT(&s->worker->state_lock); + MUTEX_INIT(&s->worker->request_lock); + + status = pthread_cond_init(&s->worker->state_changed, NULL); + if (status != 0) { + log_debug("%s worker: pthread_cond_init(state_changed) failed: %d\n", + worker2str(s), status); + status = BLADERF_ERR_UNEXPECTED; + goto worker_init_out; + } + + status = pthread_cond_init(&s->worker->requests_pending, NULL); + if (status != 0) { + log_debug("%s worker: pthread_cond_init(requests_pending) failed: %d\n", + worker2str(s), status); + status = BLADERF_ERR_UNEXPECTED; + goto worker_init_out; + } + + status = pthread_create(&s->worker->thread, NULL, sync_worker_task, s); + if (status != 0) { + log_debug("%s worker: pthread_create failed: %d\n", worker2str(s), + status); + status = BLADERF_ERR_UNEXPECTED; + goto worker_init_out; + } + + /* Wait until the worker thread has initialized and is ready to go */ + status = + sync_worker_wait_for_state(s->worker, SYNC_WORKER_STATE_IDLE, 1000); + if (status != 0) { + log_debug("%s worker: sync_worker_wait_for_state failed: %d\n", + worker2str(s), status); + status = BLADERF_ERR_TIMEOUT; + goto worker_init_out; + } + +worker_init_out: + if (status != 0) { + free(s->worker); + s->worker = NULL; + } + + return status; +} + +void sync_worker_deinit(struct sync_worker *w, + pthread_mutex_t *lock, pthread_cond_t *cond) +{ + int status; + + if (w == NULL) { + log_debug("%s called with NULL ptr\n", __FUNCTION__); + return; + } + + log_verbose("%s: Requesting worker %p to stop...\n", __FUNCTION__, w); + + sync_worker_submit_request(w, SYNC_WORKER_STOP); + + if (lock != NULL && cond != NULL) { + MUTEX_LOCK(lock); + pthread_cond_signal(cond); + MUTEX_UNLOCK(lock); + } + + status = sync_worker_wait_for_state(w, SYNC_WORKER_STATE_STOPPED, 3000); + + if (status != 0) { + log_warning("Timed out while stopping worker. Canceling thread.\n"); + pthread_cancel(w->thread); + } + + pthread_join(w->thread, NULL); + log_verbose("%s: Worker joined.\n", __FUNCTION__); + + async_deinit_stream(w->stream); + + free(w); +} + +void sync_worker_submit_request(struct sync_worker *w, unsigned int request) +{ + MUTEX_LOCK(&w->request_lock); + w->requests |= request; + pthread_cond_signal(&w->requests_pending); + MUTEX_UNLOCK(&w->request_lock); +} + +int sync_worker_wait_for_state(struct sync_worker *w, sync_worker_state state, + unsigned int timeout_ms) +{ + int status = 0; + struct timespec timeout_abs; + const int nsec_per_sec = 1000 * 1000 * 1000; + + if (timeout_ms != 0) { + const unsigned int timeout_sec = timeout_ms / 1000; + + status = clock_gettime(CLOCK_REALTIME, &timeout_abs); + if (status != 0) { + return BLADERF_ERR_UNEXPECTED; + } + + timeout_abs.tv_sec += timeout_sec; + timeout_abs.tv_nsec += (timeout_ms % 1000) * 1000 * 1000; + + if (timeout_abs.tv_nsec >= nsec_per_sec) { + timeout_abs.tv_sec += timeout_abs.tv_nsec / nsec_per_sec; + timeout_abs.tv_nsec %= nsec_per_sec; + } + + MUTEX_LOCK(&w->state_lock); + status = 0; + while (w->state != state && status == 0) { + status = pthread_cond_timedwait(&w->state_changed, + &w->state_lock, + &timeout_abs); + } + MUTEX_UNLOCK(&w->state_lock); + + } else { + MUTEX_LOCK(&w->state_lock); + while (w->state != state) { + log_verbose(": Waiting for state change, current = %d\n", w->state); + status = pthread_cond_wait(&w->state_changed, + &w->state_lock); + } + MUTEX_UNLOCK(&w->state_lock); + } + + if (status != 0) { + log_debug("%s: Wait on state change failed: %s\n", + __FUNCTION__, strerror(status)); + + if (status == ETIMEDOUT) { + status = BLADERF_ERR_TIMEOUT; + } else { + status = BLADERF_ERR_UNEXPECTED; + } + } + + return status; +} + +sync_worker_state sync_worker_get_state(struct sync_worker *w, + int *err_code) +{ + sync_worker_state ret; + + MUTEX_LOCK(&w->state_lock); + ret = w->state; + if (err_code) { + *err_code = w->err_code; + w->err_code = 0; + } + MUTEX_UNLOCK(&w->state_lock); + + return ret; +} + +static void set_state(struct sync_worker *w, sync_worker_state state) +{ + MUTEX_LOCK(&w->state_lock); + w->state = state; + pthread_cond_signal(&w->state_changed); + MUTEX_UNLOCK(&w->state_lock); +} + + +static sync_worker_state exec_idle_state(struct bladerf_sync *s) +{ + sync_worker_state next_state = SYNC_WORKER_STATE_IDLE; + unsigned int requests; + unsigned int i; + + MUTEX_LOCK(&s->worker->request_lock); + + while (s->worker->requests == 0) { + log_verbose("%s worker: Waiting for pending requests\n", worker2str(s)); + + pthread_cond_wait(&s->worker->requests_pending, + &s->worker->request_lock); + } + + requests = s->worker->requests; + s->worker->requests = 0; + MUTEX_UNLOCK(&s->worker->request_lock); + + if (requests & SYNC_WORKER_STOP) { + log_verbose("%s worker: Got request to stop\n", worker2str(s)); + + next_state = SYNC_WORKER_STATE_SHUTTING_DOWN; + + } else if (requests & SYNC_WORKER_START) { + log_verbose("%s worker: Got request to start\n", worker2str(s)); + MUTEX_LOCK(&s->buf_mgmt.lock); + + if ((s->stream_config.layout & BLADERF_DIRECTION_MASK) == BLADERF_TX) { + /* If we've previously timed out on a stream, we'll likely have some + * stale buffers marked "in-flight" that have since been cancelled. */ + for (i = 0; i < s->buf_mgmt.num_buffers; i++) { + if (s->buf_mgmt.status[i] == SYNC_BUFFER_IN_FLIGHT) { + s->buf_mgmt.status[i] = SYNC_BUFFER_EMPTY; + } + } + + pthread_cond_signal(&s->buf_mgmt.buf_ready); + } else { + s->buf_mgmt.prod_i = s->stream_config.num_xfers; + + for (i = 0; i < s->buf_mgmt.num_buffers; i++) { + if (i < s->stream_config.num_xfers) { + s->buf_mgmt.status[i] = SYNC_BUFFER_IN_FLIGHT; + } else if (s->buf_mgmt.status[i] == SYNC_BUFFER_IN_FLIGHT) { + s->buf_mgmt.status[i] = SYNC_BUFFER_EMPTY; + } + } + } + + MUTEX_UNLOCK(&s->buf_mgmt.lock); + + next_state = SYNC_WORKER_STATE_RUNNING; + } else { + log_warning("Invalid request value encountered: 0x%08X\n", + s->worker->requests); + } + + return next_state; +} + +static void exec_running_state(struct bladerf_sync *s) +{ + int status; + + status = async_run_stream(s->worker->stream, s->stream_config.layout); + + log_verbose("%s worker: stream ended with: %s\n", + worker2str(s), bladerf_strerror(status)); + + /* Save off the result of running the stream so we can report what + * happened to the API caller */ + MUTEX_LOCK(&s->worker->state_lock); + s->worker->err_code = status; + MUTEX_UNLOCK(&s->worker->state_lock); + + /* Wake the API-side if an error occurred, so that it can propagate + * the stream error code back to the API caller */ + if (status != 0) { + MUTEX_LOCK(&s->buf_mgmt.lock); + pthread_cond_signal(&s->buf_mgmt.buf_ready); + MUTEX_UNLOCK(&s->buf_mgmt.lock); + } +} + +void *sync_worker_task(void *arg) +{ + sync_worker_state state = SYNC_WORKER_STATE_IDLE; + struct bladerf_sync *s = (struct bladerf_sync *)arg; + + log_verbose("%s worker: task started\n", worker2str(s)); + set_state(s->worker, state); + log_verbose("%s worker: task state set\n", worker2str(s)); + + while (state != SYNC_WORKER_STATE_STOPPED) { + + switch (state) { + case SYNC_WORKER_STATE_STARTUP: + assert(!"Worker in unexpected state, shutting down. (STARTUP)"); + set_state(s->worker, SYNC_WORKER_STATE_SHUTTING_DOWN); + break; + + case SYNC_WORKER_STATE_IDLE: + state = exec_idle_state(s); + set_state(s->worker, state); + break; + + case SYNC_WORKER_STATE_RUNNING: + exec_running_state(s); + state = SYNC_WORKER_STATE_IDLE; + set_state(s->worker, state); + break; + + case SYNC_WORKER_STATE_SHUTTING_DOWN: + log_verbose("%s worker: Shutting down...\n", worker2str(s)); + + state = SYNC_WORKER_STATE_STOPPED; + set_state(s->worker, state); + break; + + case SYNC_WORKER_STATE_STOPPED: + assert(!"Worker in unexpected state: STOPPED"); + break; + + default: + assert(!"Worker in unexpected state, shutting down. (UNKNOWN)"); + set_state(s->worker, SYNC_WORKER_STATE_SHUTTING_DOWN); + break; + } + } + + return NULL; +} diff --git a/Radio/HW/BladeRF/src/streaming/sync_worker.h b/Radio/HW/BladeRF/src/streaming/sync_worker.h new file mode 100644 index 0000000..35d5075 --- /dev/null +++ b/Radio/HW/BladeRF/src/streaming/sync_worker.h @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2014 Nuand LLC + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef STREAMING_SYNC_WORKER_H_ +#define STREAMING_SYNC_WORKER_H_ + +#include "host_config.h" +#include "sync.h" +#include <libbladeRF.h> +#include <pthread.h> + +#if BLADERF_OS_WINDOWS || BLADERF_OS_OSX +#include "clock_gettime.h" +#else +#include <time.h> +#endif + +/* Worker lifetime: + * + * STARTUP --+--> IDLE --> RUNNING --+--> SHUTTING_DOWN --> STOPPED + * ^----------------------/ + */ + +/* Request flags */ +#define SYNC_WORKER_START (1 << 0) +#define SYNC_WORKER_STOP (1 << 1) + +typedef enum { + SYNC_WORKER_STATE_STARTUP, + SYNC_WORKER_STATE_IDLE, + SYNC_WORKER_STATE_RUNNING, + SYNC_WORKER_STATE_SHUTTING_DOWN, + SYNC_WORKER_STATE_STOPPED +} sync_worker_state; + +struct sync_worker { + pthread_t thread; + + struct bladerf_stream *stream; + bladerf_stream_cb cb; + + /* These items should be accessed while holding state_lock */ + sync_worker_state state; + int err_code; + MUTEX state_lock; + pthread_cond_t state_changed; /* Worker thread uses this to inform a + * waiting main thread about a state + * change */ + + /* The requests lock should always be acquired AFTER + * the sync->buf_mgmt.lock + */ + unsigned int requests; + pthread_cond_t requests_pending; + MUTEX request_lock; +}; + +/** + * Create a launch a worker thread. It will enter the IDLE state upon + * executing. + * + * @param s Sync handle containing worker to initialize + * + * @return 0 on success, BLADERF_ERR_* on failure + */ +int sync_worker_init(struct bladerf_sync *s); + +/** + * Shutdown and deinitialize + * + * @param w Worker to deinitialize + * @param[in] lock Acquired to signal `cond` if non-NULL + * @param[in] cond If non-NULL, this is signaled after requesting the + * worker to shut down, waking a potentially blocked + * workers. + */ +void sync_worker_deinit(struct sync_worker *w, + pthread_mutex_t *lock, + pthread_cond_t *cond); + +/** + * Wait for state change with optional timeout + * + * @param w Worker to wait for + * @param[in] state State to wait for + * @param[in] timeout_ms Timeout in ms. 0 implies "wait forever" + * + * @return 0 on success, BLADERF_ERR_TIMEOUT on timeout, BLADERF_ERR_UNKNOWN on + * other errors + */ +int sync_worker_wait_for_state(struct sync_worker *w, + sync_worker_state state, + unsigned int timeout_ms); + +/** + * Get the worker's current state. + * + * @param w Worker to query + * @param[out] err_code Stream error code (libbladeRF error code value). + * Querying this value will reset the interal error + * code value. + * + * @return Worker's current state + */ +sync_worker_state sync_worker_get_state(struct sync_worker *w, int *err_code); + +/** + * Submit a request to the worker task + * + * @param w Worker to send request to + * @param[in] request Bitmask of requests to submit + */ +void sync_worker_submit_request(struct sync_worker *w, unsigned int request); + +#endif |