import sys import os from ctypes import * from ctypes.util import find_library import enum def load_libairspy(): if sys.platform == "linux" and 'LD_LIBRARY_PATH' in os.environ.keys(): ld_library_paths = [local_path for local_path in os.environ['LD_LIBRARY_PATH'].split(':') if local_path.strip()] if "AIRSPY_TEST_PATH" in os.environ: ld_library_paths = [os.environ["AIRSPY_TEST_PATH"]] driver_files = [local_path + '/libairspy.so' for local_path in ld_library_paths] else: driver_files = [] driver_files += ['libairspy.so'] driver_files += ['airspy.dll', 'libairspy.so', 'libairspy.dylib','/usr/local/lib/libairspy.so'] driver_files += ['..//airspy.dll', '..//libairspy.so'] driver_files += [lambda : find_library('airspy'), lambda : find_library('libairspy')] dll = None for driver in driver_files: if callable(driver): driver = driver() if driver is None: continue #print("Search for driver named %s"%(driver)) try: dll = CDLL(driver) break except: pass else: raise ImportError('Error loading libairspy. Make sure libairspy '\ '(and all of its dependencies) are in your path') return dll libairspy = load_libairspy() #enum airspy_sample_type #{ # AIRSPY_SAMPLE_FLOAT32_IQ = 0, /* 2 * 32bit float per sample */ # AIRSPY_SAMPLE_FLOAT32_REAL = 1, /* 1 * 32bit float per sample */ # AIRSPY_SAMPLE_INT16_IQ = 2, /* 2 * 16bit int per sample */ # AIRSPY_SAMPLE_INT16_REAL = 3, /* 1 * 16bit int per sample */ # AIRSPY_SAMPLE_UINT16_REAL = 4, /* 1 * 16bit unsigned int per sample */ # AIRSPY_SAMPLE_RAW = 5, /* Raw packed samples from the device */ # AIRSPY_SAMPLE_END = 6 /* Number of supported sample types */ #}; # Define the types we need. class CtypesEnum(enum.IntEnum): """A ctypes-compatible IntEnum superclass.""" @classmethod def from_param(cls, obj): return int(obj) class airspy_sample_type(enum.IntEnum): AIRSPY_SAMPLE_FLOAT32_IQ = 0 #/* 2 * 32bit float per sample */ AIRSPY_SAMPLE_FLOAT32_REAL = 1 #/* 1 * 32bit float per sample */ AIRSPY_SAMPLE_INT16_IQ = 2 #/* 2 * 16bit int per sample */ AIRSPY_SAMPLE_INT16_REAL = 3 #/* 1 * 16bit int per sample */ AIRSPY_SAMPLE_UINT16_REAL = 4 #/* 1 * 16bit unsigned int per sample */ AIRSPY_SAMPLE_RAW = 5 #/* Raw packed samples from the device */ AIRSPY_SAMPLE_END = 6 #/* Number of supported sample types */ class airspy_error(enum.IntEnum): AIRSPY_SUCCESS = 0 AIRSPY_TRUE = 1 AIRSPY_ERROR_INVALID_PARAM = -2 AIRSPY_ERROR_NOT_FOUND = -5 AIRSPY_ERROR_BUSY = -6 AIRSPY_ERROR_NO_MEM = -11 AIRSPY_ERROR_UNSUPPORTED = -12 AIRSPY_ERROR_LIBUSB = -1000 AIRSPY_ERROR_THREAD = -1001 AIRSPY_ERROR_STREAMING_THREAD_ERR = -1002 AIRSPY_ERROR_STREAMING_STOPPED = -1003 AIRSPY_ERROR_OTHER = -9999 class airspy_board_id(enum.IntEnum): AIRSPY_BOARD_ID_PROTO_AIRSPY = 0 AIRSPY_BOARD_ID_INVALID = 0xFF airspy_device_t_p = c_void_p #typedef struct { # struct airspy_device* device; # void* ctx; # void* samples; # int sample_count; # uint64_t dropped_samples; # enum airspy_sample_type sample_type; #} airspy_transfer_t, airspy_transfer; class StructureWithEnums(Structure): """Add missing enum feature to ctypes Structures. """ _map = {} def __getattribute__(self, name): _map = ctypes.Structure.__getattribute__(self, '_map') value = ctypes.Structure.__getattribute__(self, name) if name in _map: EnumClass = _map[name] if isinstance(value, ctypes.Array): return [EnumClass(x) for x in value] else: return EnumClass(value) else: return value def __str__(self): result = [] result.append("struct {0} {{".format(self.__class__.__name__)) for field in self._fields_: attr, attrType = field if attr in self._map: attrType = self._map[attr] value = getattr(self, attr) result.append(" {0} [{1}] = {2!r};".format(attr, attrType.__name__, value)) result.append("};") return '\n'.join(result) __repr__ = __str__ #structures with enum #https://gist.github.com/christoph2/9c390e5c094796903097 class airspy_transfer_t(Structure): _fields_ = [("device",airspy_device_t_p), ("ctx",c_void_p), ("samples",c_void_p), ("sample_count",c_int), ("dropped_samples",c_uint64), ("sample_type",c_int)] #] _map_ = { "samples":airspy_sample_type } airspy_transfer_t_p = POINTER(airspy_transfer_t) #typedef struct { # uint32_t part_id[2]; # uint32_t serial_no[4]; #} airspy_read_partid_serialno_t; class airspy_read_partid_serialno_t(Structure): _fields_ = [("part_id", c_uint32*2), ("serial_no", c_uint32*4)] #typedef struct { # uint32_t major_version; # uint32_t minor_version; # uint32_t revision; #} airspy_lib_version_t; class airspy_lib_version_t(Structure): _fields_ = [("major_version", c_uint32), ("minor_version", c_uint32), ("revision", c_uint32)] #typedef int (*airspy_sample_block_cb_fn)(airspy_transfer* transfer); airspy_sample_block_cb_fn = PYFUNCTYPE(c_int, POINTER(airspy_transfer_t)) #extern ADDAPI void ADDCALL airspy_lib_version(airspy_lib_version_t* lib_version); f = libairspy.airspy_lib_version f.restype, f.argtypes = None, [POINTER(airspy_lib_version_t)] #try to load the lib version if its wrong print out warning version = airspy_lib_version_t() libairspy.airspy_lib_version(byref(version)) if False == ((int(version.major_version) == 1) and (int(version.minor_version) == 0) and (int(version.revision) == 11)): print("Unsuported version of libairspy.") print("Only supporting 1.0.11") raise ImportError #/* airspy_init() deprecated */ #extern ADDAPI int ADDCALL airspy_init(void); #/* airspy_exit() deprecated */ #extern ADDAPI int ADDCALL airspy_exit(void); #extern ADDAPI int ADDCALL airspy_list_devices(uint64_t *serials, int count); f = libairspy.airspy_list_devices f.restype, f.argtypes = c_int, [POINTER(c_uint64), c_int] #extern ADDAPI int ADDCALL airspy_open_sn(struct airspy_device** device, uint64_t serial_number); f = libairspy.airspy_open_sn f.restype, f.argtypes = c_int, [POINTER(airspy_device_t_p), c_uint64] #extern ADDAPI int ADDCALL airspy_open_fd(struct airspy_device** device, int fd); f = libairspy.airspy_open_fd f.restype, f.argtypes = c_int, [POINTER(airspy_device_t_p), c_int] #extern ADDAPI int ADDCALL airspy_open(struct airspy_device** device); f = libairspy.airspy_open f.restype, f.argtypes = c_int, [POINTER(airspy_device_t_p)] #extern ADDAPI int ADDCALL airspy_close(struct airspy_device* device); f = libairspy.airspy_close f.restype, f.argtypes = c_int, [airspy_device_t_p] #/* Use airspy_get_samplerates(device, buffer, 0) to get the number of available sample rates. It will be returned in the first element of buffer */ #extern ADDAPI int ADDCALL airspy_get_samplerates(struct airspy_device* device, uint32_t* buffer, const uint32_t len); f = libairspy.airspy_get_samplerates f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(c_uint32), c_uint32] #/* Parameter samplerate can be either the index of a samplerate or directly its value in Hz within the list returned by airspy_get_samplerates() */ #extern ADDAPI int ADDCALL airspy_set_samplerate(struct airspy_device* device, uint32_t samplerate); f = libairspy.airspy_set_samplerate f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint32] #extern ADDAPI int ADDCALL airspy_set_conversion_filter_float32(struct airspy_device* device, const float *kernel, const uint32_t len); f = libairspy.airspy_set_conversion_filter_float32 f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(c_float), c_uint32] #extern ADDAPI int ADDCALL airspy_set_conversion_filter_int16(struct airspy_device* device, const int16_t *kernel, const uint32_t len); f = libairspy.airspy_set_conversion_filter_int16 f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(c_int16), c_uint32] #extern ADDAPI int ADDCALL airspy_start_rx(struct airspy_device* device, airspy_sample_block_cb_fn callback, void* rx_ctx); f = libairspy.airspy_start_rx f.restype, f.argtypes = c_int, [airspy_device_t_p, airspy_sample_block_cb_fn, py_object] #extern ADDAPI int ADDCALL airspy_stop_rx(struct airspy_device* device); f = libairspy.airspy_stop_rx f.restype, f.argtypes = c_int, [airspy_device_t_p] #/* return AIRSPY_TRUE if success */ #extern ADDAPI int ADDCALL airspy_is_streaming(struct airspy_device* device); f = libairspy.airspy_is_streaming f.restype, f.argtypes = c_int, [airspy_device_t_p] #extern ADDAPI int ADDCALL airspy_si5351c_write(struct airspy_device* device, uint8_t register_number, uint8_t value); f = libairspy.airspy_set_conversion_filter_float32 f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(c_float), c_uint32] #extern ADDAPI int ADDCALL airspy_si5351c_read(struct airspy_device* device, uint8_t register_number, uint8_t* value); f = libairspy.airspy_si5351c_read f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8, POINTER(c_uint8)] #extern ADDAPI int ADDCALL airspy_config_write(struct airspy_device* device, const uint8_t page_index, const uint16_t length, unsigned char *data); #f = libairspy.airspy_config_write #f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8, c_uint16, c_char_p] #extern ADDAPI int ADDCALL airspy_config_read(struct airspy_device* device, const uint8_t page_index, const uint16_t length, unsigned char *data); #f = libairspy.airspy_config_read #f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8, c_uint16, c_ubyte_p] #extern ADDAPI int ADDCALL airspy_r820t_write(struct airspy_device* device, uint8_t register_number, uint8_t value); #f = libairspy.airspy_r820t_write #f.restype, f.argtypes = c_int, [airspy_device_t_p. c_uint8, c_uint8] #extern ADDAPI int ADDCALL airspy_r820t_read(struct airspy_device* device, uint8_t register_number, uint8_t* value); f = libairspy.airspy_r820t_read f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8, POINTER(c_uint8)] #/* Parameter value shall be 0=clear GPIO or 1=set GPIO */ #extern ADDAPI int ADDCALL airspy_gpio_write(struct airspy_device* device, airspy_gpio_port_t port, airspy_gpio_pin_t pin, uint8_t value); #f = libairspy.airspy_gpio_write #f.restype, f.argtypes = c_int, [airspy_device_t_p] #/* Parameter value corresponds to GPIO state 0 or 1 */ #extern ADDAPI int ADDCALL airspy_gpio_read(struct airspy_device* device, airspy_gpio_port_t port, airspy_gpio_pin_t pin, uint8_t* value); #f = libairspy.airspy_gpio_read #f.restype, f.argtypes = c_int, [airspy_device_t_p] #/* Parameter value shall be 0=GPIO Input direction or 1=GPIO Output direction */ #extern ADDAPI int ADDCALL airspy_gpiodir_write(struct airspy_device* device, airspy_gpio_port_t port, airspy_gpio_pin_t pin, uint8_t value); #f = libairspy.airspy_gpiodir_write #f.restype, f.argtypes = c_int, [airspy_device_t_p] #extern ADDAPI int ADDCALL airspy_gpiodir_read(struct airspy_device* device, airspy_gpio_port_t port, airspy_gpio_pin_t pin, uint8_t* value); #f = libairspy.airspy_gpiodir_read #f.restype, f.argtypes = c_int, [airspy_device_t_p] #extern ADDAPI int ADDCALL airspy_spiflash_erase(struct airspy_device* device); f = libairspy.airspy_spiflash_erase f.restype, f.argtypes = c_int, [airspy_device_t_p] #extern ADDAPI int ADDCALL airspy_spiflash_write(struct airspy_device* device, const uint32_t address, const uint16_t length, unsigned char* const data); f = libairspy.airspy_spiflash_write f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint32, c_uint16, POINTER(c_ubyte)] #extern ADDAPI int ADDCALL airspy_spiflash_read(struct airspy_device* device, const uint32_t address, const uint16_t length, unsigned char* data); f = libairspy.airspy_spiflash_read f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint32, c_uint16, POINTER(c_ubyte)] #extern ADDAPI int ADDCALL airspy_board_id_read(struct airspy_device* device, uint8_t* value); f = libairspy.airspy_board_id_read f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(c_uint8)] #/* Parameter length shall be at least 128bytes to avoid possible string clipping */ #extern ADDAPI int ADDCALL airspy_version_string_read(struct airspy_device* device, char* version, uint8_t length); f = libairspy.airspy_version_string_read f.restype, f.argtypes = c_int, [airspy_device_t_p, c_char_p, c_uint8] #extern ADDAPI int ADDCALL airspy_board_partid_serialno_read(struct airspy_device* device, airspy_read_partid_serialno_t* read_partid_serialno); f = libairspy.airspy_board_partid_serialno_read f.restype, f.argtypes = c_int, [airspy_device_t_p, POINTER(airspy_read_partid_serialno_t)] #extern ADDAPI int ADDCALL airspy_set_sample_type(struct airspy_device* device, enum airspy_sample_type sample_type); f = libairspy.airspy_set_sample_type f.restype, f.argtypes = c_int, [airspy_device_t_p, c_int] #/* Parameter freq_hz shall be between 24000000(24MHz) and 1750000000(1.75GHz) */ #extern ADDAPI int ADDCALL airspy_set_freq(struct airspy_device* device, const uint32_t freq_hz); f = libairspy.airspy_set_freq f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint32] #/* Parameter value shall be between 0 and 15 */ #extern ADDAPI int ADDCALL airspy_set_lna_gain(struct airspy_device* device, uint8_t value); f = libairspy.airspy_set_lna_gain f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8] #/* Parameter value shall be between 0 and 15 */ #extern ADDAPI int ADDCALL airspy_set_mixer_gain(struct airspy_device* device, uint8_t value); f = libairspy.airspy_set_mixer_gain f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8] #/* Parameter value shall be between 0 and 15 */ #extern ADDAPI int ADDCALL airspy_set_vga_gain(struct airspy_device* device, uint8_t value); f = libairspy.airspy_set_vga_gain f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8] #/* Parameter value: # 0=Disable LNA Automatic Gain Control # 1=Enable LNA Automatic Gain Control #*/ #extern ADDAPI int ADDCALL airspy_set_lna_agc(struct airspy_device* device, uint8_t value); f = libairspy.airspy_set_lna_agc f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8] #/* Parameter value: # 0=Disable MIXER Automatic Gain Control # 1=Enable MIXER Automatic Gain Control #*/ #extern ADDAPI int ADDCALL airspy_set_mixer_agc(struct airspy_device* device, uint8_t value); f = libairspy.airspy_set_mixer_agc f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8] #/* Parameter value: 0..21 */ #extern ADDAPI int ADDCALL airspy_set_linearity_gain(struct airspy_device* device, uint8_t value); f = libairspy.airspy_set_linearity_gain f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8] #/* Parameter value: 0..21 */ #extern ADDAPI int ADDCALL airspy_set_sensitivity_gain(struct airspy_device* device, uint8_t value); f = libairspy.airspy_set_sensitivity_gain f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8] #/* Parameter value shall be 0=Disable BiasT or 1=Enable BiasT */ #extern ADDAPI int ADDCALL airspy_set_rf_bias(struct airspy_device* dev, uint8_t value); f = libairspy.airspy_set_rf_bias f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8] #/* Parameter value shall be 0=Disable Packing or 1=Enable Packing */ #extern ADDAPI int ADDCALL airspy_set_packing(struct airspy_device* device, uint8_t value); f = libairspy.airspy_set_packing f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint8] #extern ADDAPI const char* ADDCALL airspy_error_name(enum airspy_error errcode); #f = libairspy.airspy_error_name #f.restype, f.argtypes = c_char_p, [] #extern ADDAPI const char* ADDCALL airspy_board_id_name(enum airspy_board_id board_id); #f = libairspy.airspy_board_id_name #f.restype, f.argtypes = c_char_p, [airspy_device_t_p] #/* Parameter sector_num shall be between 2 & 13 (sector 0 & 1 are reserved) */ #extern ADDAPI int ADDCALL airspy_spiflash_erase_sector(struct airspy_device* device, const uint16_t sector_num); f = libairspy.airspy_spiflash_erase_sector f.restype, f.argtypes = c_int, [airspy_device_t_p, c_uint16]