Skip to content
Snippets Groups Projects
Commit a41b2299 authored by Ray Garcia's avatar Ray Garcia :scream_cat:
Browse files

Documentation improvements

Minor renaming and move-around refactor to prepare for HCAST/HSD
parent 4291c18d
Branches
Tags
No related merge requests found
......@@ -2,12 +2,26 @@
# -*- coding: utf-8 -*-
"""
PURPOSE
Statefully consolidate PUG scene files by name and start time
Statefully consolidate AXI scene groups into globs using metadata extracted from filenames and basic channel/section scene rulesets
USAGE
inotifywait -m -r -e close_write -e moved_to --format "$PWD/%f" $PWD |python -m goesr.consolidate --thru
inotifywait -m -r -e close_write -e moved_to --format "$PWD/%f" $PWD |python -m goesr.consolidate --passthru
find /path/to/RadF -name '*.nc' -type f |python -m goesr.consolidate
ls -1 /Volumes/earth/users/rayg/Data/ABI/RadC/OR_ABI-L1b-RadC-M*C0*_G16_s20171230027204_e*_c*.nc |python -m goesr.consolidate -vv -c '1,2,3,4,5'
ls -1 $HOME/Data/ABI/RadC/OR_ABI-L1b-RadC-M*C0*_G16_s20171230027204_e*_c*.nc |python -m goesr.consolidate -vv -c '1,2,3,4,5'
See goesr/tests/test-consolidate-bash.sh for advanced example using --all output to generate completion messages,
via a bash subshell launched by xargs
By default it runs in "simple" mode, where glob patterns for completed scenes
are flushed to stdout as their requirements are satisfied.
When --all is used, output is in the form key=value key=value ... message. Two messages are provided
complete (scene had all requirements satisfied)
expired (pending scene was not satisfied within timeout or queue size limit)
When --all --passthru is given, additional message types are generated:
begin (new, relevant file)
ignored (not relevant file due to channel constraint or otherwise)
R.K.Garcia <rkgarcia@wisc.edu>
Copyright 2018 by University of Wisconsin Regents, see AUTHORS for more details
......@@ -24,15 +38,6 @@ from functools import partial
LOG = logging.getLogger(__name__)
RE_PUG = re.compile(r'(?P<env>[A-Za-z0-9]+)_(?P<inst>[A-Za-z0-9]+)-(?P<lvl>[A-Za-z0-9]+)-(?P<prod>[A-Za-z0-9]+)-M(?P<mode>\d+)C(?P<chan>\d+)_(?P<platform>[A-Za-z0-9]+)_s(?P<start>\d+)_e(?P<end>\d+)_c(?P<created>\d+)\.(?P<suffix>\w+)')
EX_PUG = r'\g<env>_\g<inst>-\g<lvl>-\g<prod>-M\g<mode>C*_\g<platform>_s\g<start>_e*_c*.\g<suffix>'
# OR_ABI-L1b-RadC-M3C*_G16_s20171230052204_e*_c*.nc
FMT_PUG_GLOB= '{dirname}/{env}_{inst}-{lvl}-{prod}-M{mode}C*_{platform}_s{start}_e*_c*.{suffix}'
RE_PUG_FILENAME_TIME = re.compile(r'(\d{4})(\d\d\d)(\d\d)(\d\d)(\d\d)(\d+)')
class ExpiringDefaultDict(defaultdict):
......@@ -45,7 +50,29 @@ class ExpiringDefaultDict(defaultdict):
return (self._incept + td) <= now
def filename_timecode_to_datetime(iso):
def convert(groupdict: dict, conversions: dict):
zult = {}
for k,v in groupdict.items():
c = conversions.get(k)
zult[k] = c(v) if callable(c) else v
return zult
#
# GOES-R PUG conversions and ruleset
#
RE_PUG = re.compile(r'(?P<env>[A-Za-z0-9]+)_(?P<inst>[A-Za-z0-9]+)-(?P<lvl>[A-Za-z0-9]+)-(?P<prod>[A-Za-z0-9]+)-M(?P<mode>\d+)C(?P<chan>\d+)_(?P<platform>[A-Za-z0-9]+)_s(?P<start>\d+)_e(?P<end>\d+)_c(?P<created>\d+)\.(?P<suffix>\w+)')
EX_PUG = r'\g<env>_\g<inst>-\g<lvl>-\g<prod>-M\g<mode>C*_\g<platform>_s\g<start>_e*_c*.\g<suffix>'
# OR_ABI-L1b-RadC-M3C*_G16_s20171230052204_e*_c*.nc
FMT_PUG_GLOB= '{dirname}/{env}_{inst}-{lvl}-{prod}-M{mode}C*_{platform}_s{start}_e*_c*.{suffix}'
RE_PUG_FILENAME_TIME = re.compile(r'(\d{4})(\d\d\d)(\d\d)(\d\d)(\d\d)(\d+)')
def pug_filename_timecode_to_datetime(iso):
m = RE_PUG_FILENAME_TIME.match(iso.strip())
nums = list(m.groups())
nums[5] += '0' * max(0, 6-len(nums[5])) # .9 -> 900000µs
......@@ -53,52 +80,44 @@ def filename_timecode_to_datetime(iso):
return datetime(yyyy, 1, 1, h, m, s, t) + timedelta(days=jjj-1)
CONVERSIONS = {
PUG_KEY_CONVERSIONS = {
'mode': lambda s: int(s, 10),
'chan': lambda s: int(s, 10),
'start': filename_timecode_to_datetime,
'end': filename_timecode_to_datetime,
'created': filename_timecode_to_datetime
'start': pug_filename_timecode_to_datetime,
'end': pug_filename_timecode_to_datetime,
'created': pug_filename_timecode_to_datetime
}
def _tc_fmt(dt: datetime):
def _pug_tc_fmt(dt: datetime):
# return dt.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
return dt.strftime('%Y%j%H%M%S%f')[:(4+3+2+2+2+1)]
FORMATTERS = {
'start': _tc_fmt,
'end': _tc_fmt,
PUG_KEY_FORMATTERS = {
'start': _pug_tc_fmt,
'end': _pug_tc_fmt,
'chan': lambda x: ','.join(str(q) for q in sorted(list(x))) if isinstance(x, set) else str(x)
}
DEFAULT_KV = {
PUG_DEFAULT_KV = {
'chan': set(range(1, 17)), # want all 16 channels
'created': None, # don't care
'end': None
}
def convert(groupdict: dict, conversions: dict=CONVERSIONS):
zult = {}
for k,v in groupdict.items():
c = conversions.get(k)
zult[k] = c(v) if callable(c) else v
return zult
def convert_filename(pug_filename: str):
def pug_convert_path(pug_filename: str):
dirname, basename = os.path.split(pug_filename)
m = RE_PUG.match(basename)
if not m:
return {}
d = convert(m.groupdict())
d = convert(m.groupdict(), PUG_KEY_CONVERSIONS)
d['dirname'] = dirname
return d
def format_glob_from_metadata(dkey, entry, variant=None):
def pug_format_glob_from_metadata(dkey, entry, variant=None):
if not isinstance(dkey, dict):
dkey = dict(dkey)
q = {}
......@@ -108,6 +127,21 @@ def format_glob_from_metadata(dkey, entry, variant=None):
return FMT_PUG_GLOB.format(**q)
def pug_format_emit_simple(path, tag, dkey, entry, variant=None):
dirname, basename = os.path.split(path)
m = RE_PUG.match(basename)
return os.path.join(dirname, m.expand(EX_PUG))
def pug_format_emit_complete(path, tag, key, entry, variant=None, use_original_path=False):
skey = ' '.join('{}={}'.format(k, repr(v)) for (k, v) in sorted(key.items()))
evar = ' '.join('{}={}'.format(k, repr(v)) for (k, v) in sorted(entry.items())) if entry else ''
svar = ' '.join('{}={}'.format(k, repr(v)) for (k, v) in sorted(variant.items())) if variant else ''
# path = 'path="{}"'.format(format_emit(path, key, entry, None))
path = 'path="{}"'.format(pug_format_glob_from_metadata(key, entry, variant) if not use_original_path else path)
return ' '.join([skey, evar, svar, path, tag])
class SceneRule(object):
BEGIN = 'begin'
PROGRESS = 'progress'
......@@ -266,21 +300,6 @@ def threaded_line_reader(fob, delay_seconds=5.0):
yield None
def format_emit_simple(path, tag, dkey, entry, variant=None):
dirname, basename = os.path.split(path)
m = RE_PUG.match(basename)
return os.path.join(dirname, m.expand(EX_PUG))
def format_emit_complete(path, tag, key, entry, variant=None, use_original_path=False):
skey = ' '.join('{}={}'.format(k, repr(v)) for (k, v) in sorted(key.items()))
evar = ' '.join('{}={}'.format(k, repr(v)) for (k, v) in sorted(entry.items())) if entry else ''
svar = ' '.join('{}={}'.format(k, repr(v)) for (k, v) in sorted(variant.items())) if variant else ''
# path = 'path="{}"'.format(format_emit(path, key, entry, None))
path = 'path="{}"'.format(format_glob_from_metadata(key, entry, variant) if not use_original_path else path)
return ' '.join([skey, evar, svar, path, tag])
def consolidate_paths(scene_rule, file_objs, complete, begin=None, progress=None, expired=None, ignored=None):
for fin in file_objs:
for line in threaded_line_reader(fin): # fin.readlines():
......@@ -350,7 +369,7 @@ def main():
# for pn in args.inputs:
# pass
kv = dict(DEFAULT_KV)
kv = dict(PUG_DEFAULT_KV)
if args.channels:
RE_CHANNEL = re.compile(r'\d+')
......@@ -359,27 +378,27 @@ def main():
kv['chan'] = channels
if args.everything:
format_emit_passthru = partial(format_emit_complete, use_original_path=True)
format_emit_passthru = partial(pug_format_emit_complete, use_original_path=True)
kw = dict(
complete=format_emit_complete,
complete=pug_format_emit_complete,
begin=format_emit_passthru if args.passthru else None,
expired=format_emit_complete,
expired=pug_format_emit_complete,
progress=format_emit_passthru if args.passthru else None,
ignored = format_emit_passthru if args.passthru else None
)
else:
kw = dict(
complete=format_emit_simple,
begin=format_emit_simple if args.passthru else None,
complete=pug_format_emit_simple,
begin=pug_format_emit_simple if args.passthru else None,
expired=None,
progress=format_emit_simple if args.passthru else None,
ignored=format_emit_simple if args.passthru else None,
progress=pug_format_emit_simple if args.passthru else None,
ignored=pug_format_emit_simple if args.passthru else None,
)
r = SceneRule(convert_filename,
r = SceneRule(pug_convert_path,
kv,
queue_max_duration=None if not args.timeout else timedelta(seconds=args.timeout),
formatter=lambda x: convert(x, FORMATTERS))
formatter=lambda x: convert(x, PUG_KEY_FORMATTERS))
if args.inputs:
inputs = [(open(fn, 'rt') if fn != '-' else sys.stdin) for fn in args.inputs]
......
#!/bin/bash -ex
# advanced example, forwarding --all key-values-message output to a bash function
complete() {
echo "$platform $prod $path";
}
export -f complete
DATA="$1"
test -d "$DATA" || DATA=$HOME/Data/ABI
test -d "$DATA"
test -x "$PYTHON" || PYTHON=python3
test -x "$PYTHON"
find $DATA -name '*L1b*.nc' \
| $PYTHON -m goesr.consolidate --all -c 1,3,5,14 \
| xargs -I{} -L1 /bin/bash -c "{}"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment