Something went wrong on our end
-
Bruce Flynn authoredBruce Flynn authored
split.py 3.56 KiB
import os
import io
import array
from datetime import datetime
from edosl0util.timecode import unixtime
from edosl0util.stream import PacketStream
def split_stream(fobj, minutes):
"""Split a VIIRS L0 PDS data stream into data blobs based on their scan
time mod the number of minutes provided.
:param fobj: A file-like object
:param minutes: integer number of minutes
"""
buf = array.array('B') # buffer for a single data file until it is written
cur_bucket = 0 # cur time bucket of size 'minutes'
pkt_count = 0
original_offset = fobj.tell()
for pkt in PacketStream(fobj):
# do the bucketing based on secondary header timestamps
if pkt.stamp:
hdrtime = unixtime(pkt.stamp)
pkt_bucket = hdrtime - hdrtime % (minutes * 60)
if cur_bucket == 0:
cur_bucket = pkt_bucket
if pkt_bucket > cur_bucket:
offset = fobj.tell() - original_offset
yield cur_bucket, offset, pkt_count, buf.tostring()
pkt_count = 0
buf = array.array('c')
cur_bucket = pkt_bucket
# this is an append operation
buf.fromstring(pkt.data)
pkt_count += 1
offset = fobj.tell() - original_offset
yield cur_bucket, offset, pkt_count, buf.tostring()
def _replace_pdsname_stamp(filename, stamp):
# P1570769AAAAAAAAAAAAAS15208032721001.PDS
#
# NOTE: It seems that EDOS uses the file_id column for fractional seconds.
# We just zero this out since the bucket should be on even seconds.
pat = '{}{}0{}'.format(filename[:22],
'%y%j%H%M%S',
filename[-6:])
return stamp.strftime(pat)
def _filename_for_splitfile(filename, stamp, minutes):
# P1570769AAAAAAAAAAAAAS15208032721001.PDS
#
# NOTE: It seems that EDOS uses the file_id column for fractional seconds.
# We just zero this out since the bucket should be on even seconds.
pat = '{}{}{}{}0{}'.format(
filename[:20],
minutes,
filename[21],
'%y%j%H%M%S',
filename[-6:]
)
return stamp.strftime(pat)
def split_file(filepath, minutes, destdir):
"""
Split a level0 PDS file into X minutes files by filename.
:param filepath: Path to a Level0 PDS file. It is assumed the file as a
standard level 0 PDS filename.
:param minutes: Number of minutes per bucket. Buckets always start at the
top of the hour. For example, a bucket size of 6 will create 10 6-min
buckets starting at minutes 0, 6, 12, etc ...
:param destdir: Where the output files are to be written. NOTE: it is likely
there will be filename collisions between time-based files and generated
files, so make sure `destdir` does not contain a time-base input file.
:raises RuntimeError: If a file exists with the same name of a bucket file.
"""
destdir = destdir or '.'
stream = split_stream(io.open(filepath), minutes)
for timestamp, offset, pkts, blob in stream:
stamp = datetime.utcfromtimestamp(timestamp)
dirname, filename = os.path.split(filepath)
newname = _filename_for_splitfile(filename, stamp, minutes)
dstpath = os.path.join(dirname, destdir, newname)
if os.path.exists(dstpath):
raise RuntimeError(
('File already exists. '
'Bucket file possibly colliding with input file.'),
dstpath)
with io.open(dstpath, 'wb') as fptr:
fptr.write(blob)
yield stamp, fptr.name