summaryrefslogtreecommitdiff
path: root/airspy/libairspy.py
blob: a642b982e5c29507965671f7ab05f24132a879a7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
import sys
import os
from ctypes import *
from ctypes.util import find_library
import enum

def load_libairspy():
    if sys.platform == "linux" and 'LD_LIBRARY_PATH' in os.environ.keys():
        ld_library_paths = [local_path for local_path in os.environ['LD_LIBRARY_PATH'].split(':') if local_path.strip()]
        if "AIRSPY_TEST_PATH" in os.environ:
            ld_library_paths = [os.environ["AIRSPY_TEST_PATH"]]
        driver_files = [local_path + '/libairspy.so' for local_path in ld_library_paths]
    else:
        driver_files = []
    driver_files += ['libairspy.so']
    driver_files += ['airspy.dll', 'libairspy.so', 'libairspy.dylib','/usr/local/lib/libairspy.so']
    driver_files += ['..//airspy.dll', '..//libairspy.so']
    driver_files += [lambda : find_library('airspy'), lambda : find_library('libairspy')]
    dll = None

    for driver in driver_files:
        if callable(driver):
            driver = driver()
        if driver is None:
            continue
        #print("Search for driver named %s"%(driver))
        try:
            dll = CDLL(driver)
            break
        except:
            pass
    else:
        raise ImportError('Error loading libairspy. Make sure libairspy '\
                          '(and all of its dependencies) are in your path')

    return dll

libairspy = load_libairspy()


#enum airspy_sample_type
#{
#	AIRSPY_SAMPLE_FLOAT32_IQ = 0,   /* 2 * 32bit float per sample */
#	AIRSPY_SAMPLE_FLOAT32_REAL = 1, /* 1 * 32bit float per sample */
#	AIRSPY_SAMPLE_INT16_IQ = 2,     /* 2 * 16bit int per sample */
#	AIRSPY_SAMPLE_INT16_REAL = 3,   /* 1 * 16bit int per sample */
#	AIRSPY_SAMPLE_UINT16_REAL = 4,  /* 1 * 16bit unsigned int per sample */
#	AIRSPY_SAMPLE_RAW = 5,          /* Raw packed samples from the device */
#	AIRSPY_SAMPLE_END = 6           /* Number of supported sample types */
#};
# Define the types we need.
class CtypesEnum(enum.IntEnum):
    """A ctypes-compatible IntEnum superclass."""
    @classmethod
    def from_param(cls, obj):
        return int(obj)

class airspy_sample_type(enum.IntEnum):
	AIRSPY_SAMPLE_FLOAT32_IQ = 0   #/* 2 * 32bit float per sample */
	AIRSPY_SAMPLE_FLOAT32_REAL = 1 #/* 1 * 32bit float per sample */
	AIRSPY_SAMPLE_INT16_IQ = 2     #/* 2 * 16bit int per sample */
	AIRSPY_SAMPLE_INT16_REAL = 3   #/* 1 * 16bit int per sample */
	AIRSPY_SAMPLE_UINT16_REAL = 4  #/* 1 * 16bit unsigned int per sample */
	AIRSPY_SAMPLE_RAW = 5          #/* Raw packed samples from the device */
	AIRSPY_SAMPLE_END = 6          #/* Number of supported sample types */


class airspy_error(enum.IntEnum):
	AIRSPY_SUCCESS = 0
	AIRSPY_TRUE = 1
	AIRSPY_ERROR_INVALID_PARAM = -2
	AIRSPY_ERROR_NOT_FOUND = -5
	AIRSPY_ERROR_BUSY = -6
	AIRSPY_ERROR_NO_MEM = -11
	AIRSPY_ERROR_UNSUPPORTED = -12
	AIRSPY_ERROR_LIBUSB = -1000
	AIRSPY_ERROR_THREAD = -1001
	AIRSPY_ERROR_STREAMING_THREAD_ERR = -1002
	AIRSPY_ERROR_STREAMING_STOPPED = -1003
	AIRSPY_ERROR_OTHER = -9999

class airspy_board_id(enum.IntEnum):
	AIRSPY_BOARD_ID_PROTO_AIRSPY  = 0
	AIRSPY_BOARD_ID_INVALID = 0xFF

airspy_device_t_p = c_void_p

#typedef struct {
#	struct airspy_device* device;
#	void* ctx;
#	void* samples;
#	int sample_count;
#	uint64_t dropped_samples;
#	enum airspy_sample_type sample_type;
#} airspy_transfer_t, airspy_transfer;
class StructureWithEnums(Structure):
    """Add missing enum feature to ctypes Structures.
    """
    _map = {}

    def __getattribute__(self, name):
        _map = ctypes.Structure.__getattribute__(self, '_map')
        value = ctypes.Structure.__getattribute__(self, name)
        if name in _map:
            EnumClass = _map[name]
            if isinstance(value, ctypes.Array):
                return [EnumClass(x) for x in value]
            else:
                return EnumClass(value)
        else:
            return value

    def __str__(self):
        result = []
        result.append("struct {0} {{".format(self.__class__.__name__))
        for field in self._fields_:
            attr, attrType = field
            if attr in self._map:
                attrType = self._map[attr]
            value = getattr(self, attr)
            result.append("    {0} [{1}] = {2!r};".format(attr, attrType.__name__, value))
        result.append("};")
        return '\n'.join(result)

    __repr__ = __str__

#structures with enum
#https://gist.github.com/christoph2/9c390e5c094796903097
class airspy_transfer_t(Structure):
    _fields_ = [("device",airspy_device_t_p),
                ("ctx",c_void_p),
                ("samples",c_void_p),
                ("sample_count",c_int),
                ("dropped_samples",c_uint64),
                ("sample_type",c_int)]
                #]
    _map_ = {
    	"samples":airspy_sample_type
    }
                
airspy_transfer_t_p = POINTER(airspy_transfer_t)

#typedef struct {
#	uint32_t part_id[2];
#	uint32_t serial_no[4];
#} airspy_read_partid_serialno_t;
class airspy_read_partid_serialno_t(Structure):
    _fields_ = [("part_id", c_uint32*2),
                ("serial_no", c_uint32*4)]


#typedef struct {
#	uint32_t major_version;
#	uint32_t minor_version;
#	uint32_t revision;
#} airspy_lib_version_t;
class airspy_lib_version_t(Structure):
    _fields_ = [("major_version", c_uint32),
                ("minor_version", c_uint32),
                ("revision",      c_uint32)]



#typedef int (*airspy_sample_block_cb_fn)(airspy_transfer* transfer);
airspy_sample_block_cb_fn = PYFUNCTYPE(c_int, POINTER(airspy_transfer_t))


#extern ADDAPI void ADDCALL airspy_lib_version(airspy_lib_version_t* lib_version);
f = libairspy.airspy_lib_version
f.restype, f.argtypes = None, [POINTER(airspy_lib_version_t)]

#try to load the lib version if its wrong print out warning
version = airspy_lib_version_t()
libairspy.airspy_lib_version(byref(version))
if False == ((int(version.major_version) == 1) and (int(version.minor_version) == 0) and (int(version.revision) == 11)):
    print("Unsuported version of libairspy.")
    print("Only supporting 1.0.11")
    raise ImportError

#/* airspy_init() deprecated */
#extern ADDAPI int ADDCALL airspy_init(void);
#/* airspy_exit() deprecated */
#extern ADDAPI int ADDCALL airspy_exit(void);

#extern ADDAPI int ADDCALL airspy_list_devices(uint64_t *serials, int count);
f = libairspy.airspy_list_devices
f.restype, f.argtypes = c_int, [POINTER(c_uint64), c_int]

#extern ADDAPI int ADDCALL airspy_open_sn(struct airspy_device** device, uint64_t serial_number);
f = libairspy.airspy_open_sn
f.restype, f.argtypes = c_int, [POINTER(airspy_device_t_p), c_uint64]

#extern ADDAPI int ADDCALL airspy_open_fd(struct airspy_device** device, int fd);
f = libairspy.airspy_open_fd
f.restype, f.argtypes = c_int, [POINTER(airspy_device_t_p), c_int]

#extern ADDAPI int ADDCALL airspy_open(struct airspy_device** device);
f = libairspy.airspy_open
f.restype, f.argtypes = c_int, [POINTER(airspy_device_t_p)]

#extern ADDAPI int ADDCALL airspy_close(struct airspy_device* device);
f = libairspy.airspy_close
f.restype, f.argtypes = c_int, [airspy_device_t_p]

#/* Use airspy_get_samplerates(device, buffer, 0) to get the number of available sample rates. It will be returned in the first element of buffer */
#extern ADDAPI int ADDCALL airspy_get_samplerates(struct airspy_device* device, uint32_t* buffer, const uint32_t len);
f = libairspy.airspy_get_samplerates
f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(c_uint32), c_uint32]


#/* Parameter samplerate can be either the index of a samplerate or directly its value in Hz within the list returned by airspy_get_samplerates() */
#extern ADDAPI int ADDCALL airspy_set_samplerate(struct airspy_device* device, uint32_t samplerate);
f = libairspy.airspy_set_samplerate
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint32]


#extern ADDAPI int ADDCALL airspy_set_conversion_filter_float32(struct airspy_device* device, const float *kernel, const uint32_t len);
f = libairspy.airspy_set_conversion_filter_float32
f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(c_float), c_uint32]

#extern ADDAPI int ADDCALL airspy_set_conversion_filter_int16(struct airspy_device* device, const int16_t *kernel, const uint32_t len);
f = libairspy.airspy_set_conversion_filter_int16
f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(c_int16), c_uint32]

#extern ADDAPI int ADDCALL airspy_start_rx(struct airspy_device* device, airspy_sample_block_cb_fn callback, void* rx_ctx);
f = libairspy.airspy_start_rx
f.restype, f.argtypes = c_int, [airspy_device_t_p, airspy_sample_block_cb_fn, py_object]

#extern ADDAPI int ADDCALL airspy_stop_rx(struct airspy_device* device);
f = libairspy.airspy_stop_rx
f.restype, f.argtypes = c_int, [airspy_device_t_p]

#/* return AIRSPY_TRUE if success */
#extern ADDAPI int ADDCALL airspy_is_streaming(struct airspy_device* device);
f = libairspy.airspy_is_streaming
f.restype, f.argtypes = c_int, [airspy_device_t_p]

#extern ADDAPI int ADDCALL airspy_si5351c_write(struct airspy_device* device, uint8_t register_number, uint8_t value);
f = libairspy.airspy_set_conversion_filter_float32
f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(c_float), c_uint32]

#extern ADDAPI int ADDCALL airspy_si5351c_read(struct airspy_device* device, uint8_t register_number, uint8_t* value);
f = libairspy.airspy_si5351c_read
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8, POINTER(c_uint8)]

#extern ADDAPI int ADDCALL airspy_config_write(struct airspy_device* device, const uint8_t page_index, const uint16_t length, unsigned char *data);
#f = libairspy.airspy_config_write
#f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8, c_uint16, c_char_p]

#extern ADDAPI int ADDCALL airspy_config_read(struct airspy_device* device, const uint8_t page_index, const uint16_t length, unsigned char *data);
#f = libairspy.airspy_config_read
#f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8, c_uint16, c_ubyte_p]

#extern ADDAPI int ADDCALL airspy_r820t_write(struct airspy_device* device, uint8_t register_number, uint8_t value);
#f = libairspy.airspy_r820t_write
#f.restype, f.argtypes = c_int, [airspy_device_t_p. c_uint8, c_uint8]

#extern ADDAPI int ADDCALL airspy_r820t_read(struct airspy_device* device, uint8_t register_number, uint8_t* value);
f = libairspy.airspy_r820t_read
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8, POINTER(c_uint8)]

#/* Parameter value shall be 0=clear GPIO or 1=set GPIO */
#extern ADDAPI int ADDCALL airspy_gpio_write(struct airspy_device* device, airspy_gpio_port_t port, airspy_gpio_pin_t pin, uint8_t value);
#f = libairspy.airspy_gpio_write
#f.restype, f.argtypes = c_int, [airspy_device_t_p]

#/* Parameter value corresponds to GPIO state 0 or 1 */
#extern ADDAPI int ADDCALL airspy_gpio_read(struct airspy_device* device, airspy_gpio_port_t port, airspy_gpio_pin_t pin, uint8_t* value);
#f = libairspy.airspy_gpio_read
#f.restype, f.argtypes = c_int, [airspy_device_t_p]

#/* Parameter value shall be 0=GPIO Input direction or 1=GPIO Output direction */
#extern ADDAPI int ADDCALL airspy_gpiodir_write(struct airspy_device* device, airspy_gpio_port_t port, airspy_gpio_pin_t pin, uint8_t value);
#f = libairspy.airspy_gpiodir_write
#f.restype, f.argtypes = c_int, [airspy_device_t_p]
#extern ADDAPI int ADDCALL airspy_gpiodir_read(struct airspy_device* device, airspy_gpio_port_t port, airspy_gpio_pin_t pin, uint8_t* value);
#f = libairspy.airspy_gpiodir_read
#f.restype, f.argtypes = c_int, [airspy_device_t_p]

#extern ADDAPI int ADDCALL airspy_spiflash_erase(struct airspy_device* device);
f = libairspy.airspy_spiflash_erase
f.restype, f.argtypes = c_int, [airspy_device_t_p]

#extern ADDAPI int ADDCALL airspy_spiflash_write(struct airspy_device* device, const uint32_t address, const uint16_t length, unsigned char* const data);
f = libairspy.airspy_spiflash_write
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint32, c_uint16, POINTER(c_ubyte)]

#extern ADDAPI int ADDCALL airspy_spiflash_read(struct airspy_device* device, const uint32_t address, const uint16_t length, unsigned char* data);
f = libairspy.airspy_spiflash_read
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint32, c_uint16, POINTER(c_ubyte)]

#extern ADDAPI int ADDCALL airspy_board_id_read(struct airspy_device* device, uint8_t* value);
f = libairspy.airspy_board_id_read
f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(c_uint8)]

#/* Parameter length shall be at least 128bytes to avoid possible string clipping */
#extern ADDAPI int ADDCALL airspy_version_string_read(struct airspy_device* device, char* version, uint8_t length);
f = libairspy.airspy_version_string_read
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_char_p, c_uint8]

#extern ADDAPI int ADDCALL airspy_board_partid_serialno_read(struct airspy_device* device, airspy_read_partid_serialno_t* read_partid_serialno);
f = libairspy.airspy_board_partid_serialno_read
f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(airspy_read_partid_serialno_t)]

#extern ADDAPI int ADDCALL airspy_set_sample_type(struct airspy_device* device, enum airspy_sample_type sample_type);
f = libairspy.airspy_set_sample_type
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_int]

#/* Parameter freq_hz shall be between 24000000(24MHz) and 1750000000(1.75GHz) */
#extern ADDAPI int ADDCALL airspy_set_freq(struct airspy_device* device, const uint32_t freq_hz);
f = libairspy.airspy_set_freq
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint32]

#/* Parameter value shall be between 0 and 15 */
#extern ADDAPI int ADDCALL airspy_set_lna_gain(struct airspy_device* device, uint8_t value);
f = libairspy.airspy_set_lna_gain
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8]

#/* Parameter value shall be between 0 and 15 */
#extern ADDAPI int ADDCALL airspy_set_mixer_gain(struct airspy_device* device, uint8_t value);
f = libairspy.airspy_set_mixer_gain
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8]


#/* Parameter value shall be between 0 and 15 */
#extern ADDAPI int ADDCALL airspy_set_vga_gain(struct airspy_device* device, uint8_t value);
f = libairspy.airspy_set_vga_gain
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8]

#/* Parameter value:
#	0=Disable LNA Automatic Gain Control
#	1=Enable LNA Automatic Gain Control
#*/
#extern ADDAPI int ADDCALL airspy_set_lna_agc(struct airspy_device* device, uint8_t value);
f = libairspy.airspy_set_lna_agc
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8]

#/* Parameter value:
#	0=Disable MIXER Automatic Gain Control
#	1=Enable MIXER Automatic Gain Control
#*/
#extern ADDAPI int ADDCALL airspy_set_mixer_agc(struct airspy_device* device, uint8_t value);
f = libairspy.airspy_set_mixer_agc
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8]

#/* Parameter value: 0..21 */
#extern ADDAPI int ADDCALL airspy_set_linearity_gain(struct airspy_device* device, uint8_t value);
f = libairspy.airspy_set_linearity_gain
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8]

#/* Parameter value: 0..21 */
#extern ADDAPI int ADDCALL airspy_set_sensitivity_gain(struct airspy_device* device, uint8_t value);
f = libairspy.airspy_set_sensitivity_gain
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8]

#/* Parameter value shall be 0=Disable BiasT or 1=Enable BiasT */
#extern ADDAPI int ADDCALL airspy_set_rf_bias(struct airspy_device* dev, uint8_t value);
f = libairspy.airspy_set_rf_bias
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8]

#/* Parameter value shall be 0=Disable Packing or 1=Enable Packing */
#extern ADDAPI int ADDCALL airspy_set_packing(struct airspy_device* device, uint8_t value);
f = libairspy.airspy_set_packing
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8]

#extern ADDAPI const char* ADDCALL airspy_error_name(enum airspy_error errcode);
#f = libairspy.airspy_error_name
#f.restype, f.argtypes = c_char_p, []

#extern ADDAPI const char* ADDCALL airspy_board_id_name(enum airspy_board_id board_id);
#f = libairspy.airspy_board_id_name
#f.restype, f.argtypes = c_char_p, [airspy_device_t_p]

#/* Parameter sector_num shall be between 2 & 13 (sector 0 & 1 are reserved) */
#extern ADDAPI int ADDCALL airspy_spiflash_erase_sector(struct airspy_device* device, const uint16_t sector_num);
f = libairspy.airspy_spiflash_erase_sector
f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint16]