From aeec6172eb954d46c44a6a34e01831a9ca0a425d Mon Sep 17 00:00:00 2001 From: Bruce Flynn <brucef@ssec.wisc.edu> Date: Thu, 13 Aug 2015 20:01:51 +0000 Subject: [PATCH] Add split_stream, split_file --- edosl0util/merge.py | 6 ++++- edosl0util/split.py | 53 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/edosl0util/merge.py b/edosl0util/merge.py index bd3b469..16ac3b3 100644 --- a/edosl0util/merge.py +++ b/edosl0util/merge.py @@ -1,4 +1,3 @@ -import io import sys import logging from datetime import datetime @@ -88,3 +87,8 @@ def merge(streams, output=sys.stdout): streams.remove(stream) LOG.debug("end-of-stream %s", stream) continue + + +def merge_files(filepaths, destpath): + streams = (PacketStream(open(f)) for f in filepaths) + merge(streams, output=open(destpath, 'wb')) diff --git a/edosl0util/split.py b/edosl0util/split.py index 714afe3..6fb0cc6 100644 --- a/edosl0util/split.py +++ b/edosl0util/split.py @@ -1,12 +1,17 @@ +import os import array +from datetime import datetime from edosl0util.timecode import unixtime from edosl0util.stream import PacketStream -def split(fobj, minutes): - """Split a VIIRS L0 PDS file into files based on their scan time mod the - number of minutes provided. +def split_stream(fobj, minutes): + """Split a VIIRS L0 PDS data stream into data blobs based on their scan + time mod the number of minutes provided. + + :param fobj: A file-like object + :param minutes: integer number of minutes """ buf = array.array('B') # buffer for a single data file until it is written cur_bucket = 0 # cur time bucket of size 'minutes' @@ -34,4 +39,44 @@ def split(fobj, minutes): pkt_count += 1 offset = fobj.tell() - original_offset - yield cur_bucket, offset, pkt_count, buf.tostring() \ No newline at end of file + yield cur_bucket, offset, pkt_count, buf.tostring() + + +def _replace_pdsname_stamp(filename, stamp): + # P1570769AAAAAAAAAAAAAS15208032721000.PDS + pat = '{}{}{}'.format(filename[:22], + '%y%j%H%M%S', + filename[-7:]) + return stamp.strftime(pat) + + +def split_file(filepath, minutes, destdir): + """ + Split a level0 PDS file into X minutes files by filename. + + :param filepath: Path to a Level0 PDS file. It is assumed the file as a + standard level 0 PDS filename. + :param minutes: Number of minutes per bucket. Buckets always start at the + top of the hour. For example, a bucket size of 6 will create 10 6-min + buckets starting at minutes 0, 6, 12, etc ... + :param destdir: Where the output files are to be written. NOTE: it is likely + there will be filename collisions between time-based files and generated + files, so make sure `destdir` does not contain a time-base input file. + + :raises RuntimeError: If a file exists with the same name of a bucket file. + """ + destdir = destdir or '.' + stream = split_stream(open(filepath), minutes) + for timestamp, offset, pkts, blob in stream: + stamp = datetime.utcfromtimestamp(timestamp) + dirname, filename = os.path.split(filepath) + newname = _replace_pdsname_stamp(filename, stamp) + dstpath = os.path.join(dirname, destdir, newname) + if os.path.exists(dstpath): + raise RuntimeError( + ('File already exists. ' + 'Bucket file possibly colliding with input file.'), + dstpath) + with open(dstpath, 'wb') as fptr: + fptr.write(blob) + yield stamp, fptr.name -- GitLab