Newer
Older
from contextlib import contextmanager
from edosl0util import compat
from edosl0util.jpssrdr import Apid as ApidListItem
from edosl0util.jpssrdr import PacketTracker, StaticHeader, decode_rdr_blob
from edosl0util.stream import jpss_packet_stream, collect_groups
from edosl0util.timecode import cds_to_iet, iet_to_dt
LOG = logging.getLogger(__name__)
class Error(Exception):
"""RDR generation base error.
"""
class InvalidStream(Error):
""" Packet stream is not valid for RDR
"""
class InvalidPacket(Error):
""" Invalid packet for the current RDR.
"""
def __init__(self, msg, packet):
self.msg = msg
self.packet = packet
def iter_pkts(l0_files):
for l0_file in l0_files:
with open(l0_file, "rb") as l0_file_obj:
for pkt in jpss_packet_stream(l0_file_obj):
yield pkt
def filter_group_orphans(s):
"""
Filter out any packets in a group that do not fall withing the expected
sequence id series and expected packets per group. This really only
applies to VIIRS data since it's the only sensor that utilizes groups.
This is useful for input RDR packet streams received from direct braodcast
that may have sub-ideal packet sequences that RDR generation does not play
well with.
"""
viirs_apids = set(ViirsScienceApidInfo.apids)
# If it's a VIIRS grouped packet, check for orphans
if (
p.apid in viirs_apids
and ViirsScienceApidInfo.get_packets_per_scan(p.apid) > 1
):
try:
viirs_tracker.get_iet(p)
except OrphanedViirsPacket:
LOG.debug("dropping orphan %s", p)
def filter_before(s, before):
"""
Filter all packets before the provided time. This handles groupped packets
by dropping the entire group if its first packet has a timestamp before
the provided time.
for grp in collect_groups(s):
if grp[0].stamp < before:
LOG.debug("dropping before[%s] %s", before, grp[0])
continue
yield from grp
def build_rdr(
sat,
pkt_iter,
output_dir=".",
aggr_type="idps",
aggr_level=None,
diary_cushion=10000000,
attr_overrides={},
"""Construct RDR file(s) from L0 packets
Default aggregation behavior uses file boundaries computed in the same way
as for IDPS assuming the default aggregation level depending on the instrument
(e.g. 85 sec for VIIRS, 8 min for CrIS). A different aggregation level can
be given as an integer number of granule (e.g. 4 to get roughly 5.5 min files
for VIIRS). Finally, aggr_type can be set to 'full' to request that a single
file gets produced (thus aggregating all granules containing the given
packets).
"""
# divy packets up into temp files organized by granule
file_mgr = BinnedTemporaryFileManager()
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
try:
get_jpss_packet_time = GetJpssPacketTime()
gran_infos = set() # (rdr_type, gran_iet) pairs
for pkt in pkt_iter:
rdr_type = get_rdr_type(pkt.apid)
pkt_iet = get_jpss_packet_time(pkt)
gran_iet = get_granule_start(sat, rdr_type.gran_len, pkt_iet)
gran_info = (rdr_type, gran_iet)
gran_infos.add(gran_info)
file_mgr.add_data(gran_info, pkt.bytes())
# determine what RDR files we'll be producing based on the packets we've seen
rdr_types = set(rdr_type for rdr_type, gran_iet in gran_infos)
primary_type, packaged_type = process_rdr_types(
rdr_types, force_packaging=False
)
rdr_types = sorted(rdr_types, key=(lambda t: 1 if t is primary_type else 2))
if aggr_type == "idps":
aggr_level = aggr_level or primary_type.default_aggregation
primary_aggr_iets = sorted(
set(
get_aggregate_start(
sat, primary_type.gran_len, aggr_level, gran_iet
)
for (rdr_type, gran_iet) in gran_infos
if rdr_type is primary_type
)
)
elif aggr_type == "full":
# produce a single output file, ignoring IDPS-style aggregation boundaries
assert aggr_level is None
first_gran_iet = min(
gran_iet
for (rdr_type, gran_iet) in gran_infos
if rdr_type is primary_type
)
last_gran_iet = max(
gran_iet
for (rdr_type, gran_iet) in gran_infos
if rdr_type is primary_type
aggr_level = (last_gran_iet - first_gran_iet) // primary_type.gran_len + 1
primary_aggr_iets = [first_gran_iet]
else:
raise ValueError("aggr_type must be idps or input")
# now generate the RDRs
rdr_files = []
for aggr_iet in primary_aggr_iets:
rdr_writer = RdrWriter(
sat, rdr_types, aggr_iet, aggr_level, output_dir, **attr_overrides
rdr_writer.write_aggregate(primary_type, aggr_iet, aggr_level)
gran_iets = [
aggr_iet + i * primary_type.gran_len for i in range(aggr_level)
]
for gran_iet in gran_iets:
with file_mgr.process_file((primary_type, gran_iet)) as pkt_file:
pkts = list(jpss_packet_stream(pkt_file))
blob = build_rdr_blob(sat, pkts, primary_type, gran_iet, strict=strict)
rdr_writer.write_granule(primary_type, gran_iet, blob)
if packaged_type:
packaged_gran_iets = get_overlapping_granules(
sat,
packaged_type.gran_len,
aggr_iet - diary_cushion,
aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1,
)
rdr_writer.write_aggregate(
packaged_type, packaged_gran_iets[0], len(packaged_gran_iets)
)
for gran_iet in packaged_gran_iets:
with file_mgr.process_file((packaged_type, gran_iet)) as pkt_file:
pkts = list(jpss_packet_stream(pkt_file))
blob = build_rdr_blob(
sat, pkts, packaged_type, gran_iet, strict=strict
)
rdr_writer.write_granule(packaged_type, gran_iet, blob)
rdr_writer.close()
rdr_files.append(rdr_writer.file_name)
finally:
file_mgr.clean_up()
def process_rdr_types(given_rdr_types, force_packaging):
"""Determine the RDR type we'll be making based on the packets we've been given
Return both a primary_type and packaged_type. The primary type could indicate
either a science or spacecraft RDR. The packaged type is used when spacecraft
packets are to be packaged along with a science RDR; it will be None if that's
not the case. forceA_packaging is a boolean that if set will force spacecraft
RDR structures to be written out even if no spacecraft packets are present.
"""
rdr_types = set(given_rdr_types)
if not rdr_types:
raise RdrgenError("No RDR types to generate!")
sci_type = {t for t in rdr_types if t.type_id == "SCIENCE"}
rdr_types -= sci_type
if len(sci_type) > 1:
raise RdrgenError(
"Cannot process more than 1 science RDR type at once "
+ "(have {})".format(", ".join(t.short_name for t in sci_type))
)
diary_type = {t for t in rdr_types if t is SpacecraftDiaryRdrType}
rdr_types -= diary_type
if rdr_types:
raise RdrgenError(
"Unsupported RDR type(s): " + ", ".join(t.short_name for t in rdr_types)
)
if sci_type:
primary_type, = sci_type
packaged_type = (
SpacecraftDiaryRdrType if (diary_type or force_packaging) else None
)
else:
primary_type = SpacecraftDiaryRdrType
packaged_type = None
return primary_type, packaged_type
"""Manage a set of append-mode temporary files each labeled with a 'bin key'
Limits the number of file handles kept open at a time.
"""
self.max_open_files = max_open_files
self.dir = tempfile.mkdtemp(dir=parent_dir)
self._file_paths = {}
self._file_objs = OrderedDict()
def add_data(self, bin_key, data):
"""Append some data to the temp file associated with bin_key
Creates the file if needed.
"""
file_obj = self._file_objs.pop(bin_key, None)
if not file_obj:
file_path = self._file_paths.get(bin_key)
if file_path:
file_obj = tempfile.NamedTemporaryFile(
dir=self.dir, delete=False, mode="wb"
)
file_path = file_obj.name
self._file_paths[bin_key] = file_path
if len(self._file_objs) == self.max_open_files:
_, old_file_obj = self._file_objs.popitem(last=False)
old_file_obj.close()
self._file_objs[bin_key] = file_obj
file_obj.write(data)
@contextmanager
def process_file(self, bin_key):
"""'Check out' a file for processing
Returns a context manager that provides a read-mode file handle and
removes the file after processing. Can be called with a bin_key that
has no data yet which just results in an empty file.
"""
file_obj = self._file_objs.pop(bin_key, None)
if file_obj:
file_path = self._file_paths.pop(bin_key, "/dev/null")
file_obj = open(file_path, "rb")
try:
yield file_obj
finally:
os.remove(file_path)
def clean_up(self):
"""Call after all files are processed to avoid leaving a dir sitting around"""
missions = {
"npp": "S-NPP/JPSS",
"snpp": "S-NPP/JPSS",
"j01": "NOAA-20/JPSS",
"jpss1": "NOAA-20/JPSS",
"noaa20": "NOAA-20/JPSS",
}
def __init__(
self,
sat,
rdr_types,
aggr_iet,
aggr_level,
dir_path=".",
distributor=None,
origin=None,
domain=None,
compressed=False,
orbit_num=0,
creation_time=None,
software_ver=None,
):
self._sat = sat
origin = origin or self.default_origin
distributor = distributor or origin
self._domain = domain or self.default_domain
self._orbit_num = orbit_num
self._creation_time = creation_time or datetime.now()
self._software_ver = (
software_ver or edosl0util.__name__ + "-" + edosl0util.__version__
)
aggr_end_iet = aggr_iet + aggr_level * rdr_types[0].gran_len
self.file_name = make_rdr_filename(
rdr_types,
self._sat,
aggr_iet,
aggr_end_iet,
self._orbit_num,
self._creation_time,
origin,
self._domain,
compressed,
)
self._h5_file = h5py.File(os.path.join(dir_path, self.file_name), "w")
self._write_skeleton(rdr_types, distributor, origin)
self._aggr_starts = {}
def _write_skeleton(self, rdr_types, distributor, origin):
self._set_h5_attrs(
self._h5_file,
{
"Distributor": distributor,
"N_Dataset_Source": origin,
"N_HDF_Creation_Date": self._format_date_attr(self._creation_time),
"N_HDF_Creation_Time": self._format_time_attr(self._creation_time),
"Platform_Short_Name": platform_short_names[self._sat],
},
)
all_grp = self._h5_file.create_group("All_Data")
prod_grp = self._h5_file.create_group("Data_Products")
gran_grp = prod_grp.create_group(rdr_type.short_name)
self._set_h5_attrs(
gran_grp,
{
"Instrument_Short_Name": instrument_short_names[rdr_type.sensor],
"N_Collection_Short_Name": rdr_type.short_name,
"N_Dataset_Type_Tag": "RDR",
"N_Processing_Domain": self._domain,
},
)
def write_aggregate(self, rdr_type, aggr_iet, num_grans):
self._aggr_starts[rdr_type] = aggr_iet
grp = self._h5_file["Data_Products"][rdr_type.short_name]
rdr_type.short_name + "_Aggr", [1], self._h5_ref_dtype, maxshape=[None]
)
ds[0] = self._h5_file["All_Data/{}_All".format(rdr_type.short_name)].ref
aggr_end_iet = aggr_iet + num_grans * rdr_type.gran_len
last_gran_iet = aggr_end_iet - rdr_type.gran_len
self._set_h5_attrs(
ds,
{
"AggregateBeginningDate": self._format_date_attr(aggr_iet),
"AggregateBeginningGranuleID": make_granule_id(self._sat, aggr_iet),
"AggregateBeginningOrbitNumber": np.uint64(self._orbit_num),
"AggregateBeginningTime": self._format_time_attr(aggr_iet),
"AggregateEndingDate": self._format_date_attr(aggr_end_iet),
"AggregateEndingGranuleID": make_granule_id(self._sat, last_gran_iet),
"AggregateEndingOrbitNumber": np.uint64(self._orbit_num),
"AggregateEndingTime": self._format_time_attr(aggr_end_iet),
"AggregateNumberGranules": np.uint64(num_grans),
},
)
def write_granule(self, rdr_type, gran_iet, blob, creation_time=None):
raw_grp = self._h5_file["All_Data/{}_All".format(rdr_type.short_name)]
gran_idx = (gran_iet - self._aggr_starts[rdr_type]) // rdr_type.gran_len
"RawApplicationPackets_{}".format(gran_idx), data=blob, maxshape=[None]
)
gran_grp = self._h5_file["Data_Products"][rdr_type.short_name]
rdr_type.short_name + "_Gran_{}".format(gran_idx),
[1],
self._h5_regionref_dtype,
maxshape=[None],
)
gran_end_iet = gran_iet + rdr_type.gran_len
creation_time = creation_time or self._creation_time or datetime.now()
gran_id = make_granule_id(self._sat, gran_iet)
blob_info = decode_rdr_blob(blob)
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
self._set_h5_attrs(
gran_ds,
{
"Beginning_Date": self._format_date_attr(gran_iet),
"Beginning_Time": self._format_time_attr(gran_iet),
"Ending_Date": self._format_date_attr(gran_end_iet),
"Ending_Time": self._format_time_attr(gran_end_iet),
"N_Beginning_Orbit_Number": np.uint64(self._orbit_num),
"N_Beginning_Time_IET": np.uint64(gran_iet),
"N_Creation_Date": self._format_date_attr(creation_time),
"N_Creation_Time": self._format_time_attr(creation_time),
"N_Ending_Time_IET": np.uint64(gran_end_iet),
"N_Granule_ID": gran_id,
"N_Granule_Status": "N/A",
"N_Granule_Version": gran_ver,
"N_IDPS_Mode": self._domain,
"N_JPSS_Document_Ref": rdr_type.document,
"N_LEOA_Flag": "Off",
"N_Packet_Type": [a.name for a in blob_info.apids],
"N_Packet_Type_Count": [
np.uint64(a.pkts_received) for a in blob_info.apids
],
"N_Percent_Missing_Data": np.float32(
self._calc_percent_missing(blob_info)
),
"N_Primary_Label": "Primary", # TODO: find out what this is
"N_Reference_ID": ":".join([rdr_type.short_name, gran_id, gran_ver]),
"N_Software_Version": self._software_ver,
},
)
def close(self):
self._h5_file.close()
@staticmethod
def _set_h5_attrs(h5_obj, attrs):
attrs = _encodeall(attrs)
def tpcnv(v):
if isinstance(v, list):
return [tpcnv(x) for x in v]
elif isinstance(v, str):
return np.array(v.encode('ascii'), dtype="S")
elif isinstance(v, bytes):
return np.array(v, dtype="S")
return v
# for some reason all the H5 attributes are in 2-D arrays in IDPS files
for name, value in attrs.items():
if isinstance(value, list):
value = [[x] for x in value]
else:
value = [[value]]
@staticmethod
def _format_date_attr(t):
@staticmethod
def _format_time_attr(t):
@staticmethod
def _calc_percent_missing(common_rdr):
total_received = sum(a.pkts_received for a in common_rdr.apids)
total_reserved = sum(a.pkts_reserved for a in common_rdr.apids)
return 100.0 * (total_reserved - total_received) / total_reserved
_h5_ref_dtype = h5py.special_dtype(ref=h5py.Reference)
_h5_regionref_dtype = h5py.special_dtype(ref=h5py.RegionReference)
def _encodeall(v):
"""
Recurse v encoding stringy values. dict keys are not encoded.
"""
if isinstance(v, dict):
dtmp = {}
for k, v in v.items():
dtmp[k] = _encodeall(v)
return dtmp
if isinstance(v, list):
return [_encodeall(x) for x in v]
if isinstance(v, compat.str_types):
return v.encode()
return v
def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet, strict=True):
""" Build an RDR blob of data for a single RDR native resolution granule.
:param str sat: snpp or noaa20
:param .stream.PacketStream pkt_stream:
Stream of packets for a single RDR native resolution granule.
:param RdrType rdr_type:
One of the *RDRType classes that indicates the type of packets contained
in the stream.
:param int granule_iet: Granule start time in IET.
:param bool strict:
When True any packet stream issues, duplicates, unexpected APIDs, etc...,
will result in an error subclass of Error.
:raise InvalidStream: If strict and the stream looks hokey
:raise InvalidPacket: If strict and got a bunk packet
"""
get_jpss_packet_time = GetJpssPacketTime()
total_pkt_size = 0
apid_info = OrderedDict()
total_trackers = 0
all_pkts = []
for apid in rdr_type.apids:
apid_info[apid.num] = {
"name": apid.name.encode(),
"pkts_reserved": apid.max_expected,
"pkts_received": 0,
"first_tracker_index": total_trackers,
"pkt_info": [{} for _ in range(apid.max_expected)],
}
if strict:
raise InvalidPacket(
"APID {} not expected for {}".format(pkt.apid, rdr_type.short_name),
pkt,
)
else:
LOG.warning("dropping unexpected APID: %s", pkt)
continue
if not granule_iet <= pkt_iet < granule_iet_end:
if strict:
raise InvalidStream("packet stream crosses granule boundary")
else:
LOG.warning("dropping packet that crosses granule boundry: %s", pkt)
continue
if info["pkts_received"] > len(info["pkt_info"]) - 1:
if strict:
raise InvalidPacket("possible duplicate packet", pkt)
else:
LOG.warning("dropping possible duplicate packet: %s", pkt)
continue
pkt_info = info["pkt_info"][info["pkts_received"]]
pkt_info["obs_time"] = pkt_iet
pkt_info["seq_num"] = pkt.seqid
pkt_info["size"] = pkt.size
pkt_info["offset"] = total_pkt_size
info["pkts_received"] += 1
total_pkt_size += pkt.size
all_pkts.append(pkt)
apid_list_offset = ctypes.sizeof(StaticHeader)
pkt_tracker_offset = apid_list_offset + len(apid_info) * ctypes.sizeof(ApidListItem)
ap_storage_offset = pkt_tracker_offset + total_trackers * ctypes.sizeof(
PacketTracker
)
buf_size = ap_storage_offset + total_pkt_size
buf = np.zeros([buf_size], np.uint8) # zeros needed to null-pad strings
header = StaticHeader.from_buffer(buf)
header.satellite = platform_short_names[sat].encode()
header.sensor = instrument_short_names[rdr_type.sensor].encode()
header.type_id = rdr_type.type_id.encode()
header.num_apids = len(apid_info)
header.apid_list_offset = apid_list_offset
header.pkt_tracker_offset = pkt_tracker_offset
header.ap_storage_offset = ap_storage_offset
header.next_pkt_pos = total_pkt_size
header.start_boundary = granule_iet
header.end_boundary = granule_iet_end
for i, (apid, info) in enumerate(apid_info.items()):
offset = header.apid_list_offset + i * ctypes.sizeof(ApidListItem)
item = ApidListItem.from_buffer(buf, offset)
item.pkt_tracker_start_idx = info["first_tracker_index"]
item.pkts_reserved = info["pkts_reserved"]
item.pkts_received = info["pkts_received"]
for j, pkt_info in enumerate(info["pkt_info"]):
offset = header.pkt_tracker_offset + (
info["first_tracker_index"] + j
) * ctypes.sizeof(PacketTracker)
tracker = PacketTracker.from_buffer(buf, offset)
if pkt_info:
tracker.obs_time = pkt_info["obs_time"]
tracker.sequence_number = pkt_info["seq_num"]
tracker.size = pkt_info["size"]
tracker.offset = pkt_info["offset"]
tracker.fill_percent = 0
else:
tracker.offset = -1
buf[ap_storage_offset:] = bytearray().join(pkt.bytes() for pkt in all_pkts)
return buf
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
names = [
"M04",
"M05",
"M03",
"M02",
"M01",
"M06",
"M07",
"M09",
"M10",
"M08",
"M11",
"M13",
"M12",
"I04",
"M16",
"M15",
"M14",
"I05",
"I01",
"I02",
"I03",
"DNB",
"DNB_MGS",
"DNB_LGS",
"CAL",
"ENG",
"DNB_HGA",
"DNB_HGB",
]
return [
ApidSpec(apid, cls.get_name(apid), cls.get_max_expected(apid))
for apid in cls.apids
]
@classmethod
def get_name(cls, apid):
return cls.names[cls.apids.index(apid)]
@classmethod
def get_max_expected(cls, apid):
max_scans = 48
return max_scans * cls.get_packets_per_scan(apid)
@classmethod
def get_packets_per_scan(cls, apid):
name = cls.get_name(apid)
return 17
else:
return 33
class CrisScienceApidInfo(object):
apids = [1289, 1290] + list(range(1315, 1396))
@classmethod
def get_specs(cls):
return [
ApidSpec(apid, cls.get_name(apid), cls.get_max_expected(apid))
for apid in cls.apids
]
@classmethod
def get_name(cls, apid):
if apid == 1289:
view_types = ["N", "S", "C"]
bands = ["LW", "MW", "SW"]
num_fovs = 9
view_type = view_types[offset // (num_fovs * len(bands))]
band = bands[offset // num_fovs % len(bands)]
fov = str(offset % num_fovs + 1)
return view_type + band + fov
@classmethod
def get_max_expected(cls, apid):
name = cls.get_name(apid)
return 1
else:
view_type = name[0]
return 121
else:
return 9
@attr.s
class ApidSpec(object):
num = attr.ib()
name = attr.ib()
max_expected = attr.ib()
class RdrTypeManager(object):
def __init__(self):
self._types_by_apid = {}
def register_type(self, cls):
for apid_spec in cls.apids:
if apid_spec.num in self._types_by_apid:
raise ValueError("each APID can only be handled by one RDR type")
self._types_by_apid[apid_spec.num] = cls
return cls
def get_type_for_apid(self, apid):
try:
return self._types_by_apid[apid]
except KeyError:
rdr_type_mgr = RdrTypeManager()
rdr_type_spec = rdr_type_mgr.register_type
get_rdr_type = rdr_type_mgr.get_type_for_apid
@rdr_type_spec
class ViirsScienceRdrType(object):
product_id = "RVIRS"
short_name = "VIIRS-SCIENCE-RDR"
sensor = "viirs"
type_id = "SCIENCE"
document = "474-00448-02-06_JPSS-DD-Vol-II-Part-6_0200G.pdf"
product_id = "RCRIS"
short_name = "CRIS-SCIENCE-RDR"
sensor = "cris"
type_id = "SCIENCE"
document = "474-00448-02-03_JPSS-DD-Vol-II-Part-3_0200B.pdf"
apids = CrisScienceApidInfo.get_specs()
@rdr_type_spec
class AtmsScienceRdrType(object):
product_id = "RATMS"
short_name = "ATMS-SCIENCE-RDR"
sensor = "atms"
type_id = "SCIENCE"
document = "474-00448-02-02_JPSS-DD-Vol-II-Part-2_0200B.pdf"
apids = [
ApidSpec(515, "CAL", max_expected=5),
ApidSpec(528, "SCI", max_expected=1249),
ApidSpec(530, "ENG_TEMP", max_expected=13),
ApidSpec(531, "ENG_HS", max_expected=5),
]
product_id = "RNSCA"
short_name = "SPACECRAFT-DIARY-RDR"
type_id = "DIARY"
document = "474-00448-02-08_JPSS-DD-Vol-II-Part-8_0200F.pdf"
apids = [
ApidSpec(0, "CRITICAL", max_expected=21),
ApidSpec(8, "ADCS_HKH", max_expected=21),
ApidSpec(11, "DIARY", max_expected=21),
]
default_aggregation = 303
class GetJpssPacketTime(object):
def __init__(self):
self._viirs_tracker = ViirsGroupedPacketTimeTracker()
def __call__(self, pkt):
if self._viirs_tracker.tracks_apid(pkt.apid):
return self._viirs_tracker.get_iet(pkt)
else:
return get_packet_iet(pkt)
class ViirsGroupedPacketTimeTracker(object):
grouped_apids = list(range(800, 824)) + [825, 827, 828]
@classmethod
def tracks_apid(cls, apid):
return apid in cls.grouped_apids
def __init__(self):
self._db = {}
def get_iet(self, pkt):
if not self.tracks_apid(pkt.apid):
raise ValueError("APID {} not a VIIRS grouped packet type".format(pkt.apid))
if pkt.is_first():
obs_iet = get_packet_iet(pkt)
self._db[pkt.apid] = (obs_iet, pkt.seqid)
return obs_iet
else:
if pkt.apid not in self._db:
# have not seen a first packet yet
raise OrphanedViirsPacket(pkt)
last_obs_iet, last_seq = self._db[pkt.apid]
group_size = ViirsScienceApidInfo.get_packets_per_scan(pkt.apid)
if not self.check_sequence_number(pkt.seqid, last_seq, group_size):
raise OrphanedViirsPacket(pkt)
if not self.check_packet_iet(self.get_viirs_iet(pkt), last_obs_iet):
raise OrphanedViirsPacket(pkt)
return last_obs_iet
@staticmethod
def get_viirs_iet(pkt):
if pkt.is_standalone():
idx = 18
elif pkt.is_first():
idx = 20
else:
idx = 10
arr = np.frombuffer(pkt.bytes()[idx : idx + 8], "B") # noqa
days = arr[0:2].view(">u2")[0]
ms = arr[2:6].view(">u4")[0]
us = arr[6:8].view(">u2")[0]
@staticmethod
def check_sequence_number(nonfirst_seq_num, first_seq_num, group_size):
group_end = first_seq_num + group_size
# the 2nd check below is needed to handle wrap-around
return (
first_seq_num < nonfirst_seq_num < group_end
or first_seq_num < nonfirst_seq_num + seq_limit < group_end
)
@staticmethod
def check_packet_iet(pkt_iet, obs_iet):
# this can be a pretty loose check since it is only needed to prevent
# the very rare case where the sequence number check yields a false
# positive
permitted_lag = 5 # seconds
lag = (pkt_iet - obs_iet) * 1e-6
return 0 <= lag <= permitted_lag
class RdrgenError(Exception):
pass
class OrphanedViirsPacket(RdrgenError):
def __init__(self, pkt):
self.packet = pkt
def __str__(self):
return repr(self.packet)
def make_rdr_filename(
rdr_types,
sat,
aggr_begin,
aggr_end,
orbit_num,
creation_time,
origin,
domain,
compressed,
):
aggr_begin = iet_to_datetime(aggr_begin)
aggr_end = iet_to_datetime(aggr_end)
prod_ids = "-".join(sorted(t.product_id for t in rdr_types))
sat = {"snpp": "npp", "j01": "j01"}[sat]
if origin.endswith("-"):
origin = origin[:-1] + ("c" if compressed else "u")
return t.strftime("%H%M%S") + str(t.microsecond // 100000)
return "{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n:05d}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5".format( # noqa
p=prod_ids,
s=sat,
d=aggr_begin,
b=format_time(aggr_begin),
e=format_time(aggr_end),
n=orbit_num,
c=creation_time,
o=origin,
m=domain,
)
def make_granule_id(sat, gran_iet):
tenths = (gran_iet - satellite_base_times[sat]) // 100000
return "{}{:012d}".format(platform_short_names[sat], tenths)
def get_packet_iet(pkt):
tc = pkt.secondary_header.timecode
return cds_to_iet(tc.days, tc.milliseconds, tc.microseconds)
if isinstance(iet, datetime):
return iet
def get_granule_start(sat, gran_len, iet):
base_time = satellite_base_times[sat]
return (iet - base_time) // gran_len * gran_len + base_time
def get_aggregate_start(sat, gran_len, grans_per_aggr, iet):
# see "DDS Aggregation Methodology" in CDFCB vol I
Y = gran_len
N = grans_per_aggr
G_s = get_granule_start(sat, Y, iet)
def get_aggregate_granule_times(sat, gran_len, aggr_level, aggr_iet):
aggr_start = get_aggregate_start(sat, gran_len, aggr_level, aggr_iet)
aggr_end = aggr_start + aggr_level * gran_len
return get_overlapping_granules(sat, gran_len, aggr_start, aggr_end)
def get_overlapping_granules(sat, gran_len, start_iet, stop_iet):
rv = []
gran_iet = get_granule_start(sat, gran_len, start_iet)
while gran_iet < stop_iet:
rv.append(gran_iet)
gran_iet += gran_len
return rv
satellite_base_times = {"snpp": 1698019234000000, "j01": 1698019234000000}
platform_short_names = {"snpp": "NPP", "j01": "J01"}
instrument_short_names = {
"viirs": "VIIRS",
"cris": "CrIS",
"atms": "ATMS",
None: "SPACECRAFT",