import ctypes import logging import os import shutil import tempfile from collections import OrderedDict from contextlib import contextmanager from datetime import datetime import attr import h5py import numpy as np import edosl0util from edosl0util import compat from edosl0util.jpssrdr import Apid as ApidListItem from edosl0util.jpssrdr import PacketTracker, StaticHeader, decode_rdr_blob from edosl0util.stream import jpss_packet_stream, collect_groups from edosl0util.timecode import cds_to_iet, iet_to_dt LOG = logging.getLogger(__name__) class Error(Exception): """RDR generation base error. """ class InvalidStream(Error): """ Packet stream is not valid for RDR """ class InvalidPacket(Error): """ Invalid packet for the current RDR. """ def __init__(self, msg, packet): self.msg = msg self.packet = packet def iter_pkts(l0_files): for l0_file in l0_files: with open(l0_file, "rb") as l0_file_obj: for pkt in jpss_packet_stream(l0_file_obj): yield pkt def filter_group_orphans(s): """ Filter out any packets in a group that do not fall withing the expected sequence id series and expected packets per group. This really only applies to VIIRS data since it's the only sensor that utilizes groups. This is useful for input RDR packet streams received from direct braodcast that may have sub-ideal packet sequences that RDR generation does not play well with. """ viirs_apids = set(ViirsScienceApidInfo.apids) viirs_tracker = ViirsGroupedPacketTimeTracker() for p in iter(s): # If it's a VIIRS grouped packet, check for orphans if ( p.apid in viirs_apids and ViirsScienceApidInfo.get_packets_per_scan(p.apid) > 1 ): try: viirs_tracker.get_iet(p) except OrphanedViirsPacket: LOG.debug("dropping orphan %s", p) continue yield p def filter_before(s, before): """ Filter all packets before the provided time. This handles groupped packets by dropping the entire group if its first packet has a timestamp before the provided time. """ for grp in collect_groups(s): if grp[0].stamp < before: LOG.debug("dropping before[%s] %s", before, grp[0]) continue yield from grp def packets_to_rdrs(sat, l0_files, **kwargs): return build_rdr(sat, iter_pkts(l0_files), **kwargs) def build_rdr( sat, pkt_iter, output_dir=".", aggr_type="idps", aggr_level=None, diary_cushion=10000000, attr_overrides={}, strict=True, ): """Construct RDR file(s) from L0 packets Default aggregation behavior uses file boundaries computed in the same way as for IDPS assuming the default aggregation level depending on the instrument (e.g. 85 sec for VIIRS, 8 min for CrIS). A different aggregation level can be given as an integer number of granule (e.g. 4 to get roughly 5.5 min files for VIIRS). Finally, aggr_type can be set to 'full' to request that a single file gets produced (thus aggregating all granules containing the given packets). """ # divy packets up into temp files organized by granule file_mgr = BinnedTemporaryFileManager() try: get_jpss_packet_time = GetJpssPacketTime() gran_infos = set() # (rdr_type, gran_iet) pairs for pkt in pkt_iter: rdr_type = get_rdr_type(pkt.apid) pkt_iet = get_jpss_packet_time(pkt) gran_iet = get_granule_start(sat, rdr_type.gran_len, pkt_iet) gran_info = (rdr_type, gran_iet) gran_infos.add(gran_info) file_mgr.add_data(gran_info, pkt.bytes()) # determine what RDR files we'll be producing based on the packets we've seen rdr_types = set(rdr_type for rdr_type, gran_iet in gran_infos) primary_type, packaged_type = process_rdr_types( rdr_types, force_packaging=False ) rdr_types = sorted(rdr_types, key=(lambda t: 1 if t is primary_type else 2)) if aggr_type == "idps": aggr_level = aggr_level or primary_type.default_aggregation primary_aggr_iets = sorted( set( get_aggregate_start( sat, primary_type.gran_len, aggr_level, gran_iet ) for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type ) ) elif aggr_type == "full": # produce a single output file, ignoring IDPS-style aggregation boundaries assert aggr_level is None first_gran_iet = min( gran_iet for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type ) last_gran_iet = max( gran_iet for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type ) aggr_level = (last_gran_iet - first_gran_iet) // primary_type.gran_len + 1 primary_aggr_iets = [first_gran_iet] else: raise ValueError("aggr_type must be idps or input") # now generate the RDRs rdr_files = [] for aggr_iet in primary_aggr_iets: rdr_writer = RdrWriter( sat, rdr_types, aggr_iet, aggr_level, output_dir, **attr_overrides ) rdr_writer.write_aggregate(primary_type, aggr_iet, aggr_level) gran_iets = [ aggr_iet + i * primary_type.gran_len for i in range(aggr_level) ] for gran_iet in gran_iets: with file_mgr.process_file((primary_type, gran_iet)) as pkt_file: pkts = list(jpss_packet_stream(pkt_file)) blob = build_rdr_blob(sat, pkts, primary_type, gran_iet, strict=strict) rdr_writer.write_granule(primary_type, gran_iet, blob) if packaged_type: packaged_gran_iets = get_overlapping_granules( sat, packaged_type.gran_len, aggr_iet - diary_cushion, aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1, ) rdr_writer.write_aggregate( packaged_type, packaged_gran_iets[0], len(packaged_gran_iets) ) for gran_iet in packaged_gran_iets: with file_mgr.process_file((packaged_type, gran_iet)) as pkt_file: pkts = list(jpss_packet_stream(pkt_file)) blob = build_rdr_blob( sat, pkts, packaged_type, gran_iet, strict=strict ) rdr_writer.write_granule(packaged_type, gran_iet, blob) rdr_writer.close() rdr_files.append(rdr_writer.file_name) finally: file_mgr.clean_up() return rdr_files def process_rdr_types(given_rdr_types, force_packaging): """Determine the RDR type we'll be making based on the packets we've been given Return both a primary_type and packaged_type. The primary type could indicate either a science or spacecraft RDR. The packaged type is used when spacecraft packets are to be packaged along with a science RDR; it will be None if that's not the case. forceA_packaging is a boolean that if set will force spacecraft RDR structures to be written out even if no spacecraft packets are present. """ rdr_types = set(given_rdr_types) if not rdr_types: raise RdrgenError("No RDR types to generate!") sci_type = {t for t in rdr_types if t.type_id == "SCIENCE"} rdr_types -= sci_type if len(sci_type) > 1: raise RdrgenError( "Cannot process more than 1 science RDR type at once " + "(have {})".format(", ".join(t.short_name for t in sci_type)) ) diary_type = {t for t in rdr_types if t is SpacecraftDiaryRdrType} rdr_types -= diary_type if rdr_types: raise RdrgenError( "Unsupported RDR type(s): " + ", ".join(t.short_name for t in rdr_types) ) if sci_type: primary_type, = sci_type packaged_type = ( SpacecraftDiaryRdrType if (diary_type or force_packaging) else None ) else: primary_type = SpacecraftDiaryRdrType packaged_type = None return primary_type, packaged_type class BinnedTemporaryFileManager(object): """Manage a set of append-mode temporary files each labeled with a 'bin key' Limits the number of file handles kept open at a time. """ def __init__(self, parent_dir=".", max_open_files=32): self.max_open_files = max_open_files self.dir = tempfile.mkdtemp(dir=parent_dir) self._file_paths = {} self._file_objs = OrderedDict() def add_data(self, bin_key, data): """Append some data to the temp file associated with bin_key Creates the file if needed. """ file_obj = self._file_objs.pop(bin_key, None) if not file_obj: file_path = self._file_paths.get(bin_key) if file_path: file_obj = open(file_path, "a+b") else: file_obj = tempfile.NamedTemporaryFile( dir=self.dir, delete=False, mode="wb" ) file_path = file_obj.name self._file_paths[bin_key] = file_path if len(self._file_objs) == self.max_open_files: _, old_file_obj = self._file_objs.popitem(last=False) old_file_obj.close() self._file_objs[bin_key] = file_obj file_obj.write(data) @contextmanager def process_file(self, bin_key): """'Check out' a file for processing Returns a context manager that provides a read-mode file handle and removes the file after processing. Can be called with a bin_key that has no data yet which just results in an empty file. """ file_obj = self._file_objs.pop(bin_key, None) if file_obj: file_obj.close() file_path = self._file_paths.pop(bin_key, "/dev/null") file_obj = open(file_path, "rb") try: yield file_obj finally: file_obj.close() if file_path != "/dev/null": os.remove(file_path) def clean_up(self): """Call after all files are processed to avoid leaving a dir sitting around""" shutil.rmtree(self.dir) missions = { "npp": "S-NPP/JPSS", "snpp": "S-NPP/JPSS", "j01": "NOAA-20/JPSS", "jpss1": "NOAA-20/JPSS", "noaa20": "NOAA-20/JPSS", } class RdrWriter(object): def __init__( self, sat, rdr_types, aggr_iet, aggr_level, dir_path=".", distributor=None, origin=None, domain=None, compressed=False, orbit_num=0, creation_time=None, software_ver=None, ): self._sat = sat origin = origin or self.default_origin distributor = distributor or origin self._domain = domain or self.default_domain self._orbit_num = orbit_num self._creation_time = creation_time or datetime.now() self._software_ver = ( software_ver or edosl0util.__name__ + "-" + edosl0util.__version__ ) aggr_end_iet = aggr_iet + aggr_level * rdr_types[0].gran_len self.file_name = make_rdr_filename( rdr_types, self._sat, aggr_iet, aggr_end_iet, self._orbit_num, self._creation_time, origin, self._domain, compressed, ) self._h5_file = h5py.File(os.path.join(dir_path, self.file_name), "w") self._write_skeleton(rdr_types, distributor, origin) self._aggr_starts = {} def _write_skeleton(self, rdr_types, distributor, origin): self._set_h5_attrs( self._h5_file, { "Distributor": distributor, "Mission_Name": missions[self._sat], "N_Dataset_Source": origin, "N_HDF_Creation_Date": self._format_date_attr(self._creation_time), "N_HDF_Creation_Time": self._format_time_attr(self._creation_time), "Platform_Short_Name": platform_short_names[self._sat], }, ) all_grp = self._h5_file.create_group("All_Data") prod_grp = self._h5_file.create_group("Data_Products") for rdr_type in rdr_types: all_grp.create_group(rdr_type.short_name + "_All") gran_grp = prod_grp.create_group(rdr_type.short_name) self._set_h5_attrs( gran_grp, { "Instrument_Short_Name": instrument_short_names[rdr_type.sensor], "N_Collection_Short_Name": rdr_type.short_name, "N_Dataset_Type_Tag": "RDR", "N_Processing_Domain": self._domain, }, ) def write_aggregate(self, rdr_type, aggr_iet, num_grans): self._aggr_starts[rdr_type] = aggr_iet grp = self._h5_file["Data_Products"][rdr_type.short_name] ds = grp.create_dataset( rdr_type.short_name + "_Aggr", [1], self._h5_ref_dtype, maxshape=[None] ) ds[0] = self._h5_file["All_Data/{}_All".format(rdr_type.short_name)].ref aggr_end_iet = aggr_iet + num_grans * rdr_type.gran_len last_gran_iet = aggr_end_iet - rdr_type.gran_len self._set_h5_attrs( ds, { "AggregateBeginningDate": self._format_date_attr(aggr_iet), "AggregateBeginningGranuleID": make_granule_id(self._sat, aggr_iet), "AggregateBeginningOrbitNumber": np.uint64(self._orbit_num), "AggregateBeginningTime": self._format_time_attr(aggr_iet), "AggregateEndingDate": self._format_date_attr(aggr_end_iet), "AggregateEndingGranuleID": make_granule_id(self._sat, last_gran_iet), "AggregateEndingOrbitNumber": np.uint64(self._orbit_num), "AggregateEndingTime": self._format_time_attr(aggr_end_iet), "AggregateNumberGranules": np.uint64(num_grans), }, ) def write_granule(self, rdr_type, gran_iet, blob, creation_time=None): raw_grp = self._h5_file["All_Data/{}_All".format(rdr_type.short_name)] gran_idx = (gran_iet - self._aggr_starts[rdr_type]) // rdr_type.gran_len raw_ds = raw_grp.create_dataset( "RawApplicationPackets_{}".format(gran_idx), data=blob, maxshape=[None] ) gran_grp = self._h5_file["Data_Products"][rdr_type.short_name] gran_ds = gran_grp.create_dataset( rdr_type.short_name + "_Gran_{}".format(gran_idx), [1], self._h5_regionref_dtype, maxshape=[None], ) gran_ds[0] = raw_ds.regionref[:] gran_end_iet = gran_iet + rdr_type.gran_len creation_time = creation_time or self._creation_time or datetime.now() gran_id = make_granule_id(self._sat, gran_iet) gran_ver = "A1" blob_info = decode_rdr_blob(blob) self._set_h5_attrs( gran_ds, { "Beginning_Date": self._format_date_attr(gran_iet), "Beginning_Time": self._format_time_attr(gran_iet), "Ending_Date": self._format_date_attr(gran_end_iet), "Ending_Time": self._format_time_attr(gran_end_iet), "N_Beginning_Orbit_Number": np.uint64(self._orbit_num), "N_Beginning_Time_IET": np.uint64(gran_iet), "N_Creation_Date": self._format_date_attr(creation_time), "N_Creation_Time": self._format_time_attr(creation_time), "N_Ending_Time_IET": np.uint64(gran_end_iet), "N_Granule_ID": gran_id, "N_Granule_Status": "N/A", "N_Granule_Version": gran_ver, "N_IDPS_Mode": self._domain, "N_JPSS_Document_Ref": rdr_type.document, "N_LEOA_Flag": "Off", "N_Packet_Type": [a.name for a in blob_info.apids], "N_Packet_Type_Count": [ np.uint64(a.pkts_received) for a in blob_info.apids ], "N_Percent_Missing_Data": np.float32( self._calc_percent_missing(blob_info) ), "N_Primary_Label": "Primary", # TODO: find out what this is "N_Reference_ID": ":".join([rdr_type.short_name, gran_id, gran_ver]), "N_Software_Version": self._software_ver, }, ) def close(self): self._h5_file.close() @staticmethod def _set_h5_attrs(h5_obj, attrs): attrs = _encodeall(attrs) def tpcnv(v): if isinstance(v, list): return [tpcnv(x) for x in v] elif isinstance(v, str): return np.array(v.encode('ascii'), dtype="S") elif isinstance(v, bytes): return np.array(v, dtype="S") return v # for some reason all the H5 attributes are in 2-D arrays in IDPS files for name, value in attrs.items(): if isinstance(value, list): value = [[x] for x in value] else: value = [[value]] h5_obj.attrs[name] = tpcnv(value) @staticmethod def _format_date_attr(t): return iet_to_datetime(t).strftime("%Y%m%d") @staticmethod def _format_time_attr(t): return iet_to_datetime(t).strftime("%H%M%S.%fZ") @staticmethod def _calc_percent_missing(common_rdr): total_received = sum(a.pkts_received for a in common_rdr.apids) total_reserved = sum(a.pkts_reserved for a in common_rdr.apids) return 100.0 * (total_reserved - total_received) / total_reserved default_origin = "ssec" default_domain = "dev" _h5_ref_dtype = h5py.special_dtype(ref=h5py.Reference) _h5_regionref_dtype = h5py.special_dtype(ref=h5py.RegionReference) def _encodeall(v): """ Recurse v encoding stringy values. dict keys are not encoded. """ if isinstance(v, dict): dtmp = {} for k, v in v.items(): dtmp[k] = _encodeall(v) return dtmp if isinstance(v, list): return [_encodeall(x) for x in v] if isinstance(v, compat.str_types): return v.encode() return v def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet, strict=True): """ Build an RDR blob of data for a single RDR native resolution granule. :param str sat: snpp or noaa20 :param .stream.PacketStream pkt_stream: Stream of packets for a single RDR native resolution granule. :param RdrType rdr_type: One of the *RDRType classes that indicates the type of packets contained in the stream. :param int granule_iet: Granule start time in IET. :param bool strict: When True any packet stream issues, duplicates, unexpected APIDs, etc..., will result in an error subclass of Error. :raise InvalidStream: If strict and the stream looks hokey :raise InvalidPacket: If strict and got a bunk packet """ get_jpss_packet_time = GetJpssPacketTime() granule_iet_end = granule_iet + rdr_type.gran_len total_pkt_size = 0 apid_info = OrderedDict() total_trackers = 0 all_pkts = [] for apid in rdr_type.apids: apid_info[apid.num] = { "name": apid.name.encode(), "pkts_reserved": apid.max_expected, "pkts_received": 0, "first_tracker_index": total_trackers, "pkt_info": [{} for _ in range(apid.max_expected)], } total_trackers += apid.max_expected for pkt in pkt_stream: if pkt.apid not in apid_info: if strict: raise InvalidPacket( "APID {} not expected for {}".format(pkt.apid, rdr_type.short_name), pkt, ) else: LOG.warning("dropping unexpected APID: %s", pkt) continue pkt_iet = get_jpss_packet_time(pkt) if not granule_iet <= pkt_iet < granule_iet_end: if strict: raise InvalidStream("packet stream crosses granule boundary") else: LOG.warning("dropping packet that crosses granule boundry: %s", pkt) continue info = apid_info[pkt.apid] if info["pkts_received"] > len(info["pkt_info"]) - 1: if strict: raise InvalidPacket("possible duplicate packet", pkt) else: LOG.warning("dropping possible duplicate packet: %s", pkt) continue pkt_info = info["pkt_info"][info["pkts_received"]] pkt_info["obs_time"] = pkt_iet pkt_info["seq_num"] = pkt.seqid pkt_info["size"] = pkt.size pkt_info["offset"] = total_pkt_size info["pkts_received"] += 1 total_pkt_size += pkt.size all_pkts.append(pkt) apid_list_offset = ctypes.sizeof(StaticHeader) pkt_tracker_offset = apid_list_offset + len(apid_info) * ctypes.sizeof(ApidListItem) ap_storage_offset = pkt_tracker_offset + total_trackers * ctypes.sizeof( PacketTracker ) buf_size = ap_storage_offset + total_pkt_size buf = np.zeros([buf_size], np.uint8) # zeros needed to null-pad strings header = StaticHeader.from_buffer(buf) header.satellite = platform_short_names[sat].encode() header.sensor = instrument_short_names[rdr_type.sensor].encode() header.type_id = rdr_type.type_id.encode() header.num_apids = len(apid_info) header.apid_list_offset = apid_list_offset header.pkt_tracker_offset = pkt_tracker_offset header.ap_storage_offset = ap_storage_offset header.next_pkt_pos = total_pkt_size header.start_boundary = granule_iet header.end_boundary = granule_iet_end for i, (apid, info) in enumerate(apid_info.items()): offset = header.apid_list_offset + i * ctypes.sizeof(ApidListItem) item = ApidListItem.from_buffer(buf, offset) item.name = info["name"] item.value = apid item.pkt_tracker_start_idx = info["first_tracker_index"] item.pkts_reserved = info["pkts_reserved"] item.pkts_received = info["pkts_received"] for j, pkt_info in enumerate(info["pkt_info"]): offset = header.pkt_tracker_offset + ( info["first_tracker_index"] + j ) * ctypes.sizeof(PacketTracker) tracker = PacketTracker.from_buffer(buf, offset) if pkt_info: tracker.obs_time = pkt_info["obs_time"] tracker.sequence_number = pkt_info["seq_num"] tracker.size = pkt_info["size"] tracker.offset = pkt_info["offset"] tracker.fill_percent = 0 else: tracker.offset = -1 buf[ap_storage_offset:] = bytearray().join(pkt.bytes() for pkt in all_pkts) return buf class ViirsScienceApidInfo(object): apids = list(x for x in range(800, 829) if x != 824) names = [ "M04", "M05", "M03", "M02", "M01", "M06", "M07", "M09", "M10", "M08", "M11", "M13", "M12", "I04", "M16", "M15", "M14", "I05", "I01", "I02", "I03", "DNB", "DNB_MGS", "DNB_LGS", "CAL", "ENG", "DNB_HGA", "DNB_HGB", ] @classmethod def get_specs(cls): return [ ApidSpec(apid, cls.get_name(apid), cls.get_max_expected(apid)) for apid in cls.apids ] @classmethod def get_name(cls, apid): return cls.names[cls.apids.index(apid)] @classmethod def get_max_expected(cls, apid): max_scans = 48 return max_scans * cls.get_packets_per_scan(apid) @classmethod def get_packets_per_scan(cls, apid): name = cls.get_name(apid) if name == "ENG": return 1 elif name == "CAL": return 24 elif name.startswith("M") or name.startswith("DNB"): return 17 else: return 33 class CrisScienceApidInfo(object): apids = [1289, 1290] + list(range(1315, 1396)) @classmethod def get_specs(cls): return [ ApidSpec(apid, cls.get_name(apid), cls.get_max_expected(apid)) for apid in cls.apids ] @classmethod def get_name(cls, apid): if apid == 1289: return "EIGHT_S_SCI" elif apid == 1290: return "ENG" else: offset = apid - 1315 view_types = ["N", "S", "C"] bands = ["LW", "MW", "SW"] num_fovs = 9 view_type = view_types[offset // (num_fovs * len(bands))] band = bands[offset // num_fovs % len(bands)] fov = str(offset % num_fovs + 1) return view_type + band + fov @classmethod def get_max_expected(cls, apid): name = cls.get_name(apid) if name == "EIGHT_S_SCI": return 5 elif name == "ENG": return 1 else: view_type = name[0] if view_type == "N": return 121 else: return 9 @attr.s class ApidSpec(object): num = attr.ib() name = attr.ib() max_expected = attr.ib() class RdrTypeManager(object): def __init__(self): self._types_by_apid = {} def register_type(self, cls): for apid_spec in cls.apids: if apid_spec.num in self._types_by_apid: raise ValueError("each APID can only be handled by one RDR type") self._types_by_apid[apid_spec.num] = cls return cls def get_type_for_apid(self, apid): try: return self._types_by_apid[apid] except KeyError: raise RdrgenError("Unsupported APID: {}".format(apid)) rdr_type_mgr = RdrTypeManager() rdr_type_spec = rdr_type_mgr.register_type get_rdr_type = rdr_type_mgr.get_type_for_apid @rdr_type_spec class ViirsScienceRdrType(object): product_id = "RVIRS" short_name = "VIIRS-SCIENCE-RDR" gran_len = 85350000 sensor = "viirs" type_id = "SCIENCE" document = "474-00448-02-06_JPSS-DD-Vol-II-Part-6_0200G.pdf" apids = ViirsScienceApidInfo.get_specs() default_aggregation = 1 @rdr_type_spec class CrisScienceRdrType(object): product_id = "RCRIS" short_name = "CRIS-SCIENCE-RDR" gran_len = 31997000 sensor = "cris" type_id = "SCIENCE" document = "474-00448-02-03_JPSS-DD-Vol-II-Part-3_0200B.pdf" apids = CrisScienceApidInfo.get_specs() default_aggregation = 15 @rdr_type_spec class AtmsScienceRdrType(object): product_id = "RATMS" short_name = "ATMS-SCIENCE-RDR" gran_len = 31997000 sensor = "atms" type_id = "SCIENCE" document = "474-00448-02-02_JPSS-DD-Vol-II-Part-2_0200B.pdf" apids = [ ApidSpec(515, "CAL", max_expected=5), ApidSpec(528, "SCI", max_expected=1249), ApidSpec(530, "ENG_TEMP", max_expected=13), ApidSpec(531, "ENG_HS", max_expected=5), ] default_aggregation = 15 @rdr_type_spec class SpacecraftDiaryRdrType(object): product_id = "RNSCA" short_name = "SPACECRAFT-DIARY-RDR" gran_len = 20000000 sensor = None type_id = "DIARY" document = "474-00448-02-08_JPSS-DD-Vol-II-Part-8_0200F.pdf" apids = [ ApidSpec(0, "CRITICAL", max_expected=21), ApidSpec(8, "ADCS_HKH", max_expected=21), ApidSpec(11, "DIARY", max_expected=21), ] default_aggregation = 303 class GetJpssPacketTime(object): def __init__(self): self._viirs_tracker = ViirsGroupedPacketTimeTracker() def __call__(self, pkt): if self._viirs_tracker.tracks_apid(pkt.apid): return self._viirs_tracker.get_iet(pkt) else: return get_packet_iet(pkt) class ViirsGroupedPacketTimeTracker(object): grouped_apids = list(range(800, 824)) + [825, 827, 828] @classmethod def tracks_apid(cls, apid): return apid in cls.grouped_apids def __init__(self): self._db = {} def get_iet(self, pkt): if not self.tracks_apid(pkt.apid): raise ValueError("APID {} not a VIIRS grouped packet type".format(pkt.apid)) if pkt.is_first(): obs_iet = get_packet_iet(pkt) self._db[pkt.apid] = (obs_iet, pkt.seqid) return obs_iet else: if pkt.apid not in self._db: # have not seen a first packet yet raise OrphanedViirsPacket(pkt) last_obs_iet, last_seq = self._db[pkt.apid] group_size = ViirsScienceApidInfo.get_packets_per_scan(pkt.apid) if not self.check_sequence_number(pkt.seqid, last_seq, group_size): raise OrphanedViirsPacket(pkt) if not self.check_packet_iet(self.get_viirs_iet(pkt), last_obs_iet): raise OrphanedViirsPacket(pkt) return last_obs_iet @staticmethod def get_viirs_iet(pkt): if pkt.is_standalone(): idx = 18 elif pkt.is_first(): idx = 20 else: idx = 10 arr = np.frombuffer(pkt.bytes()[idx : idx + 8], "B") # noqa days = arr[0:2].view(">u2")[0] ms = arr[2:6].view(">u4")[0] us = arr[6:8].view(">u2")[0] return cds_to_iet(days, ms, us) @staticmethod def check_sequence_number(nonfirst_seq_num, first_seq_num, group_size): seq_limit = 2 ** 14 group_end = first_seq_num + group_size # the 2nd check below is needed to handle wrap-around return ( first_seq_num < nonfirst_seq_num < group_end or first_seq_num < nonfirst_seq_num + seq_limit < group_end ) @staticmethod def check_packet_iet(pkt_iet, obs_iet): # this can be a pretty loose check since it is only needed to prevent # the very rare case where the sequence number check yields a false # positive permitted_lag = 5 # seconds lag = (pkt_iet - obs_iet) * 1e-6 return 0 <= lag <= permitted_lag class RdrgenError(Exception): pass class OrphanedViirsPacket(RdrgenError): def __init__(self, pkt): self.packet = pkt def __str__(self): return repr(self.packet) def make_rdr_filename( rdr_types, sat, aggr_begin, aggr_end, orbit_num, creation_time, origin, domain, compressed, ): aggr_begin = iet_to_datetime(aggr_begin) aggr_end = iet_to_datetime(aggr_end) prod_ids = "-".join(sorted(t.product_id for t in rdr_types)) sat = {"snpp": "npp", "j01": "j01"}[sat] if origin.endswith("-"): origin = origin[:-1] + ("c" if compressed else "u") def format_time(t): return t.strftime("%H%M%S") + str(t.microsecond // 100000) return "{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n:05d}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5".format( # noqa p=prod_ids, s=sat, d=aggr_begin, b=format_time(aggr_begin), e=format_time(aggr_end), n=orbit_num, c=creation_time, o=origin, m=domain, ) def make_granule_id(sat, gran_iet): tenths = (gran_iet - satellite_base_times[sat]) // 100000 return "{}{:012d}".format(platform_short_names[sat], tenths) def get_packet_iet(pkt): tc = pkt.secondary_header.timecode return cds_to_iet(tc.days, tc.milliseconds, tc.microseconds) def iet_to_datetime(iet): if isinstance(iet, datetime): return iet return iet_to_dt(iet) def get_granule_start(sat, gran_len, iet): base_time = satellite_base_times[sat] return (iet - base_time) // gran_len * gran_len + base_time def get_aggregate_start(sat, gran_len, grans_per_aggr, iet): # see "DDS Aggregation Methodology" in CDFCB vol I Y = gran_len N = grans_per_aggr G_s = get_granule_start(sat, Y, iet) A_n = G_s // (N * Y) O = G_s % Y # noqa A_s = A_n * (Y * N) + O return A_s def get_aggregate_granule_times(sat, gran_len, aggr_level, aggr_iet): aggr_start = get_aggregate_start(sat, gran_len, aggr_level, aggr_iet) aggr_end = aggr_start + aggr_level * gran_len return get_overlapping_granules(sat, gran_len, aggr_start, aggr_end) def get_overlapping_granules(sat, gran_len, start_iet, stop_iet): rv = [] gran_iet = get_granule_start(sat, gran_len, start_iet) while gran_iet < stop_iet: rv.append(gran_iet) gran_iet += gran_len return rv satellite_base_times = {"snpp": 1698019234000000, "j01": 1698019234000000} platform_short_names = {"snpp": "NPP", "j01": "J01"} instrument_short_names = { "viirs": "VIIRS", "cris": "CrIS", "atms": "ATMS", None: "SPACECRAFT", }