starting to look at waveform generator, just in python right now
This commit is contained in:
234
python/waveform_generator/wf_ofdm.py
Normal file
234
python/waveform_generator/wf_ofdm.py
Normal file
@@ -0,0 +1,234 @@
|
||||
import numpy as np
|
||||
|
||||
|
||||
# get one chip of a multi-chip OFDM waveform
|
||||
def get_chip(
|
||||
baseband,
|
||||
start_phase,
|
||||
f_samp,
|
||||
N_samp_chip,
|
||||
N_pulse,
|
||||
n_tx,
|
||||
n_pulse,
|
||||
n_sub_cpi,
|
||||
):
|
||||
# pylint: disable-msg=too-many-arguments
|
||||
# TODO: @Charlie, remove disable and fix
|
||||
# unused args
|
||||
|
||||
s_p = np.sum(
|
||||
[
|
||||
start_phase[n_tx, n_sub_cpi, nn % N_pulse]
|
||||
for nn in range(N_pulse * n_sub_cpi + n_pulse)
|
||||
]
|
||||
)
|
||||
|
||||
phase = (
|
||||
s_p + baseband[n_tx, n_sub_cpi, n_pulse] * np.arange(N_samp_chip) / f_samp
|
||||
)
|
||||
x = np.exp(2 * np.pi * 1j * phase)
|
||||
return x, phase
|
||||
|
||||
# synthesize the transmit waveform in intermediate frequency sample space
|
||||
def get_waveform(baseband, start_phase, N_tx, N_sub_cpi, N_f_pulse, N_samp_chip, N_samp, f_samp):
|
||||
pulse = np.zeros((N_tx, N_sub_cpi, N_samp), dtype=complex)
|
||||
phase = np.zeros((N_tx, N_sub_cpi, N_samp))
|
||||
|
||||
for n_tx in range(N_tx):
|
||||
for n_sub_cpi in range(N_sub_cpi):
|
||||
for n_f_pulse in range(N_f_pulse):
|
||||
(
|
||||
pulse[
|
||||
n_tx,
|
||||
n_sub_cpi,
|
||||
n_f_pulse * N_samp_chip: (
|
||||
N_samp_chip + n_f_pulse * N_samp_chip
|
||||
),
|
||||
],
|
||||
phase[
|
||||
n_tx,
|
||||
n_sub_cpi,
|
||||
n_f_pulse * N_samp_chip: (
|
||||
N_samp_chip + n_f_pulse * N_samp_chip
|
||||
),
|
||||
],
|
||||
) = get_chip(baseband, start_phase, f_samp, N_samp_chip, N_f_pulse, n_tx, n_f_pulse, n_sub_cpi)
|
||||
|
||||
return pulse, phase
|
||||
|
||||
|
||||
def get_pulse_params(n_tx, n_rx, f_samp, n_samp_chip, n_f, n_f_pulse, n_sub_cpi, n_pol, perm):
|
||||
f_chip = 5859375.0
|
||||
f_c = 16300000255.999992
|
||||
k_rgft = 256
|
||||
b = 23437500.0
|
||||
|
||||
|
||||
t_chip = 1 / f_chip
|
||||
n_tx_n_pol = n_tx * n_pol
|
||||
|
||||
# frequencies with respect to baseband
|
||||
baseband = f_chip * (perm - (n_f - 1) / 2)
|
||||
start_phase = baseband * t_chip
|
||||
|
||||
f_sa = f_c + baseband
|
||||
chip_samples = int(np.ceil(k_rgft * f_chip / f_samp))
|
||||
|
||||
chip_center_dmux = np.zeros((n_sub_cpi, n_rx, n_tx_n_pol, n_f_pulse), dtype=int)
|
||||
chip_lower_dmux = np.zeros((n_sub_cpi, n_rx, n_tx_n_pol, n_f_pulse), dtype=int)
|
||||
chip_upper_dmux = np.zeros((n_sub_cpi, n_rx, n_tx_n_pol, n_f_pulse), dtype=int)
|
||||
time_shift_dmux = np.zeros((n_sub_cpi, n_rx, n_tx_n_pol, n_f_pulse))
|
||||
f_sa_dmux = np.zeros((n_sub_cpi, n_rx, n_tx_n_pol, n_f_pulse))
|
||||
for n_sub_cpi_ind in range(n_sub_cpi):
|
||||
for n_rx_ind in range(n_rx):
|
||||
for n_tx_ind in range(n_tx_n_pol):
|
||||
for n_f_pulse_ind in range(n_f_pulse):
|
||||
time_shift_dmux[n_sub_cpi_ind, n_rx_ind, n_tx_ind, n_f_pulse_ind] = (
|
||||
n_f_pulse_ind * n_samp_chip + n_samp_chip / 2
|
||||
)
|
||||
chip_center_dmux[n_sub_cpi_ind, n_rx_ind, n_tx_ind, n_f_pulse_ind] = int(
|
||||
np.round(
|
||||
k_rgft
|
||||
* (baseband[n_tx_ind, n_sub_cpi_ind, n_f_pulse_ind] + b / 2)
|
||||
/ f_samp
|
||||
)
|
||||
)
|
||||
chip_lower_dmux[n_sub_cpi_ind, n_rx_ind, n_tx_ind, n_f_pulse_ind] = int(
|
||||
np.round(
|
||||
chip_center_dmux[n_sub_cpi_ind, n_rx_ind, n_tx_ind, n_f_pulse_ind]
|
||||
- chip_samples / 2
|
||||
)
|
||||
)
|
||||
chip_upper_dmux[n_sub_cpi_ind, n_rx_ind, n_tx_ind, n_f_pulse_ind] = (
|
||||
chip_lower_dmux[n_sub_cpi_ind, n_rx_ind, n_tx_ind, n_f_pulse_ind] + chip_samples
|
||||
)
|
||||
f_sa_dmux[n_sub_cpi_ind, n_rx_ind, n_tx_ind, n_f_pulse_ind] = f_sa[
|
||||
n_tx_ind, n_sub_cpi_ind, n_f_pulse_ind
|
||||
]
|
||||
return
|
||||
|
||||
def wg_ofdm():
|
||||
# wp = inputs.wp.proto # just to make code easier to read
|
||||
n_f = 4
|
||||
n_f_pulse = 4
|
||||
do_random = False
|
||||
do_agile = False
|
||||
n_sub_cpi = 1
|
||||
n_tx = 1
|
||||
n_rx = 2
|
||||
n_pol = 2
|
||||
n_samp = 32
|
||||
n_samp_chip = 8
|
||||
f_samp = 46875000.0
|
||||
tx_sample_rate_multiplier = 16
|
||||
n_gen_pulses = 1
|
||||
rng = np.random.default_rng(seed=42)
|
||||
perm_list = []
|
||||
sequence = np.arange(n_f)
|
||||
unique_sub_cpi = n_gen_pulses // n_sub_cpi
|
||||
for n_p in range(unique_sub_cpi):
|
||||
if do_random and (do_agile or n_p == 0):
|
||||
# if we are here, we are doing random pulses and
|
||||
# 1. it is pulse zero so we set the permutation for all of
|
||||
# the pulses
|
||||
# or
|
||||
# 2. we are agile and we set a new permutation for every pulse
|
||||
sequence = rng.permutation(n_f)
|
||||
elif n_p == 0:
|
||||
# if we are here, we are not random, so set the sequence equal
|
||||
# to a range, which will give an LFM waveform
|
||||
sequence = np.arange(n_f)
|
||||
if n_pol == 1:
|
||||
perm = np.reshape(
|
||||
sequence,
|
||||
(n_tx, n_sub_cpi, n_f_pulse),
|
||||
)
|
||||
elif n_pol == 2:
|
||||
perm = np.zeros(
|
||||
(
|
||||
n_pol * n_tx,
|
||||
n_sub_cpi,
|
||||
n_f_pulse,
|
||||
),
|
||||
dtype=int,
|
||||
)
|
||||
for n_tx_ind in range(n_tx):
|
||||
perm[n_tx_ind, :, :] = np.reshape(
|
||||
sequence[n_f * n_tx_ind: n_f * (n_tx_ind + 1)],
|
||||
(n_sub_cpi, n_f_pulse),
|
||||
)
|
||||
if n_tx_ind == 0:
|
||||
perm[n_tx + n_tx_ind, :, :] = np.reshape(
|
||||
sequence[n_f * (n_tx_ind + 1) - 1:: -1],
|
||||
(n_sub_cpi, n_f_pulse),
|
||||
)
|
||||
else:
|
||||
perm[n_tx + n_tx_ind, :, :] = np.reshape(
|
||||
sequence[n_f * (n_tx_ind + 1) - 1: n_f * n_tx_ind - 1: -1],
|
||||
(n_sub_cpi, n_f_pulse),
|
||||
)
|
||||
# fill the permutation we just generated in to a list
|
||||
perm_list.append(perm)
|
||||
|
||||
# generate a set of pulse parameters for every element of our list
|
||||
perm_list_full = []
|
||||
# at this point, we need to create pulse parameters for the full number of
|
||||
# pulses in the CPI, even though the waveform generator will only generate
|
||||
# a subset of those.
|
||||
# so N_pulses defined above is the number of pulses to generate
|
||||
# wp.N_pulses is the total number of pulses in a CPI.
|
||||
# note that here, N_pulses = wp.N_gen_pulses is the number of pulses
|
||||
# we use to size the output buffer.
|
||||
# Additional note, in the tactical system, wp.N_gen_pulses must always
|
||||
# be equal to wp.N_pulses, these two only differ in testing.
|
||||
for n_p in range(unique_sub_cpi):
|
||||
perm_list_full.append(perm_list[n_p % unique_sub_cpi])
|
||||
|
||||
# pp_list = PulseParamsList(
|
||||
# pulse_params=[get_pulse_params(wp=wp, perm=p) for p in perm_list_full]
|
||||
# )
|
||||
|
||||
[get_pulse_params(n_tx, n_rx, f_samp, n_samp_chip, n_f, n_f_pulse, n_sub_cpi, n_pol, p) for p in perm_list_full]
|
||||
|
||||
subpulses = np.ndarray(
|
||||
shape=(
|
||||
unique_sub_cpi,
|
||||
n_tx * n_pol,
|
||||
n_sub_cpi,
|
||||
n_samp * tx_sample_rate_multiplier,
|
||||
2, # I/Q
|
||||
),
|
||||
dtype=np.uint16,
|
||||
)
|
||||
|
||||
amplitude_quant = 2 ** (16 - 1) - 1
|
||||
zero_quant = 0
|
||||
|
||||
# This generates transmit waveforms for the whole CPI
|
||||
for n_cpi in range(unique_sub_cpi):
|
||||
pulse, _phase = get_waveform(
|
||||
np.array(
|
||||
pp_list.pulse_params[n_cpi].baseband.elements, dtype=np.double
|
||||
).reshape(tuple(pp_list.pulse_params[n_cpi].baseband.dimensions)),
|
||||
np.array(
|
||||
pp_list.pulse_params[n_cpi].start_phase.elements, dtype=np.double
|
||||
).reshape(tuple(pp_list.pulse_params[n_cpi].start_phase.dimensions)),
|
||||
n_tx * n_pol,
|
||||
n_sub_cpi,
|
||||
n_f_pulse,
|
||||
n_samp_chip * tx_sample_rate_multiplier,
|
||||
n_samp * tx_sample_rate_multiplier,
|
||||
f_samp * tx_sample_rate_multiplier,
|
||||
)
|
||||
|
||||
subpulses[n_cpi, ..., 0] = np.round(
|
||||
(amplitude_quant * np.real(pulse) + zero_quant).astype(np.uint16)
|
||||
)
|
||||
subpulses[n_cpi, ..., 1] = np.round(
|
||||
(amplitude_quant * np.imag(pulse) + zero_quant).astype(np.uint16)
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
if __name__ == '__main__':
|
||||
wg_ofdm()
|
||||
Reference in New Issue
Block a user