Skip to content
Snippets Groups Projects
Commit a83afeda authored by Bruce Flynn's avatar Bruce Flynn
Browse files

use (days, millis, micros) rather than stamp for packet ops

Using stamp could possibly cause issues due to UTC datetimes not being
aware of leap seconds.
parent ba818d3c
No related branches found
No related tags found
No related merge requests found
...@@ -28,7 +28,7 @@ def diff_crs(real_file, generated_file): ...@@ -28,7 +28,7 @@ def diff_crs(real_file, generated_file):
diff = difflib.ndiff(pprint.pformat(real).splitlines(), diff = difflib.ndiff(pprint.pformat(real).splitlines(),
pprint.pformat(generated).splitlines()) pprint.pformat(generated).splitlines())
for line in diff: for line in diff:
print line print(line)
def make_comparable(real, generated): def make_comparable(real, generated):
insert_fake_cr_info(real) insert_fake_cr_info(real)
...@@ -115,7 +115,7 @@ def pds_id_from_path(pds_file): ...@@ -115,7 +115,7 @@ def pds_id_from_path(pds_file):
def get_pds_creation_time(pds_file_or_id): def get_pds_creation_time(pds_file_or_id):
"""Parse 11-char creation time out of a PDS ID or file name; return a DaySegmentedTimecode""" """Parse 11-char creation time out of a PDS ID or file name; return a DaySegmentedTimecode"""
pds_file_or_id = os.path.basename(pds_file_or_id) pds_file_or_id = os.path.basename(pds_file_or_id)
return datetime_to_ccsds(datetime.strptime(pds_file_or_id[22:33], '%y%j%H%M%S')) return create_timecode(datetime.strptime(pds_file_or_id[22:33], '%y%j%H%M%S'))
def build_apid_info(scan_apid_info): def build_apid_info(scan_apid_info):
...@@ -186,12 +186,12 @@ def scan_packets(pds_file, prev_pds_file=None): ...@@ -186,12 +186,12 @@ def scan_packets(pds_file, prev_pds_file=None):
apid_map[pkt.apid] = init_entry(pkt, entry_from_prev_pds) apid_map[pkt.apid] = init_entry(pkt, entry_from_prev_pds)
else: else:
update_entry(entry, pkt) update_entry(entry, pkt)
if pkt.stamp: if pkt.cds_timecode:
if not first_pkt_time: if not first_pkt_time:
first_pkt_time = pkt.stamp first_pkt_time = pkt.cds_timecode
last_pkt_time = pkt.stamp last_pkt_time = pkt.cds_timemcode
return {'first_packet_time': datetime_to_ccsds(first_pkt_time), return {'first_packet_time': create_timecode(first_pkt_time),
'last_packet_time': datetime_to_ccsds(last_pkt_time), 'last_packet_time': create_timecode(last_pkt_time),
'apid_info': [apid_map[k] for k in sorted(apid_map)]} 'apid_info': [apid_map[k] for k in sorted(apid_map)]}
def build_prev_apid_map(prev_pds_file): def build_prev_apid_map(prev_pds_file):
...@@ -202,8 +202,8 @@ def scan_packets(pds_file, prev_pds_file=None): ...@@ -202,8 +202,8 @@ def scan_packets(pds_file, prev_pds_file=None):
def init_entry(pkt, entry_from_prev_pds): def init_entry(pkt, entry_from_prev_pds):
rv = {'apid': pkt.apid, rv = {'apid': pkt.apid,
'first_packet_time': datetime_to_ccsds(pkt.stamp), 'first_packet_offset': pkt.offset, 'first_packet_time': create_timecode(pkt.cds_timecode), 'first_packet_offset': pkt.offset,
'last_packet_time': datetime_to_ccsds(pkt.stamp), 'last_packet_ssc': pkt.seqid, 'last_packet_time': create_timecode(pkt.cds_timecode), 'last_packet_ssc': pkt.seqid,
'total_packets': 1, 'total_bytes': pkt.size, 'gap_info': []} 'total_packets': 1, 'total_bytes': pkt.size, 'gap_info': []}
if entry_from_prev_pds: if entry_from_prev_pds:
update_gap_info(rv['gap_info'], entry_from_prev_pds['last_packet_ssc'], update_gap_info(rv['gap_info'], entry_from_prev_pds['last_packet_ssc'],
...@@ -213,10 +213,10 @@ def scan_packets(pds_file, prev_pds_file=None): ...@@ -213,10 +213,10 @@ def scan_packets(pds_file, prev_pds_file=None):
def update_entry(entry, new_pkt): def update_entry(entry, new_pkt):
prev_last_ssc = entry['last_packet_ssc'] prev_last_ssc = entry['last_packet_ssc']
prev_last_time = entry['last_packet_time'] prev_last_time = entry['last_packet_time']
if new_pkt.stamp: if new_pkt.cds_timecode:
if entry['first_packet_time'] == DaySegmentedTimecode(): if entry['first_packet_time'] == DaySegmentedTimecode():
entry['first_packet_time'] = datetime_to_ccsds(new_pkt.stamp) entry['first_packet_time'] = create_timecode(new_pkt.cds_timecode)
entry['last_packet_time'] = datetime_to_ccsds(new_pkt.stamp) entry['last_packet_time'] = create_timecode(new_pkt.cds_timecode)
entry['last_packet_ssc'] = new_pkt.seqid entry['last_packet_ssc'] = new_pkt.seqid
entry['total_packets'] += 1 entry['total_packets'] += 1
entry['total_bytes'] += new_pkt.size entry['total_bytes'] += new_pkt.size
...@@ -229,24 +229,20 @@ def scan_packets(pds_file, prev_pds_file=None): ...@@ -229,24 +229,20 @@ def scan_packets(pds_file, prev_pds_file=None):
gap_entry = {'first_missing_ssc': expected_new_ssc, gap_entry = {'first_missing_ssc': expected_new_ssc,
'missing_packet_count': (new_pkt.seqid - expected_new_ssc) % ssc_limit, 'missing_packet_count': (new_pkt.seqid - expected_new_ssc) % ssc_limit,
'pre_gap_packet_time': last_pkt_time, 'pre_gap_packet_time': last_pkt_time,
'post_gap_packet_time': datetime_to_ccsds(new_pkt.stamp), 'post_gap_packet_time': create_timecode(new_pkt.cds_timecode),
'post_gap_packet_offset': new_pkt.offset} 'post_gap_packet_offset': new_pkt.offset}
gap_info.append(gap_entry) gap_info.append(gap_entry)
return main() return main()
def datetime_to_ccsds(dt): def create_timecode(tc):
"""Convert a packet stamp to DaySegmentedTimecode """Convert a packet cds_timecode to DaySegmentedTimecode
Handles input of None by returning epoch value of 1958-01-01. Handles input of None by returning epoch value of 1958-01-01.
""" """
if dt is not None: return DaySegmentedTimecode(tc.days, tc.millis, tc.micros) if tc else DaySegmentedTimecode()
days = (dt - idps_epoch).days
micros = int((dt - datetime(dt.year, dt.month, dt.day)).total_seconds() * 1e6)
return DaySegmentedTimecode(days, micros // 1000, micros % 1000)
else:
return DaySegmentedTimecode()
idps_epoch = datetime(1958, 1, 1) idps_epoch = datetime(1958, 1, 1)
......
...@@ -14,7 +14,7 @@ See: ...@@ -14,7 +14,7 @@ See:
__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."
import ctypes as c import ctypes as c
from datetime import datetime, timedelta from datetime import datetime
from edosl0util.timecode import cds_to_dt from edosl0util.timecode import cds_to_dt
...@@ -64,9 +64,15 @@ class Timecode(BaseStruct): ...@@ -64,9 +64,15 @@ class Timecode(BaseStruct):
def __repr__(self): def __repr__(self):
return str(self.asdatetime()) return str(self.asdatetime())
def asdatetime(self): def day_segmented_timecode(self):
raise NotImplementedError() raise NotImplementedError()
def asdatetime(self):
"""
CDS Timecode a UTC datetime object.
"""
return cds_to_dt(*self.day_segmented_timecode())
class AquaCucTimecode(Timecode): class AquaCucTimecode(Timecode):
""" """
...@@ -93,16 +99,12 @@ class AquaCucTimecode(Timecode): ...@@ -93,16 +99,12 @@ class AquaCucTimecode(Timecode):
EPOCH_SECS = (EPOCH - datetime(1970, 1, 1)).total_seconds() EPOCH_SECS = (EPOCH - datetime(1970, 1, 1)).total_seconds()
SUB_SECOND_UNITS = 15.2 SUB_SECOND_UNITS = 15.2
def asdatetime(self): def day_segmented_timecode(self):
"""
Return converted to UTC where leap seconds are as defined in `leap_seconds`.
"""
micros = self.SUB_SECOND_UNITS * self.sub_seconds micros = self.SUB_SECOND_UNITS * self.sub_seconds
seconds = self.seconds + self.leap_seconds seconds = self.seconds + self.leap_seconds
return cds_to_dt( return ((seconds - self.leap_seconds) // 86400,
(seconds - self.leap_seconds) // 86400, micros // 1e3,
micros // 1e3, micros % 1e3)
micros % 1e3)
class DaySegmentedTimecode(Timecode): class DaySegmentedTimecode(Timecode):
...@@ -116,8 +118,8 @@ class DaySegmentedTimecode(Timecode): ...@@ -116,8 +118,8 @@ class DaySegmentedTimecode(Timecode):
('microseconds', c.c_uint16) ('microseconds', c.c_uint16)
] ]
def asdatetime(self): def day_segmented_timecode(self):
return cds_to_dt(self.days, self.milliseconds, self.microseconds) return (self.days, self.milliseconds, self.microseconds)
class AquaGirdSecondaryHeader(BaseStruct): class AquaGirdSecondaryHeader(BaseStruct):
......
...@@ -18,11 +18,11 @@ VIIRS_APID_ORDER = (826, 821) + tuple(range(800,821)) + tuple(range(822,826)) ...@@ -18,11 +18,11 @@ VIIRS_APID_ORDER = (826, 821) + tuple(range(800,821)) + tuple(range(822,826))
class _Ptr(object): class _Ptr(object):
""" """
Represents one or more packets that share the same time stamp and apid. Represents one or more packets that share the same time timecode and apid.
""" """
def __init__(self, fobj, stamp, apid, offset, size): def __init__(self, fobj, timecode, apid, offset, size):
self.fobj = fobj self.fobj = fobj
self.stamp = stamp self.timecode = timecode
self.apid = apid self.apid = apid
self.offset = offset self.offset = offset
self.size = size self.size = size
...@@ -38,13 +38,13 @@ class _Ptr(object): ...@@ -38,13 +38,13 @@ class _Ptr(object):
def __cmp__(self, that): def __cmp__(self, that):
return cmp( return cmp(
(self.stamp, self.apid), (self.timecode, self.apid),
(that.stamp, that.apid) (that.timecode, that.apid)
) )
# instances with same stamp/apid/size will compare the same # instances with same timecode/apid/size will compare the same
def __hash__(self): def __hash__(self):
return hash((self.stamp, self.apid, self.size)) return hash((self.timecode, self.apid, self.size))
def bytes(self): def bytes(self):
self.fobj.seek(self.offset, os.SEEK_SET) self.fobj.seek(self.offset, os.SEEK_SET)
...@@ -57,29 +57,29 @@ def read_packet_index(stream): ...@@ -57,29 +57,29 @@ def read_packet_index(stream):
# drop any leading hanging packets # drop any leading hanging packets
count = 0 count = 0
packet = stream.next() packet = stream.next()
while not packet.stamp: while not packet.cds_timecode:
packet = stream.next() packet = stream.next()
count += 1 count += 1
if count: if count:
LOG.info('dropped %d leading packets', count) LOG.info('dropped %d leading packets', count)
while True: while True:
if not packet.stamp: if not packet.cds_timecode:
# corrupt packet groups can cause apid mismatch # corrupt packet groups can cause apid mismatch
# so skip until we get to the next group # so skip until we get to the next group
packet = stream.next() packet = stream.next()
continue continue
ptr = _Ptr( ptr = _Ptr(
stream.file, stream.file,
stamp=packet.stamp, timecode=packet.cds_timecode,
apid=packet.apid, apid=packet.apid,
offset=packet.offset, offset=packet.offset,
size=packet.size, size=packet.size,
) )
index.append(ptr) index.append(ptr)
# collect all packets for this stamp/group # collect all packets for this timecode/group
packet = stream.next() packet = stream.next()
while not packet.stamp: while not packet.cds_timecode:
# Bail if we're collecting group packets and apids don't match # Bail if we're collecting group packets and apids don't match
# This means group is corrupt # This means group is corrupt
if ptr.apid != packet.apid: if ptr.apid != packet.apid:
...@@ -99,7 +99,7 @@ def _sort_by_time_apid(index, order=None): ...@@ -99,7 +99,7 @@ def _sort_by_time_apid(index, order=None):
index = sorted(index, key=lambda p: order.index(p.apid) if p.apid in order else -1) index = sorted(index, key=lambda p: order.index(p.apid) if p.apid in order else -1)
else: else:
index = sorted(index, key=lambda p: p.apid) index = sorted(index, key=lambda p: p.apid)
return sorted(index, key=lambda p: p.stamp) return sorted(index, key=lambda p: p.cds_timecode)
def _filter_duplicates_by_size(index): def _filter_duplicates_by_size(index):
...@@ -108,7 +108,7 @@ def _filter_duplicates_by_size(index): ...@@ -108,7 +108,7 @@ def _filter_duplicates_by_size(index):
""" """
filtered = OrderedDict() filtered = OrderedDict()
for ptr in index: for ptr in index:
key = (ptr.stamp, ptr.apid) key = (ptr.timecode, ptr.apid)
if key in filtered: if key in filtered:
if ptr.size > filtered[key].size: if ptr.size > filtered[key].size:
filtered[key] = ptr filtered[key] = ptr
...@@ -145,7 +145,7 @@ def merge(streams, output, trunc_to=None, apid_order=None): ...@@ -145,7 +145,7 @@ def merge(streams, output, trunc_to=None, apid_order=None):
LOG.debug('writing index to %s', output) LOG.debug('writing index to %s', output)
for ptr in index: for ptr in index:
if trunc_to: if trunc_to:
if ptr.stamp >= trunc_to[0] and ptr.stamp < trunc_to[1]: if ptr.timecode >= trunc_to[0] and ptr.timecode < trunc_to[1]:
output.write(ptr.bytes()) output.write(ptr.bytes())
else: else:
output.write(ptr.bytes()) output.write(ptr.bytes())
...@@ -120,18 +120,18 @@ class Packet(object): ...@@ -120,18 +120,18 @@ class Packet(object):
__repr__ = __str__ __repr__ = __str__
@property @property
def stamp(self): def cds_timecode(self):
return ( return (
self.secondary_header and self.secondary_header and
hasattr(self.secondary_header, 'timecode') and hasattr(self.secondary_header, 'timecode') and
self.secondary_header.timecode.asdatetime() or None) self.secondary_header.day_segmented_timecode() or None)
@property @property
def timestamp(self): def stamp(self):
return ( return (
self.secondary_header and self.secondary_header and
hasattr(self.secondary_header, 'timecode') and hasattr(self.secondary_header, 'timecode') and
self.secondary_header.timecode.astimestamp() or None) self.secondary_header.timecode.asdatetime() or None)
def bytes(self): def bytes(self):
return bytearray(self.primary_header) + \ return bytearray(self.primary_header) + \
...@@ -264,14 +264,14 @@ class PacketStream(object): ...@@ -264,14 +264,14 @@ class PacketStream(object):
""" """
# seek past partial groups # seek past partial groups
packet = self.next() packet = self.next()
while not packet.stamp: while not packet.cds_timecode:
packet = self.next() packet = self.next()
# seek to first packet => `stamp` # seek to first packet => `stamp`
current_stamp = packet.stamp current_tc = packet.cds_timecode
while current_stamp < stamp: while current_tc < stamp:
packet = self.next() packet = self.next()
current_stamp = packet.stamp or current_stamp current_tc = packet.cds_timecode or current_tc
# put back so next packet is the first packet > `stamp` # put back so next packet is the first packet > `stamp`
self.push_back(packet) self.push_back(packet)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment