sxceiver: add script for DC calibration
Some checks failed
/ build-hamnet70 (push) Failing after 17s
/ build-doc (push) Successful in 25s
/ deploy-doc (push) Has been skipped

This commit is contained in:
Thomas Kolb 2025-10-25 22:07:57 +02:00
commit a086a06e96

352
impl/utils/sxc_cal.py Executable file
View file

@ -0,0 +1,352 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: MIT
"""Simple transmit test program.
Parameters can be changed by importing the module in Python REPL
and calling main() with different parameters.
For example:
$ python3
>>> import tx_test
>>> tx_test.main(tx_freq = 435e6)
"""
import logging
import os
import time
import struct
import SoapySDR
import numpy as np
def dump_signal(filename, sig, samp_rate):
with open(filename, "wb") as sigfile:
sigfile.write(b"CPX_")
sigfile.write(struct.pack("f", 1/samp_rate))
sigfile.write(sig.tobytes())
def tx_iqbal_testsignal(dev, tx, samp_rate, t_tx, blocksize, qgain, base_phase, phase_inc):
phase = base_phase + phase_inc * np.array(range(blocksize), dtype=np.float32)
base_phase = np.fmod(base_phase + blocksize * phase_inc, 2*np.pi)
tx_buf = 0.5 * (np.cos(phase, dtype=np.complex64) + 1j * qgain * np.sin(phase, dtype=np.complex64))
ret = dev.writeStream(tx, [tx_buf], len(tx_buf), SoapySDR.SOAPY_SDR_HAS_TIME, timeNs=t_tx)
if ret.ret != len(tx_buf):
logging.warning('TX: %s' % ret)
dump_signal(f"/tmp/iq_imbal_tx_raw_qg={qgain:.4f}.cpx", tx_buf, samp_rate)
return ret, base_phase
def rx_iqbal_testsignal(dev, rx, samp_rate, blocksize, qgain, fftsize, fftbin):
rx_buf = np.zeros(blocksize, dtype=np.complex64)
ret = dev.readStream(rx, [rx_buf], len(rx_buf))
if ret.ret != len(rx_buf):
logging.warning('RX: %s' % ret)
dump_signal(f"/tmp/iq_imbal_rx_raw_qg={qgain:.4f}.cpx", rx_buf, samp_rate)
# calculate the FFT and extract the two test peaks
offset = 1024
signal = rx_buf[offset:offset+fftsize]
dump_signal(f"/tmp/iq_imbal_rx_extract_qg={qgain:.4f}.cpx", signal, samp_rate)
fft = np.fft.fft(signal)
wanted_bin = np.sum(np.absolute(fft[fftbin-1:fftbin+2]))
mirror_bin = np.sum(np.absolute(fft[fftsize-fftbin-1:fftsize-fftbin+2]))
delta_dB = 20*np.log10(np.absolute(wanted_bin) / np.absolute(mirror_bin))
return delta_dB
def synced_transfer(dev, rx, tx, tx_buf):
block_size = 4096
rx_bufs = []
tx_block = tx_buf[:4*block_size]
tx_start = 4*block_size
ret = dev.writeStream(tx, [tx_block], len(tx_block))
if ret.ret != len(tx_block):
logging.warning('TX: %s' % ret)
while tx_start < len(tx_buf) + 4*block_size:
rx_block = np.zeros(block_size, dtype=np.complex64)
ret = dev.readStream(rx, [rx_block], block_size)
if ret.ret != block_size:
logging.warning('RX: %s' % ret)
rx_bufs.append(rx_block)
if tx_start < len(tx_buf):
tx_block = tx_buf[tx_start:tx_start+block_size]
else:
tx_block = np.zeros(block_size, dtype=np.complex64)
tx_start += block_size
ret = dev.writeStream(tx, [tx_block], len(tx_block))
if ret.ret != len(tx_block):
logging.warning('TX: %s' % ret)
return np.concatenate(rx_bufs)[:len(tx_buf)]
def main(
center_freq = 434.1e6,
tx_gain = 20.0,
rx_gain = 70.0,
blocksize = 16384,
samp_rate = 300e3,
avg_seconds = 1,
loglevel = logging.WARNING,
soapyloglevel = SoapySDR.SOAPY_SDR_INFO
):
avg_samples = int(samp_rate * avg_seconds)
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', level=loglevel)
SoapySDR.setLogLevel(soapyloglevel)
try:
os.sched_setscheduler(0, os.SCHED_RR, os.sched_param(10))
except PermissionError:
logging.warning('Could not use real-time scheduling')
rx_buf = np.zeros(avg_samples, dtype=np.complex64)
dev = SoapySDR.Device({
'driver': 'sx',
})
dev.setSampleRate(SoapySDR.SOAPY_SDR_TX, 0, samp_rate)
dev.setFrequency(SoapySDR.SOAPY_SDR_TX, 0, center_freq)
dev.setGain(SoapySDR.SOAPY_SDR_TX, 0, tx_gain)
dev.setAntenna(SoapySDR.SOAPY_SDR_TX, 0, "TX")
dev.setSampleRate(SoapySDR.SOAPY_SDR_RX, 0, samp_rate)
dev.setFrequency(SoapySDR.SOAPY_SDR_RX, 0, center_freq)
dev.setGain(SoapySDR.SOAPY_SDR_RX, 0, rx_gain)
dev.setAntenna(SoapySDR.SOAPY_SDR_RX, 0, "LB")
rx = dev.setupStream(SoapySDR.SOAPY_SDR_RX, SoapySDR.SOAPY_SDR_CF32, [0], {'threshold':'0'})
### RX offset measurement
dev.activateStream(rx)
print(f"Sampling for RX DC offset calculation...")
rx_len = len(rx_buf)
ret = dev.readStream(rx, [rx_buf], rx_len)
if ret.ret != rx_len:
logging.warning('RX: %s' % ret)
dev.deactivateStream(rx)
rx_dc_offset = np.sum(rx_buf[:ret.ret]) / ret.ret
print(f"RX DC offset: {1000*rx_dc_offset:.6f} ⋅ 10⁻³ ({20*np.log10(np.abs(rx_dc_offset)):.2f} dB)")
dump_signal("/tmp/rx_cal_raw.cpx", rx_buf, samp_rate)
dump_signal("/tmp/rx_cal_dc_corrected.cpx", rx_buf - rx_dc_offset, samp_rate)
dev.closeStream(rx)
### TX offset measurement
# set up linked streams to ensure they start at the same time
rx = dev.setupStream(SoapySDR.SOAPY_SDR_RX, SoapySDR.SOAPY_SDR_CF32, [0], {'threshold':'0', 'link':'1'})
tx = dev.setupStream(SoapySDR.SOAPY_SDR_TX, SoapySDR.SOAPY_SDR_CF32, [0], {'threshold':'0', 'link':'1'})
# TX offset calculation uses a signal composed of three parts with 1000
# samples pause in between:
#
# 1. Transmit constant 0.1 + 0.0j
# 2. Transmit constant 0.2 + 0.0j
# 3. Transmit constant 0.0 + 0.0j
#
# From the first two signals, the transfer function (phase shift + scaling)
# of the loopback path can be calculated. Then, this can be applied to the
# TX DC offset observed in the third signal to determine the TX DC offset.
buf_len = 3*avg_samples + 2*1000
tx_buf = np.zeros(buf_len, dtype=np.complex64)
rx_buf = np.zeros(buf_len, dtype=np.complex64)
tx_buf[:avg_samples] = (0.1 + 0.0j) * np.ones(avg_samples, dtype=np.complex64)
tx_buf[avg_samples+1000:2*avg_samples+1000] = (0.2 + 0.0j) * np.ones(avg_samples, dtype=np.complex64)
#for i in range(10):
# tx_buf[100000+100*i:100000+100*i+50] += i / 20
#samp = np.arange(0, buf_len, dtype=np.complex64)
#tx_buf += 0.1 * np.real(np.exp(1j * 2 * np.pi * samp / samp_rate * 10e3)) # 10 kHz oszillation
#tx_buf[:buf_len//2] += -0.9 + 1.8 * np.linspace(0, 1, buf_len//2)
#tx_buf[buf_len//2:] += -0.9 + 1.8 * np.linspace(1, 0, buf_len//2)
print(f"Sampling for TX DC offset calculation...")
dev.activateStream(rx)
dev.activateStream(tx)
rx_buf = synced_transfer(dev, rx, tx, tx_buf)
dev.deactivateStream(rx)
dev.deactivateStream(tx)
rx_buf_rx_dc_corrected = rx_buf - rx_dc_offset
dump_signal("/tmp/tx_cal_offset_tx_raw.cpx", tx_buf, samp_rate)
dump_signal("/tmp/tx_cal_offset_rx_raw.cpx", rx_buf, samp_rate)
dump_signal("/tmp/tx_cal_offset_rx_dc_corrected.cpx", rx_buf_rx_dc_corrected, samp_rate)
avg1 = np.mean(rx_buf_rx_dc_corrected[int(0.1*avg_samples):int(0.9*avg_samples)]) # constant 0.1
avg2 = np.mean(rx_buf_rx_dc_corrected[int(1.1*avg_samples):int(1.9*avg_samples)]) # constant 0.2
avg3 = np.mean(rx_buf_rx_dc_corrected[int(2.1*avg_samples):int(2.9*avg_samples)]) # silence
h = (avg2 - avg1) / 0.1
# avg3 contains the _received_ version of the TX offset. We therefore
# divide by h to get the value that must be subtracted from the transmit
# signal.
tx_offset = avg3 / h
print(f"AVG1: {avg1:.6f}")
print(f"AVG2: {avg2:.6f}")
print(f"AVG3: {avg3:.6f}")
print(f"h: {h:.6f} = {np.absolute(h):.6f} @ {np.angle(h)*180/np.pi:.6f}°")
print(f"TX offset: {tx_offset:.6f} = {np.absolute(tx_offset):.6f} @ {np.angle(tx_offset)*180/np.pi:.6f}°")
print(f"TX offset test...")
tx_buf = -tx_offset * np.ones(avg_samples, dtype=np.complex64)
dev.activateStream(rx)
dev.activateStream(tx)
rx_buf = synced_transfer(dev, rx, tx, tx_buf)
dev.deactivateStream(rx)
dev.deactivateStream(tx)
rx_buf_rx_dc_corrected = rx_buf - rx_dc_offset
dump_signal("/tmp/tx_offset_test_tx_raw.cpx", tx_buf[:-1000], samp_rate)
dump_signal("/tmp/tx_offset_test_rx_raw.cpx", rx_buf[:-1000], samp_rate)
dump_signal("/tmp/tx_offset_test_rx_dc_corrected.cpx", rx_buf_rx_dc_corrected[:-1000], samp_rate)
return
### I/Q imbalance measurements for TX
t_tx = int(10e6) # ns
t_tx_inc = int(1e9 * blocksize / samp_rate)
fftbin = 120
fftsize = 512
phase_inc = fftbin / fftsize * 2*np.pi
base_phase = 0
dev.activateStream(rx)
dev.activateStream(tx)
test_qgains = np.arange(0.9500, 1.0501, 0.001)
# have one signal in the TX buffer at the beginning
ret, base_phase = tx_iqbal_testsignal(dev, tx, samp_rate, t_tx, blocksize, test_qgains[0], base_phase, phase_inc)
# "seek" RX forward to the start of transmission
dummy = np.zeros(int(t_tx * samp_rate / 1e9), dtype=np.complex64)
ret = dev.readStream(rx, [dummy], len(dummy))
if ret.ret != len(dummy):
logging.warning('RX: %s' % ret)
t_tx += t_tx_inc
best_tx_qgain_db = 0
best_tx_qgain = 1.0
for i in range(len(test_qgains)-1):
txd_qgain = test_qgains[i+1]
rxd_qgain = test_qgains[i]
ret, base_phase = tx_iqbal_testsignal(dev, tx, samp_rate, t_tx, blocksize, txd_qgain, base_phase, phase_inc)
t_tx += t_tx_inc
delta_db = rx_iqbal_testsignal(dev, rx, samp_rate, blocksize, rxd_qgain, fftsize, fftbin)
print(f"{rxd_qgain:.4f} => {delta_db:.2f} dB")
if delta_db > best_tx_qgain_db:
best_tx_qgain_db = delta_db
best_tx_qgain = rxd_qgain
delta_db = rx_iqbal_testsignal(dev, rx, samp_rate, blocksize, test_qgains[-1], fftsize, fftbin)
print(f"{test_qgains[-1]:.4f} => {delta_db:.2f} dB")
if delta_db > best_tx_qgain_db:
best_tx_qgain_db = delta_db
best_tx_qgain = test_qgains[-1]
print(f"Best TX I/Q balance [{best_tx_qgain_db:.2f} dB to mirror] if Q = {best_tx_qgain:.4f} * I.")
dev.deactivateStream(rx)
dev.deactivateStream(tx)
### I/Q imbalance measurements for RX
t_tx = int(10e6) # ns
t_tx_inc = int(1e9 * blocksize / samp_rate)
fftbin = 120
fftsize = 512
phase_inc = fftbin / fftsize * 2*np.pi
base_phase = 0
dev.activateStream(rx)
dev.activateStream(tx)
test_qgains = np.arange(0.9500, 1.0501, 0.001)
# transmit the test signal, this time with the optimum TX Q gain
ret, base_phase = tx_iqbal_testsignal(dev, tx, samp_rate, t_tx, blocksize, best_tx_qgain, base_phase, phase_inc)
# "seek" RX forward to the start of transmission
dummy = np.zeros(int(t_tx * samp_rate / 1e9), dtype=np.complex64)
ret = dev.readStream(rx, [dummy], len(dummy))
if ret.ret != len(dummy):
logging.warning('RX: %s' % ret)
t_tx += t_tx_inc
# receive the test signal (only once required)
rx_buf = np.zeros(blocksize, dtype=np.complex64)
ret = dev.readStream(rx, [rx_buf], len(rx_buf))
if ret.ret != len(rx_buf):
logging.warning('RX: %s' % ret)
dev.deactivateStream(rx)
dev.deactivateStream(tx)
# remove the average value from the received signal
rx_buf_no_dc = rx_buf - np.mean(rx_buf)
# calculate the RMS of I and Q
i_rms = np.sqrt(np.mean(np.real(rx_buf_no_dc)**2))
q_rms = np.sqrt(np.mean(np.imag(rx_buf_no_dc)**2))
print(f"rms(I) = {i_rms:.6f}")
print(f"rms(Q) = {q_rms:.6f}")
best_rx_qgain = i_rms / q_rms
print(f"Best RX I/Q balance if Q = {best_rx_qgain:.4f} * I.")
# apply it to the original received signal for testing
rx_sig_iq_fixed = np.real(rx_buf) + 1j * best_rx_qgain * np.imag(rx_buf)
dump_signal("/tmp/iq_imbal_rx_raw.cpx", rx_buf, samp_rate)
dump_signal("/tmp/iq_imbal_rx_fixed.cpx", rx_sig_iq_fixed, samp_rate)
if __name__ == '__main__':
main()