From e74679711bb9051080e7f11e16fe474fb6b56e60 Mon Sep 17 00:00:00 2001 From: Arturs Artamonovs Date: Wed, 22 Mar 2023 22:55:37 +0000 Subject: airspyhf_waterfall shows first signs of working --- airspyhf_waterfall.py | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 airspyhf_waterfall.py (limited to 'airspyhf_waterfall.py') diff --git a/airspyhf_waterfall.py b/airspyhf_waterfall.py new file mode 100644 index 0000000..fc1de56 --- /dev/null +++ b/airspyhf_waterfall.py @@ -0,0 +1,226 @@ +import os +import sys +import math + +import airspyhf + +import matplotlib +import numpy +import pylab +import time +import threading + +import pygame +from pygame import gfxdraw + +CENTER_FREQ = 101000000 +SAMPLE_RATE = 196e3 +SAMPLE_NUM = 2048 + +SCREEN_X = 1025 +SCREEN_Y = 320 + +MOVE_STEP = int(SAMPLE_RATE/2) + +sample_buf_lock = threading.Lock() + +# init AIRSPY and if no then go out +airspy = airspyhf.AirSpyHF() +if airspy.open(device_index=0) == -1: + print("Cant open airspyhf device") + sys.exit(1) + +# config rtlsdr device +airspy.set_samplerate(SAMPLE_RATE) +airspy.set_frequency(CENTER_FREQ) +airspy.set_hf_agc(1) +airspy.set_hf_agc_threshold(0) +airspy.set_hf_lna(1) + + +def iq_abs(c): + return (math.sqrt((c.real ** 2 + c.imag ** 2))) + + +# point should be normalised to 0.0 ... 1.0 +def color_normalise(point): + ret = (255, 0, 0) + # blue + if (point < 0.3): + ret = (0, 0, int(point * 255 * 3.3)) + # yello + elif (point < 0.7): + ret = (0, int((point - 0.3) * 255 * 2.5), 0) + # red + elif (point <= 1.0): + ret = (int((point - 0.7) * 255 * 3.3), 0, 0) + else: + # print "Color Error ", point + pass + return ret + + +def color_mapping(x): + "assumes -50 to 0 range, returns color" + r = int((x + 70) * 255 // 70) + r = max(0, r) + r = min(255, r) + return (r, r, 100) + + +# def draw_Hz( surface, x, y, hz ): + + +arr = [[0 for i in range(0, SCREEN_X)] for j in range(0, SCREEN_Y)] + +# init all pygame modules audio,video and more +pygame.init() + +# [NEW] creates screen surface using constants +screen = pygame.display.set_mode((SCREEN_X, SCREEN_Y)) + +#samples = rtl.read_samples(SAMPLE_NUM) +sample_buffer = [] +def read_samples(transfer): + global sample_buffer + #print("callback") + #if sample_buf_lock.locked(): + # print("Buffer locked") + # return 0 + #sample_buf_lock.acquire() + #print("Python call back") + t = transfer.contents + bytes_to_write = t.sample_count * 4 * 2 + #print("Received %d samples"%(t.sample_count)) + rx_buffer = t.samples + #print(f"{bytes_to_write} bytes receieved") + #sample_buffer.append(rx_buffer) + for i in range(0,t.sample_count): + d_re = t.samples[i].re + d_im = t.samples[i].im + sample_buffer.append(math.sqrt(d_re*d_re+d_im*d_im)) + #data = struct.pack(" num: + # arr = [] + # times = len(samples)/num + # for i in range(0, num): + # avg = numpy.average(samples[times*num:times*num+times-1]) + # arr.append(avg) + # return arr + #else: + # print("Error in get_samples") + # return [] + return [] + +read_samples_cb = airspyhf.airspyhf_sample_block_cb_fn(read_samples) +airspy.start(read_samples_cb) + +run = True +line = 0 +while run and airspy.is_streaming(): + + print("Loop tick") + + # check for all events that where ocure + for event in pygame.event.get(): + # if some one clicked on close button + if event.type == pygame.QUIT: + # terminate programm + run = False + # don't waste your time by waiting while event loop will end + break + elif event.type == pygame.KEYDOWN: + if event.key == pygame.K_LEFT: + print("Left") + #rtl.center_freq -= MOVE_STEP + airspy.set_frequency(airspy.cur_freq - MOVE_STEP) + #print("Center freq: ", rtl.center_freq, " Hz") + elif event.key == pygame.K_RIGHT: + print("Right") + #rtl.center_freq += MOVE_STEP + #print("Center freq: ", rtl.center_freq, " Hz") + airspy.set_frequency(airspy.cur_freq + MOVE_STEP) + + width = SCREEN_X + height = SCREEN_Y + + samples = get_samples(SAMPLE_NUM) + if samples == []: + print("sample buffer is empty") + time.sleep(1) + continue + #print(samples) + spect = numpy.fft.fft(samples,n=SAMPLE_NUM) + + spect = spect[0:int((len(spect) / 2))] + + # (1/(Fs*N)) * abs(xdft).^2; + spect_n = [(1.0 / (SAMPLE_NUM * len(spect))) * iq_abs(x) ** 2 for x in spect] + spect_n = (10 * numpy.log10(spect_n)).tolist() + + #print(spect_n) + + # total data size + spect_len = len(spect_n) + #print(spect_len) + # calculate amount spect points per pixel without rounding + pixel_width = int(spect_len / SCREEN_X + 1) + #print(pixel_width) + pixel_steps = int(spect_len / pixel_width) + #print(pixel_steps) + #print(spect_n) + for step in range(0, pixel_steps): + avg = 0.0 + for i in range(0, pixel_width): + avg += spect_n[step * pixel_width + i] + avg /= pixel_width + if math.isinf(avg): + avg = -1000 + + # print avg + # gfxdraw.pixel( screen, step, line, color_normalise((100-abs(avg))/10)) + gfxdraw.pixel(screen, step, line, color_mapping(int(avg))) + + # draw central freq + #font = pygame.font.Font(None, 20) + #text = font.render(str(rtl.center_freq / 1e6), 1, (200, 30, 30), (0, 0, 0)) + #screen.blit(text, (SCREEN_X / 2, SCREEN_Y - 20)) + #text = font.render(str((rtl.center_freq + SAMPLE_RATE / 2) / 1e6), 1, (200, 30, 30), (0, 0, 0)) + #screen.blit(text, (SCREEN_X - 40, SCREEN_Y - 20)) + #text = font.render(str((rtl.center_freq - SAMPLE_RATE / 2) / 1e6), 1, (200, 30, 30), (0, 0, 0)) + #screen.blit(text, (20, SCREEN_Y - 20)) + + pygame.display.flip() + line += 1 + if (line > SCREEN_Y): + line = 0 + time.sleep(0.1) + +pygame.quit() + +airspy.stop() +airspy.close() + -- cgit v1.2.3