Skip to content
Snippets Groups Projects
Commit b18d65bb authored by Bruce Flynn's avatar Bruce Flynn
Browse files

merge: support aqua by using seqid in pointer

parent dc41d8d7
No related branches found
No related tags found
No related merge requests found
import io
import logging
from datetime import datetime
from edosl0util.cli import util
from edosl0util import merge, stream
LOG = logging.getLogger(__name__)
def main():
parser = util.default_parser()
......@@ -22,6 +25,7 @@ def main():
"end time is exclusive."
),
)
parser.add_argument("--aqua", action="store_true", help="Source is Aqua data")
parser.add_argument(
"-a",
"--apid-order",
......@@ -39,8 +43,13 @@ def main():
util.configure_logging(args)
stream_impl = stream.jpss_packet_stream
if args.aqua:
LOG.info("handling aqua streams")
stream_impl = stream.aqua_packet_stream if args.aqua else stream.jpss_packet_stream
apid_order = {"viirs": merge.VIIRS_APID_ORDER, "numerical": None}[args.apid_order]
streams = [stream.jpss_packet_stream(io.open(f, "rb")) for f in args.pds]
streams = [stream_impl(io.open(f, "rb")) for f in args.pds]
merge.merge(
streams,
output=io.open(args.output, "wb"),
......
......@@ -24,12 +24,13 @@ class _Ptr(object):
Represents one or more packets that share the same time timecode and apid.
"""
def __init__(self, fobj, stamp, apid, offset, size):
def __init__(self, fobj, stamp, apid, offset, size, seqid):
self.fobj = fobj
self.stamp = stamp
self.apid = apid
self.offset = offset
self.size = size
self.seqid = seqid
self.count = 1
def __repr__(self):
......@@ -42,7 +43,7 @@ class _Ptr(object):
return "<{:s} {:s}>".format(self.__class__.__name__, attrs)
def __eq__(self, that):
return (self.stamp, self.apid) == (that.stamp, that.apid)
return (self.stamp, self.apid, self.seqid) == (that.stamp, that.apid, that.seqid)
def __ne__(self, that):
return not self == that
......@@ -83,6 +84,7 @@ def read_packet_index(stream):
apid=packet.apid,
offset=packet.offset,
size=packet.size,
seqid=packet.primary_header.source_sequence_count
)
index.append(ptr)
# collect all packets for this timecode/group
......@@ -112,7 +114,8 @@ def _sort_by_time_apid(index, order=None):
)
else:
index = sorted(index, key=lambda p: p.apid)
return sorted(index, key=lambda p: p.stamp)
# have to include sequence id for cases were the stamp is not unique, e.g., Aqua
return sorted(index, key=lambda p: (p.stamp, p.seqid))
def _filter_duplicates_by_size(index):
......@@ -121,7 +124,8 @@ def _filter_duplicates_by_size(index):
"""
filtered = OrderedDict()
for ptr in index:
key = (ptr.stamp, ptr.apid)
# have to include sequence id for cases were the stamp is not unique, e.g., Aqua
key = (ptr.stamp, ptr.apid, ptr.seqid)
if key in filtered:
if ptr.size > filtered[key].size:
filtered[key] = ptr
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment