diff --git a/edosl0util/stream.py b/edosl0util/stream.py index 67f59259c5df12796f87c6028ae22b0a2b22ad08..bb5bda63c91f2a1100edfaec564777496a26e9e0 100644 --- a/edosl0util/stream.py +++ b/edosl0util/stream.py @@ -19,57 +19,98 @@ LOG = logging.getLogger(__name__) MAX_SEQ_ID = 2**14 - 1 -def simple_stream(fobj): +class Error(Exception): + """ + Base stream error. + """ + + +class PacketTooShort(Error): + """ + The number of bytes read for a packet does not match the expected length. + After this error occurs the stream is no longer usable because data offsets + are no longer reliable. + """ + def __init__(self, header=None, offset=None): + self.offset = offset + self.primary_header = header + self.args = (self.offset, self.primary_header) + + +class NonConsecutiveSeqId(Error): + """ + Non-consecutive sequence numbers encounterd for apid. + """ + + +class SimpleStream(object): """ Generator that yields PrimaryHeaders and data. Files are read using mmap. """ - psize = c.sizeof(PrimaryHeader) - while True: - buf = fobj.read(psize) + def __init__(self, fobj): + self.fobj = fobj + self.offset = fobj.tell() + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def next(self): + psize = c.sizeof(PrimaryHeader) + buf = self.fobj.read(psize) + if not buf: + raise StopIteration() if len(buf) < psize: - return + raise PacketTooShort(header=None, offset=self.offset) h1 = PrimaryHeader.from_buffer_copy(buf) # read user data size = h1.data_length_minus1 + 1 - buf = fobj.read(size) + buf = self.fobj.read(size) if len(buf) < size: - return - yield h1, buf + raise PacketTooShort(header=h1, offset=self.offset) + self.offset += (psize + size) + return h1, buf -def _full_stream(header_lookup, fobj): +class FullStream(SimpleStream): """ - Generator that yields primary and secondary header and any left over bytes + Iterable returning primary and secondary header and any left over bytes of "user" data. - - :param header_lookup: function that takes a primary header and returns - a secondary header struct to use, or None. """ - stream = simple_stream(fobj) - for h1, buf in stream: + def __init__(self, header_lookup, fobj): + """ + :param header_lookup: function that takes a primary header and returns + a secondary header struct to use, or None. + """ + self.header_lookup = header_lookup + super(FullStream, self).__init__(fobj) + + def next(self): + h1, buf = super(FullStream, self).next() h2 = '' if h1.secondary_header_flag: - H2Impl = header_lookup(h1) + H2Impl = self.header_lookup(h1) if H2Impl: hsize = c.sizeof(H2Impl) h2 = H2Impl.from_buffer_copy(buf) - yield h1, h2, buf[hsize:] - continue - yield h1, h2, buf + return h1, h2, buf[hsize:] + return h1, h2, buf def jpss_full_stream(fobj): """ - `_full_stream` impl for JPSS secondary headers. + `FullStream` impl for JPSS secondary headers. """ - return _full_stream(headers.jpss_header_lookup, fobj) + return FullStream(headers.jpss_header_lookup, fobj) def aqua_full_stream(fobj): """ - `_full_stream` impl for Aqua secondary headers. + `FullStream` impl for Aqua secondary headers. """ - return _full_stream(headers.aqua_header_lookup, fobj) + return FullStream(headers.aqua_header_lookup, fobj) class Packet(object): @@ -123,7 +164,7 @@ class PacketStream(object): """ SEQID_NOTSET = -1 - def __init__(self, fobj): + def __init__(self, fobj, fail_on_missing=False): """ :param fobj: File-like object to read stream from """ @@ -131,6 +172,7 @@ class PacketStream(object): self._seek_cache = deque() self._apid_info = defaultdict( lambda: {'count': 0, 'last_seqid': self.SEQID_NOTSET, 'num_missing': 0}) + self._fail_on_missing = fail_on_missing def __iter__(self): return self @@ -143,21 +185,15 @@ class PacketStream(object): self._seek_cache.append(packet) def next(self): - """ - Return the next packet in the stream. - - :keyword fail_on_missing: If True MissingPackets will be raised when - sequence ids are non-sequential. - - :raises: StopIteration when no more data can be serialzed into Packets. - :raises: MissingPackets see `fail_on_missing` - """ # return any items on the seek cache before reading more if len(self._seek_cache): return self._seek_cache.popleft() packet = Packet(*self._stream.next()) - self._update_info(packet) + have_missing = self._update_info(packet) + if have_missing and self._fail_on_missing: + self.push_back(packet) + raise NonConsecutiveSeqId() return packet def _update_info(self, packet): @@ -177,11 +213,10 @@ class PacketStream(object): def info(self): return self._apid_info - def seek(self, stamp, apid=None): + def seek_to(self, stamp, apid=None): """ Seek forward such that the next packet provided will be the first packet - having a timestamp available in the secondary header >= stamp, and - optionally with apid as well. + having a timestamp available in the secondary header >= stamp and apid. :raises: StopIteration if the end of the stream is reached before the criteria can be met. @@ -191,9 +226,9 @@ class PacketStream(object): while not packet.stamp: packet = self.next() - # seek to first packet > `stamp` + # seek to first packet => `stamp` current_stamp = packet.stamp - while current_stamp <= stamp: + while current_stamp < stamp: packet = self.next() current_stamp = packet.stamp or current_stamp @@ -206,6 +241,18 @@ class PacketStream(object): packet = self.next() self.push_back(packet) + def seek_to_next_stamp(self): + """ + Seek to the next packet with a timestamp available in the secondary + header. For standalone packets this is essentially the same as `next`. + For packet groups it is essentially a "next group" function. + """ + packet = self.next() # get past current packet + packet = self.next() + while not packet.stamp: + packet = self.next() + self.push_back(packet) + def _cmp_by_first_pkt(filea, fileb): """ @@ -224,6 +271,6 @@ def _cmp_by_first_pkt(filea, fileb): return cmp(stampa, stampb) -def make_streams(filepaths): +def make_streams(filepaths, **kwargs): filepaths.sort(_cmp_by_first_pkt) - return [PacketStream(io.open(f, 'rb')) for f in filepaths] + return filepaths, [PacketStream(io.open(f, 'rb'), **kwargs) for f in filepaths]