From a83afeda4d592e9c5a591bbd4e9172dac01c17bc Mon Sep 17 00:00:00 2001 From: Bruce Flynn <brucef@ssec.wisc.edu> Date: Tue, 2 Jan 2018 12:53:18 -0600 Subject: [PATCH] use (days, millis, micros) rather than stamp for packet ops Using stamp could possibly cause issues due to UTC datetimes not being aware of leap seconds. --- edosl0util/crgen.py | 38 +++++++++++++++++--------------------- edosl0util/headers.py | 26 ++++++++++++++------------ edosl0util/merge.py | 30 +++++++++++++++--------------- edosl0util/stream.py | 16 ++++++++-------- 4 files changed, 54 insertions(+), 56 deletions(-) diff --git a/edosl0util/crgen.py b/edosl0util/crgen.py index d1ab600..c6ccb47 100644 --- a/edosl0util/crgen.py +++ b/edosl0util/crgen.py @@ -28,7 +28,7 @@ def diff_crs(real_file, generated_file): diff = difflib.ndiff(pprint.pformat(real).splitlines(), pprint.pformat(generated).splitlines()) for line in diff: - print line + print(line) def make_comparable(real, generated): insert_fake_cr_info(real) @@ -115,7 +115,7 @@ def pds_id_from_path(pds_file): def get_pds_creation_time(pds_file_or_id): """Parse 11-char creation time out of a PDS ID or file name; return a DaySegmentedTimecode""" pds_file_or_id = os.path.basename(pds_file_or_id) - return datetime_to_ccsds(datetime.strptime(pds_file_or_id[22:33], '%y%j%H%M%S')) + return create_timecode(datetime.strptime(pds_file_or_id[22:33], '%y%j%H%M%S')) def build_apid_info(scan_apid_info): @@ -186,12 +186,12 @@ def scan_packets(pds_file, prev_pds_file=None): apid_map[pkt.apid] = init_entry(pkt, entry_from_prev_pds) else: update_entry(entry, pkt) - if pkt.stamp: + if pkt.cds_timecode: if not first_pkt_time: - first_pkt_time = pkt.stamp - last_pkt_time = pkt.stamp - return {'first_packet_time': datetime_to_ccsds(first_pkt_time), - 'last_packet_time': datetime_to_ccsds(last_pkt_time), + first_pkt_time = pkt.cds_timecode + last_pkt_time = pkt.cds_timemcode + return {'first_packet_time': create_timecode(first_pkt_time), + 'last_packet_time': create_timecode(last_pkt_time), 'apid_info': [apid_map[k] for k in sorted(apid_map)]} def build_prev_apid_map(prev_pds_file): @@ -202,8 +202,8 @@ def scan_packets(pds_file, prev_pds_file=None): def init_entry(pkt, entry_from_prev_pds): rv = {'apid': pkt.apid, - 'first_packet_time': datetime_to_ccsds(pkt.stamp), 'first_packet_offset': pkt.offset, - 'last_packet_time': datetime_to_ccsds(pkt.stamp), 'last_packet_ssc': pkt.seqid, + 'first_packet_time': create_timecode(pkt.cds_timecode), 'first_packet_offset': pkt.offset, + 'last_packet_time': create_timecode(pkt.cds_timecode), 'last_packet_ssc': pkt.seqid, 'total_packets': 1, 'total_bytes': pkt.size, 'gap_info': []} if entry_from_prev_pds: update_gap_info(rv['gap_info'], entry_from_prev_pds['last_packet_ssc'], @@ -213,10 +213,10 @@ def scan_packets(pds_file, prev_pds_file=None): def update_entry(entry, new_pkt): prev_last_ssc = entry['last_packet_ssc'] prev_last_time = entry['last_packet_time'] - if new_pkt.stamp: + if new_pkt.cds_timecode: if entry['first_packet_time'] == DaySegmentedTimecode(): - entry['first_packet_time'] = datetime_to_ccsds(new_pkt.stamp) - entry['last_packet_time'] = datetime_to_ccsds(new_pkt.stamp) + entry['first_packet_time'] = create_timecode(new_pkt.cds_timecode) + entry['last_packet_time'] = create_timecode(new_pkt.cds_timecode) entry['last_packet_ssc'] = new_pkt.seqid entry['total_packets'] += 1 entry['total_bytes'] += new_pkt.size @@ -229,24 +229,20 @@ def scan_packets(pds_file, prev_pds_file=None): gap_entry = {'first_missing_ssc': expected_new_ssc, 'missing_packet_count': (new_pkt.seqid - expected_new_ssc) % ssc_limit, 'pre_gap_packet_time': last_pkt_time, - 'post_gap_packet_time': datetime_to_ccsds(new_pkt.stamp), + 'post_gap_packet_time': create_timecode(new_pkt.cds_timecode), 'post_gap_packet_offset': new_pkt.offset} gap_info.append(gap_entry) return main() -def datetime_to_ccsds(dt): - """Convert a packet stamp to DaySegmentedTimecode +def create_timecode(tc): + """Convert a packet cds_timecode to DaySegmentedTimecode Handles input of None by returning epoch value of 1958-01-01. """ - if dt is not None: - days = (dt - idps_epoch).days - micros = int((dt - datetime(dt.year, dt.month, dt.day)).total_seconds() * 1e6) - return DaySegmentedTimecode(days, micros // 1000, micros % 1000) - else: - return DaySegmentedTimecode() + return DaySegmentedTimecode(tc.days, tc.millis, tc.micros) if tc else DaySegmentedTimecode() + idps_epoch = datetime(1958, 1, 1) diff --git a/edosl0util/headers.py b/edosl0util/headers.py index 4a467b0..12b319e 100644 --- a/edosl0util/headers.py +++ b/edosl0util/headers.py @@ -14,7 +14,7 @@ See: __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." import ctypes as c -from datetime import datetime, timedelta +from datetime import datetime from edosl0util.timecode import cds_to_dt @@ -64,9 +64,15 @@ class Timecode(BaseStruct): def __repr__(self): return str(self.asdatetime()) - def asdatetime(self): + def day_segmented_timecode(self): raise NotImplementedError() + def asdatetime(self): + """ + CDS Timecode a UTC datetime object. + """ + return cds_to_dt(*self.day_segmented_timecode()) + class AquaCucTimecode(Timecode): """ @@ -93,16 +99,12 @@ class AquaCucTimecode(Timecode): EPOCH_SECS = (EPOCH - datetime(1970, 1, 1)).total_seconds() SUB_SECOND_UNITS = 15.2 - def asdatetime(self): - """ - Return converted to UTC where leap seconds are as defined in `leap_seconds`. - """ + def day_segmented_timecode(self): micros = self.SUB_SECOND_UNITS * self.sub_seconds seconds = self.seconds + self.leap_seconds - return cds_to_dt( - (seconds - self.leap_seconds) // 86400, - micros // 1e3, - micros % 1e3) + return ((seconds - self.leap_seconds) // 86400, + micros // 1e3, + micros % 1e3) class DaySegmentedTimecode(Timecode): @@ -116,8 +118,8 @@ class DaySegmentedTimecode(Timecode): ('microseconds', c.c_uint16) ] - def asdatetime(self): - return cds_to_dt(self.days, self.milliseconds, self.microseconds) + def day_segmented_timecode(self): + return (self.days, self.milliseconds, self.microseconds) class AquaGirdSecondaryHeader(BaseStruct): diff --git a/edosl0util/merge.py b/edosl0util/merge.py index 46479f9..64c951d 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -18,11 +18,11 @@ VIIRS_APID_ORDER = (826, 821) + tuple(range(800,821)) + tuple(range(822,826)) class _Ptr(object): """ - Represents one or more packets that share the same time stamp and apid. + Represents one or more packets that share the same time timecode and apid. """ - def __init__(self, fobj, stamp, apid, offset, size): + def __init__(self, fobj, timecode, apid, offset, size): self.fobj = fobj - self.stamp = stamp + self.timecode = timecode self.apid = apid self.offset = offset self.size = size @@ -38,13 +38,13 @@ class _Ptr(object): def __cmp__(self, that): return cmp( - (self.stamp, self.apid), - (that.stamp, that.apid) + (self.timecode, self.apid), + (that.timecode, that.apid) ) - # instances with same stamp/apid/size will compare the same + # instances with same timecode/apid/size will compare the same def __hash__(self): - return hash((self.stamp, self.apid, self.size)) + return hash((self.timecode, self.apid, self.size)) def bytes(self): self.fobj.seek(self.offset, os.SEEK_SET) @@ -57,29 +57,29 @@ def read_packet_index(stream): # drop any leading hanging packets count = 0 packet = stream.next() - while not packet.stamp: + while not packet.cds_timecode: packet = stream.next() count += 1 if count: LOG.info('dropped %d leading packets', count) while True: - if not packet.stamp: + if not packet.cds_timecode: # corrupt packet groups can cause apid mismatch # so skip until we get to the next group packet = stream.next() continue ptr = _Ptr( stream.file, - stamp=packet.stamp, + timecode=packet.cds_timecode, apid=packet.apid, offset=packet.offset, size=packet.size, ) index.append(ptr) - # collect all packets for this stamp/group + # collect all packets for this timecode/group packet = stream.next() - while not packet.stamp: + while not packet.cds_timecode: # Bail if we're collecting group packets and apids don't match # This means group is corrupt if ptr.apid != packet.apid: @@ -99,7 +99,7 @@ def _sort_by_time_apid(index, order=None): index = sorted(index, key=lambda p: order.index(p.apid) if p.apid in order else -1) else: index = sorted(index, key=lambda p: p.apid) - return sorted(index, key=lambda p: p.stamp) + return sorted(index, key=lambda p: p.cds_timecode) def _filter_duplicates_by_size(index): @@ -108,7 +108,7 @@ def _filter_duplicates_by_size(index): """ filtered = OrderedDict() for ptr in index: - key = (ptr.stamp, ptr.apid) + key = (ptr.timecode, ptr.apid) if key in filtered: if ptr.size > filtered[key].size: filtered[key] = ptr @@ -145,7 +145,7 @@ def merge(streams, output, trunc_to=None, apid_order=None): LOG.debug('writing index to %s', output) for ptr in index: if trunc_to: - if ptr.stamp >= trunc_to[0] and ptr.stamp < trunc_to[1]: + if ptr.timecode >= trunc_to[0] and ptr.timecode < trunc_to[1]: output.write(ptr.bytes()) else: output.write(ptr.bytes()) diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 60631fb..b6edc98 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -120,18 +120,18 @@ class Packet(object): __repr__ = __str__ @property - def stamp(self): + def cds_timecode(self): return ( self.secondary_header and hasattr(self.secondary_header, 'timecode') and - self.secondary_header.timecode.asdatetime() or None) + self.secondary_header.day_segmented_timecode() or None) @property - def timestamp(self): + def stamp(self): return ( self.secondary_header and hasattr(self.secondary_header, 'timecode') and - self.secondary_header.timecode.astimestamp() or None) + self.secondary_header.timecode.asdatetime() or None) def bytes(self): return bytearray(self.primary_header) + \ @@ -264,14 +264,14 @@ class PacketStream(object): """ # seek past partial groups packet = self.next() - while not packet.stamp: + while not packet.cds_timecode: packet = self.next() # seek to first packet => `stamp` - current_stamp = packet.stamp - while current_stamp < stamp: + current_tc = packet.cds_timecode + while current_tc < stamp: packet = self.next() - current_stamp = packet.stamp or current_stamp + current_tc = packet.cds_timecode or current_tc # put back so next packet is the first packet > `stamp` self.push_back(packet) -- GitLab