diff options
Diffstat (limited to 'test.py')
-rw-r--r-- | test.py | 150 |
1 files changed, 150 insertions, 0 deletions
@@ -0,0 +1,150 @@ +import os +import airspyhf +from ctypes import * +import time +import sys +import wave +import struct + +print("Check airspyHF version") + +p = airspyhf.airspyhf_lib_version_t() +print(airspyhf.libairspyhf.airspyhf_lib_version(byref(p))) +print(p.major_version) +print(p.minor_version) +print(p.revision) + +print("Get list of devices if there is any") +ndev = airspyhf.libairspyhf.airspyhf_list_devices(None,0) +print("Found %d devices"%(ndev)) + +for devi in range(0,ndev): + serial = c_uint64(0) + airspyhf.libairspyhf.airspyhf_list_devices(byref(serial),devi+1) + print("Device %d: Serial number %s"%(int(devi),hex(serial.value) )) + +print("try to open device") +#device = POINTER(c_void_p) +#device_p = device() +dev_p = airspyhf.airspyhf_device_t_p(None) +ret = airspyhf.libairspyhf.airspyhf_open_sn(dev_p,0x3b52ab5dada12535) +print("open_sn: Returned %d"%(ret)) +if (ret != 0): + print("airspyhf_open_sn returned != 0, error") + sys.exit() + +print("List sample rates") +nsrates = c_uint32(0) + +ret = airspyhf.libairspyhf.airspyhf_get_samplerates(dev_p,byref(nsrates),c_uint32(0)) +print("ret %d"%ret) +print("sample rates %d"% nsrates.value) + +supportet_samplerates = (c_uint32*4)(0) +ret = airspyhf.libairspyhf.airspyhf_get_samplerates(dev_p,supportet_samplerates,nsrates) +print("ret %d"%ret) +for i in range(0,nsrates.value): + print("Sample rates %d"% supportet_samplerates[i]) + +#try to get some samples +ret = airspyhf.libairspyhf.airspyhf_set_samplerate(dev_p, supportet_samplerates[3]) +print(f"airspyhf_set_samplerate ret={ret}") + +ret = airspyhf.libairspyhf.airspyhf_set_hf_agc(dev_p, 1) +print(f"airspyhf_set_hf_agc ret={ret}") + +ret = airspyhf.libairspyhf.airspyhf_set_hf_agc_threshold(dev_p, 0) +print(f"airspyhf_set_hf_agc_threshold ret={ret}") + + +class CBthread: + def __init__(self,dev_p): + self.dev_p = dev_p + self.read_samples_cb = None + + def read_samples(self, transfer): + print("we here") + return 0 + + def start(self): + self.read_samples_cb = airspyhf.airspyhf_sample_block_cb_fn(self.read_samples) + ret = airspyhf.libairspyhf.airspyhf_start(self.dev_p, self.read_samples_cb, None) + print(f"airspyhf_start ret={ret}") + + def wait(self): + print("wait called") + +sample_count = 0 +wave_file = wave.open("record.wav","w") +wave_file.setnchannels(2) +wave_file.setsampwidth(4) +wave_file.setframerate(supportet_samplerates[1]) +#@CFUNCTYPE(c_int, airspyhf.airspyhf_transfer_t_p) +#@PYFUNCTYPE(c_int, airspyhf.airspyhf_transfer_t_p) +def read_samples(transfer): + global sample_count + global wave_file + #print("Python call back") + t = transfer.contents + bytes_to_write = t.sample_count * 4 * 2 + rx_buffer = t.samples + #print(f"{bytes_to_write} bytes receieved") + sample_count += t.sample_count + for i in range(0,t.sample_count): + d_re = t.samples[i].re + d_im = t.samples[i].im + data = struct.pack("<f",d_re) # FIX ?! + wave_file.writeframesraw(data) + data = struct.pack("<f", d_im) # FIX ?! + wave_file.writeframesraw(data) + #print("End call back") + return 0 + + +#th = CBthread(dev_p) +#th.start() + +read_samples_cb = airspyhf.airspyhf_sample_block_cb_fn(read_samples) + +print(read_samples) +print(read_samples_cb) + + +ret = airspyhf.libairspyhf.airspyhf_start(dev_p, airspyhf.airspyhf_sample_block_cb_fn(read_samples), None) +print(f"airspyhf_start ret={ret}") + +#ret = airspyhf.libairspyhf.py_cb_wrapper(dev_p) +#print(f"airspyhf_start ret={ret}") + + +ret = airspyhf.libairspyhf.airspyhf_set_freq(dev_p, 3865000) +print(f"airspyhf_set_freq ret={ret}") + +count = 0 +try: + while (airspyhf.libairspyhf.airspyhf_is_streaming(dev_p)) and (count < 3): + print("Main loop") + time.sleep(1) + count += 1 +except: + print("Error in main loop") + +ret = airspyhf.libairspyhf.airspyhf_stop(dev_p) +print(f"airspyhf_stop ret={ret}") + +#Not close for now +ret = airspyhf.libairspyhf.close(dev_p) +print("closed: Returned %d"%(ret)) + +print(f"Total samples received {sample_count}") + +airspyhf.libairspyhf.py_test() + +airspyhf.libairspyhf.py_test_cb(read_samples_cb) + +wave_file.close() + +print("All is ok") + +#th.wait() + |