Skip to content
Snippets Groups Projects
hcast2scmi2awips.sh 7.57 KiB
#!/bin/bash
SOURCE="${BASH_SOURCE[0]}"
while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done
BASE="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )"

# - acquire inotify+consolidate notifications of new complete scenes on mount point
# - convert L1b for a single scene type to SCMI files in local directory
# - copy SCMI files to a destination using SCP or LDMSEND, retrying up to N times before moving files to a failed_transfer/ directory
# - use available cores on machine via xargs -P
# - restart if timeouts occur in connection
# Copyright 2018, University of Wisconsin Regents
# License: GPLv3
# Author: R.K.Garcia <rkgarcia@wisc.edu>

# example invocation
## incoming data location and filtering
# export SCENE=FLDK  # used to filter filenames
# export HCAST_PRODUCTS=/path/to/product-landing-directory
# export HCAST_SECTION_TIMEOUT=300  # seconds
## outgoing SCMI markings
# export SCMI_REGION=HFD  
# export SCMI_SCENE="Full Disk"
# export SCMI_ENV=OT
## exported LDM/AWIPS server configured to ACCEPT from this host
# export LDM_SERVER_HOST=chai.ssec.wisc.edu
# export LDM_CHANNEL=SPARE
# ./hcast2scmi2awips.sh

# exit loudly on unhandled error
set -ex

# configurable values provided in environment
test -n "$SCENE"  # e.g. "RadC" "RadM1", "RadM2", "RadF"
test -n "$SCMI_REGION"  # TCONUS, TMESO1, TMESO2, TFD  for lack of a better guess
test -n "$SCMI_SCENE"
test -n "$SCMI_ENV" || export SCMI_ENV=OT # OR, DT, etc

# if HCAST_PRODUCTS is set to a directory, then use inotifywait to get L1b filenames as they arrive
test -z "$HCAST_PRODUCTS" || echo "Will use inotify to process files arriving in directory ${HCAST_PRODUCTS}"
test -d "$HCAST_PRODUCTS"
test -n "$HCAST_SECTION_TIMEOUT" && echo "Will use consolidation logic to wait for sectioned full disk up to $HCAST_SECTION_TIMEOUT seconds"

# if LDM_SERVER_HOST and LDM_CHANNEL are set non-empty, ldmsend (part of LDM but can be built standalone+static) is used to transfer tiles
test -n "$LDM_SERVER_HOST" || export LDM_SERVER_HOST=""
test -n "$LDM_CHANNEL" || export LDM_CHANNEL=SPARE

# if SSH_IDENTITY and SCP_DEST are set non-empty, SCP delivery is used to LDM server (not recommended for anything but testing)
test -n "$SSH_IDENTITY" || export SSH_IDENTITY=""  # $HOME/.ssh/myidentityissupersekrit
test -n "$SCP_DEST" || export SCP_DEST=user@host:/path/to/landing_point/


# local utilities and packages needed: goesr package, amqpfind package (flaregun), python3 with netCDF4
# amqpfind utility for listening to AMQP notifications see gitlab.ssec.wisc.edu

# GOESR package has goesr.cmi_changer module aka cmi_changer.sh, also gitlab.ssec.wisc.edu
# use cmi_changer.sh if using bundled python interpreter; else run "python3 setup.py install"
test -n "$AXI_TOOLS" || export AXI_TOOLS=$BASE
# test -n "$GOESR" || export GOESR=$HOME/Repos/git/goesr
# test -n "$HIMAWARI" || export HIMAWARI=$HOME/Repos/git/himawari
# export PYTHONPATH="$GOESR:$HIMAWARI"

# python3 environment with netCDF4 available; default to assuming this script is in $AXI_TOOLS/bin/
test -n "$PY3" || export PY3=$AXI_TOOLS/ShellB3/bin/python3

test -x "$PY3" || echo "Please set AXI_TOOLS variable to install location of axi-tools, or PY3 for developer python interpreter with goesr and himawari modules"
test -x "$PY3"

# creating sat environment (technically overkill for this purpose):
# of the packages listed, it's netcdf4, pyproj, and pika we need most
# module load anaconda27
# conda create -n sat python=3.6 anaconda
# source activate sat
# conda install netcdf4 h5py cffi numba matplotlib pyproj scipy basemap bokeh jupyter pandas flask bottleneck numexpr click click-plugins munch six cligj
# conda install hdf4
# pip install -U geojson shapely fiona mpld3 pika python-hdf4
# python -c 'import netCDF4' || echo "try again"
# python -c 'from pyhdf.SD import SD, SDC' || echo "try again"
# cd /path/to/goesr; python3 setup.py install

# sample stdout of cmi_changer; we grab the filenames from stdout and scp them
# > /data/users/rayg/Workspace/cmichanger/DT_TCONUS-005-B12-M3C02-T012_G16_s2017123000700_c2017138201118.nc
# > /data/users/rayg/Workspace/cmichanger/DT_TCONUS-005-B12-M3C02-T013_G16_s2017123000700_c2017138201118.nc
# > /data/users/rayg/Workspace/cmichanger/DT_TCONUS-005-B12-M3C02-T014_G16_s2017123000700_c2017138201118.nc

cmi_changer() {
  $PY3 -m goesr.cmi_changer -v -P0 -E $SCMI_ENV -R $SCMI_REGION -S "$SCMI_SCENE" "$@"
  # alternately:
  # /path/to/cmi_changer.sh -E $SCMI_ENV -R $SCMI_REGION "$@"
}
export -f cmi_changer  # needed when using with xargs and other forms of sub-shell like $(...)

# OR_ABI-L1b-RadM1-M3C
inotify_scene() {
  scene="$1"
  if [ -n "$HCAST_SECTION_TIMEOUT" ]; then 
    inotifywait -m -r -e close_write -e moved_to --format "${HCAST_PRODUCTS}/%f" "$HCAST_PRODUCTS" \
    |grep --line-buffered -E ".*?${scene}.*$" \
    |$PY3 -u -m goesr.consolidate --hcast-sections --timeout=$HCAST_SECTION_TIMEOUT \
    |sed -u 's/\*$//g'
  else
    inotifywait -m -r -e close_write -e moved_to --format "${HCAST_PRODUCTS}/%f" "$HCAST_PRODUCTS" \
    |grep --line-buffered -E ".*?${scene}.*$" 
  fi
}

# copy files to destination; if successful, remove local copies
scpurge() {
  if scp -i $SSH_IDENTITY "$@" $SCP_DEST; then
    rm "$@"
    return 0
  else
    return 1
  fi
}
export -f scpurge

# send a group of files to AWIPS server
scp_deliver() {
  if [ -n "$SCP_DEST2" ]; then
    for attempts in `seq 5`; do
      scp -i $SSH_IDENTITY "$@" $SCP_DEST2 && break
      sleep 12
    done
  fi
  for attempts in `seq 5`; do
    scpurge "$@" && return 0
    sleep 12
  done
  echo "ERROR: could not transfer: $*"
  test -d failed_transfer || mkdir failed_transfer
  mv -v "$@" failed_transfer/
}
export -f scp_deliver  # needed when using with xargs

justbases() {
  for path in "$@"; do 
    basename $path
  done
}
export -f justbases

dmpurge() {
  # strip full paths down to basenames, should all be in CWD in any case
  if ldmsend -h $LDM_SERVER_HOST -f $LDM_CHANNEL $(justbases "$@"); then
    rm "$@"
    return 0
  else
    return 1
  fi
}
export -f dmpurge

ldmsend_deliver() {
  for attempts in `seq 5`; do 
    dmpurge "$@" && return 0
    sleep 12
  done
  echo "ERROR: could not transfer: $*"
  test -d failed_transfer || mkdir failed_transfer
  mv -v "$@" failed_transfer/
}
export -f ldmsend_deliver

# given a scene path pattern, process to SCMI and send those files to AWIPS2
hcast2scmi2awips() {
  filenames=$(cmi_changer "$@" |perl -ne '/^\>\s*(.*?)\s*$/ && print $1 . "\n"')
  # deliver via ldmsend, if LDM_SERVER_HOST is set
  test -n "$LDM_SERVER_HOST" && ldmsend_deliver $filenames
  # deliver via scp, if SSH_IDENTITY is set
  test -n "$SSH_IDENTITY" && scp_deliver $filenames
}
export -f hcast2scmi2awips  # needed when using with xargs

inotify_main() {
  inotify_scene $SCENE \
  |xargs -P20 -L1 -I {} /bin/nice /bin/bash -c "hcast2scmi2awips {}"  
}

# prerequisite checks
# test that cmi_changer and amqpfind are available
cmi_changer -h &>/dev/null
test -x "$(which inotifywait)"
$PY3 -m goesr.consolidate -h  &>/dev/null
$PY3 -c 'import himawari.HimawariScene'

# main launching/testing point
if [ -z "$*" ]; then
  while true; do
    inotify_main &>${SCENE}-$(date '+%Y%m%dT%H%M%S').log || echo "ERROR: message loop exited"
    echo "ERROR: timeout at $(date); restarting in 30s"
    sleep 30
  done
else
  # call one of our internal commands for testing; examples:
  # ./hcast2scmi2awips.sh inotify_scene FLDK
  # ./hcast2scmi2awips.sh ldmsend_deliver file1 file2 file3
  # ./hcast2scmi2awips.sh cmi_changer filenames
  # SCENE=RadM1 SCMI_REGION=TMESO1 SCMI_ENV=OT ./hcast2scmi2awips.sh hcast2scmi2awips /satbuf1_data/goes/grb/goes16/2017/2017_05_19_139/abi/L1b/RadM1/OR_ABI-L1b-RadM1-M3C??_G16_s20171390223265*.nc
  "$@"
fi