diff --git a/.gitattributes b/.gitattributes index 4f159de4803e90f124427b1ce04a59696b0372ce..4c1f0c4299b859f09e5084325108859a5dc2e89c 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1 +1,5 @@ tests/fixtures/merge/RNSCA-RVIRS_j01_d20191212_t0745585_e0800120_b00001_c20191212080005825000_all-_dev.h5 filter=lfs diff=lfs merge=lfs -text +tests/fixtures/merge/scan.modis.154.1619815441000.dat filter=lfs diff=lfs merge=lfs -text +tests/fixtures/merge/scan.modis.154.1619815442000.dat filter=lfs diff=lfs merge=lfs -text +tests/fixtures/merge/scan.viirs.159.1620324171404.dat filter=lfs diff=lfs merge=lfs -text +tests/fixtures/merge/scan.viirs.159.1620324173190.dat filter=lfs diff=lfs merge=lfs -text diff --git a/edosl0util/cli/merge.py b/edosl0util/cli/merge.py index c351ead430aefbebe80c24211b40f67d1a081c9c..a98a4373907d53033fbf6a37c4e111e6ee6ffb16 100644 --- a/edosl0util/cli/merge.py +++ b/edosl0util/cli/merge.py @@ -1,8 +1,11 @@ import io +import logging from datetime import datetime from edosl0util.cli import util from edosl0util import merge, stream +LOG = logging.getLogger(__name__) + def main(): parser = util.default_parser() @@ -22,6 +25,7 @@ def main(): "end time is exclusive." ), ) + parser.add_argument("--aqua", action="store_true", help="Source is NASA EOS (Aqua/Terra) data") parser.add_argument( "-a", "--apid-order", @@ -39,13 +43,19 @@ def main(): util.configure_logging(args) + stream_impl = stream.jpss_packet_stream + if args.aqua: + LOG.info("handling aqua streams") + stream_impl = stream.aqua_packet_stream if args.aqua else stream.jpss_packet_stream + apid_order = {"viirs": merge.VIIRS_APID_ORDER, "numerical": None}[args.apid_order] - streams = [stream.jpss_packet_stream(io.open(f, "rb")) for f in args.pds] + streams = [stream_impl(io.open(f, "rb")) for f in args.pds] merge.merge( streams, output=io.open(args.output, "wb"), trunc_to=args.trunc_to, apid_order=apid_order, + eos=args.aqua, ) diff --git a/edosl0util/eos.py b/edosl0util/eos.py new file mode 100644 index 0000000000000000000000000000000000000000..7611aa759ec717d796eb7d9973bd299508ba93da --- /dev/null +++ b/edosl0util/eos.py @@ -0,0 +1,76 @@ +""" NASA EOS mission specific data + +References: + 1. MODIS Command, Telemetry, Science and Engineering Description, + Document Number 151840, May 1997, Appendix C + https://directreadout.sci.gsfc.nasa.gov/documents/satellite_gen/MODIS_UG.pdf +""" +import struct + +packet_types = { + 0: "day", + 1: "night", + 2: "eng1", + 4: "eng2", +} + +packet_sources = { + 0: "solar-diff", + 1: "srca-cal", + 2: "bb-cal", + 3: "space-cal", +} + +packets_per_scan = { + "day": 3032, + "night": 326, +} + + +def sourceid(p): + if p.data[0] >> 7 == 0: + return "earthdata" + return packet_sources[p.data[0] >> 5 & 0x3] + + +def mirrorside(p): + return ["side1", "side2"][bytes(p.secondary_header)[8] & 0x1] + + +def scancount(p): + return bytes(p.secondary_header)[8] >> 1 & 0x7 + + +def pkttype(p): + return packet_types[bytes(p.secondary_header)[8] >> 4 & 0x7] + + +def frame_count(p): + x = int.from_bytes(b'\x00\x00' + p.data[:2], 'big') + if sourceid(p) == "earthdata": + return x >> 4 & 0x7ff + return x >> 4 & 0x3f + +# Combination of source ids and packet types to their position within a scan. +_sort_indexes = { + "solar-diff": 1, + "srca-cal": 2, + "bb-cal": 3, + "space-cal": 4, + "earthdata": 5, + "eng1": 6, + "eng2": 7, +} + +def _modis_sort_key(p): + """ Return a tuple that will maintain order for this packet in a stream of + MODIS data. Intended to be used as a key func for sorting. + + Packets are sorted in the order defined in _sort_indexes. + """ + type_idx = _sort_indexes.get(pkttype(p), _sort_indexes[sourceid(p)]) + return (p.stamp, 64, type_idx, frame_count(p), int(p.is_last())) + + +def sort_modis(packets): + return sorted(packets, key=_sort_key) \ No newline at end of file diff --git a/edosl0util/jpssrdr.py b/edosl0util/jpssrdr.py index 0519f360b03a2797a7ff01d784177151f8616a35..2f0845a568352e2ab95618be57f0a04a0ad81f2d 100644 --- a/edosl0util/jpssrdr.py +++ b/edosl0util/jpssrdr.py @@ -271,7 +271,6 @@ def sort_packets_by_apid(packets, order=None): else: return sorted(packets, key=lambda p: p.packet.apid) - def _write_packets(pkts, dest, skipfill): for pkt in pkts: if pkt.tracker.fill_percent != 0 and skipfill: diff --git a/edosl0util/merge.py b/edosl0util/merge.py index ad1d304b89aa478336a416a41f383da84bb421fe..27afc535dad004fcf204e2bde32f6dbb50a603b1 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -13,24 +13,29 @@ import os from collections import deque, OrderedDict from functools import total_ordering +from . import eos as _eos + LOG = logging.getLogger(__name__) -VIIRS_APID_ORDER = (826, 821) + tuple(range(800, 821)) + tuple(range(822, 826)) +VIIRS_APID_ORDER = ( + (826, 821) + tuple(range(800, 821)) + tuple(range(822, 826)) + (827, 828) +) -@total_ordering class _Ptr(object): """ Represents one or more packets that share the same time timecode and apid. """ - def __init__(self, fobj, stamp, apid, offset, size): + def __init__(self, fobj, stamp, apid, offset, size, seqid, sort_key=None): self.fobj = fobj self.stamp = stamp self.apid = apid self.offset = offset self.size = size + self.seqid = seqid self.count = 1 + self._sort_key = sort_key or (self.stamp, self.apid) def __repr__(self): attrs = " ".join( @@ -42,30 +47,41 @@ class _Ptr(object): return "<{:s} {:s}>".format(self.__class__.__name__, attrs) def __eq__(self, that): - return (self.stamp, self.apid) == (that.stamp, that.apid) + return (self.stamp, self.apid, self.seqid) == ( + that.stamp, + that.apid, + that.seqid, + ) def __ne__(self, that): - return not self == that + return self != that def __lt__(self, that): - return (self.stamp, self.apid) < (that.stamp, that.apid) + return self._sort_key < that._sort_key # hash by stamp, apid, size so we can dedup in index using set def __hash__(self): - return hash((self.stamp, self.apid, self.size)) + return hash((self.stamp, self.apid, self.seqid, self.size)) def bytes(self): self.fobj.seek(self.offset, os.SEEK_SET) return self.fobj.read(self.size) -def read_packet_index(stream): +def _sort_key(p, order=None, eos=False): + if eos and p.apid == 64: + return _eos._modis_sort_key(p) + return (p.stamp, order.index(p.apid) if order else p.apid) + + +def read_packet_index(stream, order=None, eos=False): index = deque() try: # drop any leading hanging packets count = 0 packet = stream.next() while not packet.stamp: + #while not (packet.is_first() or packet.is_standalone()): packet = stream.next() count += 1 if count: @@ -73,6 +89,7 @@ def read_packet_index(stream): while True: if not packet.stamp: + #if not (packet.is_first() or packet.is_standalone()): # corrupt packet groups can cause apid mismatch # so skip until we get to the next group packet = stream.next() @@ -83,11 +100,14 @@ def read_packet_index(stream): apid=packet.apid, offset=packet.offset, size=packet.size, + seqid=packet.primary_header.source_sequence_count, + sort_key=_sort_key(packet, order, eos), ) index.append(ptr) # collect all packets for this timecode/group packet = stream.next() while not packet.stamp: + #while not (packet.is_first() or packet.is_standalone()): # Bail if we're collecting group packets and apids don't match # This means group is corrupt if ptr.apid != packet.apid: @@ -102,35 +122,7 @@ def read_packet_index(stream): return index -def _sort_by_time_apid(index, order=None): - """ - Sort pointers by time and apid in-place. - """ - if order: - index = sorted( - index, key=lambda p: order.index(p.apid) if p.apid in order else -1 - ) - else: - index = sorted(index, key=lambda p: p.apid) - return sorted(index, key=lambda p: p.stamp) - - -def _filter_duplicates_by_size(index): - """ - Take the packet with the largest size. - """ - filtered = OrderedDict() - for ptr in index: - key = (ptr.stamp, ptr.apid) - if key in filtered: - if ptr.size > filtered[key].size: - filtered[key] = ptr - else: - filtered[key] = ptr - return filtered.values() - - -def merge(streams, output, trunc_to=None, apid_order=None): +def merge(streams, output, trunc_to=None, apid_order=None, eos=False): """ Merge packets from multiple streams to an output file. Duplicate packets will be filtered and packets will be sorted by time and the apid order @@ -148,12 +140,11 @@ def merge(streams, output, trunc_to=None, apid_order=None): index = set() # will remove duplicates for stream in streams: LOG.debug("indexing %s", stream) - index.update(read_packet_index(stream)) + index.update(read_packet_index(stream, order=apid_order, eos=eos)) LOG.debug("sorting index with %d pointers", len(index)) - index = _sort_by_time_apid(index, order=apid_order) - - index = _filter_duplicates_by_size(index) + # _Ptr class implements custom sort key impl for sorting + index = sorted(index) LOG.debug("writing index to %s", output) for ptr in index: diff --git a/edosl0util/rdrgen.py b/edosl0util/rdrgen.py index 4f281fd3f247bdeb121621f532e8ce8121ce689a..64596a0cc7ae54ca9f7a5d1bfbd4a3b9cd97890a 100644 --- a/edosl0util/rdrgen.py +++ b/edosl0util/rdrgen.py @@ -294,6 +294,14 @@ class BinnedTemporaryFileManager(object): shutil.rmtree(self.dir) +missions = { + "npp": "S-NPP/JPSS", + "snpp": "S-NPP/JPSS", + "j01": "NOAA-20/JPSS", + "jpss1": "NOAA-20/JPSS", + "noaa20": "NOAA-20/JPSS", +} + class RdrWriter(object): def __init__( self, @@ -341,7 +349,7 @@ class RdrWriter(object): self._h5_file, { "Distributor": distributor, - "Mission_Name": "S-NPP/JPSS", # FIXME: what will this be for J1? + "Mission_Name": missions[self._sat], "N_Dataset_Source": origin, "N_HDF_Creation_Date": self._format_date_attr(self._creation_time), "N_HDF_Creation_Time": self._format_time_attr(self._creation_time), @@ -443,13 +451,21 @@ class RdrWriter(object): @staticmethod def _set_h5_attrs(h5_obj, attrs): attrs = _encodeall(attrs) + def tpcnv(v): + if isinstance(v, list): + return [tpcnv(x) for x in v] + elif isinstance(v, str): + return np.array(v.encode('ascii'), dtype="S") + elif isinstance(v, bytes): + return np.array(v, dtype="S") + return v # for some reason all the H5 attributes are in 2-D arrays in IDPS files for name, value in attrs.items(): if isinstance(value, list): value = [[x] for x in value] else: value = [[value]] - h5_obj.attrs[name] = value + h5_obj.attrs[name] = tpcnv(value) @staticmethod def _format_date_attr(t): diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 02bc3d7c216180aeecc54f384b6f739b2fd7641c..9f854a36f5a602d5dcc76e3ab92c9117c568bd4f 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -258,6 +258,7 @@ class PacketStream(object): num_missing = packet.seqid - apid["last_seqid"] - 1 else: num_missing = packet.seqid - apid["last_seqid"] + MAX_SEQ_ID + # FIXME: shouldn't this just be num_missing != 0? have_missing = num_missing != apid["num_missing"] apid["num_missing"] += num_missing apid["last_seqid"] = packet.seqid diff --git a/tests/fixtures/merge/scan.modis.154.1619815441000.dat b/tests/fixtures/merge/scan.modis.154.1619815441000.dat new file mode 100644 index 0000000000000000000000000000000000000000..5f9e15ce31812661fced3520e3789364c73495e1 --- /dev/null +++ b/tests/fixtures/merge/scan.modis.154.1619815441000.dat @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:8b88000ccb7fc8a874369cdaec98a78f2f0454e09b573327681c08266d11dc64 +size 1946922 diff --git a/tests/fixtures/merge/scan.modis.154.1619815442000.dat b/tests/fixtures/merge/scan.modis.154.1619815442000.dat new file mode 100644 index 0000000000000000000000000000000000000000..db95a1e9f445ea0eabdcc6b0d5dcbbaf81fd93d4 --- /dev/null +++ b/tests/fixtures/merge/scan.modis.154.1619815442000.dat @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:565a0396bcb91e20af4b9793c803e2239f5f426524079776fe547d75b54cc7ea +size 1946922 diff --git a/tests/fixtures/merge/scan.viirs.159.1620324171404.dat b/tests/fixtures/merge/scan.viirs.159.1620324171404.dat new file mode 100644 index 0000000000000000000000000000000000000000..e6f10263c787114ab4d70d02ea1436d169a122e2 --- /dev/null +++ b/tests/fixtures/merge/scan.viirs.159.1620324171404.dat @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:00257febe1b0ab5b725bf72b95cdbc71e136385088fa54d4945b34d996451a81 +size 1784357 diff --git a/tests/fixtures/merge/scan.viirs.159.1620324173190.dat b/tests/fixtures/merge/scan.viirs.159.1620324173190.dat new file mode 100644 index 0000000000000000000000000000000000000000..9e70ee51fd0526c02b3c1f7fe860094e22df6c3f --- /dev/null +++ b/tests/fixtures/merge/scan.viirs.159.1620324173190.dat @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:94120dd50d8dead4d7eebebb97a88632648589e5a94702a7eb39c2113ac12a2a +size 1767519 diff --git a/tests/test_merge.py b/tests/test_merge.py index d4e54c1220be06293b3ea0c165dd0eac8dde17ac..5d50fdd10a472aee5ca48c4a977aed59e3a19bc3 100644 --- a/tests/test_merge.py +++ b/tests/test_merge.py @@ -1,31 +1,48 @@ -from datetime import timedelta, datetime -from operator import eq, lt, gt +import hashlib +import io +from datetime import datetime, timedelta +from operator import eq, gt, lt import pytest - -from edosl0util import merge - - -class Test_Ptr: - - @pytest.mark.parametrize( - 'd1,d2,op', - [ - [(0, 0), (0, 0), eq], - [(1, 0), (0, 0), gt], - [(0, 1), (0, 0), gt], - [(1, 1), (0, 0), gt], - [(0, 0), (1, 0), lt], - [(0, 0), (0, 1), lt], - [(0, 0), (1, 1), lt], - ]) - def test_ordering(self, d1, d2, op): - base = datetime.utcnow() - - days1, apid1 = d1 - ptr1 = merge._Ptr(open('/dev/null', 'r'), base + timedelta(days1), apid1, 0, 0) - - days2, apid2 = d2 - ptr2 = merge._Ptr(open('/dev/null', 'r'), base + timedelta(days2), apid2, 0, 0) - - assert op(ptr1, ptr2) +from edosl0util import merge, stream + + +@pytest.mark.parametrize( + "stream_impl, eos, files", + [ + pytest.param( + stream.aqua_packet_stream, + True, + [ + "tests/fixtures/merge/scan.modis.154.1619815441000.dat", + "tests/fixtures/merge/scan.modis.154.1619815442000.dat", + ], + id="aqua-modis", + ), + pytest.param( + stream.jpss_packet_stream, + False, + [ + "tests/fixtures/merge/scan.viirs.159.1620324171404.dat", + "tests/fixtures/merge/scan.viirs.159.1620324173190.dat", + ], + id="noaa20-viirs", + ), + ], +) +def test_merge(tmp_path, stream_impl, eos, files): + # forward direction + streams = [stream_impl(open(f, "rb")) for f in files] + with open(tmp_path.joinpath("forward.dat"), "wb") as forward: + merge.merge(streams, forward, eos=eos) + forward_csum = hashlib.md5(open(forward.name, "rb").read()).hexdigest() + + # reverse direction + streams = [stream_impl(open(f, "rb")) for f in reversed(files)] + with open(tmp_path.joinpath("reverse.dat"), "wb") as reverse: + merge.merge(streams, reverse, eos=eos) + reverse_csum = hashlib.md5(open(reverse.name, "rb").read()).hexdigest() + + assert forward_csum != hashlib.md5(b"").hexdigest(), "forward csum is not valid" + assert reverse_csum != hashlib.md5(b"").hexdigest(), "reverse csum is not valid" + assert forward_csum == reverse_csum