Skip to content
Snippets Groups Projects
Commit 1a4b0232 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

merge: use datetime rather than timecode

parent 7212be6f
No related branches found
No related tags found
No related merge requests found
......@@ -24,9 +24,9 @@ class _Ptr(object):
Represents one or more packets that share the same time timecode and apid.
"""
def __init__(self, fobj, timecode, apid, offset, size):
def __init__(self, fobj, stamp, apid, offset, size):
self.fobj = fobj
self.timecode = timecode
self.stamp = stamp
self.apid = apid
self.offset = offset
self.size = size
......@@ -41,19 +41,18 @@ class _Ptr(object):
return '<{:s} {:s}>'.format(self.__class__.__name__, attrs)
def __eq__(self, that):
return (self.timecode.day_segmented_timecode(), self.apid) \
== (that.timecode.day_segmented_timecode(), that.apid)
return (self.stamp, self.apid) == (that.stamp, that.apid)
def __ne__(self, that):
return not self == that
def __lt__(self, that):
return (self.timecode.day_segmented_timecode(), self.apid) \
< (that.timecode.day_segmented_timecode(), that.apid)
return (self.stamp, self.apid) < (that.stamp, that.apid)
# WTF?
# instances with same timecode/apid/size will compare the same
def __hash__(self):
return hash((self.timecode, self.apid, self.size))
# def __hash__(self):
# return hash((self.stamp, self.apid, self.size))
def bytes(self):
self.fobj.seek(self.offset, os.SEEK_SET)
......@@ -66,21 +65,21 @@ def read_packet_index(stream):
# drop any leading hanging packets
count = 0
packet = stream.next()
while not packet.cds_timecode:
while not packet.stamp:
packet = stream.next()
count += 1
if count:
LOG.info('dropped %d leading packets', count)
while True:
if not packet.cds_timecode:
if not packet.stamp:
# corrupt packet groups can cause apid mismatch
# so skip until we get to the next group
packet = stream.next()
continue
ptr = _Ptr(
stream.file,
timecode=packet.cds_timecode,
stamp=packet.stamp,
apid=packet.apid,
offset=packet.offset,
size=packet.size,
......@@ -88,7 +87,7 @@ def read_packet_index(stream):
index.append(ptr)
# collect all packets for this timecode/group
packet = stream.next()
while not packet.cds_timecode:
while not packet.stamp:
# Bail if we're collecting group packets and apids don't match
# This means group is corrupt
if ptr.apid != packet.apid:
......@@ -111,7 +110,7 @@ def _sort_by_time_apid(index, order=None):
index = sorted(index, key=lambda p: order.index(p.apid) if p.apid in order else -1)
else:
index = sorted(index, key=lambda p: p.apid)
return sorted(index, key=lambda p: p.timecode)
return sorted(index, key=lambda p: p.stamp)
def _filter_duplicates_by_size(index):
......@@ -120,7 +119,7 @@ def _filter_duplicates_by_size(index):
"""
filtered = OrderedDict()
for ptr in index:
key = (ptr.timecode, ptr.apid)
key = (ptr.stamp, ptr.apid)
if key in filtered:
if ptr.size > filtered[key].size:
filtered[key] = ptr
......@@ -157,7 +156,7 @@ def merge(streams, output, trunc_to=None, apid_order=None):
LOG.debug('writing index to %s', output)
for ptr in index:
if trunc_to:
if ptr.timecode >= trunc_to[0] and ptr.timecode < trunc_to[1]:
if trunc_to[0] <= ptr.stamp < trunc_to[1]:
output.write(ptr.bytes())
else:
output.write(ptr.bytes())
from datetime import timedelta, datetime
from operator import eq, lt, gt
import pytest
......@@ -19,14 +20,12 @@ class Test_Ptr:
[(0, 0), (1, 1), lt],
])
def test_ordering(self, d1, d2, op):
time1, apid1 = d1
ptr1 = merge._Ptr(open('/dev/null', 'r'),
headers.DaySegmentedTimecode(time1, 0, 0),
apid1, 0, 0)
time2, apid2 = d2
ptr2 = merge._Ptr(open('/dev/null', 'r'),
headers.DaySegmentedTimecode(time2, 0, 0),
apid2, 0, 0)
base = datetime.utcnow()
days1, apid1 = d1
ptr1 = merge._Ptr(open('/dev/null', 'r'), base + timedelta(days1), apid1, 0, 0)
days2, apid2 = d2
ptr2 = merge._Ptr(open('/dev/null', 'r'), base + timedelta(days2), apid2, 0, 0)
assert op(ptr1, ptr2)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment