diff --git a/edosl0util/cli.py b/edosl0util/cli.py index 270851685bd5927a6141149304ef37099c72bb2d..b676a663e5eda7596bcaa93e1b69d751c86d99c1 100644 --- a/edosl0util/cli.py +++ b/edosl0util/cli.py @@ -3,7 +3,7 @@ Console script entry points for CLI tools. """ from datetime import datetime -from edosl0util import split, dump +from edosl0util import split def cmd_split(): @@ -14,33 +14,8 @@ def cmd_split(): parser.add_argument('filepath') args = parser.parse_args() - for time, blob in split.split(args.filepath, args.minutes): + for time, offset, count, blob in split.split(open(args.filepath), args.minutes): stamp = datetime.utcfromtimestamp(time) filepath = stamp.strftime(args.output_format) - print("writing %s" % filepath) + print("writing %d packets from offset %d to %s" % (count, offset, filepath)) open(filepath, 'wb', buffering=0).write(blob) - - -def cmd_dump(): - from argparse import ArgumentParser - parser = ArgumentParser(description=dump.__doc__) - parser.add_argument('-v', '--verbose', action='store_true') - parser.add_argument('pds') - args = parser.parse_args() - - def handler(pkt): - print(pkt.primary_header) - if pkt.secondary_header: - print("\t%s : %s" % (pkt.stamp, pkt.secondary_header)) - - stats = dump.dump(args.pds, handler=(handler if args.verbose else None)) - print("First Packet: {}".format(stats['first_pkt'])) - print("Last Packet: {}".format(stats['last_pkt'])) - print("Packet Count: {:d}".format(stats['packet_count'])) - print("Group Count: {:d}".format(stats['group_count'])) - print("Missing Count: {:d}".format(stats['missing_count'])) - apids = sorted(stats['apids']) - for apid in apids: - count = stats['apids'][apid]['count'] - missing = stats['apids'][apid]['missing'] - print("APID {:d} Count:{:d} Missing:{:d}".format(apid, count, missing)) diff --git a/edosl0util/dump.py b/edosl0util/dump.py index 53cb61184860233e9800d54db1b7a5b19b4c61ca..0fecf934183e298aedb44ba2bbc93a7681c032d0 100644 --- a/edosl0util/dump.py +++ b/edosl0util/dump.py @@ -18,7 +18,6 @@ def dump(filepath, handler=None): last_pkt = datetime(1900, 1, 1) packets = 0 groups = 0 - missing = 0 # sequence nums are per APID apids = defaultdict(lambda: {'count': 0, 'prev_seq': 0, 'missing': 0}) for pkt in PacketStream(open(filepath)): @@ -29,10 +28,6 @@ def dump(filepath, handler=None): apids[h1.apid]['count'] += 1 seq = h1.source_sequence_count - expected_seq = (apids[h1.apid]['prev_seq'] + 1) % MAX_SEQ_ID - if seq != expected_seq: - missing += 1 - apids[h1.apid]['missing'] += 1 apids[h1.apid]['prev_seq'] = seq if pkt.stamp: @@ -49,6 +44,5 @@ def dump(filepath, handler=None): last_pkt=last_pkt, packet_count=packets, group_count=groups, - missing_count=missing, apids=apids ) \ No newline at end of file diff --git a/edosl0util/headers.py b/edosl0util/headers.py new file mode 100644 index 0000000000000000000000000000000000000000..8837e12bd0fa991815cd8e6de27dc42e7c665ceb --- /dev/null +++ b/edosl0util/headers.py @@ -0,0 +1,225 @@ +# -*- coding: utf-8 -*- +""" +CCSDS BaseStruct implementations for AQUA. + +See: + + ICD_Space_Ground_Aqua.pdf + GSFC 422-11-19-03 + +Last known location: + http://directreadout.sci.gsfc.nasa.gov/links/rsd_eosdb/PDF/ICD_Space_Ground_Aqua.pdf +""" + +import ctypes as c +from datetime import datetime, timedelta + +from edos.ccsds import ( + GROUP_FIRST, + GROUP_LAST, + GROUP_CONTINUING, + GROUP_STANDALONE +) + +from edosl0util.timecode import cds_stamp + + +class BaseStruct(c.BigEndianStructure): + _pack_ = 1 + + def __str__(self): + return "<" + ' '.join('%s=%s' % (f[0], getattr(self, f[0])) for f in self._fields_) + " >" + + def __repr__(self): + fields = ', '.join('%s=%s' % (f[0], repr(getattr(self, f[0]))) for f in self._fields_) + return '<%s (%s)>' % (self.__class__.__name__, fields) + + +class AquaCucTimecode(BaseStruct): + """ + EOS AQUA implementation of a CCSDS Unsegmented Timecode. An CUT is an + agency defined timecode which is in this case (I think) specific to Aqua. + """ + _fields_ = [ + ('extension_flag', c.c_uint8, 1), # 1 + ('timecode_epoch', c.c_uint8, 3), # 010 == Jan 1, 1958 + ('coarse_time_count', c.c_uint8, 2), # 11 == 4 octets + ('fine_time_count', c.c_uint8, 2), # 10 == 2 octets + ('no_continuation_flag', c.c_uint8, 1), # 0 + ('leap_seconds', c.c_uint8, 7), + ('seconds', c.c_uint32), + ('sub_seconds', c.c_uint16) + ] + + EPOCH = datetime(1958, 1, 1) + SUB_SECOND_UNITS = 15.2 + + def asdatetime(self): + """ + Return converted to UTC where leap seconds are as defined in `leap_seconds`. + + FIXME: Verify this conversion is correct, specfically the use of + SUB_SECOND_UNIT. + """ + seconds = self.seconds + self.leap_seconds + micros = self.SUB_SECOND_UNITS * self.sub_seconds + return self.EPOCH + timedelta(seconds=seconds, microseconds=micros) + + +class DaySegmentedTimecode(BaseStruct): + _pack_ = 1 + _fields_ = [ + ('days', c.c_uint16), + ('milliseconds', c.c_uint32), + ('microseconds', c.c_uint16) + ] + + EPOCH = datetime(1958, 1, 1) + + def asdatetime(self): + return cds_stamp(self.days, self.milliseconds, self.microseconds) + + +class GirdSecondaryHeader(BaseStruct): + _pack_ = 1 + _fields_ = [ + ('flags', c.c_uint8), + ('timecode', AquaCucTimecode), + ] + + +class GiisSecondaryHeader(BaseStruct): + _pack_ = 1 + _fields_ = [ + ('timecode', DaySegmentedTimecode), + ('quicklook_flag', c.c_uint8, 1), + ('user_flags', c.c_uint8, 7) + ] + + +class SpacecraftBusSecondaryHeader(BaseStruct): + _fields_ = [ + ('timecode', AquaCucTimecode) + ] + + +def amsu_headers(): + return { + # AMSU-A1 + (257, GROUP_CONTINUING): GirdSecondaryHeader, + (259, GROUP_CONTINUING): GirdSecondaryHeader, + (260, GROUP_CONTINUING): GirdSecondaryHeader, + (261, GROUP_CONTINUING): GirdSecondaryHeader, + (261, GROUP_CONTINUING): GirdSecondaryHeader, + (262, GROUP_CONTINUING): GirdSecondaryHeader, + # AMSU-A2 + (288, GROUP_CONTINUING): GirdSecondaryHeader, + (289, GROUP_CONTINUING): GirdSecondaryHeader, + (290, GROUP_STANDALONE): GirdSecondaryHeader, + } + + +def airs_headers(): + return { + # AIRS + (404, GROUP_STANDALONE): GirdSecondaryHeader, + (405, GROUP_STANDALONE): GirdSecondaryHeader, + (406, GROUP_STANDALONE): GirdSecondaryHeader, + (407, GROUP_STANDALONE): GirdSecondaryHeader, + (414, GROUP_STANDALONE): GirdSecondaryHeader, + (415, GROUP_STANDALONE): GirdSecondaryHeader, + (416, GROUP_STANDALONE): GirdSecondaryHeader, + (417, GROUP_STANDALONE): GirdSecondaryHeader, + } + + +def hsb_headers(): + return { + # HSB + (342, GROUP_STANDALONE): GirdSecondaryHeader, + } + + +def modis_headers(): + return { + # MODIS + (64, GROUP_STANDALONE): GiisSecondaryHeader, + (64, GROUP_FIRST): GiisSecondaryHeader, + (64, GROUP_LAST): GiisSecondaryHeader, + (64, GROUP_CONTINUING): GiisSecondaryHeader, + (127, GROUP_STANDALONE): GiisSecondaryHeader, + } + + +def ceres_headers(): + apids = ( + # CERES+Y + 141, 142, 143, 144, + # CERES-Y + 157, 158, 159, 160 + ) + groupings = (GROUP_FIRST, GROUP_CONTINUING, GROUP_LAST, GROUP_STANDALONE) + return {(a, g): GiisSecondaryHeader for a in apids for g in groupings} + + +def gbad_headers(): + return { + # GBAD + (957, GROUP_STANDALONE): SpacecraftBusSecondaryHeader + } + + +def aqua_headers(): + """ + Aqua headers are looked up via their APID and grouping. + """ + headers = {} + headers.update(amsu_headers()) + headers.update(airs_headers()) + headers.update(hsb_headers()) + headers.update(modis_headers()) + headers.update(ceres_headers()) + headers.update(gbad_headers()) + return headers + + +class JpssSecondaryHeader(BaseStruct): + """Secondary Header for a JSPP CCSDS packet that is not part of a packet + sequence. + """ + _pack_ = 1 + _fields_ = [ + ('timecode', DaySegmentedTimecode) + ] + + +class JpssFirstSecondaryHeader(BaseStruct): + """Secondary Header for a JSPP CCSDS packet that is the first packet in a + packet sequence. Following packets that are part of this sequence will not + have a secondary header. + """ + _pack_ = 1 + _fields_ = [ + ('timecode', DaySegmentedTimecode), + ('packet_count', c.c_uint8), + ('_spare', c.c_uint8) + ] + + +def jpss_header_lookup(primary_header): + grouping = primary_header.sequence_grouping + if grouping == GROUP_FIRST: + return JpssFirstSecondaryHeader + elif grouping == GROUP_CONTINUING: + return JpssSecondaryHeader + elif grouping == GROUP_LAST: + return JpssSecondaryHeader + elif grouping == GROUP_STANDALONE: + return JpssSecondaryHeader + + +_aqua_headers = aqua_headers() +def aqua_header_lookup(primary_header): + apid = primary_header.apid + grouping = primary_header.sequence_grouping + return _aqua_headers.get((apid, grouping)) diff --git a/edosl0util/merge.py b/edosl0util/merge.py index 05ac42da2fe041b2aaff9ea972bb921e3685bfc0..bd3b469bcf83046cb12a57e4d223096c44d0f357 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -31,28 +31,6 @@ def _is_valid_group(packets): and packets[0].apid == packets[-1].apid -def _cmp_by_first_pkt(filea, fileb): - """ - Compare 2 L0 files by their first available timestamp. Used to sort L0 files - by time. - """ - stampa = stampb = None - for pkt in PacketStream(open(filea)): - if pkt.stamp: - stampa = pkt.stamp - break - for pkt in PacketStream(open(fileb)): - if pkt.stamp: - stampb = pkt.stamp - break - return cmp(stampa, stampb) - - -def make_streams(filepaths): - filepaths.sort(_cmp_by_first_pkt) - return [PacketStream(io.open(f, 'rb')) for f in filepaths] - - def merge(streams, output=sys.stdout): last_apid = None last_stamp = datetime(1900, 1, 1) diff --git a/edosl0util/split.py b/edosl0util/split.py index 4d44e52d386adbbe30a88270aaeca75eda826e2d..714afe3d9eac3b0ebed7985c179da443c84e18ad 100644 --- a/edosl0util/split.py +++ b/edosl0util/split.py @@ -4,16 +4,16 @@ from edosl0util.timecode import unixtime from edosl0util.stream import PacketStream -def split(filepath, minutes): +def split(fobj, minutes): """Split a VIIRS L0 PDS file into files based on their scan time mod the number of minutes provided. - - A single output file is buffered in memory until it is written. """ - buf = array.array('c') # buffer for a single data file until it is written + buf = array.array('B') # buffer for a single data file until it is written cur_bucket = 0 # cur time bucket of size 'minutes' - for pkt in PacketStream(open(filepath)): + pkt_count = 0 + original_offset = fobj.tell() + for pkt in PacketStream(fobj): # do the bucketing based on secondary header timestamps if pkt.stamp: hdrtime = unixtime(pkt.stamp) @@ -23,10 +23,15 @@ def split(filepath, minutes): cur_bucket = pkt_bucket if pkt_bucket > cur_bucket: - yield cur_bucket, buf.tostring() + offset = fobj.tell() - original_offset + yield cur_bucket, offset, pkt_count, buf.tostring() + pkt_count = 0 buf = array.array('c') cur_bucket = pkt_bucket + # this is an append operation buf.fromstring(pkt.blob) + pkt_count += 1 - yield cur_bucket, buf.tostring() \ No newline at end of file + offset = fobj.tell() - original_offset + yield cur_bucket, offset, pkt_count, buf.tostring() \ No newline at end of file diff --git a/edosl0util/stream.py b/edosl0util/stream.py index f7fe3c079d6d19cf7ae406bc00e3d72e9bb4d375..977b9443bab2691eccb79d052b26b5916cb188cb 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -12,15 +12,10 @@ from edos.ccsds import ( GROUP_STANDALONE ) -from edosl0util.timecode import cds_stamp +from edosl0util import headers LOG = logging.getLogger(__name__) -secondary_header_impls = { - GROUP_FIRST: JpssFirstSecondaryHeader, - GROUP_STANDALONE: JpssSecondaryHeader -} - class MissingPackets(Exception): """ @@ -77,16 +72,31 @@ class SequenceId(object): return self.val +def default_header_lookup(primary_header): + return ( + headers.aqua_header_lookup(primary_header) or + headers.jpss_header_lookup(primary_header) + ) + + class PacketStream(object): """ Iterates over all CCSDS data producing `Packet` tuples. Only as much data is necessary to generate a single packet is read into memory at a time. """ - def __init__(self, fobj, fail_on_missing=False): + def __init__(self, fobj, fail_on_missing=False, header_lookup=default_header_lookup): + """ + :param fobj: File-like object to read stream from + :keyword fail_on_missing: If True raise a MissingPackets error when packets + are missing. + :keyword header_lookup: A function that takes a PrimaryHeader and returns + a secondary header implementation that will be used. + """ self._input = fobj self._seek_cache = deque() self._seqids = {} self._fail_on_missing = fail_on_missing + self._header_lookup = header_lookup def __str__(self): return '<PacketStream pos=%d %s>' % (self.tell(), self._input) @@ -103,9 +113,6 @@ class PacketStream(object): raise StopIteration return data - def tell(self): - return self._input.tell() - def next(self): """ Return the next packet in the stream. @@ -130,10 +137,11 @@ class PacketStream(object): stamp = None h2 = None # only first/standalone have h2 impls - H2Impl = secondary_header_impls.get(h1.sequence_grouping) + H2Impl = self._header_lookup(h1) if H2Impl: h2 = H2Impl.from_buffer_copy(buf[:c.sizeof(H2Impl)]) - stamp = cds_stamp(h2.day, h2.milliseconds, h2.microseconds) + if hasattr(h2, 'timecode') and hasattr(h2.timecode, 'asdatetime'): + stamp = h2.timecode.asdatetime() packet = Packet(h1.apid, h1.source_sequence_count, stamp, data, h1, h2) @@ -169,3 +177,25 @@ class PacketStream(object): while packet.apid != apid: packet = self.next() self.push_back(packet) + + +def _cmp_by_first_pkt(filea, fileb): + """ + Compare 2 L0 files by their first available timestamp. Used to sort L0 files + by time. + """ + stampa = stampb = None + for pkt in PacketStream(open(filea)): + if pkt.stamp: + stampa = pkt.stamp + break + for pkt in PacketStream(open(fileb)): + if pkt.stamp: + stampb = pkt.stamp + break + return cmp(stampa, stampb) + + +def make_streams(filepaths): + filepaths.sort(_cmp_by_first_pkt) + return [PacketStream(io.open(f, 'rb')) for f in filepaths] diff --git a/setup.py b/setup.py index 8ecd8f464403736eea1af309304ed7be71766046..b4f431ad02b8ddbe7eef278cff1153fef2e9f2ba 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,5 @@ setup( entry_points=""" [console_scripts] edosl0split = edosl0util.cli:cmd_split - edosl0dump = edosl0util.cli:cmd_dump """ ) \ No newline at end of file