diff --git a/edosl0util/rdrgen.py b/edosl0util/rdrgen.py index 8492ff1513b79fd988fdbbed693242f603385f63..7b1e516b7b2b848f9d860639b60946c4d151f11a 100644 --- a/edosl0util/rdrgen.py +++ b/edosl0util/rdrgen.py @@ -2,6 +2,7 @@ import ctypes import os import tempfile from collections import OrderedDict +from contextlib import contextmanager from datetime import datetime import attr @@ -15,40 +16,107 @@ from edosl0util.jpssrdr import ( from edosl0util.stream import jpss_packet_stream -def packets_to_rdrs(sat, pkt_files): +def packets_to_rdrs(sat, pkt_files, aggr_level=None, attr_overrides={}): # divy packets up into temp files organized by granule - rdr_pkt_files = BinnedTemporaryFileManager() + file_mgr = BinnedTemporaryFileManager() get_jpss_packet_time = GetJpssPacketTime() + gran_infos = set() # (rdr_type, gran_iet) pairs for pkt_file in pkt_files: with open(pkt_file) as file_obj: stream = jpss_packet_stream(file_obj) for pkt in stream: rdr_type = get_rdr_type(pkt.apid) pkt_iet = get_jpss_packet_time(pkt) - gran = calc_rdr_granule(sat, rdr_type, pkt_iet) - rdr_pkt_files.add_data((rdr_type, gran), pkt.bytes()) - - # make RDRs one granule at a time - get_jpss_packet_time = GetJpssPacketTime() - for (rdr_type, gran), rdr_pkt_file in rdr_pkt_files.process_files(): - pkts = list(jpss_packet_stream(rdr_pkt_file)) - pkt_times = {p: get_jpss_packet_time(p) for p in pkts} - pkts.sort(key=(lambda p: (pkt_times[p], p.apid))) - blob = build_rdr_blob(sat, pkts, rdr_type, gran) - write_rdr(sat, blob) + gran_iet = get_granule_start(sat, rdr_type.gran_len, pkt_iet) + gran_info = (rdr_type, gran_iet) + gran_infos.add(gran_info) + file_mgr.add_data(gran_info, pkt.bytes()) + + # determine what RDR files we'll be producing based on the packets we've seen + rdr_types = set(rdr_type for rdr_type, gran_iet in gran_infos) + primary_type, packaged_type = process_rdr_types(rdr_types, force_packaging=False) + rdr_types = sorted(rdr_types, key=(lambda t: 1 if t is primary_type else 2)) + aggr_level = aggr_level or primary_type.default_aggregation + primary_aggr_iets = sorted(set( + get_aggregate_start(sat, primary_type.gran_len, aggr_level, gran_iet) + for (rdr_type, gran_iet) in gran_infos if rdr_type is primary_type)) + + # now generate the RDRs + for aggr_iet in primary_aggr_iets: + rdr_writer = RdrWriter(sat, rdr_types, aggr_iet, aggr_level, **attr_overrides) + rdr_writer.write_aggregate(primary_type, aggr_iet, aggr_level) + gran_iets = get_aggregate_granule_times( + sat, primary_type.gran_len, aggr_level, aggr_iet) + for gran_iet in gran_iets: + with file_mgr.process_file((primary_type, gran_iet)) as pkt_file: + pkts = list(jpss_packet_stream(pkt_file)) + blob = build_rdr_blob(sat, pkts, primary_type, gran_iet) + rdr_writer.write_granule(primary_type, gran_iet, blob) + if packaged_type: + cushion = 1000000 # 1 second; is this consistent with IDPS? + packaged_gran_iets = get_overlapping_granules( + sat, packaged_type.gran_len, aggr_iet - cushion, + aggr_iet + aggr_level * primary_type.gran_len + cushion + 1) + rdr_writer.write_aggregate( + packaged_type, packaged_gran_iets[0], len(packaged_gran_iets)) + for gran_iet in packaged_gran_iets: + with file_mgr.process_file((packaged_type, gran_iet)) as pkt_file: + pkts = list(jpss_packet_stream(pkt_file)) + blob = build_rdr_blob(sat, pkts, packaged_type, gran_iet) + rdr_writer.write_granule(packaged_type, gran_iet, blob) + rdr_writer.close() + + +def process_rdr_types(given_rdr_types, force_packaging): + """Determine the RDR type we'll be making based on the packets we've been given + + Return both a primary_type and packaged_type. The primary type could indicate + either a science or spacecraft RDR. The packaged type is used when spacecraft + packets are to be packaged along with a science RDR; it will be None if that's + not the case. forceA_packaging is a boolean that if set will force spacecraft + RDR structures to be written out even if no spacecraft packets are present. + """ + rdr_types = set(given_rdr_types) + if not rdr_types: + raise RdrgenError('No RDR types to generate!') + sci_type = {t for t in rdr_types if t.type_id == 'SCIENCE'} + rdr_types -= sci_type + if len(sci_type) > 1: + raise RdrgenError( + 'Cannot process more than 1 science RDR type at once ' + + '(have {})'.format(', '.join(t.short_name for t in sci_type))) + diary_type = {t for t in rdr_types if t is SpacecraftDiaryRdrType} + rdr_types -= diary_type + if rdr_types: + raise RdrgenError( + 'Unsupported RDR type(s): ' + ', '.join(t.short_name for t in rdr_types)) + if sci_type: + primary_type, = sci_type + packaged_type = ( + SpacecraftDiaryRdrType if (diary_type or force_packaging) else None) + else: + primary_type = SpacecraftDiaryRdrType + packaged_type = None + return primary_type, packaged_type class BinnedTemporaryFileManager(object): + """Manage a set of append-mode temporary files each labeled with a 'bin key' + + Limits the number of file handles kept open at a time. + """ def __init__(self, parent_dir='.', max_open_files=32): self.max_open_files = max_open_files self.dir = tempfile.mkdtemp(dir=parent_dir) self._file_paths = {} self._file_objs = OrderedDict() - self._process_files_called = False def add_data(self, bin_key, data): - assert not self._process_files_called + """Append some data to the temp file associated with bin_key + + Creates the file if needed. + """ file_obj = self._file_objs.pop(bin_key, None) if not file_obj: file_path = self._file_paths.get(bin_key) @@ -64,110 +132,159 @@ class BinnedTemporaryFileManager(object): self._file_objs[bin_key] = file_obj file_obj.write(data) - def process_files(self): - assert not self._process_files_called - self._process_files_called = True - for file_obj in self._file_objs.values(): + @contextmanager + def process_file(self, bin_key): + """'Check out' a file for processing + + Returns a context manager that provides a read-mode file handle and + removes the file after processing. Can be called with a bin_key that + has no data yet which just results in an empty file. + """ + file_obj = self._file_objs.pop(bin_key, None) + if file_obj: file_obj.close() - for bin_key, file_name in sorted(self._file_paths.items()): - file_obj = open(file_name) - yield bin_key, file_obj + file_path = self._file_paths.pop(bin_key, '/dev/null') + file_obj = open(file_path) + try: + yield file_obj + finally: file_obj.close() - os.remove(file_name) + if file_path != '/dev/null': + os.remove(file_path) + + def clean_up(self): + """Call after all files are processed to avoid leaving a dir sitting around""" os.rmdir(self.dir) -def write_rdr(sat, blob, dir_path='.', - distributor=None, origin=None, domain=None, compressed=False, orbit_num=0, - creation_time=None, gran_creation_time=None, software_ver=None): - # TODO: write out the user block too?? - distributor = distributor or origin or default_origin - origin = origin or default_origin - domain = domain or default_domain - creation_time = creation_time or datetime.now() - gran_creation_time = gran_creation_time or creation_time - software_ver = (software_ver - or edosl0util.__name__ + '-' + edosl0util.__version__) - blob_info = decode_rdr_blob(blob) - rdr_type = get_rdr_type(blob_info.apids[0].value) - gran_iet = blob_info.header.start_boundary - gran_end_iet = blob_info.header.end_boundary - gran_time = iet_to_datetime(gran_iet) - gran_end_time = iet_to_datetime(gran_end_iet) - gran_id = make_granule_id(sat, gran_iet) - gran_ver = 'A1' - format_date = lambda t: t.strftime('%Y%m%d') - format_time = lambda t: t.strftime('%H%M%S.%fZ') - file_name = make_rdr_filename(rdr_type.product_id, sat, gran_time, gran_end_time, - orbit_num, creation_time, origin, domain, compressed) - with h5py.File(os.path.join(dir_path, file_name), 'w') as h5_file: - set_h5_attrs(h5_file, { + +class RdrWriter(object): + def __init__(self, sat, rdr_types, aggr_iet, aggr_level, dir_path='.', + distributor=None, origin=None, domain=None, compressed=False, + orbit_num=0, creation_time=None, software_ver=None): + self._sat = sat + origin = origin or self.default_origin + distributor = distributor or origin + self._domain = domain or self.default_domain + self._orbit_num = orbit_num + self._creation_time = creation_time or datetime.now() + self._software_ver = ( + software_ver or edosl0util.__name__ + '-' + edosl0util.__version__) + + aggr_end_iet = aggr_iet + aggr_level * rdr_types[0].gran_len + self.file_name = make_rdr_filename( + rdr_types, self._sat, aggr_iet, aggr_end_iet, self._orbit_num, + self._creation_time, origin, self._domain, compressed) + self._h5_file = h5py.File(os.path.join(dir_path, self.file_name), 'w') + self._write_skeleton(rdr_types, distributor, origin) + self._aggr_starts = {} + + def _write_skeleton(self, rdr_types, distributor, origin): + self._set_h5_attrs(self._h5_file, { 'Distributor': distributor, - 'Mission_Name': 'S-NPP/JPSS', + 'Mission_Name': 'S-NPP/JPSS', # FIXME: what will this be for J1? 'N_Dataset_Source': origin, - 'N_HDF_Creation_Date': format_date(creation_time), - 'N_HDF_Creation_Time': format_time(creation_time), - 'Platform_Short_Name': platform_short_names[sat]}) - all_grp = h5_file.create_group('All_Data') - raw_grp = all_grp.create_group(rdr_type.short_name + '_All') + 'N_HDF_Creation_Date': self._format_date_attr(self._creation_time), + 'N_HDF_Creation_Time': self._format_time_attr(self._creation_time), + 'Platform_Short_Name': platform_short_names[self._sat]}) + all_grp = self._h5_file.create_group('All_Data') + prod_grp = self._h5_file.create_group('Data_Products') + for rdr_type in rdr_types: + all_grp.create_group(rdr_type.short_name + '_All') + gran_grp = prod_grp.create_group(rdr_type.short_name) + self._set_h5_attrs(gran_grp, { + 'Instrument_Short_Name': instrument_short_names[rdr_type.sensor], + 'N_Collection_Short_Name': rdr_type.short_name, + 'N_Dataset_Type_Tag': 'RDR', + 'N_Processing_Domain': self._domain}) + + def write_aggregate(self, rdr_type, aggr_iet, num_grans): + self._aggr_starts[rdr_type] = aggr_iet + grp = self._h5_file['Data_Products'][rdr_type.short_name] + ds = grp.create_dataset( + rdr_type.short_name + '_Aggr', [1], self._h5_ref_dtype, maxshape=[None]) + ds[0] = self._h5_file['All_Data/{}_All'.format(rdr_type.short_name)].ref + aggr_end_iet = aggr_iet + num_grans * rdr_type.gran_len + last_gran_iet = aggr_end_iet - rdr_type.gran_len + self._set_h5_attrs(ds, { + 'AggregateBeginningDate': self._format_date_attr(aggr_iet), + 'AggregateBeginningGranuleID': make_granule_id(self._sat, aggr_iet), + 'AggregateBeginningOrbitNumber': np.uint64(self._orbit_num), + 'AggregateBeginningTime': self._format_time_attr(aggr_iet), + 'AggregateEndingDate': self._format_date_attr(aggr_end_iet), + 'AggregateEndingGranuleID': make_granule_id(self._sat, last_gran_iet), + 'AggregateEndingOrbitNumber': np.uint64(self._orbit_num), + 'AggregateEndingTime': self._format_time_attr(aggr_end_iet), + 'AggregateNumberGranules': np.uint64(1)}) + + def write_granule(self, rdr_type, gran_iet, blob, creation_time=None): + raw_grp = self._h5_file['All_Data/{}_All'.format(rdr_type.short_name)] + gran_idx = int((gran_iet - self._aggr_starts[rdr_type]) // rdr_type.gran_len) raw_ds = raw_grp.create_dataset( - 'RawApplicationPackets_0', data=blob, maxshape=[None]) - prod_grp = h5_file.create_group('Data_Products') - gran_grp = prod_grp.create_group(rdr_type.short_name) - set_h5_attrs(gran_grp, { - 'Instrument_Short_Name': instrument_short_names[rdr_type.sensor], - 'N_Collection_Short_Name': rdr_type.short_name, - 'N_Dataset_Type_Tag': 'RDR', - 'N_Processing_Domain': domain}) + 'RawApplicationPackets_{}'.format(gran_idx), data=blob, maxshape=[None]) + gran_grp = self._h5_file['Data_Products'][rdr_type.short_name] gran_ds = gran_grp.create_dataset( - rdr_type.short_name + '_Gran_0', [1], h5_regionref_dtype, maxshape=[None]) + rdr_type.short_name + '_Gran_{}'.format(gran_idx), + [1], self._h5_regionref_dtype, maxshape=[None]) gran_ds[0] = raw_ds.regionref[:] - set_h5_attrs(gran_ds, { - 'Beginning_Date': format_date(gran_time), - 'Beginning_Time': format_time(gran_time), - 'Ending_Date': format_date(gran_end_time), - 'Ending_Time': format_time(gran_end_time), - 'N_Beginning_Orbit_Number': np.uint64(orbit_num), # FIXME: detect? + gran_end_iet = gran_iet + rdr_type.gran_len + creation_time = creation_time or self._creation_time or datetime.now() + gran_id = make_granule_id(self._sat, gran_iet) + gran_ver = 'A1' + blob_info = decode_rdr_blob(blob) + self._set_h5_attrs(gran_ds, { + 'Beginning_Date': self._format_date_attr(gran_iet), + 'Beginning_Time': self._format_time_attr(gran_iet), + 'Ending_Date': self._format_date_attr(gran_end_iet), + 'Ending_Time': self._format_time_attr(gran_end_iet), + 'N_Beginning_Orbit_Number': np.uint64(self._orbit_num), 'N_Beginning_Time_IET': np.uint64(gran_iet), - 'N_Creation_Date': format_date(gran_creation_time), - 'N_Creation_Time': format_time(gran_creation_time), + 'N_Creation_Date': self._format_date_attr(creation_time), + 'N_Creation_Time': self._format_time_attr(creation_time), 'N_Ending_Time_IET': np.uint64(gran_end_iet), 'N_Granule_ID': gran_id, 'N_Granule_Status': 'N/A', 'N_Granule_Version': gran_ver, - 'N_IDPS_Mode': domain, + 'N_IDPS_Mode': self._domain, 'N_JPSS_Document_Ref': rdr_type.document, 'N_LEOA_Flag': 'Off', 'N_Packet_Type': [a.name for a in blob_info.apids], 'N_Packet_Type_Count': [np.uint64(a.pkts_received) for a in blob_info.apids], - 'N_Percent_Missing_Data': np.float32(calc_percent_missing(blob_info)), + 'N_Percent_Missing_Data': np.float32(self._calc_percent_missing(blob_info)), 'N_Primary_Label': 'Primary', # TODO: find out what this is 'N_Reference_ID': ':'.join([rdr_type.short_name, gran_id, gran_ver]), - 'N_Software_Version': software_ver}) - aggr_ds = gran_grp.create_dataset( - rdr_type.short_name + '_Aggr', [1], h5_ref_dtype, maxshape=[None]) - aggr_ds[0] = raw_grp.ref - set_h5_attrs(aggr_ds, { - 'AggregateBeginningDate': format_date(gran_time), - 'AggregateBeginningGranuleID': gran_id, - 'AggregateBeginningOrbitNumber': np.uint64(orbit_num), - 'AggregateBeginningTime': format_time(gran_time), - 'AggregateEndingDate': format_date(gran_end_time), - 'AggregateEndingGranuleID': gran_id, - 'AggregateEndingOrbitNumber': np.uint64(orbit_num), - 'AggregateEndingTime': format_time(gran_end_time), - 'AggregateNumberGranules': np.uint64(1)}) - return file_name + 'N_Software_Version': self._software_ver}) + def close(self): + self._h5_file.close() -def set_h5_attrs(h5_obj, attrs): - # for some reason all the H5 attributes are in 2-D arrays in IDPS files - for name, value in attrs.items(): - if isinstance(value, list): - value = [[x] for x in value] - else: - value = [[value]] - h5_obj.attrs[name] = value + @staticmethod + def _set_h5_attrs(h5_obj, attrs): + # for some reason all the H5 attributes are in 2-D arrays in IDPS files + for name, value in attrs.items(): + if isinstance(value, list): + value = [[x] for x in value] + else: + value = [[value]] + h5_obj.attrs[name] = value + @staticmethod + def _format_date_attr(t): + return iet_to_datetime(t).strftime('%Y%m%d') + + @staticmethod + def _format_time_attr(t): + return iet_to_datetime(t).strftime('%H%M%S.%fZ') + + @staticmethod + def _calc_percent_missing(common_rdr): + total_received = sum(a.pkts_received for a in common_rdr.apids) + total_reserved = sum(a.pkts_reserved for a in common_rdr.apids) + return 100. * (total_reserved - total_received) / total_reserved + + default_origin = 'ssec' + default_domain = 'dev' + _h5_ref_dtype = h5py.special_dtype(ref=h5py.Reference) + _h5_regionref_dtype = h5py.special_dtype(ref=h5py.RegionReference) def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet): get_jpss_packet_time = GetJpssPacketTime() @@ -247,17 +364,6 @@ def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet): return buf -class GetJpssPacketTime(object): - def __init__(self): - self._viirs_tracker = ViirsGroupedPacketTimeTracker() - - def __call__(self, pkt): - if self._viirs_tracker.tracks_apid(pkt.apid): - return self._viirs_tracker.get_iet(pkt) - else: - return get_packet_iet(pkt) - - class ViirsScienceApidInfo(object): apids = list(x for x in range(800, 827) if x != 824) names = ['M04', 'M05', 'M03', 'M02', 'M01', 'M06', 'M07', 'M09', 'M10', @@ -350,7 +456,10 @@ class RdrTypeManager(object): return cls def get_type_for_apid(self, apid): - return self._types_by_apid[apid] + try: + return self._types_by_apid[apid] + except KeyError: + raise RdrgenError('Unsupported APID: {}'.format(apid)) rdr_type_mgr = RdrTypeManager() @@ -367,6 +476,7 @@ class ViirsScienceRdrType(object): type_id = 'SCIENCE' document = '474-00448-02-08_JPSS-DD-Vol-II-Part-8_0200B.pdf' apids = ViirsScienceApidInfo.get_specs() + default_aggregation = 1 @rdr_type_spec @@ -378,6 +488,7 @@ class CrisScienceRdrType(object): type_id = 'SCIENCE' document = '474-00448-02-03_JPSS-DD-Vol-II-Part-3_0200B.pdf' apids = CrisScienceApidInfo.get_specs() + default_aggregation = 15 @rdr_type_spec @@ -391,6 +502,18 @@ class SpacecraftDiaryRdrType(object): apids = [ApidSpec(0, 'CRITICAL', max_expected=21), ApidSpec(8, 'ADCS_HKH', max_expected=21), ApidSpec(11, 'DIARY', max_expected=21)] + default_aggregation = 303 + + +class GetJpssPacketTime(object): + def __init__(self): + self._viirs_tracker = ViirsGroupedPacketTimeTracker() + + def __call__(self, pkt): + if self._viirs_tracker.tracks_apid(pkt.apid): + return self._viirs_tracker.get_iet(pkt) + else: + return get_packet_iet(pkt) class ViirsGroupedPacketTimeTracker(object): @@ -451,7 +574,11 @@ class ViirsGroupedPacketTimeTracker(object): return 0 <= lag <= permitted_lag -class OrphanedViirsPacket(Exception): +class RdrgenError(Exception): + pass + + +class OrphanedViirsPacket(RdrgenError): def __init__(self, pkt): self.packet = pkt @@ -459,20 +586,24 @@ class OrphanedViirsPacket(Exception): return repr(self.packet) -def make_rdr_filename(prod_id, sat, gran_time, gran_end_time, orbit_num, creation_time, +def make_rdr_filename(rdr_types, sat, aggr_begin, aggr_end, orbit_num, creation_time, origin, domain, compressed): - sat_strs = {'snpp': 'npp'} - def format_gran_time(t): + aggr_begin = iet_to_datetime(aggr_begin) + aggr_end = iet_to_datetime(aggr_end) + prod_ids = '-'.join(sorted(t.product_id for t in rdr_types)) + sat = {'snpp': 'npp'}[sat] + if origin.endswith('-'): + origin = origin[:-1] + ('c' if compressed else 'u') + def format_time(t): return t.strftime('%H%M%S') + str(t.microsecond / 100000) - def format_origin(o): - if o.endswith('-'): - return o[:-1] + ('c' if compressed else 'u') - else: - return o return '{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n:05d}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5'.format( - p=prod_id, s=sat_strs[sat], d=gran_time, b=format_gran_time(gran_time), - e=format_gran_time(gran_end_time), n=orbit_num, c=creation_time, - o=format_origin(origin), m=domain) + p=prod_ids, s=sat, d=aggr_begin, b=format_time(aggr_begin), + e=format_time(aggr_end), n=orbit_num, c=creation_time, o=origin, m=domain) + + +def make_granule_id(sat, gran_iet): + tenths = (gran_iet - satellite_base_times[sat]) // 100000 + return '{}{:012d}'.format(platform_short_names[sat], tenths) def get_packet_iet(pkt): @@ -488,45 +619,43 @@ def timecode_parts_to_iet(days, ms, us): def iet_to_datetime(iet): + if isinstance(iet, datetime): + return iet return (iet_epoch + TimeDelta(iet * 1e-6, format='sec')).utc.datetime -def calc_rdr_granule(sat, rdr_type, pkt_iet): - return calc_iet_granule(satellite_base_times[sat], rdr_type.gran_len, pkt_iet) - - -def calc_iet_granule(base_time, gran_len, iet): +def get_granule_start(sat, gran_len, iet): + base_time = satellite_base_times[sat] return (iet - base_time) // gran_len * gran_len + base_time -def calc_iet_aggregate(base_time, gran_len, grans_per_aggr, iet): +def get_aggregate_start(sat, gran_len, grans_per_aggr, iet): # see "DDS Aggregation Methodology" in CDFCB vol I Y = gran_len N = grans_per_aggr - G_s = calc_iet_granule(base_time, Y, iet) + G_s = get_granule_start(sat, Y, iet) A_n = G_s // (N * Y) O = G_s % Y A_s = A_n * (Y * N) + O return A_s -def make_granule_id(sat, gran_iet): - tenths = (gran_iet - satellite_base_times[sat]) // 100000 - return '{}{:012d}'.format(platform_short_names[sat], tenths) +def get_aggregate_granule_times(sat, gran_len, aggr_level, aggr_iet): + aggr_start = get_aggregate_start(sat, gran_len, aggr_level, aggr_iet) + aggr_end = aggr_start + aggr_level * gran_len + return get_overlapping_granules(sat, gran_len, aggr_start, aggr_end) -def calc_percent_missing(common_rdr): - total_received = sum(a.pkts_received for a in common_rdr.apids) - total_reserved = sum(a.pkts_reserved for a in common_rdr.apids) - return 100. * (total_reserved - total_received) / total_reserved +def get_overlapping_granules(sat, gran_len, start_iet, stop_iet): + rv = [] + gran_iet = get_granule_start(sat, gran_len, start_iet) + while gran_iet < stop_iet: + rv.append(gran_iet) + gran_iet += gran_len + return rv iet_epoch = Time('1958-01-01', scale='tai') satellite_base_times = {'snpp': 1698019234000000} platform_short_names = {'snpp': 'NPP'} instrument_short_names = {'viirs': 'VIIRS', 'cris': 'CrIS', None: 'SPACECRAFT'} -default_origin = 'ssec' -default_domain = 'dev' - -h5_ref_dtype = h5py.special_dtype(ref=h5py.Reference) -h5_regionref_dtype = h5py.special_dtype(ref=h5py.RegionReference) diff --git a/tests/test_rdrgen.py b/tests/test_rdrgen.py index 2b9023c406d3d0ff216b0f8f9205a5385cd632b9..dd0b359ab506c04534c690599e54bc1c1a3a2d63 100644 --- a/tests/test_rdrgen.py +++ b/tests/test_rdrgen.py @@ -10,48 +10,6 @@ from edosl0util.jpssrdr import decode_rdr_blob from edosl0util.stream import jpss_packet_stream -class TestViirsGroupedPacketTimeTracker(object): - def test_check_sequence_number(self): - group_size = 24 - def run(nonfirst_seq_num, first_seq_num): - return m.ViirsGroupedPacketTimeTracker.check_sequence_number( - nonfirst_seq_num, first_seq_num, group_size) - - first_seq = 4096 - assert not run(4095, first_seq) - assert run(4097, first_seq) - assert run(4119, first_seq) - assert not run(4120, first_seq) - - max_seq = 2**14 - first_seq = max_seq - 16 - assert not run(max_seq - 17, first_seq) - assert run(max_seq - 15, first_seq) - assert run(max_seq - 1, first_seq) - assert run(0, first_seq) - assert run(7, first_seq) - assert not run(8, first_seq) - - def test_get_viirs_iet(self): - def run(pkt): - return m.iet_to_datetime(m.ViirsGroupedPacketTimeTracker.get_viirs_iet(pkt)) - - with open(self.l0_path) as l0_file: - stream = jpss_packet_stream(l0_file) - standalone_pkt = next(p for p in stream if p.is_standalone()) - first_pkt = next(p for p in stream if p.is_first()) - nonfirst_pkt = next(p for p in stream if p.is_continuing()) - - # expected values haven't been independently verified, just looked at - # to see that they at least make sense - assert run(standalone_pkt) == datetime(2017, 9, 27, 13, 54, 1, 727765) - assert run(first_pkt) == datetime(2017, 9, 27, 13, 54, 1, 746898) - assert run(nonfirst_pkt) == datetime(2017, 9, 27, 13, 54, 1, 748328) - - l0_file = 'P1570826VIIRSSCIENCE6T17270135400001.PDS' - l0_path = os.path.join(os.path.dirname(__file__), l0_file) - - def test_can_reproduce_rdr_from_class(): class_rdr_file = 'RNSCA_npp_d20170912_t0001170_e0001370_b30441_c20170913220340173580_nobu_ops.h5' class_rdr_path = os.path.join(os.path.dirname(__file__), class_rdr_file) @@ -66,50 +24,102 @@ def test_can_reproduce_rdr_from_class(): pkt_buf = class_blob[ini:fin] # generate new RDR from packets, injecting matching metadata from CLASS file - blob = m.build_rdr_blob('snpp', jpss_packet_stream(StringIO(pkt_buf.tobytes()))) + rdr_type = m.SpacecraftDiaryRdrType + gran_iet = 1883865714000000 + aggr_level = 1 + pkt_stream = jpss_packet_stream(StringIO(pkt_buf.tobytes())) + blob = m.build_rdr_blob('snpp', pkt_stream, rdr_type, gran_iet) tmp_dir = '/tmp' - file_name = m.write_rdr( - 'snpp', blob, tmp_dir, + writer = m.RdrWriter( + 'snpp', [rdr_type], gran_iet, aggr_level, tmp_dir, distributor='arch', origin='nob-', domain='ops', creation_time=datetime(2017, 9, 13, 22, 3, 40, 173580), - gran_creation_time=datetime(2017, 9, 12, 1, 37, 43, 474383), orbit_num=30441, software_ver='I2.0.03.00') + writer.write_granule(rdr_type, gran_iet, blob, + creation_time=datetime(2017, 9, 12, 1, 37, 43, 474383)) + writer.write_aggregate(rdr_type, gran_iet, aggr_level) + writer.close() + # file names should be identical - assert file_name == class_rdr_file + assert writer.file_name == class_rdr_file # use h5diff to verify files match. -p option is needed to allow some slop # in comparing N_Percent_Missing_Data p = subprocess.Popen( - ['h5diff', '-c', '-p', '1e-6', class_rdr_path, os.path.join(tmp_dir, file_name)], + ['h5diff', '-c', '-p', '1e-6', + class_rdr_path, os.path.join(tmp_dir, writer.file_name)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) assert p.communicate()[0] == '' assert p.returncode == 0 class TestGranulation(object): - def test_calc_iet_granule(self): - def run(t): - return m.calc_iet_granule(self.snpp_base_time, self.cris_gran_len, t) - gran = 1880240293174000 + """Test granule time computations using IETs from IDPS CrIS granules""" + + def test_get_granule_start(self): + gran = 1880240293174000 # some actual CrIS granule times prev_gran = 1880240261177000 next_gran = 1880240325171000 + def run(t): + return m.get_granule_start('snpp', self.cris_gran_len, t) assert run(gran) == gran assert run(gran + 1) == gran assert run(gran - 1) == prev_gran assert run(gran + self.cris_gran_len) == next_gran - def test_calc_iet_aggregate(self): - grans_per_aggr = 15 - def run(t): - return m.calc_iet_aggregate( - self.snpp_base_time, self.cris_gran_len, grans_per_aggr, t) + def test_get_aggregate_start(self): aggr = 1880240037198000 - aggr_len = grans_per_aggr * self.cris_gran_len + aggr_level = 15 + def run(t): + return m.get_aggregate_start('snpp', self.cris_gran_len, aggr_level, t) + aggr_len = aggr_level * self.cris_gran_len assert run(aggr - 1) == aggr - aggr_len assert run(aggr) == aggr assert run(aggr + aggr_len - 1) == aggr assert run(aggr + aggr_len) == aggr + aggr_len - snpp_base_time = 1698019234000000 # CDFCB vol I "Time of First Ascending Node" table cris_gran_len = 31997000 + + +class TestViirsGroupedPacketTimeTracker(object): + + def test_check_sequence_number(self): + group_size = 24 + def run(nonfirst_seq_num, first_seq_num): + return m.ViirsGroupedPacketTimeTracker.check_sequence_number( + nonfirst_seq_num, first_seq_num, group_size) + + first_seq = 4096 + assert not run(4095, first_seq) + assert run(4097, first_seq) + assert run(4119, first_seq) + assert not run(4120, first_seq) + + max_seq = 2**14 + first_seq = max_seq - 16 + assert not run(max_seq - 17, first_seq) + assert run(max_seq - 15, first_seq) + assert run(max_seq - 1, first_seq) + assert run(0, first_seq) + assert run(7, first_seq) + assert not run(8, first_seq) + + def test_get_viirs_iet(self): + def run(pkt): + return m.iet_to_datetime(m.ViirsGroupedPacketTimeTracker.get_viirs_iet(pkt)) + + with open(self.l0_path) as l0_file: + stream = jpss_packet_stream(l0_file) + standalone_pkt = next(p for p in stream if p.is_standalone()) + first_pkt = next(p for p in stream if p.is_first()) + nonfirst_pkt = next(p for p in stream if p.is_continuing()) + + # expected values haven't been independently verified, just looked at + # to see that they at least make sense + assert run(standalone_pkt) == datetime(2017, 9, 27, 13, 54, 1, 727765) + assert run(first_pkt) == datetime(2017, 9, 27, 13, 54, 1, 746898) + assert run(nonfirst_pkt) == datetime(2017, 9, 27, 13, 54, 1, 748328) + + l0_file = 'P1570826VIIRSSCIENCE6T17270135400001.PDS' + l0_path = os.path.join(os.path.dirname(__file__), l0_file)