From 287f4a7bd0357cafd1d708223466a61216cf127e Mon Sep 17 00:00:00 2001 From: Bruce Flynn <brucef@ssec.wisc.edu> Date: Tue, 2 Jan 2018 12:51:07 -0600 Subject: [PATCH] remove grain instead of astropy --- edosl0util/_old_merge.py | 183 --------------------------------------- edosl0util/rdrgen.py | 33 ++++--- edosl0util/timecode.py | 39 +++++++-- setup.py | 2 +- 4 files changed, 46 insertions(+), 211 deletions(-) delete mode 100644 edosl0util/_old_merge.py diff --git a/edosl0util/_old_merge.py b/edosl0util/_old_merge.py deleted file mode 100644 index 87b01eb..0000000 --- a/edosl0util/_old_merge.py +++ /dev/null @@ -1,183 +0,0 @@ -# encoding: utf-8 -__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." - -import io -import sys -import logging -from collections import deque - -from edosl0util.stream import ( - jpss_packet_stream, - NonConsecutiveSeqId, - PacketTooShort -) - -LOG = logging.getLogger(__name__) - - -class InvalidPacketGroup(Exception): - """ - If a there exists packets in a group with different apids or if the group - does not end in a last packet. - """ - - -def _group_packets(stream): - """ - Returns a generator that yields all packets until a timestamps, i.e., a - packet group. - """ - packet = stream.next() - while not packet.stamp: - yield packet - packet = stream.next() - # put the next packet with a stamp so it will be next - stream.push_back(packet) - - -class StreamManager(object): - - def __init__(self, streams): - self.streams = list(streams) - self.current_idx = 0 - self.valid = [True]*len(self.streams) - self.tried = [False]*len(self.streams) - self.handling_error = False - - def invalid(self, stream): - idx = self.streams.index(stream) - self.valid[idx] = False - self.current_idx = 0 - - end_of_stream = invalid - - def have_valid(self): - return any(self.valid) - - def set_error(self, stream): - idx = self.streams.index(stream) - self.tried[idx] = True - - missing_seqid = set_error - invalid_group = set_error - - def next(self): - if self.handling_error: - if all(self.tried): - # tried every stream, can't recover, move on - self.tried = [False]*len(self.stream) - self.handling_error = False - else: - # there is a stream we haven't tried, use it - self.current_idx = self.tried.index(False) - self.tried[self.current_idx] = True - return self.streams[self.current_idx] - - self.current_idx = self.valid.index(True, self.current_idx) - return self.streams[self.current_idx] - - -def next_stamp_and_apid(stream, stamp, apid): - LOG.debug("seeking to %s apid %s", stamp, apid) - stream.seek_to(stamp, apid) - stream.seek_to_next_stamp() - - -def next_stamp(stream, stamp): - LOG.debug("seeking to %s", stamp) - stream.seek_to(stamp) - - -def merge(streams, output=sys.stdout): - last_packet = None - - streams = StreamManager(streams) - stream = streams.next() - while streams.have_valid(): - LOG.debug("stream %s", stream) - LOG.debug("streams valid: %s", streams.valid) - LOG.debug("streams handling_error:%s tried:%s", streams.handling_error, streams.tried) - LOG.debug("streams current_idx: %s", streams.current_idx) - try: - # Do until `next` causes StopIteration - while True: - packets_to_write = deque() - packet = stream.next() - if packet.is_standalone(): - packets_to_write.append(packet) - - elif packet.is_first(): # packet group - group = deque([packet]) - group.extend(_group_packets(stream)) - if not group[0].is_first() \ - and group[-1].is_last() \ - and group[0].apid == group[-1].apid: - raise InvalidPacketGroup() - packets_to_write.extend(group) - - else: - LOG.debug("skipping hanging packet: %s", packet) - - # First packet always has a stamp because it's either - # standalone or part of a valid group - last_packet = packets_to_write[0] - while packets_to_write: - pkt = packets_to_write.popleft() - output.write(pkt.bytes()) - - except NonConsecutiveSeqId: - LOG.debug('missing sequence id, next stream') - streams.missing_seqid(stream) - - LOG.debug("seeking to %s apid %s", last_packet.stamp, last_packet.apid) - stream = streams.next() - - except InvalidPacketGroup: - LOG.debug("invalid group, next stream") - streams.invalid_group(stream) - stream = streams.next() - next_stamp_and_apid(stream, last_packet.stamp, last_packet.apid) - - except PacketTooShort as err: - LOG.error("corrupt stream, removing: %s", err) - streams.invalid(stream) - stream = streams.next() - next_stamp_and_apid(stream, last_packet.stamp, last_packet.apid) - - except StopIteration: - LOG.debug("end-of-stream %s", stream) - streams.end_of_stream(stream) - if streams.have_valid(): - stream = streams.next() - next_stamp(stream, last_packet.stamp) - - -def merge_files(filepaths, destpath): - filepaths, streams = make_streams(filepaths, fail_on_missing=True) - for i in range(0, len(filepaths)): - LOG.debug("stream %s filepath %s", streams[i], filepaths[i]) - merge(streams, output=io.open(destpath, 'wb')) - - -def _cmp_by_first_pkt(filea, fileb): - """ - Compare 2 L0 files by their first available timestamp. Used to sort L0 files - by time. - """ - stampa = stampb = None - for pkt in jpss_packet_stream(io.open(filea, 'rb')): - if pkt.stamp: - stampa = pkt.stamp - break - for pkt in jpss_packet_stream(io.open(fileb, 'rb')): - if pkt.stamp: - stampb = pkt.stamp - break - return cmp(stampa, stampb) - - -def make_streams(*filepaths): - return [ - jpss_packet_stream(io.open(f, 'rb')) - for f in sorted(filepaths, cmp=_cmp_by_first_pkt) - ] diff --git a/edosl0util/rdrgen.py b/edosl0util/rdrgen.py index 0ae6215..58de617 100644 --- a/edosl0util/rdrgen.py +++ b/edosl0util/rdrgen.py @@ -8,12 +8,12 @@ from datetime import datetime import attr import h5py import numpy as np -from astropy.time import Time, TimeDelta import edosl0util from edosl0util.jpssrdr import ( StaticHeader, Apid as ApidListItem, PacketTracker, decode_rdr_blob) from edosl0util.stream import jpss_packet_stream +from edosl0util.timecode import cds_to_iet, iet_to_dt def packets_to_rdrs(sat, l0_files, **kwargs): @@ -22,11 +22,12 @@ def packets_to_rdrs(sat, l0_files, **kwargs): with open(l0_file) as l0_file_obj: for pkt in jpss_packet_stream(l0_file_obj): yield pkt - build_rdr(sat, iter_pkts(l0_files), **kwargs) + build_rdr(sat, iter_pkts(l0_files), **kwargs) -def build_rdr(sat, pkt_iter, output_dir='.', aggr_level=None, diary_cushion=10000000, attr_overrides={}): +def build_rdr(sat, pkt_iter, output_dir='.', aggr_level=None, diary_cushion=10000000, + attr_overrides={}): # divy packets up into temp files organized by granule file_mgr = BinnedTemporaryFileManager() get_jpss_packet_time = GetJpssPacketTime() @@ -64,7 +65,7 @@ def build_rdr(sat, pkt_iter, output_dir='.', aggr_level=None, diary_cushion=1000 if packaged_type: packaged_gran_iets = get_overlapping_granules( sat, packaged_type.gran_len, aggr_iet - diary_cushion, - aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1) + aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1) rdr_writer.write_aggregate( packaged_type, packaged_gran_iets[0], len(packaged_gran_iets)) for gran_iet in packaged_gran_iets: @@ -117,6 +118,7 @@ class BinnedTemporaryFileManager(object): Limits the number of file handles kept open at a time. """ + def __init__(self, parent_dir='.', max_open_files=32): self.max_open_files = max_open_files self.dir = tempfile.mkdtemp(dir=parent_dir) @@ -179,7 +181,7 @@ class RdrWriter(object): self._orbit_num = orbit_num self._creation_time = creation_time or datetime.now() self._software_ver = ( - software_ver or edosl0util.__name__ + '-' + edosl0util.__version__) + software_ver or edosl0util.__name__ + '-' + edosl0util.__version__) aggr_end_iet = aggr_iet + aggr_level * rdr_types[0].gran_len self.file_name = make_rdr_filename( @@ -297,6 +299,7 @@ class RdrWriter(object): _h5_ref_dtype = h5py.special_dtype(ref=h5py.Reference) _h5_regionref_dtype = h5py.special_dtype(ref=h5py.RegionReference) + def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet): get_jpss_packet_time = GetJpssPacketTime() granule_iet_end = granule_iet + rdr_type.gran_len @@ -576,15 +579,15 @@ class ViirsGroupedPacketTimeTracker(object): idx = 20 else: idx = 10 - arr = np.frombuffer(pkt.bytes()[idx:idx+8], 'B') + arr = np.frombuffer(pkt.bytes()[idx:idx + 8], 'B') days = arr[0:2].view('>u2')[0] ms = arr[2:6].view('>u4')[0] us = arr[6:8].view('>u2')[0] - return timecode_parts_to_iet(days, ms, us) + return cds_to_iet(days, ms, us) @staticmethod def check_sequence_number(nonfirst_seq_num, first_seq_num, group_size): - seq_limit = 2**14 + seq_limit = 2 ** 14 group_end = first_seq_num + group_size # the 2nd check below is needed to handle wrap-around return (first_seq_num < nonfirst_seq_num < group_end @@ -620,8 +623,10 @@ def make_rdr_filename(rdr_types, sat, aggr_begin, aggr_end, orbit_num, creation_ sat = {'snpp': 'npp'}[sat] if origin.endswith('-'): origin = origin[:-1] + ('c' if compressed else 'u') + def format_time(t): return t.strftime('%H%M%S') + str(t.microsecond / 100000) + return '{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n:05d}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5'.format( p=prod_ids, s=sat, d=aggr_begin, b=format_time(aggr_begin), e=format_time(aggr_end), n=orbit_num, c=creation_time, o=origin, m=domain) @@ -634,20 +639,13 @@ def make_granule_id(sat, gran_iet): def get_packet_iet(pkt): tc = pkt.secondary_header.timecode - return timecode_parts_to_iet(tc.days, tc.milliseconds, tc.microseconds) - - -def timecode_parts_to_iet(days, ms, us): - # FIXME: move to timecode.py - ccsds_epoch = Time('1958-01-01', scale='tai') - day = Time(ccsds_epoch.jd + days, scale='utc', format='jd') - iet_epoch - return int(day.sec * 1e6 + ms * 1e3 + us) + return cds_to_iet(tc.days, tc.milliseconds, tc.microseconds) def iet_to_datetime(iet): if isinstance(iet, datetime): return iet - return (iet_epoch + TimeDelta(iet * 1e-6, format='sec')).utc.datetime + return iet_to_dt(iet) def get_granule_start(sat, gran_len, iet): @@ -681,7 +679,6 @@ def get_overlapping_granules(sat, gran_len, start_iet, stop_iet): return rv -iet_epoch = Time('1958-01-01', scale='tai') satellite_base_times = {'snpp': 1698019234000000} platform_short_names = {'snpp': 'NPP'} instrument_short_names = {'viirs': 'VIIRS', 'cris': 'CrIS', 'atms': 'ATMS', diff --git a/edosl0util/timecode.py b/edosl0util/timecode.py index 1e32b90..cf1c357 100644 --- a/edosl0util/timecode.py +++ b/edosl0util/timecode.py @@ -1,9 +1,14 @@ # encoding: utf-8 from datetime import timedelta, datetime +from grain import Grain + __copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved." -from astropy.time import Time, TimeDelta +UNIX_EPOCH = datetime(1970, 1, 1) +CDS_EPOCH = datetime(1958, 1, 1) + +_grain = Grain() def unixtime(dt): @@ -13,18 +18,34 @@ def unixtime(dt): return (dt - datetime(1970, 1, 1)).total_seconds() -def cds_to_dt(days, millis, microseconds): +def timecode_parts_to_dt(days, ms, us, epoch): """ - CCSDS Day Segmented timecode to UTC datetime. + Convert components to a UTC datetime based on arbitrary epoch. + """ + return epoch + timedelta(days=days, microseconds=1e3 * ms + us) + + +def timecode_parts_to_iet(days, ms, us, epoch): + """ + Convert components to a IET based on arbitrary epoch. """ - return datetime(1970, 1, 1) + timedelta(days=days, microseconds=1e6 * millis + microseconds) + return _grain.utc2tai( + epoch + timedelta(days=days, milliseconds=ms, microseconds=us), + epoch) def cds_to_iet(days, ms, us): """ - CCSDS Day Segmented timecode parts to IET (microseconds) + CCSDS Day Segmented timecode (UTC) parts to IET (microseconds) """ - iet_epoch = Time('1958-01-01', scale='tai') - ccsds_epoch = Time('1958-01-01', scale='tai') - day = Time(ccsds_epoch.jd + days, scale='utc', format='jd') - iet_epoch - return int(day.sec * 1e6 + ms * 1e3 + us) + return timecode_parts_to_iet(days, ms, us, CDS_EPOCH) + + +def cds_to_dt(days, ms, us): + """ + CCSDS Day Segmented timecode to UTC datetime. + """ + return timecode_parts_to_dt(days, ms, us, CDS_EPOCH) + + +iet_to_dt = _grain.iet2utc diff --git a/setup.py b/setup.py index 89b3c37..9c4d4d0 100644 --- a/setup.py +++ b/setup.py @@ -13,9 +13,9 @@ setup( 'setuptools_scm' ], install_requires=[ - 'astropy', 'attrs', 'h5py', + 'grain >= 2.0', ], extras_require={ 'testing': [ -- GitLab