Skip to content
Snippets Groups Projects
Commit aeec6172 authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Add split_stream, split_file

parent 4db07748
No related branches found
No related tags found
No related merge requests found
import io
import sys
import logging
from datetime import datetime
......@@ -88,3 +87,8 @@ def merge(streams, output=sys.stdout):
streams.remove(stream)
LOG.debug("end-of-stream %s", stream)
continue
def merge_files(filepaths, destpath):
streams = (PacketStream(open(f)) for f in filepaths)
merge(streams, output=open(destpath, 'wb'))
import os
import array
from datetime import datetime
from edosl0util.timecode import unixtime
from edosl0util.stream import PacketStream
def split(fobj, minutes):
"""Split a VIIRS L0 PDS file into files based on their scan time mod the
number of minutes provided.
def split_stream(fobj, minutes):
"""Split a VIIRS L0 PDS data stream into data blobs based on their scan
time mod the number of minutes provided.
:param fobj: A file-like object
:param minutes: integer number of minutes
"""
buf = array.array('B') # buffer for a single data file until it is written
cur_bucket = 0 # cur time bucket of size 'minutes'
......@@ -34,4 +39,44 @@ def split(fobj, minutes):
pkt_count += 1
offset = fobj.tell() - original_offset
yield cur_bucket, offset, pkt_count, buf.tostring()
\ No newline at end of file
yield cur_bucket, offset, pkt_count, buf.tostring()
def _replace_pdsname_stamp(filename, stamp):
# P1570769AAAAAAAAAAAAAS15208032721000.PDS
pat = '{}{}{}'.format(filename[:22],
'%y%j%H%M%S',
filename[-7:])
return stamp.strftime(pat)
def split_file(filepath, minutes, destdir):
"""
Split a level0 PDS file into X minutes files by filename.
:param filepath: Path to a Level0 PDS file. It is assumed the file as a
standard level 0 PDS filename.
:param minutes: Number of minutes per bucket. Buckets always start at the
top of the hour. For example, a bucket size of 6 will create 10 6-min
buckets starting at minutes 0, 6, 12, etc ...
:param destdir: Where the output files are to be written. NOTE: it is likely
there will be filename collisions between time-based files and generated
files, so make sure `destdir` does not contain a time-base input file.
:raises RuntimeError: If a file exists with the same name of a bucket file.
"""
destdir = destdir or '.'
stream = split_stream(open(filepath), minutes)
for timestamp, offset, pkts, blob in stream:
stamp = datetime.utcfromtimestamp(timestamp)
dirname, filename = os.path.split(filepath)
newname = _replace_pdsname_stamp(filename, stamp)
dstpath = os.path.join(dirname, destdir, newname)
if os.path.exists(dstpath):
raise RuntimeError(
('File already exists. '
'Bucket file possibly colliding with input file.'),
dstpath)
with open(dstpath, 'wb') as fptr:
fptr.write(blob)
yield stamp, fptr.name
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment