diff --git a/tile_gen/Dockerfile b/tile_gen/Dockerfile index 125540c43684370b59e725755a438fbf2982bb40..e51a66e8f631d3acc843953c71520fdc614d53c3 100644 --- a/tile_gen/Dockerfile +++ b/tile_gen/Dockerfile @@ -2,7 +2,6 @@ FROM tiledb/tiledb-geospatial:latest WORKDIR /work -# TODO may need the unzip command to be installed if not already RUN apt-get update && apt-get install -y unzip && \ wget http://ssec.wisc.edu/~rayg/pub/amqpfind.zip && \ unzip amqpfind.zip && \ diff --git a/tile_gen/generate_tiles.py b/tile_gen/generate_tiles.py index 2312f6b1c800bf047d9eab36a0b3a8306e004793..0a5c7e8c5158f07c994cbacc6c636f6e3c486f17 100644 --- a/tile_gen/generate_tiles.py +++ b/tile_gen/generate_tiles.py @@ -6,6 +6,7 @@ import warnings import logging import subprocess import shutil +import json from glob import glob import tile_index @@ -71,6 +72,11 @@ def main(): help="Shapefile filename pattern to use and placed " "in the output directory. " "(default: '{product}.shp')") + parser.add_argument('--json-times', action='store_true', + help="Print a JSON dict to stdout containing removed " + "and added times (ex. '{\'times_removed\': [], " + "\'times_added\': []}'). Times are in the form: " + "YYYY-MM-DDTHH:MM:SS.") parser.add_argument('out_dir', help="Output path to save tile information to (ex. '/data/tiles/{product}')") parser.add_argument('input_files', nargs="+", @@ -78,6 +84,8 @@ def main(): args = parser.parse_args() groups = group_files(args.products, args.input_files) + all_added = set() + all_removed = set() for prod, prod_files in groups.items(): out_dir = args.out_dir.format(product=prod) os.makedirs(out_dir, exist_ok=True) @@ -97,8 +105,14 @@ def main(): # create shape file LOG.info("Rebuilding shapefile index with:\n\t{}".format(", ".join(all_prod_files))) - tile_index.index(all_prod_files, shp_pathname) - + removed_times, added_times = tile_index.index(all_prod_files, shp_pathname) + all_removed.update(removed_times) + all_added.update(added_times) + + if args.json_times: + all_removed = [x.isoformat() for x in sorted(all_removed)] + all_added = [x.isoformat() for x in sorted(all_added)] + print(json.dumps({'removed_times': all_removed, 'added_times': all_added})) if __name__ == "__main__": diff --git a/tile_gen/run.sh b/tile_gen/run.sh index 449169bbe594746a7cd09bca1f245ccb7579816b..99c773908772f1acf402eb49e288bd6b64bc1492 100755 --- a/tile_gen/run.sh +++ b/tile_gen/run.sh @@ -16,7 +16,7 @@ test -d "/data" export AMQPFIND_ARGS=${AMQPFIND_ARGS:-"-H cspp-geo-rabbit -X satellite -u guest -p guest"} -export AMQPSEND_ARGS=${AMQPSEND_ARGS:-"-H cspp-geo-rabbit -X satellite -u guest -p guest"} +export AMQPSEND_ARGS=${AMQPSEND_ARGS:-"-D 15 -H cspp-geo-rabbit -X satellite -u guest -p guest"} export AMQPFIND_TOPIC=${AMQPFIND_TOPIC:-'data.goes.*.abi.*.l1b.geotiff.complete'} export G2G_PRODUCTS=${G2G_PRODUCTS:-"C01 C02 C03 C04 C05 C06 C07 C08 C09 C10 C11 C12 C13 C14 C15 C16 true_color"} export TILE_ARGS=${TILE_ARGS:-""} @@ -62,16 +62,19 @@ run_tile_gen() { out_dir="${out_dir}/{product}" fi echo "Generating tiles in directory: ${out_dir}" - python3 generate_tiles.py ${TILE_ARGS} -p ${G2G_PRODUCTS} -- ${out_dir} ${path} - # OUT/<product>/<product>.shp - glob_pattern="${out_dir}/*/*.shp" - # Remove the /data prefix - glob_pattern="${glob_pattern/${dst_dir}\//}" + for product in ${G2G_PRODUCTS}; do + json_times_changed=$(python3 generate_tiles.py ${TILE_ARGS} --json-times -p ${product} -- ${out_dir} ${path}) + # OUT/<product>/<product>.shp + glob_pattern="${out_dir}/${product}/*.shp" + # Remove the /data prefix + glob_pattern="${glob_pattern/${dst_dir}\//}" - amqpsend_topic="data.${satellite_family}.${satellite_id}.${instrument}.${data_type}.l1b.tiledb.complete" -# json_info="{path: ${glob_pattern}, satellite_family: ${satellite_family}, satellite_ID: ${satellite_id}, instrument: ${instrument}, data_type: ${data_type}}" - json_info="{\"path\": \"${glob_pattern}\", \"satellite_family\": \"${satellite_family}\", \"satellite_ID\": \"${satellite_id}\", \"instrument\": \"${instrument}\", \"data_type\": \"${data_type}\"}" - echo -e "[[\"$amqpsend_topic\", $json_info]]" | python3 /work/amqpfind/amqpsend.py ${AMQPSEND_ARGS} + amqpsend_topic="data.${satellite_family}.${satellite_id}.${instrument}.${data_type}.l1b.tiles.${product}.complete" + json_info="{\"path\": \"${glob_pattern}\", \"satellite_family\": \"${satellite_family}\", \"satellite_ID\": \"${satellite_id}\", \"instrument\": \"${instrument}\", \"data_type\": \"${data_type}\", \"product\": \"${product}\"}" + # append the JSON returned by the python (remove the curly braces at the ends) + json_info="${json_info:0:-1}, ${json_times_changed:1}" + echo -e "[[\"$amqpsend_topic\", $json_info]]" | python3 /work/amqpfind/amqpsend.py ${AMQPSEND_ARGS} + done echo "Done generating tiles for ${path}" } diff --git a/tile_gen/tile_index.py b/tile_gen/tile_index.py index 6ff1769ef093322586201601cdc4432279c55e6f..2514c4b29d1fe7b4d676fbb787aff190a4a28d38 100644 --- a/tile_gen/tile_index.py +++ b/tile_gen/tile_index.py @@ -41,6 +41,18 @@ possible_time_regex = ( ) +def get_current_times(shp_filename): + """Get currently available times in the shapefile.""" + if not os.path.isfile(shp_filename): + # no shapefile, no times + return + + with fiona.open(shp_filename, 'r') as shp_file: + for time_step in shp_file: + t = time_step['properties']['time'] + yield datetime.datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + + def get_file_time(fn): for regex, time_fmt in possible_time_regex: matches = regex.findall(fn) @@ -57,16 +69,23 @@ def index(input_files, output_shapefile): once. Repeated calls to this function will overwrite existing shapefile information. + Returns: + (times_removed, times_added): Two lists of times removed by this index + being updated and times added by this index being updated. + """ import tempfile out_dir, shp_fn = os.path.split(output_shapefile) tmp_dir = tempfile.mkdtemp("_tile_index") tmp_shapefile = os.path.join(tmp_dir, shp_fn) + current_times = set(get_current_times(output_shapefile)) + new_times = set() with fiona.open(tmp_shapefile, 'w', driver='ESRI Shapefile', schema=temporal_schema) as output: for f in input_files: try: dt = get_file_time(f) + new_times.add(dt) except ValueError: logger.error(f"Can't time for file {f}") continue @@ -90,6 +109,10 @@ def index(input_files, output_shapefile): # we don't need the temporary directory anymore shutil.rmtree(tmp_dir, ignore_errors=True) + times_removed = current_times - new_times + times_added = new_times = current_times + return sorted(times_removed), sorted(times_added) + # def index(src_dir, output): # files = glob.glob(os.path.join(src_dir, '*.tif'))