Newer
Older
1. Cat PDS files together
2. Index packets
- drop any leading hanging packets
-
3. Sort index
4. Write
"""
from collections import deque, OrderedDict
VIIRS_APID_ORDER = (
(826, 821) + tuple(range(800, 821)) + tuple(range(822, 826)) + (827, 828)
)
Represents one or more packets that share the same time timecode and apid.
def __init__(self, fobj, stamp, apid, offset, size, seqid, sort_key=None):
self.apid = apid
self.offset = offset
self.size = size
def __repr__(self):
return "<{:s} {:s}>".format(self.__class__.__name__, attrs)
return (self.stamp, self.apid, self.seqid) == (
that.stamp,
that.apid,
that.seqid,
)
# hash by stamp, apid, size so we can dedup in index using set
def __hash__(self):
return hash((self.stamp, self.apid, self.seqid, self.size))
def bytes(self):
self.fobj.seek(self.offset, os.SEEK_SET)
return self.fobj.read(self.size)
def _sort_key(p, order=None, eos=False):
if eos and p.apid == 64:
return _eos._modis_sort_key(p)
return (p.stamp, order.index(p.apid) if order else p.apid)
def read_packet_index(stream, order=None, eos=False):
index = deque()
try:
# drop any leading hanging packets
count = 0
packet = stream.next()
#while not (packet.is_first() or packet.is_standalone()):
packet = stream.next()
count += 1
if count:
#if not (packet.is_first() or packet.is_standalone()):
# corrupt packet groups can cause apid mismatch
# so skip until we get to the next group
packet = stream.next()
continue
apid=packet.apid,
offset=packet.offset,
size=packet.size,
seqid=packet.primary_header.source_sequence_count,
sort_key=_sort_key(packet, order, eos),
)
index.append(ptr)
# collect all packets for this timecode/group
packet = stream.next()
#while not (packet.is_first() or packet.is_standalone()):
# Bail if we're collecting group packets and apids don't match
# This means group is corrupt
if ptr.apid != packet.apid:
break
ptr.size += packet.size
ptr.count += 1
except StopIteration:
pass
return index
def merge(streams, output, trunc_to=None, apid_order=None, eos=False):
"""
Merge packets from multiple streams to an output file. Duplicate packets
will be filtered and packets will be sorted by time and the apid order
provided.
:param streams: List of `PacketStream`s such as returned by
`jpss_packet_stream`.
:param output: File-like object to write output packets to.
:keyword trunc_to: (start, end) datetime to truncate packets to. Start is
inclusive and end is exclusive.
:keyword apid_order: List of apid is the order in which they should appear
in the output. If provided ALL apids must be present, otherwise an
IndexError will occur.
"""
index = set() # will remove duplicates
index.update(read_packet_index(stream, order=apid_order, eos=eos))
# _Ptr class implements custom sort key impl for sorting
index = sorted(index)
if trunc_to[0] <= ptr.stamp < trunc_to[1]:
output.write(ptr.bytes())
else:
output.write(ptr.bytes())