diff --git a/edosl0util/cli/merge.py b/edosl0util/cli/merge.py index c351ead430aefbebe80c24211b40f67d1a081c9c..4f88970588980968d498ac9ec6e5683516845ecd 100644 --- a/edosl0util/cli/merge.py +++ b/edosl0util/cli/merge.py @@ -1,8 +1,11 @@ import io +import logging from datetime import datetime from edosl0util.cli import util from edosl0util import merge, stream +LOG = logging.getLogger(__name__) + def main(): parser = util.default_parser() @@ -22,6 +25,7 @@ def main(): "end time is exclusive." ), ) + parser.add_argument("--aqua", action="store_true", help="Source is Aqua data") parser.add_argument( "-a", "--apid-order", @@ -39,8 +43,13 @@ def main(): util.configure_logging(args) + stream_impl = stream.jpss_packet_stream + if args.aqua: + LOG.info("handling aqua streams") + stream_impl = stream.aqua_packet_stream if args.aqua else stream.jpss_packet_stream + apid_order = {"viirs": merge.VIIRS_APID_ORDER, "numerical": None}[args.apid_order] - streams = [stream.jpss_packet_stream(io.open(f, "rb")) for f in args.pds] + streams = [stream_impl(io.open(f, "rb")) for f in args.pds] merge.merge( streams, output=io.open(args.output, "wb"), diff --git a/edosl0util/merge.py b/edosl0util/merge.py index ad1d304b89aa478336a416a41f383da84bb421fe..dff3a6afd147822aa730e3b2ed60e58ac763f15c 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -24,12 +24,13 @@ class _Ptr(object): Represents one or more packets that share the same time timecode and apid. """ - def __init__(self, fobj, stamp, apid, offset, size): + def __init__(self, fobj, stamp, apid, offset, size, seqid): self.fobj = fobj self.stamp = stamp self.apid = apid self.offset = offset self.size = size + self.seqid = seqid self.count = 1 def __repr__(self): @@ -42,7 +43,7 @@ class _Ptr(object): return "<{:s} {:s}>".format(self.__class__.__name__, attrs) def __eq__(self, that): - return (self.stamp, self.apid) == (that.stamp, that.apid) + return (self.stamp, self.apid, self.seqid) == (that.stamp, that.apid, that.seqid) def __ne__(self, that): return not self == that @@ -83,6 +84,7 @@ def read_packet_index(stream): apid=packet.apid, offset=packet.offset, size=packet.size, + seqid=packet.primary_header.source_sequence_count ) index.append(ptr) # collect all packets for this timecode/group @@ -112,7 +114,8 @@ def _sort_by_time_apid(index, order=None): ) else: index = sorted(index, key=lambda p: p.apid) - return sorted(index, key=lambda p: p.stamp) + # have to include sequence id for cases were the stamp is not unique, e.g., Aqua + return sorted(index, key=lambda p: (p.stamp, p.seqid)) def _filter_duplicates_by_size(index): @@ -121,7 +124,8 @@ def _filter_duplicates_by_size(index): """ filtered = OrderedDict() for ptr in index: - key = (ptr.stamp, ptr.apid) + # have to include sequence id for cases were the stamp is not unique, e.g., Aqua + key = (ptr.stamp, ptr.apid, ptr.seqid) if key in filtered: if ptr.size > filtered[key].size: filtered[key] = ptr