diff --git a/peregrine/analysis/tracking_loop.py b/peregrine/analysis/tracking_loop.py index 4183cd1..c6c36d3 100755 --- a/peregrine/analysis/tracking_loop.py +++ b/peregrine/analysis/tracking_loop.py @@ -11,7 +11,7 @@ # WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE. import argparse -import sys + from peregrine.samples import load_samples from peregrine.acquisition import AcquisitionResult from peregrine import defaults @@ -75,8 +75,6 @@ def main(): freq_profile = defaults.freq_profile_normal_rate elif args.profile == 'high_rate': freq_profile = defaults.freq_profile_high_rate - else: - raise NotImplementedError() isL1CA = (args.signal == L1CA) isL2C = (args.signal == L2C) diff --git a/peregrine/run.py b/peregrine/run.py index ff7be2e..e3da633 100755 --- a/peregrine/run.py +++ b/peregrine/run.py @@ -54,6 +54,7 @@ def __call__(self, parser, namespace, file_hnd, option_string=None): file_hnd.close() + def populate_peregrine_cmd_line_arguments(parser): if sys.stdout.isatty(): progress_bar_default = 'stdout' diff --git a/peregrine/tracking.py b/peregrine/tracking.py index 55c33df..01e365f 100644 --- a/peregrine/tracking.py +++ b/peregrine/tracking.py @@ -250,13 +250,12 @@ def start(self): logger.info("[PRN: %d (%s)] Tracking is started. " "IF: %.1f, Doppler: %.1f, code phase: %.1f, " - "sample channel: %d sample index: %d" % + "sample index: %d" % (self.prn + 1, self.signal, self.IF, self.acq.doppler, self.acq.code_phase, - self.acq.sample_channel, self.acq.sample_index)) def get_index(self): @@ -358,6 +357,7 @@ def run(self, samples): """ + self.start() self.samples = samples if self.sample_index < samples['sample_index']: @@ -518,14 +518,13 @@ def run(self, samples): self.track_result.ms_tracked[self.i] = self.samples_tracked * 1e3 / \ self.sampling_freq + self.track_result.status = 'T' + self.i += 1 if self.i >= self.results_num: self.dump() self.sample_index += samples_processed - self.track_result.status = 'T' - if self.i > 0: - self.dump() return self._get_result() @@ -1084,10 +1083,6 @@ def __init__(self, n_points, prn, signal): self.signal = signal self.ms_tracked = np.zeros(n_points) - def __str__(self): - return "PRN %2d (%s) %s" % \ - (self.prn + 1, self.signal, self.status) - def dump(self, output_file, size): """ Store tracking result to file system. diff --git a/tests/test_common.py b/tests/test_common.py index 6826fca..c96e3c0 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -17,6 +17,34 @@ from mock import patch +def fileformat_to_bands(file_format): + if file_format == '1bit': + bands = ['l1ca'] + elif file_format == '1bit_x2': + bands = ['l1ca', 'l2c'] + elif file_format == '2bits': + bands = ['l1ca'] + elif file_format == '2bits_x2': + bands = ['l1ca', 'l2c'] + elif file_format == '2bits_x4': + bands = ['l1ca', 'l2c'] + return bands + + +def get_skip_params(skip_samples, skip_ms): + if skip_samples is not None: + skip_param = '--skip-samples' + skip_val = skip_samples + elif skip_ms is not None: + skip_param = '--skip-ms' + skip_val = skip_ms + else: + skip_param = '--skip-ms' + skip_val = 0 + return (skip_param, skip_val) + + + def generate_2bits_x4_sample_file(filename): sample_block_size = 4 # [bits] s_file = np.memmap(filename, offset=0, dtype=np.uint8, mode='r') @@ -120,14 +148,17 @@ def generate_sample_file(gps_sv_prn, init_doppler, def run_peregrine(file_name, file_format, freq_profile, skip_param, skip_val, skip_tracking=True, - skip_navigation=True): + skip_navigation=True, + pipelining=None, + short_long_cycles=None): parameters = [ 'peregrine', '--file', file_name, '--file-format', file_format, '--profile', freq_profile, - skip_param, str(skip_val) + skip_param, str(skip_val), + '--progress-bar', 'stdout' ] if skip_tracking: parameters += ['-t'] @@ -135,6 +166,12 @@ def run_peregrine(file_name, file_format, freq_profile, if skip_navigation: parameters += ['-n'] + if pipelining: + parameters += ['--pipelining', str(pipelining)] + + if short_long_cycles: + parameters += ['--short-long-cycles', str(short_long_cycles)] + # Replace argv with args to skip tracking and navigation. with patch.object(sys, 'argv', parameters): print "sys.argv = ", sys.argv diff --git a/tests/test_tracking.py b/tests/test_tracking.py index b41ec7c..41574c0 100644 --- a/tests/test_tracking.py +++ b/tests/test_tracking.py @@ -9,36 +9,81 @@ # EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE. -from test_common import generate_sample_file,\ - run_peregrine,\ - propagate_code_phase,\ - get_sampling_freq +from peregrine.gps_constants import l1, l2, L1CA, L2C +from test_common import generate_sample_file, fileformat_to_bands,\ + get_skip_params, run_peregrine from test_acquisition import get_acq_result_file_name +from peregrine.analysis import tracking_loop +from peregrine.tracking import TrackingLoop, NavBitSync, NavBitSyncSBAS,\ + NBSLibSwiftNav, NBSSBAS, NBSMatchBit,\ + NBSHistogram, NBSMatchEdge import cPickle +import csv import os +import sys +from mock import patch -def get_track_result_file_name(sample_file, prn): + +def run_tracking_loop(prn, signal, dopp, phase, file_name, file_format, + freq_profile, skip_val, norun=False, l2chandover=False, + pipelining=None, short_long_cycles=None): + parameters = [ + 'tracking_loop', + '-P', str(prn), + '-p', str(phase), + '-d', str(dopp), + '-S', signal, + '--file', file_name, + '--file-format', file_format, + '--profile', freq_profile, + '--skip-samples', str(skip_val) + ] + + if pipelining: + parameters += ['--pipelining', str(pipelining)] + elif short_long_cycles: + parameters += ['--short-long-cycles', str(short_long_cycles)] + + if norun: + parameters.append('--no-run') + + if l2chandover: + parameters.append('--l2c-handover') + + # Replace argv with args to skip tracking and navigation. + with patch.object(sys, 'argv', parameters): + print "sys.argv = ", sys.argv + tracking_loop.main() + + +def get_track_result_file_name(sample_file, prn, band): sample_file, sample_file_extension = os.path.splitext(sample_file) - return sample_file + (".PRN-%d.%s" % (prn, 'l1ca')) +\ - sample_file_extension + '.track_results' + return (sample_file + (".PRN-%d.%s" % (prn, band)) + + sample_file_extension + '.track_results', + "track.PRN-%d.%s.csv" % (prn, band)) + + +def get_peregrine_tr_res_file_name(sample_file, prn, band): + per_fn, tr_loop_fn = get_track_result_file_name(sample_file, prn, band) + return per_fn + +def get_tr_loop_res_file_name(sample_file, prn, band): + per_fn, tr_loop_fn = get_track_result_file_name(sample_file, prn, band) + return tr_loop_fn -def run_track_test(init_doppler, init_code_phase, + +def run_track_test(expected_lock_ratio, init_doppler, init_code_phase, prns, file_format, freq_profile='low_rate', - skip_samples=None, skip_ms=None): - - if skip_samples is not None: - skip_param = '--skip-samples' - skip_val = skip_samples - elif skip_ms is not None: - skip_param = '--skip-ms' - skip_val = skip_ms - else: - skip_param = '--skip-ms' - skip_val = 0 + skip_samples=None, skip_ms=None, + pipelining=None, short_long_cycles=None): + + bands = fileformat_to_bands(file_format) + + skip_param, skip_val = get_skip_params(skip_samples, skip_ms) for prn in prns: samples_filename = generate_sample_file(prn, init_doppler, @@ -46,43 +91,139 @@ def run_track_test(init_doppler, init_code_phase, file_format, freq_profile, generate=10) run_peregrine(samples_filename, file_format, freq_profile, - skip_param, skip_val, skip_tracking=False) + skip_param, skip_val, skip_tracking=False, + pipelining=pipelining, short_long_cycles=short_long_cycles) + + for band in bands: + dopp_ratio = 1 + if band == "l2c": + dopp_ratio = l2 / l1 + run_tracking_loop(prn, band, init_doppler * dopp_ratio, init_code_phase, + samples_filename, file_format, freq_profile, 0, + pipelining=pipelining, + short_long_cycles=short_long_cycles) + + #code_phase = propagate_code_phase(init_code_phase, + #get_sampling_freq(freq_profile), + #skip_param, skip_val) - code_phase = propagate_code_phase(init_code_phase, - get_sampling_freq(freq_profile), - skip_param, skip_val) + check_per_track_results(expected_lock_ratio, samples_filename, prn, bands, + pipelining, short_long_cycles) - check_track_results(samples_filename, prn, init_doppler, code_phase) + check_tr_loop_track(expected_lock_ratio, samples_filename, prn, bands, + pipelining, short_long_cycles) # Clean-up. os.remove(get_acq_result_file_name(samples_filename)) - os.remove(get_track_result_file_name(samples_filename, prn)) + for band in bands: + os.remove(get_peregrine_tr_res_file_name(samples_filename, prn, band)) + os.remove(get_tr_loop_res_file_name(samples_filename, prn, band)) os.remove(samples_filename) -def check_track_results(filename, prn, doppler, code_phase): - with open(get_track_result_file_name(filename, prn), 'rb') as f: - track_results = cPickle.load(f) - - print "result = ", track_results - lock_ratio = float((track_results.lock_detect_outp == 1).sum()) /\ - len(track_results.lock_detect_outp) - print "lock_ratio = ", lock_ratio - assert lock_ratio > 0.1 - assert (track_results.prn + 1) == prn - #assert track_results.status == 'T' +def check_per_track_results(expected_lock_ratio, filename, prn, bands, + pipelining, short_long_cycles): + ret = {} + print "Peregrine tracking:" + for band in bands: + ret[band] = {} + with open(get_peregrine_tr_res_file_name(filename, prn, band), 'rb') as f: + lock_detect_outp_sum = 0 + lock_detect_outp_len = 0 + while True: + try: + track_results = cPickle.load(f) + assert (track_results.prn + 1) == prn + assert track_results.status == 'T' + assert track_results == track_results + lock_detect_outp_sum += (track_results.lock_detect_outp == 1).sum() + lock_detect_outp_len += len(track_results.lock_detect_outp) + assert track_results.resize(1) == track_results.resize(1) + except EOFError: + break + print "band =", band + lock_ratio = float(lock_detect_outp_sum) / lock_detect_outp_len + print "lock_ratio =", lock_ratio + if (not short_long_cycles and not pipelining) or band != L2C: + assert lock_ratio >= expected_lock_ratio + ret[band]['lock_ratio'] = lock_ratio + return ret + + +def check_tr_loop_track(expected_lock_ratio, filename, prn, bands, + pipelining, short_long_cycles): + ret = {} + print "Tracking loop:" + for band in bands: + ret[band] = {} + with open(get_tr_loop_res_file_name(filename, prn, band), 'rb') as f: + reader = csv.reader(f) + hdr = reader.next() + ldo_idx = hdr.index("lock_detect_outp") + lock_detect_outp_sum = 0 + lock_detect_outp_len = 0 + for row in reader: + lock_detect_outp_len += 1 + lock_detect_outp_sum += int(row[ldo_idx]) + print "band =", band + lock_ratio = float(lock_detect_outp_sum) / lock_detect_outp_len + print "lock_ratio =", lock_ratio + if (not short_long_cycles and not pipelining) or band != L2C: + assert lock_ratio >= expected_lock_ratio + ret[band]['lock_ratio'] = lock_ratio + return ret def test_tracking(): """ - Test GPS L1C/A tracking + Test GPS L1C/A and L2C tracking """ - run_track_test(1000, 0, [1], '2bits_x2') + run_track_test(0.7, 555, 0, [1], '2bits_x2') + run_track_test(0.3, 333, 0, [1], '2bits_x2', pipelining=0.5) + run_track_test(0.3, -100, 0, [1], '2bits_x2', short_long_cycles=0.5) + + # test --no-run + run_tracking_loop(1, L1CA, 0, 0, 'dummy', '2bits_x2', 'low_rate', 0, + norun=True) # Test with different initial code phases # for code_phase in [0, 310, 620, 967]: # run_acq_test(-2345, code_phase, prns, '2bits') + try: + TrackingLoop().start(None, None,) + assert False + except NotImplementedError: + pass + + try: + TrackingLoop().update(None, None, None) + assert False + except NotImplementedError: + pass + + try: + NavBitSync().update_bit_sync(0, 0) + assert False + except NotImplementedError: + pass + + nvb = NavBitSync() + # Test ._equal + nvb.synced = True + assert NavBitSync() != nvb + nvb.dummy = 'dummy' + assert NavBitSync() != nvb + + # Test constructors + assert NBSMatchBit() + assert NavBitSyncSBAS() + assert NBSSBAS() + assert NBSLibSwiftNav() + assert NBSHistogram() + assert NBSMatchEdge() + + if __name__ == '__main__': test_tracking()