Skip to content
Snippets Groups Projects
Commit 287f4a7b authored by Bruce Flynn's avatar Bruce Flynn
Browse files

remove grain instead of astropy

parent 87dde0e3
No related branches found
No related tags found
No related merge requests found
# encoding: utf-8
__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."
import io
import sys
import logging
from collections import deque
from edosl0util.stream import (
jpss_packet_stream,
NonConsecutiveSeqId,
PacketTooShort
)
LOG = logging.getLogger(__name__)
class InvalidPacketGroup(Exception):
"""
If a there exists packets in a group with different apids or if the group
does not end in a last packet.
"""
def _group_packets(stream):
"""
Returns a generator that yields all packets until a timestamps, i.e., a
packet group.
"""
packet = stream.next()
while not packet.stamp:
yield packet
packet = stream.next()
# put the next packet with a stamp so it will be next
stream.push_back(packet)
class StreamManager(object):
def __init__(self, streams):
self.streams = list(streams)
self.current_idx = 0
self.valid = [True]*len(self.streams)
self.tried = [False]*len(self.streams)
self.handling_error = False
def invalid(self, stream):
idx = self.streams.index(stream)
self.valid[idx] = False
self.current_idx = 0
end_of_stream = invalid
def have_valid(self):
return any(self.valid)
def set_error(self, stream):
idx = self.streams.index(stream)
self.tried[idx] = True
missing_seqid = set_error
invalid_group = set_error
def next(self):
if self.handling_error:
if all(self.tried):
# tried every stream, can't recover, move on
self.tried = [False]*len(self.stream)
self.handling_error = False
else:
# there is a stream we haven't tried, use it
self.current_idx = self.tried.index(False)
self.tried[self.current_idx] = True
return self.streams[self.current_idx]
self.current_idx = self.valid.index(True, self.current_idx)
return self.streams[self.current_idx]
def next_stamp_and_apid(stream, stamp, apid):
LOG.debug("seeking to %s apid %s", stamp, apid)
stream.seek_to(stamp, apid)
stream.seek_to_next_stamp()
def next_stamp(stream, stamp):
LOG.debug("seeking to %s", stamp)
stream.seek_to(stamp)
def merge(streams, output=sys.stdout):
last_packet = None
streams = StreamManager(streams)
stream = streams.next()
while streams.have_valid():
LOG.debug("stream %s", stream)
LOG.debug("streams valid: %s", streams.valid)
LOG.debug("streams handling_error:%s tried:%s", streams.handling_error, streams.tried)
LOG.debug("streams current_idx: %s", streams.current_idx)
try:
# Do until `next` causes StopIteration
while True:
packets_to_write = deque()
packet = stream.next()
if packet.is_standalone():
packets_to_write.append(packet)
elif packet.is_first(): # packet group
group = deque([packet])
group.extend(_group_packets(stream))
if not group[0].is_first() \
and group[-1].is_last() \
and group[0].apid == group[-1].apid:
raise InvalidPacketGroup()
packets_to_write.extend(group)
else:
LOG.debug("skipping hanging packet: %s", packet)
# First packet always has a stamp because it's either
# standalone or part of a valid group
last_packet = packets_to_write[0]
while packets_to_write:
pkt = packets_to_write.popleft()
output.write(pkt.bytes())
except NonConsecutiveSeqId:
LOG.debug('missing sequence id, next stream')
streams.missing_seqid(stream)
LOG.debug("seeking to %s apid %s", last_packet.stamp, last_packet.apid)
stream = streams.next()
except InvalidPacketGroup:
LOG.debug("invalid group, next stream")
streams.invalid_group(stream)
stream = streams.next()
next_stamp_and_apid(stream, last_packet.stamp, last_packet.apid)
except PacketTooShort as err:
LOG.error("corrupt stream, removing: %s", err)
streams.invalid(stream)
stream = streams.next()
next_stamp_and_apid(stream, last_packet.stamp, last_packet.apid)
except StopIteration:
LOG.debug("end-of-stream %s", stream)
streams.end_of_stream(stream)
if streams.have_valid():
stream = streams.next()
next_stamp(stream, last_packet.stamp)
def merge_files(filepaths, destpath):
filepaths, streams = make_streams(filepaths, fail_on_missing=True)
for i in range(0, len(filepaths)):
LOG.debug("stream %s filepath %s", streams[i], filepaths[i])
merge(streams, output=io.open(destpath, 'wb'))
def _cmp_by_first_pkt(filea, fileb):
"""
Compare 2 L0 files by their first available timestamp. Used to sort L0 files
by time.
"""
stampa = stampb = None
for pkt in jpss_packet_stream(io.open(filea, 'rb')):
if pkt.stamp:
stampa = pkt.stamp
break
for pkt in jpss_packet_stream(io.open(fileb, 'rb')):
if pkt.stamp:
stampb = pkt.stamp
break
return cmp(stampa, stampb)
def make_streams(*filepaths):
return [
jpss_packet_stream(io.open(f, 'rb'))
for f in sorted(filepaths, cmp=_cmp_by_first_pkt)
]
......@@ -8,12 +8,12 @@ from datetime import datetime
import attr
import h5py
import numpy as np
from astropy.time import Time, TimeDelta
import edosl0util
from edosl0util.jpssrdr import (
StaticHeader, Apid as ApidListItem, PacketTracker, decode_rdr_blob)
from edosl0util.stream import jpss_packet_stream
from edosl0util.timecode import cds_to_iet, iet_to_dt
def packets_to_rdrs(sat, l0_files, **kwargs):
......@@ -22,11 +22,12 @@ def packets_to_rdrs(sat, l0_files, **kwargs):
with open(l0_file) as l0_file_obj:
for pkt in jpss_packet_stream(l0_file_obj):
yield pkt
build_rdr(sat, iter_pkts(l0_files), **kwargs)
build_rdr(sat, iter_pkts(l0_files), **kwargs)
def build_rdr(sat, pkt_iter, output_dir='.', aggr_level=None, diary_cushion=10000000, attr_overrides={}):
def build_rdr(sat, pkt_iter, output_dir='.', aggr_level=None, diary_cushion=10000000,
attr_overrides={}):
# divy packets up into temp files organized by granule
file_mgr = BinnedTemporaryFileManager()
get_jpss_packet_time = GetJpssPacketTime()
......@@ -64,7 +65,7 @@ def build_rdr(sat, pkt_iter, output_dir='.', aggr_level=None, diary_cushion=1000
if packaged_type:
packaged_gran_iets = get_overlapping_granules(
sat, packaged_type.gran_len, aggr_iet - diary_cushion,
aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1)
aggr_iet + aggr_level * primary_type.gran_len + diary_cushion + 1)
rdr_writer.write_aggregate(
packaged_type, packaged_gran_iets[0], len(packaged_gran_iets))
for gran_iet in packaged_gran_iets:
......@@ -117,6 +118,7 @@ class BinnedTemporaryFileManager(object):
Limits the number of file handles kept open at a time.
"""
def __init__(self, parent_dir='.', max_open_files=32):
self.max_open_files = max_open_files
self.dir = tempfile.mkdtemp(dir=parent_dir)
......@@ -179,7 +181,7 @@ class RdrWriter(object):
self._orbit_num = orbit_num
self._creation_time = creation_time or datetime.now()
self._software_ver = (
software_ver or edosl0util.__name__ + '-' + edosl0util.__version__)
software_ver or edosl0util.__name__ + '-' + edosl0util.__version__)
aggr_end_iet = aggr_iet + aggr_level * rdr_types[0].gran_len
self.file_name = make_rdr_filename(
......@@ -297,6 +299,7 @@ class RdrWriter(object):
_h5_ref_dtype = h5py.special_dtype(ref=h5py.Reference)
_h5_regionref_dtype = h5py.special_dtype(ref=h5py.RegionReference)
def build_rdr_blob(sat, pkt_stream, rdr_type, granule_iet):
get_jpss_packet_time = GetJpssPacketTime()
granule_iet_end = granule_iet + rdr_type.gran_len
......@@ -576,15 +579,15 @@ class ViirsGroupedPacketTimeTracker(object):
idx = 20
else:
idx = 10
arr = np.frombuffer(pkt.bytes()[idx:idx+8], 'B')
arr = np.frombuffer(pkt.bytes()[idx:idx + 8], 'B')
days = arr[0:2].view('>u2')[0]
ms = arr[2:6].view('>u4')[0]
us = arr[6:8].view('>u2')[0]
return timecode_parts_to_iet(days, ms, us)
return cds_to_iet(days, ms, us)
@staticmethod
def check_sequence_number(nonfirst_seq_num, first_seq_num, group_size):
seq_limit = 2**14
seq_limit = 2 ** 14
group_end = first_seq_num + group_size
# the 2nd check below is needed to handle wrap-around
return (first_seq_num < nonfirst_seq_num < group_end
......@@ -620,8 +623,10 @@ def make_rdr_filename(rdr_types, sat, aggr_begin, aggr_end, orbit_num, creation_
sat = {'snpp': 'npp'}[sat]
if origin.endswith('-'):
origin = origin[:-1] + ('c' if compressed else 'u')
def format_time(t):
return t.strftime('%H%M%S') + str(t.microsecond / 100000)
return '{p}_{s}_d{d:%Y%m%d}_t{b}_e{e}_b{n:05d}_c{c:%Y%m%d%H%M%S%f}_{o}_{m}.h5'.format(
p=prod_ids, s=sat, d=aggr_begin, b=format_time(aggr_begin),
e=format_time(aggr_end), n=orbit_num, c=creation_time, o=origin, m=domain)
......@@ -634,20 +639,13 @@ def make_granule_id(sat, gran_iet):
def get_packet_iet(pkt):
tc = pkt.secondary_header.timecode
return timecode_parts_to_iet(tc.days, tc.milliseconds, tc.microseconds)
def timecode_parts_to_iet(days, ms, us):
# FIXME: move to timecode.py
ccsds_epoch = Time('1958-01-01', scale='tai')
day = Time(ccsds_epoch.jd + days, scale='utc', format='jd') - iet_epoch
return int(day.sec * 1e6 + ms * 1e3 + us)
return cds_to_iet(tc.days, tc.milliseconds, tc.microseconds)
def iet_to_datetime(iet):
if isinstance(iet, datetime):
return iet
return (iet_epoch + TimeDelta(iet * 1e-6, format='sec')).utc.datetime
return iet_to_dt(iet)
def get_granule_start(sat, gran_len, iet):
......@@ -681,7 +679,6 @@ def get_overlapping_granules(sat, gran_len, start_iet, stop_iet):
return rv
iet_epoch = Time('1958-01-01', scale='tai')
satellite_base_times = {'snpp': 1698019234000000}
platform_short_names = {'snpp': 'NPP'}
instrument_short_names = {'viirs': 'VIIRS', 'cris': 'CrIS', 'atms': 'ATMS',
......
# encoding: utf-8
from datetime import timedelta, datetime
from grain import Grain
__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."
from astropy.time import Time, TimeDelta
UNIX_EPOCH = datetime(1970, 1, 1)
CDS_EPOCH = datetime(1958, 1, 1)
_grain = Grain()
def unixtime(dt):
......@@ -13,18 +18,34 @@ def unixtime(dt):
return (dt - datetime(1970, 1, 1)).total_seconds()
def cds_to_dt(days, millis, microseconds):
def timecode_parts_to_dt(days, ms, us, epoch):
"""
CCSDS Day Segmented timecode to UTC datetime.
Convert components to a UTC datetime based on arbitrary epoch.
"""
return epoch + timedelta(days=days, microseconds=1e3 * ms + us)
def timecode_parts_to_iet(days, ms, us, epoch):
"""
Convert components to a IET based on arbitrary epoch.
"""
return datetime(1970, 1, 1) + timedelta(days=days, microseconds=1e6 * millis + microseconds)
return _grain.utc2tai(
epoch + timedelta(days=days, milliseconds=ms, microseconds=us),
epoch)
def cds_to_iet(days, ms, us):
"""
CCSDS Day Segmented timecode parts to IET (microseconds)
CCSDS Day Segmented timecode (UTC) parts to IET (microseconds)
"""
iet_epoch = Time('1958-01-01', scale='tai')
ccsds_epoch = Time('1958-01-01', scale='tai')
day = Time(ccsds_epoch.jd + days, scale='utc', format='jd') - iet_epoch
return int(day.sec * 1e6 + ms * 1e3 + us)
return timecode_parts_to_iet(days, ms, us, CDS_EPOCH)
def cds_to_dt(days, ms, us):
"""
CCSDS Day Segmented timecode to UTC datetime.
"""
return timecode_parts_to_dt(days, ms, us, CDS_EPOCH)
iet_to_dt = _grain.iet2utc
......@@ -13,9 +13,9 @@ setup(
'setuptools_scm'
],
install_requires=[
'astropy',
'attrs',
'h5py',
'grain >= 2.0',
],
extras_require={
'testing': [
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment