Files
castelion_radar_alinx_kintex/python/radar_manager.py

440 lines
14 KiB
Python
Executable File

import ctypes
import datetime
import ipaddress
import socket
import struct
import time
import numpy as np
from matplotlib import pyplot as plt
import data_structures as msg_types
from data_structures import CpiHeader
UTIL_REG_ADDR = 0x40050000
TIMING_ENGINE_ADDR = 0x40051000
DIG_RX_ADDR = 0x20000000
DIG_RX_STRIDE = 0x10000
WAVEFORM_GEN_ADDR = 0x40053000
NUM_RX = 2
JESD204B = False
BASEBAND_SAMPLE_RATE = 750e6
TIMING_ENGINE_FREQ = BASEBAND_SAMPLE_RATE / 4
# ADC_SAMPLE_RATE = 187.5e6
# DAC_SAMPLE_RATE = 187.5e6
f_dac = 9e9
f_adc = 3e9
# ADC_SAMPLE_RATE = 225e6
# DAC_SAMPLE_RATE = 225e6
# f_dac = 10.8e9
# f_adc = 3.6e9
# JESD204B = True
# # ADC_SAMPLE_RATE = 250e6
# # DAC_SAMPLE_RATE = 250e6
# # f_dac = 12e9
# # f_adc = 4e9
# ADC_SAMPLE_RATE = 187.5e6
# DAC_SAMPLE_RATE = 187.5e6
# f_dac = 9e9
# f_adc = 3e9
def form_chirp(pulsewidth, bw, sample_rate, win=None, ):
n = int(np.round(pulsewidth * sample_rate))
d_t = 1 / sample_rate
f = np.linspace(-bw/2, bw/2, n)
phi = np.cumsum(2 * np.pi * f * d_t)
x = np.exp(-1j * phi)
return x.astype(np.complex64)
class RadarManager:
def __init__(self,
host="192.168.1.200",
port=5001):
self.host = host
self.port = port
self.s = None
self.CONNECTED = False
self.connect()
self.get_fpga_datecode()
# Update UDP packet size
# self.packet_size = 4096
# self.packet_size = 16
# self.axi_write_register(0x4005001C, self.packet_size)
self.reset_10g_udp()
self.stop_running()
def connect(self):
self.CONNECTED = False
retry_cnt = 0
while True:
try:
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.settimeout(5)
self.s.connect((self.host, self.port))
self.s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.s.settimeout(5)
self.CONNECTED = True
break
except socket.error as e:
print("Connection Error %s" % e)
# print("Connection Timeout, trying again %d" % retry_cnt)
self.CONNECTED = False
retry_cnt += 1
if retry_cnt >= 1:
break
return self.CONNECTED
def disconnect(self):
self.s.close()
self.CONNECTED = False
def is_connected(self):
return self.CONNECTED
def send_bytes(self, msg_bytes):
self.s.sendall(msg_bytes)
def send_message(self, msg, update_hdr=True, enable_ack=True, wait_for_ack=True, timeout=10):
self.s.settimeout(timeout)
ret = True
msg.header.flags = 0
# Request a completion ACK
if enable_ack:
msg.header.flags = msg_types.HDR_FLAG_REQ_ACK
# Serialize message and send
self.send_bytes(bytes(msg))
# Need to wait for response
if wait_for_ack:
recv_bytes, err = self.get_response(ctypes.sizeof(msg_types.AckType))
resp = msg_types.AckType.from_buffer_copy(recv_bytes)
ret = True
return ret
def get_response(self, size):
recv_data = bytearray()
while len(recv_data) < size:
try:
recv_data += self.s.recv(size, 0)
except socket.timeout as e:
return 0, -1
return recv_data, 0
def axi_write_register(self, address, data):
# Make sure address is word aligned
address -= (address % 4)
# Form message
msg = msg_types.WriteRegType()
msg.address = address
msg.data = data
self.send_message(msg)
return
def axi_write_register_burst(self, address, data):
# Make sure address is word aligned
address -= (address % 4)
# Form message
msg = msg_types.WriteRegBurstType()
msg.address = address
msg.length = len(data)
for i in range(len(data)):
msg.data[i] = data[i]
self.send_message(msg)
return
def axi_read_register(self, address):
# Make sure address is word aligned
address -= (address % 4)
# Form message
msg = msg_types.ReadRequestType()
msg.address = address
self.send_message(msg, enable_ack=False, wait_for_ack=False)
# Get response
recv_bytes, _ = self.get_response(ctypes.sizeof(msg_types.ReadResponseType))
resp = msg_types.ReadResponseType.from_buffer_copy(recv_bytes)
return resp.data
def ad9081_write_reg(self, address, data):
# Form message
msg = msg_types.WriteRegType(msg_id=msg_types.AD9081_REG_WRITE)
msg.address = address
msg.data = data
self.send_message(msg)
return
def ad9081_read_reg(self, address):
# Form message
msg = msg_types.ReadRequestType(msg_id=msg_types.AD9081_REG_READ)
msg.address = address
self.send_message(msg, enable_ack=False, wait_for_ack=False)
# Get response
recv_bytes, _ = self.get_response(ctypes.sizeof(msg_types.ReadResponseType))
resp = msg_types.ReadResponseType.from_buffer_copy(recv_bytes)
return resp.data
def set_dac_nco(self, channel, frequency):
# Form message
msg = msg_types.DacNcoConfigType()
msg.channel = channel
msg.frequency = int(frequency)
self.send_message(msg)
return
def set_adc_nco(self, channel, frequency):
# Form message
msg = msg_types.AdcNcoConfigType()
msg.channel = channel
msg.frequency = int(frequency)
self.send_message(msg)
return
def set_lane_mapping(self, lane_map):
# Form message
msg = msg_types.LaneMapType()
for i in range(8):
msg.lane_map[i] = lane_map[i]
self.send_message(msg)
return
def rf_spi_write(self, dev_sel, num_bits, data):
# Form message
msg = msg_types.RfSpiWriteType()
msg.dev_sel = dev_sel
msg.num_bits = num_bits
msg.data = data
self.send_message(msg)
return
def config_flash_write(self, address, data):
# Make sure address is word aligned
address -= (address % 4)
# Form message
msg = msg_types.ConfigFlashWriteType(msg_id=msg_types.CONFIG_FLASH_WRITE)
msg.address = address
msg.length = len(data)
for i in range(len(data)):
msg.data[i] = data[i]
self.send_message(msg)
return
def update_ip_address(self, ip, mask, gw, port):
ip = ipaddress.IPv4Address(ip)
mask = ipaddress.IPv4Address(mask)
gw = ipaddress.IPv4Address(gw)
data = bytes(np.array([ip, mask, gw, port], dtype=np.uint32))
self.config_flash_write(0xf00000, data)
def get_fpga_datecode(self):
datecode = self.axi_read_register(UTIL_REG_ADDR + 0x114)
timecode = self.axi_read_register(UTIL_REG_ADDR + 0x118)
print('FPGA Datestamp %x_%x' % (datecode, timecode))
def load_waveform(self, ch, amp, bw, pw, num_wf=1):
addr = 0x0020000 + 0x0020000 * ch
print('Load', hex(addr))
num_samples = pw
waveforms = np.empty(num_wf*num_samples, dtype=np.complex64)
for wf_ind in range(num_wf):
wf = form_chirp(pw, bw, 1)
wf = wf * amp / (wf_ind + 1)
waveforms[(wf_ind * num_samples):(wf_ind * num_samples + num_samples)] = wf
# plt.plot(waveforms.real)
# plt.show()
iq = waveforms * 0x7FFF
iq_real = iq.real.astype(np.uint16)
iq_imag = iq.imag.astype(np.uint16)
iq_real = iq_real.astype(np.uint32)
iq_imag = iq_imag.astype(np.uint32)
data = iq_real | (iq_imag << 16)
num_bursts = len(waveforms) / msg_types.MAX_BURST_LENGTH
num_bursts = int(np.ceil(num_bursts))
for i in range(num_bursts):
start_ind = i * msg_types.MAX_BURST_LENGTH
stop_ind = start_ind + msg_types.MAX_BURST_LENGTH
stop_ind = min(stop_ind, num_wf*num_samples)
burst_data = data[start_ind:stop_ind]
self.axi_write_register_burst(addr + i * 4 * msg_types.MAX_BURST_LENGTH, burst_data)
def reset_10g_udp(self):
val = self.axi_read_register(0x40050008)
self.axi_write_register(0x40050008, val | (1 << 15))
self.axi_write_register(0x40050008, val)
time.sleep(2)
def setup_cpi_header(self, pri, inter_cpi, num_pulses, num_samples, start_sample,
tx_num_samples, tx_start_sample, rx_lo_offset, tx_lo_offset):
# Load Up header BRAM
header = CpiHeader()
header.sync = 0xAABBCCDD
header.num_pulses = num_pulses
header.num_samples = num_samples
header.start_sample = start_sample
header.tx_num_samples = tx_num_samples
header.tx_start_sample = tx_start_sample
header.pri = pri
header.inter_cpi = inter_cpi
header.tx_lo_offset = tx_lo_offset
header.rx_lo_offset = rx_lo_offset
data = np.frombuffer(bytes(header), dtype=np.uint32)
self.axi_write_register_burst(TIMING_ENGINE_ADDR + 0xC00, data)
def setup_timing_engine(self, pri, num_pulses, inter_cpi):
self.axi_write_register(TIMING_ENGINE_ADDR + 0x4, pri - 1)
self.axi_write_register(TIMING_ENGINE_ADDR + 0x8, num_pulses)
self.axi_write_register(TIMING_ENGINE_ADDR + 0x10, inter_cpi - 1)
def setup_rx(self, num_samples, start_sample, dec_rate):
for i in range(NUM_RX):
if JESD204B:
self.axi_write_register(DIG_RX_ADDR + i*DIG_RX_STRIDE + 0x4, num_samples >> 1)
self.axi_write_register(DIG_RX_ADDR + i*DIG_RX_STRIDE + 0x8, start_sample >> 1)
else:
self.axi_write_register(DIG_RX_ADDR + i*DIG_RX_STRIDE + 0x4, num_samples >> 2)
self.axi_write_register(DIG_RX_ADDR + i*DIG_RX_STRIDE + 0x8, start_sample >> 2)
# Decimation Selection
dec_sel = int(np.log2(dec_rate))
self.axi_write_register(DIG_RX_ADDR + i * DIG_RX_STRIDE + 0xC, dec_sel)
# Setup RX Strobe
# self.axi_write_register(TIMING_ENGINE_ADDR + 0x88 + i * 8, start_sample >> 2)
# self.axi_write_register(TIMING_ENGINE_ADDR + 0x8C + i * 8, num_samples >> 2)
# Just force the enable high all the time before we start running
self.axi_write_register(TIMING_ENGINE_ADDR + 0x88 + i * 8, 0x1FFFFFFF)
def setup_tx(self, num_samples, start_sample, num_wf):
if JESD204B:
self.axi_write_register(WAVEFORM_GEN_ADDR + 0x4, num_samples >> 1)
self.axi_write_register(WAVEFORM_GEN_ADDR + 0x8, start_sample >> 1)
else:
self.axi_write_register(WAVEFORM_GEN_ADDR + 0x4, num_samples >> 2)
self.axi_write_register(WAVEFORM_GEN_ADDR + 0x8, start_sample >> 2)
self.axi_write_register(WAVEFORM_GEN_ADDR + 0xC, num_wf)
# Setup TX Strobe
# self.axi_write_register(TIMING_ENGINE_ADDR + 0x80, start_sample >> 2)
# self.axi_write_register(TIMING_ENGINE_ADDR + 0x84, num_samples >> 2)
# Just force the enable high all the time before we start running
self.axi_write_register(TIMING_ENGINE_ADDR + 0x80, 0x1FFFFFFF)
def start_running(self):
for i in range(NUM_RX):
self.axi_write_register(DIG_RX_ADDR + i*DIG_RX_STRIDE + 0x0, 0) # RX Reset
self.axi_write_register(WAVEFORM_GEN_ADDR + 0x0, 0) # TX Reset
self.axi_write_register(TIMING_ENGINE_ADDR + 0x0, 0) # Timing Reset (do this last)
def stop_running(self):
self.axi_write_register(TIMING_ENGINE_ADDR + 0x0, 1) # Timing Engine Reset
for i in range(NUM_RX):
self.axi_write_register(DIG_RX_ADDR + i*DIG_RX_STRIDE + 0x0, 1) # RX Reset
self.axi_write_register(TIMING_ENGINE_ADDR + 0x88 + i * 8, 0x0FFFFFF) # Clear RX Enable
self.axi_write_register(WAVEFORM_GEN_ADDR + 0x0, 1) # TX Reset
self.axi_write_register(TIMING_ENGINE_ADDR + 0x80, 0x0FFFFFF) # Clear TX Enable
def setup_rf_attenuators(self, rf_atten):
self.rf_spi_write((1 << 0), 6, rf_atten[0]) # TX0 RF (ADRF5730)
self.rf_spi_write((1 << 1), 6, rf_atten[1]) # TX1 RF (ADRF5730)
self.rf_spi_write((1 << 2), 6, rf_atten[2]) # RX0 RF (ADRF5721)
self.rf_spi_write((1 << 3), 6, rf_atten[3]) # RX0 IF (HMC624)
self.rf_spi_write((1 << 4), 6, rf_atten[4]) # RX1 RF (ADRF5721)
self.rf_spi_write((1 << 5), 6, rf_atten[5]) # RX1 IF (HMC624)
def configure_cpi(self, pri, inter_cpi, num_pulses, num_samples, start_sample,
tx_num_samples, tx_start_sample, rx_lo_offset, tx_lo_offset, dec_rate):
num_wf = 4
self.load_waveform(0, 1, 0.05, tx_num_samples, num_wf)
self.load_waveform(1, 1, 0.05, tx_num_samples, num_wf)
total_bytes_cpi = num_pulses * num_samples * 4
self.axi_write_register(0x4005001C, total_bytes_cpi)
num_samples_quant = int(self.packet_size / 4)
if num_samples % num_samples_quant > 0:
print('Packet Size Invalid')
num_samples -= (num_samples % num_samples_quant)
if num_samples == 0:
num_samples = num_samples_quant
print('Updated num samples to', num_samples)
rf_atten = [1, 2, 3, 4, 5, 6]
self.setup_rf_attenuators(rf_atten)
adc_nco = 1e9 % f_adc
dac_nco = 1e9 % f_dac
# adc_nco = 2e9
# adc_nyquist_zone = np.floor(adc_nco / (f_adc / 2))
# adc_nco = adc_nco % f_adc
# if adc_nyquist_zone % 2:
# # In even nyquist
# adc_nco -= f_adc
print(adc_nco)
for i in range(4):
self.set_adc_nco(i, adc_nco)
self.set_dac_nco(i, dac_nco)
self.setup_timing_engine(pri, num_pulses, inter_cpi)
self.setup_rx(num_samples, start_sample, dec_rate)
self.setup_tx(tx_num_samples, tx_start_sample, num_wf)
self.setup_cpi_header(pri, inter_cpi, num_pulses, num_samples, start_sample,
tx_num_samples, tx_start_sample, rx_lo_offset, tx_lo_offset)