From cf6b7bdf83496c647430098eee8c08027be649ef Mon Sep 17 00:00:00 2001 From: David Hoese <david.hoese@ssec.wisc.edu> Date: Tue, 19 May 2020 10:49:16 -0500 Subject: [PATCH] Optimize tile generation to perform better with a lot of files --- tile_gen/generate_tiles.py | 16 +++++- tile_gen/run.sh | 8 ++- tile_gen/tile_index.py | 110 ++++++++++++++++++------------------- 3 files changed, 71 insertions(+), 63 deletions(-) diff --git a/tile_gen/generate_tiles.py b/tile_gen/generate_tiles.py index 0a5c7e8..d57be4f 100644 --- a/tile_gen/generate_tiles.py +++ b/tile_gen/generate_tiles.py @@ -50,6 +50,9 @@ def link_or_copy(input_tifs, out_dir): """Hardlink input tifs to output directory.""" for prod_file in input_tifs: out_file = os.path.join(out_dir, os.path.basename(prod_file)) + if os.path.exists(out_file): + yield out_file + continue try: os.link(prod_file, out_file) except OSError: @@ -77,6 +80,10 @@ def main(): "and added times (ex. '{\'times_removed\': [], " "\'times_added\': []}'). Times are in the form: " "YYYY-MM-DDTHH:MM:SS.") + parser.add_argument('--shared-bbox', action='store_true', + help="Assume that all input geotiffs have the same " + "bounding box so no need to compute it for every " + "input.") parser.add_argument('out_dir', help="Output path to save tile information to (ex. '/data/tiles/{product}')") parser.add_argument('input_files', nargs="+", @@ -105,13 +112,16 @@ def main(): # create shape file LOG.info("Rebuilding shapefile index with:\n\t{}".format(", ".join(all_prod_files))) - removed_times, added_times = tile_index.index(all_prod_files, shp_pathname) + removed_times, added_times = tile_index.index( + all_prod_files, shp_pathname, shared_bbox=args.shared_bbox) all_removed.update(removed_times) all_added.update(added_times) if args.json_times: - all_removed = [x.isoformat() for x in sorted(all_removed)] - all_added = [x.isoformat() for x in sorted(all_added)] + # all_removed = [x.isoformat() for x in sorted(all_removed)] + # all_added = [x.isoformat() for x in sorted(all_added)] + all_removed = sorted(all_removed) + all_added = sorted(all_added) print(json.dumps({'removed_times': all_removed, 'added_times': all_added})) diff --git a/tile_gen/run.sh b/tile_gen/run.sh index 0870e54..a7f328e 100755 --- a/tile_gen/run.sh +++ b/tile_gen/run.sh @@ -61,9 +61,13 @@ run_tile_gen() { # add string formatting portion to separate add 'product' sub-directory out_dir="${out_dir}/{product}" fi + # Add optimization to only compute the bboxonce + if ([ "$data_type" == "radf" ] || [ "$data_type" == "radc" ]) && [ $TILE_ARGS =~ "shared" ]; then + TILE_ARGS="${TILE_ARGS} --shared-bbox" + fi echo "Generating tiles in directory: ${out_dir}" for product in ${G2G_PRODUCTS}; do - json_times_changed=$(python3 generate_tiles.py ${TILE_ARGS} --json-times -p ${product} -- ${out_dir} ${path}) + json_times_changed=$(python3 generate_tiles.py ${TILE_ARGS} --json-times -p ${product} -- ${out_dir} "${path}") # OUT/<product>/<product>.shp glob_pattern="${out_dir/\{product\}/${product}}/*.shp" # Remove the /data prefix @@ -81,5 +85,5 @@ run_tile_gen() { export -f run_tile_gen echo "Listening to AMQP messages with topic \"$AMQPFIND_TOPIC\"" -python3 amqpfind/amqpfind.py ${AMQPFIND_ARGS} -C "${AMQPFIND_TOPIC}" -j "{satellite_family} {satellite_ID} {instrument} {data_type} \'{path}\'" | xargs -I{} -P3 -n1 bash -c "run_tile_gen {}" +python3 amqpfind/amqpfind.py ${AMQPFIND_ARGS} -C "${AMQPFIND_TOPIC}" -j "{satellite_family} {satellite_ID} {instrument} {data_type} \'{path}\'" | xargs -I{} -P4 -n1 bash -c "run_tile_gen {}" diff --git a/tile_gen/tile_index.py b/tile_gen/tile_index.py index dfdfcc8..bba12ad 100644 --- a/tile_gen/tile_index.py +++ b/tile_gen/tile_index.py @@ -41,7 +41,7 @@ possible_time_regex = ( ) -def get_current_times(shp_filename): +def previous_shapes_generator(shp_filename): """Get currently available times in the shapefile.""" if not os.path.isfile(shp_filename): # no shapefile, no times @@ -49,8 +49,7 @@ def get_current_times(shp_filename): with fiona.open(shp_filename, 'r') as shp_file: for time_step in shp_file: - t = time_step['properties']['time'] - yield datetime.datetime.strptime(t, '%Y-%m-%dT%H:%M:%S') + yield time_step['properties']['location'], time_step def get_file_time(fn): @@ -62,7 +61,47 @@ def get_file_time(fn): raise ValueError("Unknown filename scheme, can't determine file time.") -def index(input_files, output_shapefile): +def shapes_generator(input_files, prev_shapefile, shared_bbox=False): + prev_shapes = previous_shapes_generator(prev_shapefile) + default_shape = (None, None) + location, shape_info = next(prev_shapes, default_shape) + while location is not None: + if location != input_files[0]: + # the old shapefile includes a file that we no longer want + # keep going through the existing shapes until we find one we + # do want + location, shape_info = next(prev_shapes, default_shape) + yield 'removed', location, shape_info + continue + yield 'existing', location, shape_info + location, shape_info = next(prev_shapes, default_shape) + input_files = input_files[1:] + + # from here on out these are all newly added files + bbox_cache = None + for f in input_files: + try: + dt = get_file_time(f) + except ValueError: + logger.error(f"Can't time for file {f}") + continue + + logger.info(f"Indexing {f} {dt.isoformat()}") + if bbox_cache is None or not shared_bbox: + with rasterio.open(f) as src: + bbox_cache = mapping(box(*src.bounds)) + + shape_info = { + 'geometry': bbox_cache, + 'properties': { + 'location': f, + 'time': dt.isoformat(), + }, + } + yield 'added', f, shape_info + + +def index(input_files, output_shapefile, shared_bbox=False): """Create shapefile for location and times of provided geotiffs or tileDB arrays. Note: All layers to be included in the shapefile must be provided all at @@ -78,30 +117,17 @@ def index(input_files, output_shapefile): out_dir, shp_fn = os.path.split(output_shapefile) tmp_dir = tempfile.mkdtemp("_tile_index") tmp_shapefile = os.path.join(tmp_dir, shp_fn) - current_times = set(get_current_times(output_shapefile)) - new_times = set() + times_added = [] + times_removed = [] with fiona.open(tmp_shapefile, 'w', driver='ESRI Shapefile', schema=temporal_schema) as output: - for f in input_files: - try: - dt = get_file_time(f) - new_times.add(dt) - except ValueError: - logger.error(f"Can't time for file {f}") + for shape_state, location, shape_info in shapes_generator(input_files, output_shapefile, shared_bbox=shared_bbox): + if shape_state == 'removed': + times_removed.append(shape_info['properties']['time']) continue - - logger.info(f"Indexing {f} {dt.isoformat()}") - with rasterio.open(f) as src: - g = box(*src.bounds) - - output.write( - { - 'geometry': mapping(g), - 'properties': { - 'location': f, - 'time': dt.isoformat() - } - }) + if shape_state == 'added': + times_added.append(shape_info['properties']['time']) + output.write(shape_info) # move the shapefile contents to the final destination for fn in os.listdir(tmp_dir): @@ -109,39 +135,7 @@ def index(input_files, output_shapefile): # we don't need the temporary directory anymore shutil.rmtree(tmp_dir, ignore_errors=True) - times_removed = current_times - new_times - times_added = new_times - current_times - return sorted(times_removed), sorted(times_added) - - -# def index(src_dir, output): -# files = glob.glob(os.path.join(src_dir, '*.tif')) -# folders = [os.path.join(src_dir, o) for o in os.listdir(src_dir) if os.path.isdir(os.path.join(src_dir, o))] -# -# with fiona.open(output, 'w', driver='ESRI Shapefile', -# schema=temporal_schema) as output: -# # simple toggle between indexing tiff files or tiledb arrays -# if len(files) > 0: -# it = files -# else: -# it = folders -# -# for f in it: -# parts = f.split('_') -# tstamp = parts[5] + parts[6] -# dt = datetime.datetime.strptime(tstamp, '%Y%m%d%H%M%S') -# logger.info(f"Indexing {f} {dt.isoformat()}") -# with rasterio.open(f) as src: -# g = box(*src.bounds) -# -# output.write( -# { -# 'geometry': mapping(g), -# 'properties': { -# 'location': f, -# 'time': dt.isoformat() -# } -# }) + return times_removed, times_added def main(): -- GitLab