Newer
Older
# encoding: utf-8
__copyright__ = "Copyright (C) 2015 University of Wisconsin SSEC. All rights reserved."
from edosl0util.stream import jpss_packet_stream
def split_stream(stream, minutes):
"""Split a VIIRS L0 PDS data stream into data blobs based on their scan
time mod the number of minutes provided.
:param fobj: A file-like object
:param minutes: integer number of minutes
buf = bytearray() # buffer for a single data file until it is written
# do the bucketing based on secondary header timestamps
pkt_bucket = hdrtime - hdrtime % (minutes * 60)
if cur_bucket == 0:
yield cur_bucket, pkt_count, buf
yield cur_bucket, pkt_count, buf
def _replace_pdsname_stamp(filename, stamp):
# P1570769AAAAAAAAAAAAAS15208032721001.PDS
#
# NOTE: It seems that EDOS uses the file_id column for fractional seconds.
# We just zero this out since the bucket should be on even seconds.
pat = "{}{}0{}".format(filename[:22], "%y%j%H%M%S", filename[-6:])
def _filename_for_splitfile(filename, stamp, minutes):
# P1570769AAAAAAAAAAAAAS15208032721001.PDS
#
# NOTE: It seems that EDOS uses the file_id column for fractional seconds.
# We just zero this out since the bucket should be on even seconds.
pat = "{}{}{}{}0{}".format(
filename[:20], minutes, filename[21], "%y%j%H%M%S", filename[-6:]
def split_file(filepath, minutes, destdir):
"""
Split a level0 PDS file into X minutes files by filename.
:param filepath: Path to a Level0 PDS file, with a standard L0 PDS filename.
:param minutes: Number of minutes per bucket. Buckets always start at the
top of the hour. For example, a bucket size of 6 will create 10 6-min
buckets starting at minutes 0, 6, 12, etc ...
:param destdir: Where the output files are to be written. NOTE: it is likely
there will be filename collisions between time-based files and generated
files, so make sure `destdir` does not contain a time-base input file.
:raises RuntimeError: If a file exists with the same name of a bucket file.
"""
destdir = destdir or "."
stream = split_stream(jpss_packet_stream(io.open(filepath, "rb")), minutes)
for timestamp, pkts, blob in stream:
stamp = datetime.utcfromtimestamp(timestamp)
dirname, filename = os.path.split(filepath)
newname = _filename_for_splitfile(filename, stamp, minutes)
dstpath = os.path.join(dirname, destdir, newname)
if os.path.exists(dstpath):
raise RuntimeError(
(
"File already exists. "
"Bucket file possibly colliding with input file."
),
dstpath,
)
with io.open(dstpath, "wb") as fptr: