summaryrefslogtreecommitdiff
path: root/Radio/HW/BladeRF/src/streaming
diff options
context:
space:
mode:
authorArturs Artamonovs <arturs.artamonovs@protonmail.com>2024-11-03 15:56:55 +0000
committerArturs Artamonovs <arturs.artamonovs@protonmail.com>2024-11-03 15:56:55 +0000
commitcf4444e7390365df43ecbd3d130015c1e06ef88f (patch)
tree8a6eb114135a04d5efd5af213577b4fac47532ae /Radio/HW/BladeRF/src/streaming
parentca50c0f64f1b2fce46b4cb83ed111854bac13852 (diff)
downloadPrySDR-cf4444e7390365df43ecbd3d130015c1e06ef88f.tar.gz
PrySDR-cf4444e7390365df43ecbd3d130015c1e06ef88f.zip
BladeRF library compiles
Diffstat (limited to 'Radio/HW/BladeRF/src/streaming')
-rw-r--r--Radio/HW/BladeRF/src/streaming/async.c294
-rw-r--r--Radio/HW/BladeRF/src/streaming/async.h111
-rw-r--r--Radio/HW/BladeRF/src/streaming/format.h109
-rw-r--r--Radio/HW/BladeRF/src/streaming/metadata.h180
-rw-r--r--Radio/HW/BladeRF/src/streaming/sync.c1339
-rw-r--r--Radio/HW/BladeRF/src/streaming/sync.h193
-rw-r--r--Radio/HW/BladeRF/src/streaming/sync_worker.c532
-rw-r--r--Radio/HW/BladeRF/src/streaming/sync_worker.h130
8 files changed, 2888 insertions, 0 deletions
diff --git a/Radio/HW/BladeRF/src/streaming/async.c b/Radio/HW/BladeRF/src/streaming/async.c
new file mode 100644
index 0000000..cffa9d2
--- /dev/null
+++ b/Radio/HW/BladeRF/src/streaming/async.c
@@ -0,0 +1,294 @@
+/*
+ * This file is part of the bladeRF project:
+ * http://www.github.com/nuand/bladeRF
+ *
+ * Copyright (C) 2014 Nuand LLC
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <errno.h>
+
+#include "log.h"
+
+#include "backend/usb/usb.h"
+
+#include "async.h"
+#include "board/board.h"
+#include "helpers/timeout.h"
+#include "helpers/have_cap.h"
+
+int async_init_stream(struct bladerf_stream **stream,
+ struct bladerf *dev,
+ bladerf_stream_cb callback,
+ void ***buffers,
+ size_t num_buffers,
+ bladerf_format format,
+ size_t samples_per_buffer,
+ size_t num_transfers,
+ void *user_data)
+{
+ struct bladerf_stream *lstream;
+ size_t buffer_size_bytes;
+ size_t i;
+ int status = 0;
+
+ if (num_transfers > num_buffers) {
+ log_error("num_transfers must be <= num_buffers\n");
+ return BLADERF_ERR_INVAL;
+ }
+
+ if (samples_per_buffer < 1024 || samples_per_buffer % 1024 != 0) {
+ log_error("samples_per_buffer must be multiples of 1024\n");
+ return BLADERF_ERR_INVAL;
+ }
+
+ /* Create a stream and populate it with the appropriate information */
+ lstream = malloc(sizeof(struct bladerf_stream));
+
+ if (!lstream) {
+ return BLADERF_ERR_MEM;
+ }
+
+ MUTEX_INIT(&lstream->lock);
+
+ if (pthread_cond_init(&lstream->can_submit_buffer, NULL) != 0) {
+ free(lstream);
+ return BLADERF_ERR_UNEXPECTED;
+ }
+
+ if (pthread_cond_init(&lstream->stream_started, NULL) != 0) {
+ free(lstream);
+ return BLADERF_ERR_UNEXPECTED;
+ }
+
+ lstream->dev = dev;
+ lstream->error_code = 0;
+ lstream->state = STREAM_IDLE;
+ lstream->samples_per_buffer = samples_per_buffer;
+ lstream->num_buffers = num_buffers;
+ lstream->format = format;
+ lstream->transfer_timeout = BULK_TIMEOUT_MS;
+ lstream->cb = callback;
+ lstream->user_data = user_data;
+ lstream->buffers = NULL;
+
+ if (format == BLADERF_FORMAT_PACKET_META) {
+ if (!have_cap_dev(dev, BLADERF_CAP_FW_SHORT_PACKET)) {
+ log_error("Firmware does not support short packets. "
+ "Upgrade to at least firmware version 2.4.0.");
+ return BLADERF_ERR_UNSUPPORTED;
+ }
+
+ if (!have_cap_dev(dev, BLADERF_CAP_FPGA_PACKET_META)) {
+ log_error("FPGA does not support packet meta format. "
+ "Upgrade to at least FPGA version 0.12.0 .");
+ return BLADERF_ERR_UNSUPPORTED;
+ }
+ }
+
+ if (format == BLADERF_FORMAT_SC8_Q7 || format == BLADERF_FORMAT_SC8_Q7_META) {
+ if (!have_cap_dev(dev, BLADERF_CAP_FPGA_8BIT_SAMPLES)) {
+ log_error("FPGA does not support 8bit mode. "
+ "Upgrade to at least FPGA version 0.15.0.\n");
+ return BLADERF_ERR_UNSUPPORTED;
+ }
+ }
+
+ switch(format) {
+ case BLADERF_FORMAT_SC8_Q7:
+ case BLADERF_FORMAT_SC8_Q7_META:
+ buffer_size_bytes = sc8q7_to_bytes(samples_per_buffer);
+ break;
+
+ case BLADERF_FORMAT_SC16_Q11:
+ case BLADERF_FORMAT_SC16_Q11_META:
+ buffer_size_bytes = sc16q11_to_bytes(samples_per_buffer);
+ break;
+
+ case BLADERF_FORMAT_PACKET_META:
+ buffer_size_bytes = samples_per_buffer;
+ break;
+
+ default:
+ status = BLADERF_ERR_INVAL;
+ break;
+ }
+
+ if (!status) {
+ lstream->buffers = calloc(num_buffers, sizeof(lstream->buffers[0]));
+ if (lstream->buffers) {
+ for (i = 0; i < num_buffers && !status; i++) {
+ lstream->buffers[i] = calloc(1, buffer_size_bytes);
+ if (!lstream->buffers[i]) {
+ status = BLADERF_ERR_MEM;
+ }
+ }
+ } else {
+ status = BLADERF_ERR_MEM;
+ }
+ }
+
+ /* Clean up everything we've allocated if we hit any errors */
+ if (status) {
+
+ if (lstream->buffers) {
+ for (i = 0; i < num_buffers; i++) {
+ free(lstream->buffers[i]);
+ }
+
+ free(lstream->buffers);
+ }
+
+ free(lstream);
+ } else {
+ /* Perform any backend-specific stream initialization */
+ status = dev->backend->init_stream(lstream, num_transfers);
+
+ if (status < 0) {
+ async_deinit_stream(lstream);
+ *stream = NULL;
+ } else {
+ /* Update the caller's pointers */
+ *stream = lstream;
+
+ if (buffers) {
+ *buffers = lstream->buffers;
+ }
+ }
+ }
+
+ return status;
+}
+
+int async_set_transfer_timeout(struct bladerf_stream *stream,
+ unsigned int transfer_timeout_ms)
+{
+ MUTEX_LOCK(&stream->lock);
+ stream->transfer_timeout = transfer_timeout_ms;
+ MUTEX_UNLOCK(&stream->lock);
+
+ return 0;
+}
+
+int async_get_transfer_timeout(struct bladerf_stream *stream,
+ unsigned int *transfer_timeout_ms)
+{
+ MUTEX_LOCK(&stream->lock);
+ *transfer_timeout_ms = stream->transfer_timeout;
+ MUTEX_UNLOCK(&stream->lock);
+
+ return 0;
+}
+
+int async_run_stream(struct bladerf_stream *stream, bladerf_channel_layout layout)
+{
+ int status;
+ struct bladerf *dev = stream->dev;
+
+ MUTEX_LOCK(&stream->lock);
+ stream->layout = layout;
+ stream->state = STREAM_RUNNING;
+ pthread_cond_signal(&stream->stream_started);
+ MUTEX_UNLOCK(&stream->lock);
+
+ status = dev->backend->stream(stream, layout);
+
+ /* Backend return value takes precedence over stream error status */
+ return status == 0 ? stream->error_code : status;
+}
+
+int async_submit_stream_buffer(struct bladerf_stream *stream,
+ void *buffer, size_t *length,
+ unsigned int timeout_ms,
+ bool nonblock)
+{
+ int status = 0;
+ struct timespec timeout_abs;
+
+ MUTEX_LOCK(&stream->lock);
+
+ if (buffer != BLADERF_STREAM_SHUTDOWN) {
+ if (stream->state != STREAM_RUNNING && timeout_ms != 0) {
+ status = populate_abs_timeout(&timeout_abs, timeout_ms);
+ if (status != 0) {
+ log_debug("Failed to populate timeout value\n");
+ goto error;
+ }
+ }
+
+ while (stream->state != STREAM_RUNNING) {
+ log_debug("Buffer submitted while stream's not running. "
+ "Waiting for stream to start.\n");
+
+ if (timeout_ms == 0) {
+ status = pthread_cond_wait(&stream->stream_started,
+ &stream->lock);
+ } else {
+ status = pthread_cond_timedwait(&stream->stream_started,
+ &stream->lock, &timeout_abs);
+ }
+
+ if (status == ETIMEDOUT) {
+ status = BLADERF_ERR_TIMEOUT;
+ log_debug("%s: %u ms timeout expired",
+ __FUNCTION__, timeout_ms);
+ goto error;
+ } else if (status != 0) {
+ status = BLADERF_ERR_UNEXPECTED;
+ goto error;
+ }
+ }
+ }
+
+ status = stream->dev->backend->submit_stream_buffer(stream, buffer,
+ length, timeout_ms, nonblock);
+
+error:
+ MUTEX_UNLOCK(&stream->lock);
+ return status;
+}
+
+void async_deinit_stream(struct bladerf_stream *stream)
+{
+ size_t i;
+
+ if (!stream) {
+ log_debug("%s called with NULL stream\n", __FUNCTION__);
+ return;
+ }
+
+ while(stream->state != STREAM_DONE && stream->state != STREAM_IDLE) {
+ log_verbose( "Stream not done...\n" );
+ usleep(1000000);
+ }
+
+ /* Free up the backend data */
+ stream->dev->backend->deinit_stream(stream);
+
+ /* Free up the buffers */
+ for (i = 0; i < stream->num_buffers; i++) {
+ free(stream->buffers[i]);
+ }
+
+ /* Free up the pointer to the buffers */
+ free(stream->buffers);
+
+ /* Free up the stream itself */
+ free(stream);
+}
+
diff --git a/Radio/HW/BladeRF/src/streaming/async.h b/Radio/HW/BladeRF/src/streaming/async.h
new file mode 100644
index 0000000..b0fd3b8
--- /dev/null
+++ b/Radio/HW/BladeRF/src/streaming/async.h
@@ -0,0 +1,111 @@
+/*
+ * This file is part of the bladeRF project:
+ * http://www.github.com/nuand/bladeRF
+ *
+ * Copyright (C) 2014 Nuand LLC
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef STREAMING_ASYNC_H_
+#define STREAMING_ASYNC_H_
+
+#include <pthread.h>
+
+#include <libbladeRF.h>
+
+#include "thread.h"
+
+#include "format.h"
+
+typedef enum {
+ STREAM_IDLE, /* Idle and initialized */
+ STREAM_RUNNING, /* Currently running */
+ STREAM_SHUTTING_DOWN, /* Currently tearing down.
+ * See bladerf_stream->error_code to determine
+ * whether or not the shutdown was a clean exit
+ * or due to an error. */
+ STREAM_DONE /* Done and deallocated */
+} bladerf_stream_state;
+
+struct bladerf_stream {
+ /* These items are configured in async_init_stream() and should only be
+ * read (NOT MODIFIED) during the execution of the stream */
+ struct bladerf *dev;
+ bladerf_channel_layout layout;
+ bladerf_format format;
+ unsigned int transfer_timeout;
+ bladerf_stream_cb cb;
+ void *user_data;
+ size_t samples_per_buffer;
+ size_t num_buffers;
+ void **buffers;
+
+ MUTEX lock;
+
+ /* The following items must be accessed atomically */
+ int error_code;
+ bladerf_stream_state state;
+ pthread_cond_t can_submit_buffer;
+ pthread_cond_t stream_started;
+ void *backend_data;
+};
+
+/* Get the number of bytes per stream buffer */
+static inline size_t async_stream_buf_bytes(struct bladerf_stream *s)
+{
+ if (s->format == BLADERF_FORMAT_PACKET_META)
+ return s->samples_per_buffer;
+ return samples_to_bytes(s->format, s->samples_per_buffer);
+}
+
+int async_init_stream(struct bladerf_stream **stream,
+ struct bladerf *dev,
+ bladerf_stream_cb callback,
+ void ***buffers,
+ size_t num_buffers,
+ bladerf_format format,
+ size_t buffer_size,
+ size_t num_transfers,
+ void *user_data);
+
+/* Set the transfer timeout. This acquires stream->lock. */
+int async_set_transfer_timeout(struct bladerf_stream *stream,
+ unsigned int transfer_timeout_ms);
+
+/* Get the transfer timeout. This acquires stream->lock. */
+int async_get_transfer_timeout(struct bladerf_stream *stream,
+ unsigned int *transfer_timeout_ms);
+
+/* Backend code is responsible for acquiring stream->lock in their callbacks */
+int async_run_stream(struct bladerf_stream *stream,
+ bladerf_channel_layout layout);
+
+
+/* This function WILL acquire stream->lock before calling backend code.
+ *
+ * If nonblock=true and no transfers are available, this function shall return
+ * BLADERF_ERR_WOULD_BLOCK.
+ */
+int async_submit_stream_buffer(struct bladerf_stream *stream,
+ void *buffer,
+ size_t *length,
+ unsigned int timeout_ms,
+ bool nonblock);
+
+
+void async_deinit_stream(struct bladerf_stream *stream);
+
+#endif
diff --git a/Radio/HW/BladeRF/src/streaming/format.h b/Radio/HW/BladeRF/src/streaming/format.h
new file mode 100644
index 0000000..95cf5da
--- /dev/null
+++ b/Radio/HW/BladeRF/src/streaming/format.h
@@ -0,0 +1,109 @@
+/*
+ * This file is part of the bladeRF project:
+ * http://www.github.com/nuand/bladeRF
+ *
+ * Copyright (C) 2014 Nuand LLC
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef STREAMING_FORMAT_H_
+#define STREAMING_FORMAT_H_
+
+#include "rel_assert.h"
+
+/*
+ * Convert SC8Q8 samples to bytes
+ */
+static inline size_t sc8q7_to_bytes(size_t n_samples)
+{
+ const size_t sample_size = 2 * sizeof(int8_t);
+ assert(n_samples <= (SIZE_MAX / sample_size));
+ return n_samples * sample_size;
+}
+
+/*
+ * Convert bytes to SC8Q8 samples
+ */
+static inline size_t bytes_to_sc8q7(size_t n_bytes)
+{
+ const size_t sample_size = 2 * sizeof(int8_t);
+ assert((n_bytes % sample_size) == 0);
+ return n_bytes / sample_size;
+}
+
+/*
+ * Convert SC16Q11 samples to bytes
+ */
+static inline size_t sc16q11_to_bytes(size_t n_samples)
+{
+ const size_t sample_size = 2 * sizeof(int16_t);
+ assert(n_samples <= (SIZE_MAX / sample_size));
+ return n_samples * sample_size;
+}
+
+/*
+ * Convert bytes to SC16Q11 samples
+ */
+static inline size_t bytes_to_sc16q11(size_t n_bytes)
+{
+ const size_t sample_size = 2 * sizeof(int16_t);
+ assert((n_bytes % sample_size) == 0);
+ return n_bytes / sample_size;
+}
+
+/* Covert samples to bytes based upon the provided format */
+static inline size_t samples_to_bytes(bladerf_format format, size_t n)
+{
+ switch (format) {
+ case BLADERF_FORMAT_SC8_Q7:
+ case BLADERF_FORMAT_SC8_Q7_META:
+ return sc8q7_to_bytes(n);
+
+ case BLADERF_FORMAT_SC16_Q11:
+ case BLADERF_FORMAT_SC16_Q11_META:
+ return sc16q11_to_bytes(n);
+
+ case BLADERF_FORMAT_PACKET_META:
+ return n*4;
+
+ default:
+ assert(!"Invalid format");
+ return 0;
+ }
+}
+
+/* Convert bytes to samples based upon the provided format */
+static inline size_t bytes_to_samples(bladerf_format format, size_t n)
+{
+ switch (format) {
+ case BLADERF_FORMAT_SC8_Q7:
+ case BLADERF_FORMAT_SC8_Q7_META:
+ return bytes_to_sc8q7(n);
+
+ case BLADERF_FORMAT_SC16_Q11:
+ case BLADERF_FORMAT_SC16_Q11_META:
+ return bytes_to_sc16q11(n);
+
+ case BLADERF_FORMAT_PACKET_META:
+ return (n+3)/4;
+
+ default:
+ assert(!"Invalid format");
+ return 0;
+ }
+}
+
+#endif
diff --git a/Radio/HW/BladeRF/src/streaming/metadata.h b/Radio/HW/BladeRF/src/streaming/metadata.h
new file mode 100644
index 0000000..e268559
--- /dev/null
+++ b/Radio/HW/BladeRF/src/streaming/metadata.h
@@ -0,0 +1,180 @@
+/*
+ * Copyright (C) 2014 Nuand LLC
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef STREAMING_METADATA_H_
+#define STREAMING_METADATA_H_
+
+/*
+ * Metadata layout
+ * ~~~~~~~~~~~~~~~~~~~~~~~
+ *
+ * The FPGA handles data in units of "messages." These messages are
+ * 1024 or 2048 bytes for USB 2.0 (Hi-Speed) or USB 3.0 (SuperSpeed),
+ * respectively.
+ *
+ * The first 16 bytes of the message form a header, which includes metadata
+ * for the samples within the message. This header is shown below:
+ *
+ * +-----------------+
+ * 0x00 | Packet length | 2 bytes, Little-endian uint16_t
+ * +-----------------+
+ * 0x02 | Packet flags | 1 byte
+ * +-----------------+
+ * 0x03 | Packet core ID | 1 byte
+ * +-----------------+
+ * 0x04 | Timestamp | 8 bytes, Little-endian uint64_t
+ * +-----------------+
+ * 0x0c | Flags | 4 bytes, Little-endian uint32_t
+ * +-----------------+
+ *
+ * The term "buffer" is used to describe a block of of data received from or
+ * sent to the device. The size of a "buffer" (in bytes) is always a multiple
+ * of the size of a "message." Said another way, a buffer will always evenly
+ * divide into multiple messages. Messages are *not* fragmented across
+ * consecutive buffers.
+ *
+ * +-----------------+ <-. <-.
+ * | header | | |
+ * +-----------------+ | |
+ * | | | |
+ * | samples | | |
+ * | | | |
+ * +-----------------+ | <-+---- message
+ * | header | |
+ * +-----------------+ |
+ * | | |
+ * | samples | |
+ * | | |
+ * +-----------------+ |
+ * | header | |
+ * +-----------------+ |
+ * | | |
+ * | samples | |
+ * | | |
+ * +-----------------+ |
+ * | header | |
+ * +-----------------+ |
+ * | | |
+ * | samples | |
+ * | | |
+ * +-----------------+ <-+---------- buffer
+ *
+ *
+ * When intentionally transmitting discontinuous groups of samples (such
+ * as bursts), it is important that the last two samples within a message
+ * be (0 + 0j). Otherwise, the DAC will not properly hold its output
+ * at (0 + 0j) for the duration of the discontinuity.
+ */
+
+/* Components of the metadata header */
+#define METADATA_RESV_SIZE (sizeof(uint32_t))
+#define METADATA_TIMESTAMP_SIZE (sizeof(uint64_t))
+#define METADATA_FLAGS_SIZE (sizeof(uint32_t))
+#define METADATA_PACKET_LEN_SIZE (sizeof(uint16_t))
+#define METADATA_PACKET_CORE_SIZE (sizeof(uint8_t))
+#define METADATA_PACKET_FLAGS_SIZE (sizeof(uint8_t))
+
+#define METADATA_RESV_OFFSET 0
+#define METADATA_PACKET_LEN_OFFSET 0
+#define METADATA_PACKET_FLAGS_OFFSET 2
+#define METADATA_PACKET_CORE_OFFSET 3
+#define METADATA_TIMESTAMP_OFFSET (METADATA_RESV_SIZE)
+#define METADATA_FLAGS_OFFSET \
+ (METADATA_TIMESTAMP_OFFSET + METADATA_TIMESTAMP_SIZE)
+
+#define METADATA_HEADER_SIZE (METADATA_FLAGS_OFFSET + METADATA_FLAGS_SIZE)
+
+static inline uint64_t metadata_get_timestamp(const uint8_t *header)
+{
+ uint64_t ret;
+ assert(sizeof(ret) == METADATA_TIMESTAMP_SIZE);
+ memcpy(&ret, &header[METADATA_TIMESTAMP_OFFSET], METADATA_TIMESTAMP_SIZE);
+
+ ret = LE64_TO_HOST(ret);
+
+ return ret;
+}
+
+static inline uint32_t metadata_get_flags(const uint8_t *header)
+{
+ uint32_t ret;
+ assert(sizeof(ret) == METADATA_FLAGS_SIZE);
+ memcpy(&ret, &header[METADATA_FLAGS_OFFSET], METADATA_FLAGS_SIZE);
+ return LE32_TO_HOST(ret);
+}
+
+static inline uint16_t metadata_get_packet_len(const uint8_t *header)
+{
+ uint16_t ret;
+ assert(sizeof(ret) == METADATA_PACKET_LEN_SIZE);
+ memcpy(&ret, &header[METADATA_PACKET_LEN_OFFSET], METADATA_PACKET_LEN_SIZE);
+ return LE16_TO_HOST(ret);
+}
+
+static inline uint8_t metadata_get_packet_core(const uint8_t *header)
+{
+ uint8_t ret;
+ assert(sizeof(ret) == METADATA_PACKET_CORE_SIZE);
+ memcpy(&ret, &header[METADATA_PACKET_CORE_OFFSET], METADATA_PACKET_CORE_SIZE);
+ return ret;
+}
+
+static inline uint8_t metadata_get_packet_flags(const uint8_t *header)
+{
+ uint8_t ret;
+ assert(sizeof(ret) == METADATA_PACKET_FLAGS_SIZE);
+ memcpy(&ret, &header[METADATA_PACKET_FLAGS_OFFSET], METADATA_PACKET_FLAGS_SIZE);
+ return ret;
+}
+
+static inline void metadata_set_packet(uint8_t *header,
+ uint64_t timestamp,
+ uint32_t flags,
+ uint16_t length,
+ uint8_t core,
+ uint8_t pkt_flags)
+{
+ timestamp = HOST_TO_LE64(timestamp);
+
+ flags = HOST_TO_LE32(flags);
+
+ length = HOST_TO_LE16(length);
+
+ assert(sizeof(timestamp) == METADATA_TIMESTAMP_SIZE);
+ assert(sizeof(flags) == METADATA_FLAGS_SIZE);
+
+ memset(&header[METADATA_RESV_OFFSET], 0, METADATA_RESV_SIZE);
+
+ memcpy(&header[METADATA_PACKET_LEN_OFFSET], &length, METADATA_PACKET_LEN_SIZE);
+ memcpy(&header[METADATA_PACKET_CORE_OFFSET], &core, METADATA_PACKET_CORE_SIZE);
+ memcpy(&header[METADATA_PACKET_FLAGS_OFFSET], &pkt_flags, METADATA_PACKET_FLAGS_SIZE);
+
+ memcpy(&header[METADATA_TIMESTAMP_OFFSET], &timestamp,
+ METADATA_TIMESTAMP_SIZE);
+
+ memcpy(&header[METADATA_FLAGS_OFFSET], &flags, METADATA_FLAGS_SIZE);
+}
+
+static inline void metadata_set(uint8_t *header,
+ uint64_t timestamp,
+ uint32_t flags)
+{
+ metadata_set_packet(header, timestamp, flags, 0, 0, 0);
+}
+
+#endif
diff --git a/Radio/HW/BladeRF/src/streaming/sync.c b/Radio/HW/BladeRF/src/streaming/sync.c
new file mode 100644
index 0000000..feef101
--- /dev/null
+++ b/Radio/HW/BladeRF/src/streaming/sync.c
@@ -0,0 +1,1339 @@
+/*
+ * Copyright (C) 2014-2015 Nuand LLC
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <string.h>
+#include <errno.h>
+#include <inttypes.h>
+
+/* Only switch on the verbose debug prints in this file when we *really* want
+ * them. Otherwise, compile them out to avoid excessive log level checks
+ * in our data path */
+#include "log.h"
+#ifndef ENABLE_LIBBLADERF_SYNC_LOG_VERBOSE
+#undef log_verbose
+#define log_verbose(...)
+#endif
+#include "minmax.h"
+#include "rel_assert.h"
+
+#include "async.h"
+#include "sync.h"
+#include "sync_worker.h"
+#include "metadata.h"
+
+#include "board/board.h"
+#include "helpers/timeout.h"
+#include "helpers/have_cap.h"
+
+#ifdef ENABLE_LIBBLADERF_SYNC_LOG_VERBOSE
+static inline void dump_buf_states(struct bladerf_sync *s)
+{
+ static char *out = NULL;
+ struct buffer_mgmt *b = &s->buf_mgmt;
+ char *statestr = "UNKNOWN";
+
+ if (out == NULL) {
+ out = malloc((b->num_buffers + 1) * sizeof(char));
+ }
+
+ if (out == NULL) {
+ log_verbose("%s: malloc failed\n");
+ return;
+ }
+
+ out[b->num_buffers] = '\0';
+
+ for (size_t i = 0; i < b->num_buffers; ++i) {
+ switch (b->status[i]) {
+ case SYNC_BUFFER_EMPTY:
+ out[i] = '_';
+ break;
+ case SYNC_BUFFER_IN_FLIGHT:
+ out[i] = '-';
+ break;
+ case SYNC_BUFFER_FULL:
+ out[i] = '*';
+ break;
+ case SYNC_BUFFER_PARTIAL:
+ out[i] = 'o';
+ break;
+ }
+ }
+
+ switch (s->state) {
+ case SYNC_STATE_BUFFER_READY:
+ statestr = "BUFFER_READY";
+ break;
+ case SYNC_STATE_CHECK_WORKER:
+ statestr = "CHECK_WORKER";
+ break;
+ case SYNC_STATE_RESET_BUF_MGMT:
+ statestr = "RESET_BUF_MGMT";
+ break;
+ case SYNC_STATE_START_WORKER:
+ statestr = "START_WORKER";
+ break;
+ case SYNC_STATE_USING_BUFFER:
+ statestr = "USING_BUFFER";
+ break;
+ case SYNC_STATE_USING_BUFFER_META:
+ statestr = "USING_BUFFER_META";
+ break;
+ case SYNC_STATE_USING_PACKET_META:
+ statestr = "USING_PACKET_META";
+ break;
+ case SYNC_STATE_WAIT_FOR_BUFFER:
+ statestr = "WAIT_FOR_BUFFER";
+ break;
+ }
+
+ log_verbose("%s: %s (%s)\n", __FUNCTION__, out, statestr);
+}
+#else
+#define dump_buf_states(...)
+#endif // ENABLE_LIBBLADERF_SYNC_LOG_VERBOSE
+
+static inline size_t samples2bytes(struct bladerf_sync *s, size_t n) {
+ return s->stream_config.bytes_per_sample * n;
+}
+
+static inline unsigned int msg_per_buf(size_t msg_size, size_t buf_size,
+ size_t bytes_per_sample)
+{
+ size_t n = buf_size / (msg_size / bytes_per_sample);
+ assert(n <= UINT_MAX);
+ return (unsigned int) n;
+}
+
+static inline unsigned int samples_per_msg(size_t msg_size,
+ size_t bytes_per_sample)
+{
+ size_t n = (msg_size - METADATA_HEADER_SIZE) / bytes_per_sample;
+ assert(n <= UINT_MAX);
+ return (unsigned int) n;
+}
+
+int sync_init(struct bladerf_sync *sync,
+ struct bladerf *dev,
+ bladerf_channel_layout layout,
+ bladerf_format format,
+ unsigned int num_buffers,
+ size_t buffer_size,
+ size_t msg_size,
+ unsigned int num_transfers,
+ unsigned int stream_timeout)
+
+{
+ int status = 0;
+ size_t i, bytes_per_sample;
+
+ if (num_transfers >= num_buffers) {
+ return BLADERF_ERR_INVAL;
+ }
+
+ if (format == BLADERF_FORMAT_PACKET_META) {
+ if (!have_cap_dev(dev, BLADERF_CAP_FW_SHORT_PACKET)) {
+ log_error("Firmware does not support short packets. "
+ "Upgrade to at least firmware version 2.4.0.\n");
+ return BLADERF_ERR_UNSUPPORTED;
+ }
+
+ if (!have_cap_dev(dev, BLADERF_CAP_FPGA_PACKET_META)) {
+ log_error("FPGA does not support packet meta format. "
+ "Upgrade to at least FPGA version 0.12.0.\n");
+ return BLADERF_ERR_UNSUPPORTED;
+ }
+ }
+
+ if (format == BLADERF_FORMAT_SC8_Q7 || format == BLADERF_FORMAT_SC8_Q7_META) {
+ if (!have_cap_dev(dev, BLADERF_CAP_FPGA_8BIT_SAMPLES)) {
+ log_error("FPGA does not support 8bit mode. "
+ "Upgrade to at least FPGA version 0.15.0.\n");
+ return BLADERF_ERR_UNSUPPORTED;
+ }
+ }
+
+ switch (format) {
+ case BLADERF_FORMAT_SC8_Q7:
+ case BLADERF_FORMAT_SC8_Q7_META:
+ bytes_per_sample = 2;
+ break;
+
+ case BLADERF_FORMAT_SC16_Q11:
+ case BLADERF_FORMAT_SC16_Q11_META:
+ case BLADERF_FORMAT_PACKET_META:
+ bytes_per_sample = 4;
+ break;
+
+ default:
+ log_debug("Invalid format value: %d\n", format);
+ return BLADERF_ERR_INVAL;
+ }
+
+ /* bladeRF GPIF DMA requirement */
+ if ((bytes_per_sample * buffer_size) % 4096 != 0) {
+ assert(!"Invalid buffer size");
+ return BLADERF_ERR_INVAL;
+ }
+
+ /* Deinitialize sync handle if it's initialized */
+ sync_deinit(sync);
+
+ MUTEX_INIT(&sync->lock);
+
+ switch (layout & BLADERF_DIRECTION_MASK) {
+ case BLADERF_TX:
+ sync->buf_mgmt.submitter = SYNC_TX_SUBMITTER_FN;
+ break;
+ case BLADERF_RX:
+ sync->buf_mgmt.submitter = SYNC_TX_SUBMITTER_INVALID;
+ break;
+ }
+
+ sync->dev = dev;
+ sync->state = SYNC_STATE_CHECK_WORKER;
+
+ sync->buf_mgmt.num_buffers = num_buffers;
+ sync->buf_mgmt.resubmit_count = 0;
+
+ sync->stream_config.layout = layout;
+ sync->stream_config.format = format;
+ sync->stream_config.samples_per_buffer = (unsigned int)buffer_size;
+ sync->stream_config.num_xfers = num_transfers;
+ sync->stream_config.timeout_ms = stream_timeout;
+ sync->stream_config.bytes_per_sample = bytes_per_sample;
+
+ sync->meta.state = SYNC_META_STATE_HEADER;
+ sync->meta.msg_size = msg_size;
+ sync->meta.msg_per_buf = msg_per_buf(msg_size, buffer_size, bytes_per_sample);
+ sync->meta.samples_per_msg = samples_per_msg(msg_size, bytes_per_sample);
+ sync->meta.samples_per_ts = (layout == BLADERF_RX_X2 || layout == BLADERF_TX_X2) ? 2:1;
+
+ log_verbose("%s: Buffer size (in bytes): %u\n",
+ __FUNCTION__, buffer_size * bytes_per_sample);
+
+ log_verbose("%s: Buffer size (in samples): %u\n",
+ __FUNCTION__, buffer_size);
+
+ log_verbose("%s: Msg per buffer: %u\n",
+ __FUNCTION__, sync->meta.msg_per_buf);
+
+ log_verbose("%s: Samples per msg: %u\n",
+ __FUNCTION__, sync->meta.samples_per_msg);
+
+ MUTEX_INIT(&sync->buf_mgmt.lock);
+ pthread_cond_init(&sync->buf_mgmt.buf_ready, NULL);
+
+ sync->buf_mgmt.status = (sync_buffer_status*) malloc(num_buffers * sizeof(sync_buffer_status));
+ if (sync->buf_mgmt.status == NULL) {
+ status = BLADERF_ERR_MEM;
+ goto error;
+ }
+
+ sync->buf_mgmt.actual_lengths = (size_t *) malloc(num_buffers * sizeof(size_t));
+ if (sync->buf_mgmt.actual_lengths == NULL) {
+ status = BLADERF_ERR_MEM;
+ goto error;
+ }
+
+ switch (layout & BLADERF_DIRECTION_MASK) {
+ case BLADERF_RX:
+ /* When starting up an RX stream, the first 'num_transfers'
+ * transfers will be submitted to the USB layer to grab data */
+ sync->buf_mgmt.prod_i = num_transfers;
+ sync->buf_mgmt.cons_i = 0;
+ sync->buf_mgmt.partial_off = 0;
+
+ for (i = 0; i < num_buffers; i++) {
+ if (i < num_transfers) {
+ sync->buf_mgmt.status[i] = SYNC_BUFFER_IN_FLIGHT;
+ } else {
+ sync->buf_mgmt.status[i] = SYNC_BUFFER_EMPTY;
+ }
+ }
+
+ sync->meta.msg_timestamp = 0;
+ sync->meta.msg_flags = 0;
+
+ break;
+
+ case BLADERF_TX:
+ sync->buf_mgmt.prod_i = 0;
+ sync->buf_mgmt.cons_i = BUFFER_MGMT_INVALID_INDEX;
+ sync->buf_mgmt.partial_off = 0;
+
+ for (i = 0; i < num_buffers; i++) {
+ sync->buf_mgmt.status[i] = SYNC_BUFFER_EMPTY;
+ }
+
+ sync->meta.msg_timestamp = 0;
+ sync->meta.in_burst = false;
+ sync->meta.now = false;
+ break;
+ }
+
+ status = sync_worker_init(sync);
+ if (status < 0) {
+ goto error;
+ }
+
+ sync->initialized = true;
+
+ return 0;
+
+error:
+ sync_deinit(sync);
+ return status;
+}
+
+void sync_deinit(struct bladerf_sync *sync)
+{
+ if (sync->initialized) {
+ if ((sync->stream_config.layout & BLADERF_DIRECTION_MASK) == BLADERF_TX) {
+ async_submit_stream_buffer(sync->worker->stream,
+ BLADERF_STREAM_SHUTDOWN, NULL, 0, false);
+ }
+
+ sync_worker_deinit(sync->worker, &sync->buf_mgmt.lock,
+ &sync->buf_mgmt.buf_ready);
+
+ if (sync->buf_mgmt.actual_lengths) {
+ free(sync->buf_mgmt.actual_lengths);
+ }
+ /* De-allocate our buffer management resources */
+ if (sync->buf_mgmt.status) {
+ MUTEX_DESTROY(&sync->buf_mgmt.lock);
+ free(sync->buf_mgmt.status);
+ }
+
+ MUTEX_DESTROY(&sync->lock);
+
+ sync->initialized = false;
+ }
+}
+
+static int wait_for_buffer(struct buffer_mgmt *b,
+ unsigned int timeout_ms,
+ const char *dbg_name,
+ unsigned int dbg_idx)
+{
+ int status;
+ struct timespec timeout;
+
+ if (timeout_ms == 0) {
+ log_verbose("%s: Infinite wait for buffer[%d] (status: %d).\n",
+ dbg_name, dbg_idx, b->status[dbg_idx]);
+ status = pthread_cond_wait(&b->buf_ready, &b->lock);
+ } else {
+ log_verbose("%s: Timed wait for buffer[%d] (status: %d).\n", dbg_name,
+ dbg_idx, b->status[dbg_idx]);
+ status = populate_abs_timeout(&timeout, timeout_ms);
+ if (status == 0) {
+ status = pthread_cond_timedwait(&b->buf_ready, &b->lock, &timeout);
+ }
+ }
+
+ if (status == ETIMEDOUT) {
+ log_error("%s: Timed out waiting for buf_ready after %d ms\n",
+ __FUNCTION__, timeout_ms);
+ status = BLADERF_ERR_TIMEOUT;
+ } else if (status != 0) {
+ status = BLADERF_ERR_UNEXPECTED;
+ }
+
+ return status;
+}
+
+#ifndef SYNC_WORKER_START_TIMEOUT_MS
+# define SYNC_WORKER_START_TIMEOUT_MS 250
+#endif
+
+/* Returns # of timestamps (or time steps) left in a message */
+static inline unsigned int ts_remaining(struct bladerf_sync *s)
+{
+ size_t ret = s->meta.samples_per_msg / s->meta.samples_per_ts - s->meta.curr_msg_off;
+ assert(ret <= UINT_MAX);
+
+ return (unsigned int) ret;
+}
+
+/* Returns # of samples left in a message (SC16Q11 mode only) */
+static inline unsigned int left_in_msg(struct bladerf_sync *s)
+{
+ size_t ret = s->meta.samples_per_msg - s->meta.curr_msg_off;
+ assert(ret <= UINT_MAX);
+
+ return (unsigned int) ret;
+}
+
+static inline void advance_rx_buffer(struct buffer_mgmt *b)
+{
+ log_verbose("%s: Marking buf[%u] empty.\n", __FUNCTION__, b->cons_i);
+
+ b->status[b->cons_i] = SYNC_BUFFER_EMPTY;
+ b->cons_i = (b->cons_i + 1) % b->num_buffers;
+}
+
+static inline unsigned int timestamp_to_msg(struct bladerf_sync *s, uint64_t t)
+{
+ uint64_t m = t / s->meta.samples_per_msg;
+ assert(m <= UINT_MAX);
+ return (unsigned int) m;
+}
+
+int sync_rx(struct bladerf_sync *s, void *samples, unsigned num_samples,
+ struct bladerf_metadata *user_meta, unsigned int timeout_ms)
+{
+ struct buffer_mgmt *b;
+
+ int status = 0;
+ bool exit_early = false;
+ bool copied_data = false;
+ unsigned int samples_returned = 0;
+ uint8_t *samples_dest = (uint8_t*)samples;
+ uint8_t *buf_src = NULL;
+ unsigned int samples_to_copy = 0;
+ unsigned int samples_per_buffer = 0;
+ uint64_t target_timestamp = UINT64_MAX;
+ unsigned int pkt_len_dwords = 0;
+
+ if (s == NULL || samples == NULL) {
+ log_debug("NULL pointer passed to %s\n", __FUNCTION__);
+ return BLADERF_ERR_INVAL;
+ } else if (!s->initialized) {
+ return BLADERF_ERR_INVAL;
+ }
+
+ if (num_samples % s->meta.samples_per_ts != 0) {
+ log_debug("%s: %u samples %% %u channels != 0\n",
+ __FUNCTION__, num_samples, s->meta.samples_per_ts);
+ return BLADERF_ERR_INVAL;
+ }
+
+ MUTEX_LOCK(&s->lock);
+
+ if (s->stream_config.format == BLADERF_FORMAT_SC16_Q11_META ||
+ s->stream_config.format == BLADERF_FORMAT_SC8_Q7_META ||
+ s->stream_config.format == BLADERF_FORMAT_PACKET_META) {
+ if (user_meta == NULL) {
+ log_debug("NULL metadata pointer passed to %s\n", __FUNCTION__);
+ status = BLADERF_ERR_INVAL;
+ goto out;
+ } else {
+ user_meta->status = 0;
+ target_timestamp = user_meta->timestamp;
+ }
+ }
+
+ b = &s->buf_mgmt;
+ samples_per_buffer = s->stream_config.samples_per_buffer;
+
+ log_verbose("%s: Requests %u samples.\n", __FUNCTION__, num_samples);
+
+ while (!exit_early && samples_returned < num_samples && status == 0) {
+ dump_buf_states(s);
+
+ switch (s->state) {
+ case SYNC_STATE_CHECK_WORKER: {
+ int stream_error;
+ sync_worker_state worker_state =
+ sync_worker_get_state(s->worker, &stream_error);
+
+ /* Propagate stream error back to the caller.
+ * They can call this function again to restart the stream and
+ * try again.
+ */
+ if (stream_error != 0) {
+ status = stream_error;
+ } else {
+ if (worker_state == SYNC_WORKER_STATE_IDLE) {
+ log_debug("%s: Worker is idle. Going to reset buf "
+ "mgmt.\n", __FUNCTION__);
+ s->state = SYNC_STATE_RESET_BUF_MGMT;
+ } else if (worker_state == SYNC_WORKER_STATE_RUNNING) {
+ s->state = SYNC_STATE_WAIT_FOR_BUFFER;
+ } else {
+ status = BLADERF_ERR_UNEXPECTED;
+ log_debug("%s: Unexpected worker state=%d\n",
+ __FUNCTION__, worker_state);
+ }
+ }
+
+ break;
+ }
+
+ case SYNC_STATE_RESET_BUF_MGMT:
+ MUTEX_LOCK(&b->lock);
+ /* When the RX stream starts up, it will submit the first T
+ * transfers, so the consumer index must be reset to 0 */
+ b->cons_i = 0;
+ MUTEX_UNLOCK(&b->lock);
+ log_debug("%s: Reset buf_mgmt consumer index\n", __FUNCTION__);
+ s->state = SYNC_STATE_START_WORKER;
+ break;
+
+
+ case SYNC_STATE_START_WORKER:
+ sync_worker_submit_request(s->worker, SYNC_WORKER_START);
+
+ status = sync_worker_wait_for_state(
+ s->worker,
+ SYNC_WORKER_STATE_RUNNING,
+ SYNC_WORKER_START_TIMEOUT_MS);
+
+ if (status == 0) {
+ s->state = SYNC_STATE_WAIT_FOR_BUFFER;
+ log_debug("%s: Worker is now running.\n", __FUNCTION__);
+ } else {
+ log_debug("%s: Failed to start worker, (%d)\n",
+ __FUNCTION__, status);
+ }
+ break;
+
+ case SYNC_STATE_WAIT_FOR_BUFFER:
+ MUTEX_LOCK(&b->lock);
+
+ /* Check the buffer state, as the worker may have produced one
+ * since we last queried the status */
+ if (b->status[b->cons_i] == SYNC_BUFFER_FULL) {
+ s->state = SYNC_STATE_BUFFER_READY;
+ log_verbose("%s: buffer %u is ready to consume\n",
+ __FUNCTION__, b->cons_i);
+ } else {
+ status = wait_for_buffer(b, timeout_ms,
+ __FUNCTION__, b->cons_i);
+
+ if (status == 0) {
+ if (b->status[b->cons_i] != SYNC_BUFFER_FULL) {
+ s->state = SYNC_STATE_CHECK_WORKER;
+ } else {
+ s->state = SYNC_STATE_BUFFER_READY;
+ log_verbose("%s: buffer %u is ready to consume\n",
+ __FUNCTION__, b->cons_i);
+ }
+ }
+ }
+
+ MUTEX_UNLOCK(&b->lock);
+ break;
+
+ case SYNC_STATE_BUFFER_READY:
+ MUTEX_LOCK(&b->lock);
+ b->status[b->cons_i] = SYNC_BUFFER_PARTIAL;
+ b->partial_off = 0;
+
+ switch (s->stream_config.format) {
+ case BLADERF_FORMAT_SC16_Q11:
+ case BLADERF_FORMAT_SC8_Q7:
+ s->state = SYNC_STATE_USING_BUFFER;
+ break;
+
+ case BLADERF_FORMAT_SC16_Q11_META:
+ case BLADERF_FORMAT_SC8_Q7_META:
+ s->state = SYNC_STATE_USING_BUFFER_META;
+ s->meta.curr_msg_off = 0;
+ s->meta.msg_num = 0;
+ break;
+
+ case BLADERF_FORMAT_PACKET_META:
+ s->state = SYNC_STATE_USING_PACKET_META;
+ break;
+
+ default:
+ assert(!"Invalid stream format");
+ status = BLADERF_ERR_UNEXPECTED;
+ }
+
+ MUTEX_UNLOCK(&b->lock);
+ break;
+
+ case SYNC_STATE_USING_BUFFER: /* SC16Q11 buffers w/o metadata */
+ MUTEX_LOCK(&b->lock);
+
+ buf_src = (uint8_t*)b->buffers[b->cons_i];
+
+ samples_to_copy = uint_min(num_samples - samples_returned,
+ samples_per_buffer - b->partial_off);
+
+ memcpy(samples_dest + samples2bytes(s, samples_returned),
+ buf_src + samples2bytes(s, b->partial_off),
+ samples2bytes(s, samples_to_copy));
+
+ b->partial_off += samples_to_copy;
+ samples_returned += samples_to_copy;
+
+ log_verbose("%s: Provided %u samples to caller\n",
+ __FUNCTION__, samples_to_copy);
+
+ /* We've finished consuming this buffer and can start looking
+ * for available samples in the next buffer */
+ if (b->partial_off >= samples_per_buffer) {
+
+ /* Check for symptom of out-of-bounds accesses */
+ assert(b->partial_off == samples_per_buffer);
+
+ advance_rx_buffer(b);
+ s->state = SYNC_STATE_WAIT_FOR_BUFFER;
+ }
+
+ MUTEX_UNLOCK(&b->lock);
+ break;
+
+
+ case SYNC_STATE_USING_BUFFER_META: /* SC16Q11 buffers w/ metadata */
+ MUTEX_LOCK(&b->lock);
+
+ switch (s->meta.state) {
+ case SYNC_META_STATE_HEADER:
+
+ assert(s->meta.msg_num < s->meta.msg_per_buf);
+
+ buf_src = (uint8_t*)b->buffers[b->cons_i];
+
+ s->meta.curr_msg =
+ buf_src + s->meta.msg_size * s->meta.msg_num;
+
+ s->meta.msg_timestamp =
+ metadata_get_timestamp(s->meta.curr_msg);
+
+ s->meta.msg_flags =
+ metadata_get_flags(s->meta.curr_msg);
+
+ user_meta->status |= s->meta.msg_flags &
+ (BLADERF_META_FLAG_RX_HW_UNDERFLOW |
+ BLADERF_META_FLAG_RX_HW_MINIEXP1 |
+ BLADERF_META_FLAG_RX_HW_MINIEXP2);
+
+ s->meta.curr_msg_off = 0;
+
+ /* We've encountered a discontinuity and need to return
+ * what we have so far, setting the status flags */
+ if (copied_data &&
+ s->meta.msg_timestamp != s->meta.curr_timestamp) {
+
+ user_meta->status |= BLADERF_META_STATUS_OVERRUN;
+ exit_early = true;
+ log_debug("Sample discontinuity detected @ "
+ "buffer %u, message %u: Expected t=%llu, "
+ "got t=%llu\n",
+ b->cons_i, s->meta.msg_num,
+ (unsigned long long)s->meta.curr_timestamp,
+ (unsigned long long)s->meta.msg_timestamp);
+
+ } else {
+ log_verbose("Got header for message %u: "
+ "t_new=%u, t_old=%u\n",
+ s->meta.msg_num,
+ s->meta.msg_timestamp,
+ s->meta.curr_timestamp);
+ }
+
+ s->meta.curr_timestamp = s->meta.msg_timestamp;
+ s->meta.state = SYNC_META_STATE_SAMPLES;
+ break;
+
+ case SYNC_META_STATE_SAMPLES:
+ if (!copied_data &&
+ (user_meta->flags & BLADERF_META_FLAG_RX_NOW) == 0 &&
+ target_timestamp < s->meta.curr_timestamp) {
+
+ log_debug("Current timestamp is %llu, "
+ "target=%llu (user=%llu)\n",
+ (unsigned long long)s->meta.curr_timestamp,
+ (unsigned long long)target_timestamp,
+ (unsigned long long)user_meta->timestamp);
+
+ status = BLADERF_ERR_TIME_PAST;
+ } else if ((user_meta->flags & BLADERF_META_FLAG_RX_NOW) ||
+ target_timestamp == s->meta.curr_timestamp) {
+
+ /* Copy the request amount up to the end of a
+ * this message in the current buffer */
+ samples_to_copy =
+ uint_min(num_samples - samples_returned,
+ left_in_msg(s));
+
+ memcpy(samples_dest + samples2bytes(s, samples_returned),
+ s->meta.curr_msg +
+ METADATA_HEADER_SIZE +
+ samples2bytes(s, s->meta.curr_msg_off),
+ samples2bytes(s, samples_to_copy));
+
+ samples_returned += samples_to_copy;
+ s->meta.curr_msg_off += samples_to_copy;
+
+ if (!copied_data &&
+ (user_meta->flags & BLADERF_META_FLAG_RX_NOW)) {
+
+ /* Provide the user with the timestamp at the
+ * first returned sample when the
+ * NOW flag has been provided */
+ user_meta->timestamp = s->meta.curr_timestamp;
+ log_verbose("Updated user meta timestamp with: "
+ "%llu\n", (unsigned long long)
+ user_meta->timestamp);
+ }
+
+ copied_data = true;
+
+ s->meta.curr_timestamp += samples_to_copy / s->meta.samples_per_ts;
+
+ /* We've begun copying samples, so our target will
+ * just keep tracking the current timestamp. */
+ target_timestamp = s->meta.curr_timestamp;
+
+ log_verbose("After copying samples, t=%llu\n",
+ (unsigned long long)s->meta.curr_timestamp);
+
+ if (left_in_msg(s) == 0) {
+ assert(s->meta.curr_msg_off == s->meta.samples_per_msg);
+
+ s->meta.state = SYNC_META_STATE_HEADER;
+ s->meta.msg_num++;
+
+ if (s->meta.msg_num >= s->meta.msg_per_buf) {
+ assert(s->meta.msg_num == s->meta.msg_per_buf);
+ advance_rx_buffer(b);
+ s->meta.msg_num = 0;
+ s->state = SYNC_STATE_WAIT_FOR_BUFFER;
+ }
+ }
+
+ } else {
+ const uint64_t time_delta = target_timestamp - s->meta.curr_timestamp;
+ uint64_t samples_left = time_delta * s->meta.samples_per_ts;
+ uint64_t left_in_buffer =
+ (uint64_t) s->meta.samples_per_msg *
+ (s->meta.msg_per_buf - s->meta.msg_num);
+
+ /* Account for current position in buffer */
+ left_in_buffer -= s->meta.curr_msg_off;
+
+ if (samples_left >= left_in_buffer) {
+ /* Discard the remainder of this buffer */
+ advance_rx_buffer(b);
+ s->state = SYNC_STATE_WAIT_FOR_BUFFER;
+ s->meta.state = SYNC_META_STATE_HEADER;
+
+ log_verbose("%s: Discarding rest of buffer.\n",
+ __FUNCTION__);
+
+ } else if (time_delta <= ts_remaining(s)) {
+ /* Fast forward within the current message */
+ assert(time_delta <= SIZE_MAX);
+
+ s->meta.curr_msg_off += (size_t)samples_left;
+ s->meta.curr_timestamp += time_delta;
+
+ log_verbose("%s: Seeking within message (t=%llu)\n",
+ __FUNCTION__,
+ s->meta.curr_timestamp);
+ } else {
+ s->meta.state = SYNC_META_STATE_HEADER;
+ s->meta.msg_num += timestamp_to_msg(s, samples_left);
+
+ log_verbose("%s: Seeking to message %u.\n",
+ __FUNCTION__, s->meta.msg_num);
+ }
+ }
+ break;
+
+ default:
+ assert(!"Invalid state");
+ status = BLADERF_ERR_UNEXPECTED;
+ }
+
+ MUTEX_UNLOCK(&b->lock);
+ break;
+
+ case SYNC_STATE_USING_PACKET_META: /* Packet buffers w/ metadata */
+ MUTEX_LOCK(&b->lock);
+
+ buf_src = (uint8_t*)b->buffers[b->cons_i];
+
+ pkt_len_dwords = metadata_get_packet_len(buf_src);
+
+ if (pkt_len_dwords > 0) {
+ samples_returned += num_samples;
+ user_meta->actual_count = pkt_len_dwords;
+ memcpy(samples_dest, buf_src + METADATA_HEADER_SIZE, samples2bytes(s, pkt_len_dwords));
+ }
+
+ advance_rx_buffer(b);
+ s->state = SYNC_STATE_WAIT_FOR_BUFFER;
+ MUTEX_UNLOCK(&b->lock);
+ break;
+
+
+ }
+ }
+
+ if (user_meta && s->stream_config.format != BLADERF_FORMAT_PACKET_META) {
+ user_meta->actual_count = samples_returned;
+ }
+
+out:
+ MUTEX_UNLOCK(&s->lock);
+
+ return status;
+}
+
+/* Assumes buffer lock is held */
+static int advance_tx_buffer(struct bladerf_sync *s, struct buffer_mgmt *b)
+{
+ int status = 0;
+ const unsigned int idx = b->prod_i;
+
+ if (b->submitter == SYNC_TX_SUBMITTER_FN) {
+ /* Mark buffer in flight because we're going to send it out.
+ * This ensures that if the callback fires before this function
+ * completes, its state will be correct. */
+ b->status[idx] = SYNC_BUFFER_IN_FLIGHT;
+
+ /* This call may block and it results in a per-stream lock being held,
+ * so the buffer lock must be dropped.
+ *
+ * A callback may occur in the meantime, but this will not touch the
+ * status for this this buffer, or the producer index.
+ */
+ MUTEX_UNLOCK(&b->lock);
+ size_t len;
+ if (s->stream_config.format == BLADERF_FORMAT_PACKET_META) {
+ len = b->actual_lengths[idx];
+ } else {
+ len = async_stream_buf_bytes(s->worker->stream);
+ }
+ status = async_submit_stream_buffer(s->worker->stream,
+ b->buffers[idx],
+ &len,
+ s->stream_config.timeout_ms,
+ true);
+ MUTEX_LOCK(&b->lock);
+
+ if (status == 0) {
+ log_verbose("%s: buf[%u] submitted.\n",
+ __FUNCTION__, idx);
+
+ } else if (status == BLADERF_ERR_WOULD_BLOCK) {
+ log_verbose("%s: Deferring buf[%u] submission to worker callback.\n",
+ __FUNCTION__, idx);
+
+ /* Mark this buffer as being full of data, but not in flight */
+ b->status[idx] = SYNC_BUFFER_FULL;
+
+ /* Assign callback the duty of submitting deferred buffers,
+ * and use buffer_mgmt.cons_i to denote which it should submit
+ * (i.e., consume). */
+ b->submitter = SYNC_TX_SUBMITTER_CALLBACK;
+ b->cons_i = idx;
+
+ /* This is expected and we are handling it. Don't propagate this
+ * status back up */
+ status = 0;
+ } else {
+ /* Unmark this as being in flight */
+ b->status[idx] = SYNC_BUFFER_FULL;
+
+ log_debug("%s: Failed to submit buf[%u].\n", __FUNCTION__, idx);
+ return status;
+ }
+ } else {
+ /* We are not submitting this buffer; this is deffered to the worker
+ * call back. Just update its state to being full of samples. */
+ b->status[idx] = SYNC_BUFFER_FULL;
+ }
+
+ /* Advance "producer" insertion index. */
+ b->prod_i = (idx + 1) % b->num_buffers;
+
+ /* Determine our next state based upon the state of the next buffer we
+ * want to use. */
+ if (b->status[b->prod_i] == SYNC_BUFFER_EMPTY) {
+ /* Buffer is empty and ready for use */
+ s->state = SYNC_STATE_BUFFER_READY;
+ } else {
+ /* We'll have to wait on this buffer to become ready. First, we'll
+ * verify that the worker is running. */
+ s->state = SYNC_STATE_CHECK_WORKER;
+ }
+
+ return status;
+}
+
+static inline bool timestamp_in_past(struct bladerf_metadata *user_meta,
+ struct bladerf_sync *s)
+{
+ const bool in_past = user_meta->timestamp < s->meta.curr_timestamp;
+
+ if (in_past) {
+ log_debug("Provided timestamp=%"PRIu64" is in past: current=%"PRIu64"\n",
+ user_meta->timestamp, s->meta.curr_timestamp);
+ }
+
+ return in_past;
+}
+
+struct tx_options {
+ bool flush;
+ bool zero_pad;
+};
+
+static inline int handle_tx_parameters(struct bladerf_metadata *user_meta,
+ struct bladerf_sync *s,
+ struct tx_options *options)
+{
+ if (s->stream_config.format == BLADERF_FORMAT_SC16_Q11_META) {
+ if (user_meta == NULL) {
+ log_debug("NULL metadata pointer passed to %s\n", __FUNCTION__);
+ return BLADERF_ERR_INVAL;
+ }
+
+ if (user_meta->flags & BLADERF_META_FLAG_TX_BURST_START) {
+ bool now = user_meta->flags & BLADERF_META_FLAG_TX_NOW;
+
+ if (s->meta.in_burst) {
+ log_debug("%s: BURST_START provided while already in a burst.\n",
+ __FUNCTION__);
+ return BLADERF_ERR_INVAL;
+ } else if (!now && timestamp_in_past(user_meta, s)) {
+ return BLADERF_ERR_TIME_PAST;
+ }
+
+ s->meta.in_burst = true;
+ if (now) {
+ s->meta.now = true;
+ log_verbose("%s: Starting burst \"now\"\n", __FUNCTION__);
+ } else {
+ s->meta.curr_timestamp = user_meta->timestamp;
+ log_verbose("%s: Starting burst @ %llu\n", __FUNCTION__,
+ (unsigned long long)s->meta.curr_timestamp);
+ }
+
+ if (user_meta->flags & BLADERF_META_FLAG_TX_UPDATE_TIMESTAMP) {
+ log_debug("UPDATE_TIMESTAMP ignored; BURST_START flag was used.\n");
+ }
+
+ } else if (user_meta->flags & BLADERF_META_FLAG_TX_NOW) {
+ log_debug("%s: TX_NOW was specified without BURST_START.\n",
+ __FUNCTION__);
+ return BLADERF_ERR_INVAL;
+ } else if (user_meta->flags & BLADERF_META_FLAG_TX_UPDATE_TIMESTAMP) {
+ if (timestamp_in_past(user_meta, s)) {
+ return BLADERF_ERR_TIME_PAST;
+ } else {
+ options->zero_pad = true;
+ }
+ }
+
+ if (user_meta->flags & BLADERF_META_FLAG_TX_BURST_END) {
+ if (s->meta.in_burst) {
+ options->flush = true;
+ } else {
+ log_debug("%s: BURST_END provided while not in a burst.\n",
+ __FUNCTION__);
+ return BLADERF_ERR_INVAL;
+ }
+ }
+
+ user_meta->status = 0;
+ }
+
+ return 0;
+}
+
+int sync_tx(struct bladerf_sync *s,
+ void const *samples,
+ unsigned int num_samples,
+ struct bladerf_metadata *user_meta,
+ unsigned int timeout_ms)
+{
+ struct buffer_mgmt *b = NULL;
+
+ int status = 0;
+ unsigned int samples_written = 0;
+ unsigned int samples_to_copy = 0;
+ unsigned int samples_per_buffer = 0;
+ uint8_t const *samples_src = (uint8_t const *)samples;
+ uint8_t *buf_dest = NULL;
+ struct tx_options op = {
+ FIELD_INIT(.flush, false), FIELD_INIT(.zero_pad, false),
+ };
+
+ log_verbose("%s: called for %u samples.\n", __FUNCTION__, num_samples);
+
+ if (s == NULL || samples == NULL || !s->initialized) {
+ return BLADERF_ERR_INVAL;
+ }
+
+ MUTEX_LOCK(&s->lock);
+
+ status = handle_tx_parameters(user_meta, s, &op);
+ if (status != 0) {
+ goto out;
+ }
+
+ b = &s->buf_mgmt;
+ samples_per_buffer = s->stream_config.samples_per_buffer;
+
+ while (status == 0 && ((samples_written < num_samples) || op.flush)) {
+ switch (s->state) {
+ case SYNC_STATE_CHECK_WORKER: {
+ int stream_error;
+ sync_worker_state worker_state =
+ sync_worker_get_state(s->worker, &stream_error);
+
+ if (stream_error != 0) {
+ status = stream_error;
+ } else {
+ if (worker_state == SYNC_WORKER_STATE_IDLE) {
+ /* No need to reset any buffer management for TX since
+ * the TX stream does not submit an initial set of
+ * buffers. Therefore the RESET_BUF_MGMT state is
+ * skipped here. */
+ s->state = SYNC_STATE_START_WORKER;
+ } else {
+ /* Worker is running - continue onto checking for and
+ * potentially waiting for an available buffer */
+ s->state = SYNC_STATE_WAIT_FOR_BUFFER;
+ }
+ }
+ break;
+ }
+
+ case SYNC_STATE_RESET_BUF_MGMT:
+ assert(!"Bug");
+ break;
+
+ case SYNC_STATE_START_WORKER:
+ sync_worker_submit_request(s->worker, SYNC_WORKER_START);
+
+ status = sync_worker_wait_for_state(
+ s->worker, SYNC_WORKER_STATE_RUNNING,
+ SYNC_WORKER_START_TIMEOUT_MS);
+
+ if (status == 0) {
+ s->state = SYNC_STATE_WAIT_FOR_BUFFER;
+ log_debug("%s: Worker is now running.\n", __FUNCTION__);
+ }
+ break;
+
+ case SYNC_STATE_WAIT_FOR_BUFFER:
+ MUTEX_LOCK(&b->lock);
+
+ /* Check the buffer state, as the worker may have consumed one
+ * since we last queried the status */
+ if (b->status[b->prod_i] == SYNC_BUFFER_EMPTY) {
+ s->state = SYNC_STATE_BUFFER_READY;
+ } else {
+ status =
+ wait_for_buffer(b, timeout_ms, __FUNCTION__, b->prod_i);
+ }
+
+ MUTEX_UNLOCK(&b->lock);
+ break;
+
+ case SYNC_STATE_BUFFER_READY:
+ MUTEX_LOCK(&b->lock);
+ b->status[b->prod_i] = SYNC_BUFFER_PARTIAL;
+ b->partial_off = 0;
+
+ switch (s->stream_config.format) {
+ case BLADERF_FORMAT_SC16_Q11:
+ case BLADERF_FORMAT_SC8_Q7:
+ s->state = SYNC_STATE_USING_BUFFER;
+ break;
+
+ case BLADERF_FORMAT_SC16_Q11_META:
+ case BLADERF_FORMAT_SC8_Q7_META:
+ s->state = SYNC_STATE_USING_BUFFER_META;
+ s->meta.curr_msg_off = 0;
+ s->meta.msg_num = 0;
+ break;
+
+ case BLADERF_FORMAT_PACKET_META:
+ s->state = SYNC_STATE_USING_PACKET_META;
+ s->meta.curr_msg_off = 0;
+ s->meta.msg_num = 0;
+ break;
+
+ default:
+ assert(!"Invalid stream format");
+ status = BLADERF_ERR_UNEXPECTED;
+ }
+
+ MUTEX_UNLOCK(&b->lock);
+ break;
+
+
+ case SYNC_STATE_USING_BUFFER:
+ MUTEX_LOCK(&b->lock);
+
+ buf_dest = (uint8_t *)b->buffers[b->prod_i];
+ samples_to_copy = uint_min(num_samples - samples_written,
+ samples_per_buffer - b->partial_off);
+
+ memcpy(buf_dest + samples2bytes(s, b->partial_off),
+ samples_src + samples2bytes(s, samples_written),
+ samples2bytes(s, samples_to_copy));
+
+ b->partial_off += samples_to_copy;
+ samples_written += samples_to_copy;
+
+ log_verbose("%s: Buffered %u samples from caller\n",
+ __FUNCTION__, samples_to_copy);
+
+ if (b->partial_off >= samples_per_buffer) {
+ /* Check for symptom of out-of-bounds accesses */
+ assert(b->partial_off == samples_per_buffer);
+
+ /* Submit buffer and advance to the next one */
+ status = advance_tx_buffer(s, b);
+ }
+
+ MUTEX_UNLOCK(&b->lock);
+ break;
+
+ case SYNC_STATE_USING_BUFFER_META: /* SC16Q11 buffers w/ metadata */
+ MUTEX_LOCK(&b->lock);
+
+ switch (s->meta.state) {
+ case SYNC_META_STATE_HEADER:
+ buf_dest = (uint8_t *)b->buffers[b->prod_i];
+
+ s->meta.curr_msg =
+ buf_dest + s->meta.msg_size * s->meta.msg_num;
+
+ log_verbose("%s: Set curr_msg to: %p (buf @ %p)\n",
+ __FUNCTION__, s->meta.curr_msg, buf_dest);
+
+ s->meta.curr_msg_off = 0;
+
+ if (s->meta.now) {
+ metadata_set(s->meta.curr_msg, 0, 0);
+ } else {
+ metadata_set(s->meta.curr_msg,
+ s->meta.curr_timestamp, 0);
+ }
+
+ s->meta.state = SYNC_META_STATE_SAMPLES;
+
+ log_verbose("%s: Filled in header (t=%llu)\n",
+ __FUNCTION__,
+ (unsigned long long)s->meta.curr_timestamp);
+ break;
+
+ case SYNC_META_STATE_SAMPLES:
+ if (op.zero_pad) {
+ const uint64_t delta =
+ user_meta->timestamp - s->meta.curr_timestamp;
+
+ size_t to_zero;
+
+ log_verbose("%s: User requested zero padding to "
+ "t=%" PRIu64 " (%" PRIu64 " + %" PRIu64
+ ")\n",
+ __FUNCTION__, user_meta->timestamp,
+ s->meta.curr_timestamp, delta);
+
+ if (delta < left_in_msg(s)) {
+ to_zero = (size_t)delta;
+
+ log_verbose("%s: Padded subset of msg "
+ "(%" PRIu64 " samples)\n",
+ __FUNCTION__, (uint64_t)to_zero);
+ } else {
+ to_zero = left_in_msg(s);
+
+ log_verbose("%s: Padded remainder of msg "
+ "(%" PRIu64 " samples)\n",
+ __FUNCTION__, (uint64_t)to_zero);
+ }
+
+ memset(s->meta.curr_msg + METADATA_HEADER_SIZE +
+ samples2bytes(s, s->meta.curr_msg_off),
+ 0, samples2bytes(s, to_zero));
+
+ s->meta.curr_msg_off += to_zero;
+
+ /* If we're going to supply the FPGA with a
+ * discontinuity, it is required that the last three
+ * samples provided be zero in order to hold the
+ * DAC @ (0 + 0j).
+ *
+ * See "Figure 9: TX data interface" in the LMS6002D
+ * data sheet for the register stages that create
+ * this requirement.
+ *
+ * If we're ending a burst with < 3 zeros samples at
+ * the end of the message, we'll need to continue
+ * onto the next message. At this next message,
+ * we'll either encounter the requested timestamp or
+ * zero-fill the message to fulfil this "three zero
+ * sample" requirement, and set the timestamp
+ * appropriately at the following message.
+ */
+ if (to_zero < 3 && left_in_msg(s) == 0) {
+ s->meta.curr_timestamp += to_zero;
+ log_verbose("Ended msg with < 3 zero samples. "
+ "Padding into next message.\n");
+ } else {
+ s->meta.curr_timestamp = user_meta->timestamp;
+ op.zero_pad = false;
+ }
+ }
+
+ samples_to_copy = uint_min(
+ num_samples - samples_written, left_in_msg(s));
+
+ if (samples_to_copy != 0) {
+ /* We have user data to copy into the current
+ * message within the buffer */
+ memcpy(s->meta.curr_msg + METADATA_HEADER_SIZE +
+ samples2bytes(s, s->meta.curr_msg_off),
+ samples_src +
+ samples2bytes(s, samples_written),
+ samples2bytes(s, samples_to_copy));
+
+ s->meta.curr_msg_off += samples_to_copy;
+ if (s->stream_config.layout == BLADERF_TX_X2)
+ s->meta.curr_timestamp += samples_to_copy / 2;
+ else
+ s->meta.curr_timestamp += samples_to_copy;
+
+ samples_written += samples_to_copy;
+
+ log_verbose("%s: Copied %u samples. "
+ "Current message offset is now: %u\n",
+ __FUNCTION__, samples_to_copy,
+ s->meta.curr_msg_off);
+ }
+
+ if (left_in_msg(s) != 0 && op.flush) {
+ /* We're ending this buffer early and need to
+ * flush the remaining samples by setting all
+ * samples in the messages to (0 + 0j) */
+ const unsigned int to_zero = left_in_msg(s);
+
+ const size_t off =
+ METADATA_HEADER_SIZE +
+ samples2bytes(s, s->meta.curr_msg_off);
+
+ /* If we're here, we should have already copied
+ * all requested data to the buffer */
+ assert(num_samples == samples_written);
+
+ memset(s->meta.curr_msg + off, 0,
+ samples2bytes(s, to_zero));
+
+ log_verbose(
+ "%s: Flushed %u samples @ %u (0x%08x)\n",
+ __FUNCTION__, to_zero, s->meta.curr_msg_off,
+ off);
+
+ s->meta.curr_msg_off += to_zero;
+ s->meta.curr_timestamp += to_zero;
+ }
+
+ if (left_in_msg(s) == 0) {
+ s->meta.msg_num++;
+ s->meta.state = SYNC_META_STATE_HEADER;
+
+ log_verbose("%s: Advancing to next message (%u)\n",
+ __FUNCTION__, s->meta.msg_num);
+ }
+
+ if (s->meta.msg_num >= s->meta.msg_per_buf) {
+ assert(s->meta.msg_num == s->meta.msg_per_buf);
+
+ /* Submit buffer of samples for transmission */
+ status = advance_tx_buffer(s, b);
+
+ s->meta.msg_num = 0;
+ s->state = SYNC_STATE_WAIT_FOR_BUFFER;
+
+ /* We want to clear the flush flag if we've written
+ * all of our data, but keep it set if we have more
+ * data and need wrap around to another buffer */
+ op.flush =
+ op.flush && (samples_written != num_samples);
+ }
+
+ break;
+
+ default:
+ assert(!"Invalid state");
+ status = BLADERF_ERR_UNEXPECTED;
+ }
+
+ MUTEX_UNLOCK(&b->lock);
+ break;
+
+ case SYNC_STATE_USING_PACKET_META: /* Packet buffers w/ metadata */
+ MUTEX_LOCK(&b->lock);
+
+ buf_dest = (uint8_t *)b->buffers[b->prod_i];
+
+ memcpy(buf_dest + METADATA_HEADER_SIZE, samples_src, num_samples*4);
+
+ b->actual_lengths[b->prod_i] = samples2bytes(s, num_samples) + METADATA_HEADER_SIZE;
+
+ metadata_set_packet(buf_dest, 0, 0, num_samples, 0, 0);
+
+ samples_written = num_samples;
+
+ status = advance_tx_buffer(s, b);
+
+ s->meta.msg_num = 0;
+ s->state = SYNC_STATE_WAIT_FOR_BUFFER;
+
+ MUTEX_UNLOCK(&b->lock);
+ break;
+
+ }
+ }
+
+ if (status == 0 &&
+ s->stream_config.format == BLADERF_FORMAT_SC16_Q11_META &&
+ (user_meta->flags & BLADERF_META_FLAG_TX_BURST_END)) {
+ s->meta.in_burst = false;
+ s->meta.now = false;
+ }
+
+out:
+ MUTEX_UNLOCK(&s->lock);
+
+ return status;
+}
+
+unsigned int sync_buf2idx(struct buffer_mgmt *b, void *addr)
+{
+ unsigned int i;
+
+ for (i = 0; i < b->num_buffers; i++) {
+ if (b->buffers[i] == addr) {
+ return i;
+ }
+ }
+
+ assert(!"Bug: Buffer not found.");
+
+ /* Assertions are intended to always remain on. If someone turned them
+ * off, do the best we can...complain loudly and clobber a buffer */
+ log_critical("Bug: Buffer not found.");
+ return 0;
+}
diff --git a/Radio/HW/BladeRF/src/streaming/sync.h b/Radio/HW/BladeRF/src/streaming/sync.h
new file mode 100644
index 0000000..8977791
--- /dev/null
+++ b/Radio/HW/BladeRF/src/streaming/sync.h
@@ -0,0 +1,193 @@
+/*
+ * This file is part of the bladeRF project:
+ * http://www.github.com/nuand/bladeRF
+ *
+ * Copyright (C) 2014 Nuand LLC
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef STREAMING_SYNC_H_
+#define STREAMING_SYNC_H_
+
+#include <limits.h>
+#include <pthread.h>
+
+#include <libbladeRF.h>
+
+#include "thread.h"
+
+/* These parameters are only written during sync_init */
+struct stream_config {
+ bladerf_format format;
+ bladerf_channel_layout layout;
+
+ unsigned int samples_per_buffer;
+ unsigned int num_xfers;
+ unsigned int timeout_ms;
+
+ size_t bytes_per_sample;
+};
+
+typedef enum {
+ SYNC_BUFFER_EMPTY = 0, /**< Buffer contains no data */
+ SYNC_BUFFER_PARTIAL, /**< sync_rx/tx is currently emptying/filling */
+ SYNC_BUFFER_FULL, /**< Buffer is full of data */
+ SYNC_BUFFER_IN_FLIGHT, /**< Currently being transferred */
+} sync_buffer_status;
+
+typedef enum {
+ SYNC_META_STATE_HEADER, /**< Extract the metadata header */
+ SYNC_META_STATE_SAMPLES, /**< Process samples */
+} sync_meta_state;
+
+typedef enum {
+ /** Invalid selection */
+ SYNC_TX_SUBMITTER_INVALID = -1,
+
+ /** sync_tx() is repsonsible for submitting buffers for async transfer */
+ SYNC_TX_SUBMITTER_FN,
+
+ /** The TX worker callbacks should be returning buffers for submission */
+ SYNC_TX_SUBMITTER_CALLBACK
+} sync_tx_submitter;
+
+#define BUFFER_MGMT_INVALID_INDEX (UINT_MAX)
+
+struct buffer_mgmt {
+ sync_buffer_status *status;
+ size_t *actual_lengths;
+
+ void **buffers;
+ unsigned int num_buffers;
+
+ unsigned int prod_i; /**< Producer index - next buffer to fill */
+ unsigned int cons_i; /**< Consumer index - next buffer to empty */
+ unsigned int partial_off; /**< Current index into partial buffer */
+
+ /* In the event of a SW RX overrun, this count is used to determine
+ * how many more transfers should be considered invalid and require
+ * resubmission */
+ unsigned int resubmit_count;
+
+ /* Applicable to TX only. Denotes which context is responsible for
+ * submitting full buffers to the underlying async system */
+ sync_tx_submitter submitter;
+
+
+ MUTEX lock;
+ pthread_cond_t buf_ready; /**< Buffer produced by RX callback, or
+ * buffer emptied by TX callback */
+};
+
+/* State of API-side sync interface */
+typedef enum {
+ SYNC_STATE_CHECK_WORKER,
+ SYNC_STATE_RESET_BUF_MGMT,
+ SYNC_STATE_START_WORKER,
+ SYNC_STATE_WAIT_FOR_BUFFER,
+ SYNC_STATE_BUFFER_READY,
+ SYNC_STATE_USING_BUFFER,
+ SYNC_STATE_USING_PACKET_META,
+ SYNC_STATE_USING_BUFFER_META
+} sync_state;
+
+struct sync_meta {
+ sync_meta_state state; /* State of metadata processing */
+
+ uint8_t *curr_msg; /* Points to current message in the buffer */
+ size_t curr_msg_off; /* Offset into current message (samples),
+ * ignoring the 4-samples worth of metadata */
+ size_t msg_size; /* Size of data message */
+ unsigned int msg_per_buf; /* Number of data messages per buffer */
+ unsigned int msg_num; /* Which message within the buffer are we in?
+ * Range is: 0 to msg_per_buf */
+ unsigned int samples_per_msg; /* Number of samples within a message */
+ unsigned int samples_per_ts; /* Number of samples within a timestamp */
+
+ union {
+ /* Used only for RX */
+ struct {
+ uint64_t
+ msg_timestamp; /* Timestamp contained in the current message */
+ uint32_t msg_flags; /* Flags for the current message */
+ };
+
+ /* Used only for TX */
+ struct {
+ bool in_burst;
+ bool now;
+ };
+ };
+
+ uint64_t curr_timestamp; /* Timestamp at the sample we've
+ * consumed up to */
+};
+
+struct bladerf_sync {
+ MUTEX lock;
+ struct bladerf *dev;
+ bool initialized;
+ sync_state state;
+ struct buffer_mgmt buf_mgmt;
+ struct stream_config stream_config;
+ struct sync_worker *worker;
+ struct sync_meta meta;
+};
+
+/**
+ * Create and initialize as synchronous interface handle for the specified
+ * device and direction. If the synchronous handle is already initialized, this
+ * call will first deinitialize it.
+ *
+ * The associated stream will be started at the first RX or TX call
+ *
+ * @return 0 or BLADERF_ERR_* value on failure
+ */
+int sync_init(struct bladerf_sync *sync,
+ struct bladerf *dev,
+ bladerf_channel_layout layout,
+ bladerf_format format,
+ unsigned int num_buffers,
+ size_t buffer_size,
+ size_t msg_size,
+ unsigned int num_transfers,
+ unsigned int stream_timeout);
+
+/**
+ * Deinitialize the sync handle. This tears down and deallocates the underlying
+ * asynchronous stream.
+ *
+ * @param[inout] sync Handle to deinitialize.
+ */
+void sync_deinit(struct bladerf_sync *sync);
+
+int sync_rx(struct bladerf_sync *sync,
+ void *samples,
+ unsigned int num_samples,
+ struct bladerf_metadata *metadata,
+ unsigned int timeout_ms);
+
+int sync_tx(struct bladerf_sync *sync,
+ void const *samples,
+ unsigned int num_samples,
+ struct bladerf_metadata *metadata,
+ unsigned int timeout_ms);
+
+unsigned int sync_buf2idx(struct buffer_mgmt *b, void *addr);
+
+void *sync_idx2buf(struct buffer_mgmt *b, unsigned int idx);
+
+#endif
diff --git a/Radio/HW/BladeRF/src/streaming/sync_worker.c b/Radio/HW/BladeRF/src/streaming/sync_worker.c
new file mode 100644
index 0000000..b2ec806
--- /dev/null
+++ b/Radio/HW/BladeRF/src/streaming/sync_worker.c
@@ -0,0 +1,532 @@
+/*
+ * Copyright (C) 2014-2015 Nuand LLC
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+
+/* Only switch on the verbose debug prints in this file when we *really* want
+ * them. Otherwise, compile them out to avoid excessive log level checks
+ * in our data path */
+#include "log.h"
+#ifndef ENABLE_LIBBLADERF_SYNC_LOG_VERBOSE
+#undef log_verbose
+#define log_verbose(...)
+#endif
+#include "rel_assert.h"
+#include "conversions.h"
+#include "minmax.h"
+
+#include "async.h"
+#include "sync.h"
+#include "sync_worker.h"
+
+#include "board/board.h"
+#include "backend/usb/usb.h"
+
+#define worker2str(s) (direction2str(s->stream_config.layout & BLADERF_DIRECTION_MASK))
+
+void *sync_worker_task(void *arg);
+
+static void *rx_callback(struct bladerf *dev,
+ struct bladerf_stream *stream,
+ struct bladerf_metadata *meta,
+ void *samples,
+ size_t num_samples,
+ void *user_data)
+{
+ unsigned int requests; /* Pending requests */
+ unsigned int next_idx;
+ unsigned int samples_idx;
+ void *next_buf = NULL; /* Next buffer to submit for reception */
+
+ struct bladerf_sync *s = (struct bladerf_sync *)user_data;
+ struct sync_worker *w = s->worker;
+ struct buffer_mgmt *b = &s->buf_mgmt;
+
+ /* Check if the caller has requested us to shut down. We'll keep the
+ * SHUTDOWN bit set through our transition into the IDLE state so we
+ * can act on it there. */
+ MUTEX_LOCK(&w->request_lock);
+ requests = w->requests;
+ MUTEX_UNLOCK(&w->request_lock);
+
+ if (requests & SYNC_WORKER_STOP) {
+ log_verbose("%s worker: Got STOP request upon entering callback. "
+ "Ending stream.\n", worker2str(s));
+ return NULL;
+ }
+
+ MUTEX_LOCK(&b->lock);
+
+ /* Get the index of the buffer that was just filled */
+ samples_idx = sync_buf2idx(b, samples);
+
+ if (b->resubmit_count == 0) {
+ if (b->status[b->prod_i] == SYNC_BUFFER_EMPTY) {
+
+ /* This buffer is now ready for the consumer */
+ b->status[samples_idx] = SYNC_BUFFER_FULL;
+ b->actual_lengths[samples_idx] = num_samples;
+ pthread_cond_signal(&b->buf_ready);
+
+ /* Update the state of the buffer being submitted next */
+ next_idx = b->prod_i;
+ b->status[next_idx] = SYNC_BUFFER_IN_FLIGHT;
+ next_buf = b->buffers[next_idx];
+
+ /* Advance to the next buffer for the next callback */
+ b->prod_i = (next_idx + 1) % b->num_buffers;
+
+ log_verbose("%s worker: buf[%u] = full, buf[%u] = in_flight\n",
+ worker2str(s), samples_idx, next_idx);
+
+ } else {
+ /* TODO propagate back the RX Overrun to the sync_rx() caller */
+ log_debug("RX overrun @ buffer %u\r\n", samples_idx);
+
+ next_buf = samples;
+ b->resubmit_count = s->stream_config.num_xfers - 1;
+ }
+ } else {
+ /* We're still recovering from an overrun at this point. Just
+ * turn around and resubmit this buffer */
+ next_buf = samples;
+ b->resubmit_count--;
+ log_verbose("Resubmitting buffer %u (%u resubmissions left)\r\n",
+ samples_idx, b->resubmit_count);
+ }
+
+
+ MUTEX_UNLOCK(&b->lock);
+ return next_buf;
+}
+
+static void *tx_callback(struct bladerf *dev,
+ struct bladerf_stream *stream,
+ struct bladerf_metadata *meta,
+ void *samples,
+ size_t num_samples,
+ void *user_data)
+{
+ unsigned int requests; /* Pending requests */
+ unsigned int completed_idx; /* Index of completed buffer */
+
+ struct bladerf_sync *s = (struct bladerf_sync *)user_data;
+ struct sync_worker *w = s->worker;
+ struct buffer_mgmt *b = &s->buf_mgmt;
+
+ void *ret = BLADERF_STREAM_NO_DATA;
+
+ /* Check if the caller has requested us to shut down. We'll keep the
+ * SHUTDOWN bit set through our transition into the IDLE state so we
+ * can act on it there. */
+ MUTEX_LOCK(&w->request_lock);
+ requests = w->requests;
+ MUTEX_UNLOCK(&w->request_lock);
+
+ if (requests & SYNC_WORKER_STOP) {
+ log_verbose("%s worker: Got STOP request upon entering callback. "
+ "Ending stream.\r\n", worker2str(s));
+ return NULL;
+ }
+
+ /* The initial set of callbacks will do not provide us with any
+ * completed sample buffers */
+ if (samples != NULL) {
+ MUTEX_LOCK(&b->lock);
+
+ /* Mark the completed buffer as being empty */
+ completed_idx = sync_buf2idx(b, samples);
+ assert(b->status[completed_idx] == SYNC_BUFFER_IN_FLIGHT);
+ b->status[completed_idx] = SYNC_BUFFER_EMPTY;
+ pthread_cond_signal(&b->buf_ready);
+
+ /* If the callback is assigned to be the submitter, there are
+ * buffers pending submission */
+ if (b->submitter == SYNC_TX_SUBMITTER_CALLBACK) {
+ assert(b->cons_i != BUFFER_MGMT_INVALID_INDEX);
+ if (b->status[b->cons_i] == SYNC_BUFFER_FULL) {
+ /* This buffer is ready to ship out ("consume") */
+ log_verbose("%s: Submitting deferred buf[%u]\n",
+ __FUNCTION__, b->cons_i);
+
+ ret = b->buffers[b->cons_i];
+ /* This is actually # of 32bit DWORDs for PACKET_META */
+ meta->actual_count = b->actual_lengths[b->cons_i];
+ b->status[b->cons_i] = SYNC_BUFFER_IN_FLIGHT;
+ b->cons_i = (b->cons_i + 1) % b->num_buffers;
+ } else {
+ log_verbose("%s: No deferred buffer available. "
+ "Assigning submitter=FN\n", __FUNCTION__);
+
+ b->submitter = SYNC_TX_SUBMITTER_FN;
+ b->cons_i = BUFFER_MGMT_INVALID_INDEX;
+ }
+ }
+
+ MUTEX_UNLOCK(&b->lock);
+
+ log_verbose("%s worker: Buffer %u emptied.\r\n",
+ worker2str(s), completed_idx);
+ }
+
+ return ret;
+}
+
+int sync_worker_init(struct bladerf_sync *s)
+{
+ int status = 0;
+ s->worker = (struct sync_worker *)calloc(1, sizeof(*s->worker));
+
+ if (s->worker == NULL) {
+ status = BLADERF_ERR_MEM;
+ goto worker_init_out;
+ }
+
+ s->worker->state = SYNC_WORKER_STATE_STARTUP;
+ s->worker->err_code = 0;
+
+ s->worker->cb =
+ (s->stream_config.layout & BLADERF_DIRECTION_MASK) == BLADERF_RX
+ ? rx_callback
+ : tx_callback;
+
+ status = async_init_stream(
+ &s->worker->stream, s->dev, s->worker->cb, &s->buf_mgmt.buffers,
+ s->buf_mgmt.num_buffers, s->stream_config.format,
+ s->stream_config.samples_per_buffer, s->stream_config.num_xfers, s);
+
+ if (status != 0) {
+ log_debug("%s worker: Failed to init stream: %s\n", worker2str(s),
+ bladerf_strerror(status));
+ goto worker_init_out;
+ }
+
+ status = async_set_transfer_timeout(
+ s->worker->stream,
+ uint_max(s->stream_config.timeout_ms, BULK_TIMEOUT_MS));
+ if (status != 0) {
+ log_debug("%s worker: Failed to set transfer timeout: %s\n",
+ worker2str(s), bladerf_strerror(status));
+ goto worker_init_out;
+ }
+
+ MUTEX_INIT(&s->worker->state_lock);
+ MUTEX_INIT(&s->worker->request_lock);
+
+ status = pthread_cond_init(&s->worker->state_changed, NULL);
+ if (status != 0) {
+ log_debug("%s worker: pthread_cond_init(state_changed) failed: %d\n",
+ worker2str(s), status);
+ status = BLADERF_ERR_UNEXPECTED;
+ goto worker_init_out;
+ }
+
+ status = pthread_cond_init(&s->worker->requests_pending, NULL);
+ if (status != 0) {
+ log_debug("%s worker: pthread_cond_init(requests_pending) failed: %d\n",
+ worker2str(s), status);
+ status = BLADERF_ERR_UNEXPECTED;
+ goto worker_init_out;
+ }
+
+ status = pthread_create(&s->worker->thread, NULL, sync_worker_task, s);
+ if (status != 0) {
+ log_debug("%s worker: pthread_create failed: %d\n", worker2str(s),
+ status);
+ status = BLADERF_ERR_UNEXPECTED;
+ goto worker_init_out;
+ }
+
+ /* Wait until the worker thread has initialized and is ready to go */
+ status =
+ sync_worker_wait_for_state(s->worker, SYNC_WORKER_STATE_IDLE, 1000);
+ if (status != 0) {
+ log_debug("%s worker: sync_worker_wait_for_state failed: %d\n",
+ worker2str(s), status);
+ status = BLADERF_ERR_TIMEOUT;
+ goto worker_init_out;
+ }
+
+worker_init_out:
+ if (status != 0) {
+ free(s->worker);
+ s->worker = NULL;
+ }
+
+ return status;
+}
+
+void sync_worker_deinit(struct sync_worker *w,
+ pthread_mutex_t *lock, pthread_cond_t *cond)
+{
+ int status;
+
+ if (w == NULL) {
+ log_debug("%s called with NULL ptr\n", __FUNCTION__);
+ return;
+ }
+
+ log_verbose("%s: Requesting worker %p to stop...\n", __FUNCTION__, w);
+
+ sync_worker_submit_request(w, SYNC_WORKER_STOP);
+
+ if (lock != NULL && cond != NULL) {
+ MUTEX_LOCK(lock);
+ pthread_cond_signal(cond);
+ MUTEX_UNLOCK(lock);
+ }
+
+ status = sync_worker_wait_for_state(w, SYNC_WORKER_STATE_STOPPED, 3000);
+
+ if (status != 0) {
+ log_warning("Timed out while stopping worker. Canceling thread.\n");
+ pthread_cancel(w->thread);
+ }
+
+ pthread_join(w->thread, NULL);
+ log_verbose("%s: Worker joined.\n", __FUNCTION__);
+
+ async_deinit_stream(w->stream);
+
+ free(w);
+}
+
+void sync_worker_submit_request(struct sync_worker *w, unsigned int request)
+{
+ MUTEX_LOCK(&w->request_lock);
+ w->requests |= request;
+ pthread_cond_signal(&w->requests_pending);
+ MUTEX_UNLOCK(&w->request_lock);
+}
+
+int sync_worker_wait_for_state(struct sync_worker *w, sync_worker_state state,
+ unsigned int timeout_ms)
+{
+ int status = 0;
+ struct timespec timeout_abs;
+ const int nsec_per_sec = 1000 * 1000 * 1000;
+
+ if (timeout_ms != 0) {
+ const unsigned int timeout_sec = timeout_ms / 1000;
+
+ status = clock_gettime(CLOCK_REALTIME, &timeout_abs);
+ if (status != 0) {
+ return BLADERF_ERR_UNEXPECTED;
+ }
+
+ timeout_abs.tv_sec += timeout_sec;
+ timeout_abs.tv_nsec += (timeout_ms % 1000) * 1000 * 1000;
+
+ if (timeout_abs.tv_nsec >= nsec_per_sec) {
+ timeout_abs.tv_sec += timeout_abs.tv_nsec / nsec_per_sec;
+ timeout_abs.tv_nsec %= nsec_per_sec;
+ }
+
+ MUTEX_LOCK(&w->state_lock);
+ status = 0;
+ while (w->state != state && status == 0) {
+ status = pthread_cond_timedwait(&w->state_changed,
+ &w->state_lock,
+ &timeout_abs);
+ }
+ MUTEX_UNLOCK(&w->state_lock);
+
+ } else {
+ MUTEX_LOCK(&w->state_lock);
+ while (w->state != state) {
+ log_verbose(": Waiting for state change, current = %d\n", w->state);
+ status = pthread_cond_wait(&w->state_changed,
+ &w->state_lock);
+ }
+ MUTEX_UNLOCK(&w->state_lock);
+ }
+
+ if (status != 0) {
+ log_debug("%s: Wait on state change failed: %s\n",
+ __FUNCTION__, strerror(status));
+
+ if (status == ETIMEDOUT) {
+ status = BLADERF_ERR_TIMEOUT;
+ } else {
+ status = BLADERF_ERR_UNEXPECTED;
+ }
+ }
+
+ return status;
+}
+
+sync_worker_state sync_worker_get_state(struct sync_worker *w,
+ int *err_code)
+{
+ sync_worker_state ret;
+
+ MUTEX_LOCK(&w->state_lock);
+ ret = w->state;
+ if (err_code) {
+ *err_code = w->err_code;
+ w->err_code = 0;
+ }
+ MUTEX_UNLOCK(&w->state_lock);
+
+ return ret;
+}
+
+static void set_state(struct sync_worker *w, sync_worker_state state)
+{
+ MUTEX_LOCK(&w->state_lock);
+ w->state = state;
+ pthread_cond_signal(&w->state_changed);
+ MUTEX_UNLOCK(&w->state_lock);
+}
+
+
+static sync_worker_state exec_idle_state(struct bladerf_sync *s)
+{
+ sync_worker_state next_state = SYNC_WORKER_STATE_IDLE;
+ unsigned int requests;
+ unsigned int i;
+
+ MUTEX_LOCK(&s->worker->request_lock);
+
+ while (s->worker->requests == 0) {
+ log_verbose("%s worker: Waiting for pending requests\n", worker2str(s));
+
+ pthread_cond_wait(&s->worker->requests_pending,
+ &s->worker->request_lock);
+ }
+
+ requests = s->worker->requests;
+ s->worker->requests = 0;
+ MUTEX_UNLOCK(&s->worker->request_lock);
+
+ if (requests & SYNC_WORKER_STOP) {
+ log_verbose("%s worker: Got request to stop\n", worker2str(s));
+
+ next_state = SYNC_WORKER_STATE_SHUTTING_DOWN;
+
+ } else if (requests & SYNC_WORKER_START) {
+ log_verbose("%s worker: Got request to start\n", worker2str(s));
+ MUTEX_LOCK(&s->buf_mgmt.lock);
+
+ if ((s->stream_config.layout & BLADERF_DIRECTION_MASK) == BLADERF_TX) {
+ /* If we've previously timed out on a stream, we'll likely have some
+ * stale buffers marked "in-flight" that have since been cancelled. */
+ for (i = 0; i < s->buf_mgmt.num_buffers; i++) {
+ if (s->buf_mgmt.status[i] == SYNC_BUFFER_IN_FLIGHT) {
+ s->buf_mgmt.status[i] = SYNC_BUFFER_EMPTY;
+ }
+ }
+
+ pthread_cond_signal(&s->buf_mgmt.buf_ready);
+ } else {
+ s->buf_mgmt.prod_i = s->stream_config.num_xfers;
+
+ for (i = 0; i < s->buf_mgmt.num_buffers; i++) {
+ if (i < s->stream_config.num_xfers) {
+ s->buf_mgmt.status[i] = SYNC_BUFFER_IN_FLIGHT;
+ } else if (s->buf_mgmt.status[i] == SYNC_BUFFER_IN_FLIGHT) {
+ s->buf_mgmt.status[i] = SYNC_BUFFER_EMPTY;
+ }
+ }
+ }
+
+ MUTEX_UNLOCK(&s->buf_mgmt.lock);
+
+ next_state = SYNC_WORKER_STATE_RUNNING;
+ } else {
+ log_warning("Invalid request value encountered: 0x%08X\n",
+ s->worker->requests);
+ }
+
+ return next_state;
+}
+
+static void exec_running_state(struct bladerf_sync *s)
+{
+ int status;
+
+ status = async_run_stream(s->worker->stream, s->stream_config.layout);
+
+ log_verbose("%s worker: stream ended with: %s\n",
+ worker2str(s), bladerf_strerror(status));
+
+ /* Save off the result of running the stream so we can report what
+ * happened to the API caller */
+ MUTEX_LOCK(&s->worker->state_lock);
+ s->worker->err_code = status;
+ MUTEX_UNLOCK(&s->worker->state_lock);
+
+ /* Wake the API-side if an error occurred, so that it can propagate
+ * the stream error code back to the API caller */
+ if (status != 0) {
+ MUTEX_LOCK(&s->buf_mgmt.lock);
+ pthread_cond_signal(&s->buf_mgmt.buf_ready);
+ MUTEX_UNLOCK(&s->buf_mgmt.lock);
+ }
+}
+
+void *sync_worker_task(void *arg)
+{
+ sync_worker_state state = SYNC_WORKER_STATE_IDLE;
+ struct bladerf_sync *s = (struct bladerf_sync *)arg;
+
+ log_verbose("%s worker: task started\n", worker2str(s));
+ set_state(s->worker, state);
+ log_verbose("%s worker: task state set\n", worker2str(s));
+
+ while (state != SYNC_WORKER_STATE_STOPPED) {
+
+ switch (state) {
+ case SYNC_WORKER_STATE_STARTUP:
+ assert(!"Worker in unexpected state, shutting down. (STARTUP)");
+ set_state(s->worker, SYNC_WORKER_STATE_SHUTTING_DOWN);
+ break;
+
+ case SYNC_WORKER_STATE_IDLE:
+ state = exec_idle_state(s);
+ set_state(s->worker, state);
+ break;
+
+ case SYNC_WORKER_STATE_RUNNING:
+ exec_running_state(s);
+ state = SYNC_WORKER_STATE_IDLE;
+ set_state(s->worker, state);
+ break;
+
+ case SYNC_WORKER_STATE_SHUTTING_DOWN:
+ log_verbose("%s worker: Shutting down...\n", worker2str(s));
+
+ state = SYNC_WORKER_STATE_STOPPED;
+ set_state(s->worker, state);
+ break;
+
+ case SYNC_WORKER_STATE_STOPPED:
+ assert(!"Worker in unexpected state: STOPPED");
+ break;
+
+ default:
+ assert(!"Worker in unexpected state, shutting down. (UNKNOWN)");
+ set_state(s->worker, SYNC_WORKER_STATE_SHUTTING_DOWN);
+ break;
+ }
+ }
+
+ return NULL;
+}
diff --git a/Radio/HW/BladeRF/src/streaming/sync_worker.h b/Radio/HW/BladeRF/src/streaming/sync_worker.h
new file mode 100644
index 0000000..35d5075
--- /dev/null
+++ b/Radio/HW/BladeRF/src/streaming/sync_worker.h
@@ -0,0 +1,130 @@
+/*
+ * Copyright (C) 2014 Nuand LLC
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef STREAMING_SYNC_WORKER_H_
+#define STREAMING_SYNC_WORKER_H_
+
+#include "host_config.h"
+#include "sync.h"
+#include <libbladeRF.h>
+#include <pthread.h>
+
+#if BLADERF_OS_WINDOWS || BLADERF_OS_OSX
+#include "clock_gettime.h"
+#else
+#include <time.h>
+#endif
+
+/* Worker lifetime:
+ *
+ * STARTUP --+--> IDLE --> RUNNING --+--> SHUTTING_DOWN --> STOPPED
+ * ^----------------------/
+ */
+
+/* Request flags */
+#define SYNC_WORKER_START (1 << 0)
+#define SYNC_WORKER_STOP (1 << 1)
+
+typedef enum {
+ SYNC_WORKER_STATE_STARTUP,
+ SYNC_WORKER_STATE_IDLE,
+ SYNC_WORKER_STATE_RUNNING,
+ SYNC_WORKER_STATE_SHUTTING_DOWN,
+ SYNC_WORKER_STATE_STOPPED
+} sync_worker_state;
+
+struct sync_worker {
+ pthread_t thread;
+
+ struct bladerf_stream *stream;
+ bladerf_stream_cb cb;
+
+ /* These items should be accessed while holding state_lock */
+ sync_worker_state state;
+ int err_code;
+ MUTEX state_lock;
+ pthread_cond_t state_changed; /* Worker thread uses this to inform a
+ * waiting main thread about a state
+ * change */
+
+ /* The requests lock should always be acquired AFTER
+ * the sync->buf_mgmt.lock
+ */
+ unsigned int requests;
+ pthread_cond_t requests_pending;
+ MUTEX request_lock;
+};
+
+/**
+ * Create a launch a worker thread. It will enter the IDLE state upon
+ * executing.
+ *
+ * @param s Sync handle containing worker to initialize
+ *
+ * @return 0 on success, BLADERF_ERR_* on failure
+ */
+int sync_worker_init(struct bladerf_sync *s);
+
+/**
+ * Shutdown and deinitialize
+ *
+ * @param w Worker to deinitialize
+ * @param[in] lock Acquired to signal `cond` if non-NULL
+ * @param[in] cond If non-NULL, this is signaled after requesting the
+ * worker to shut down, waking a potentially blocked
+ * workers.
+ */
+void sync_worker_deinit(struct sync_worker *w,
+ pthread_mutex_t *lock,
+ pthread_cond_t *cond);
+
+/**
+ * Wait for state change with optional timeout
+ *
+ * @param w Worker to wait for
+ * @param[in] state State to wait for
+ * @param[in] timeout_ms Timeout in ms. 0 implies "wait forever"
+ *
+ * @return 0 on success, BLADERF_ERR_TIMEOUT on timeout, BLADERF_ERR_UNKNOWN on
+ * other errors
+ */
+int sync_worker_wait_for_state(struct sync_worker *w,
+ sync_worker_state state,
+ unsigned int timeout_ms);
+
+/**
+ * Get the worker's current state.
+ *
+ * @param w Worker to query
+ * @param[out] err_code Stream error code (libbladeRF error code value).
+ * Querying this value will reset the interal error
+ * code value.
+ *
+ * @return Worker's current state
+ */
+sync_worker_state sync_worker_get_state(struct sync_worker *w, int *err_code);
+
+/**
+ * Submit a request to the worker task
+ *
+ * @param w Worker to send request to
+ * @param[in] request Bitmask of requests to submit
+ */
+void sync_worker_submit_request(struct sync_worker *w, unsigned int request);
+
+#endif