Skip to content
Snippets Groups Projects
Commit 4928e96c authored by David Hoese's avatar David Hoese
Browse files

Merge branch 'optimize-tilegen' into 'master'

Optimize tile generation to perform better with a lot of files

Closes #41

See merge request cspp_geo/cspp-geo-web-viewer!37
parents 77d7a535 4326d079
No related branches found
No related tags found
No related merge requests found
......@@ -50,6 +50,9 @@ def link_or_copy(input_tifs, out_dir):
"""Hardlink input tifs to output directory."""
for prod_file in input_tifs:
out_file = os.path.join(out_dir, os.path.basename(prod_file))
if os.path.exists(out_file):
yield out_file
continue
try:
os.link(prod_file, out_file)
except OSError:
......@@ -77,12 +80,18 @@ def main():
"and added times (ex. '{\'times_removed\': [], "
"\'times_added\': []}'). Times are in the form: "
"YYYY-MM-DDTHH:MM:SS.")
parser.add_argument('--shared-bbox', action='store_true',
help="Assume that all input geotiffs have the same "
"bounding box so no need to compute it for every "
"input.")
parser.add_argument('out_dir',
help="Output path to save tile information to (ex. '/data/tiles/{product}')")
parser.add_argument('input_files', nargs="+",
help="Input geotiffs to generate tiles for (separate from product lists with '--')")
args = parser.parse_args()
# allow for glob patterns to reduce number of files passed through command line
args.input_files = [fn for glob_pat in args.input_files for fn in glob(glob_pat)]
groups = group_files(args.products, args.input_files)
all_added = set()
all_removed = set()
......@@ -105,13 +114,16 @@ def main():
# create shape file
LOG.info("Rebuilding shapefile index with:\n\t{}".format(", ".join(all_prod_files)))
removed_times, added_times = tile_index.index(all_prod_files, shp_pathname)
removed_times, added_times = tile_index.index(
all_prod_files, shp_pathname, shared_bbox=args.shared_bbox)
all_removed.update(removed_times)
all_added.update(added_times)
if args.json_times:
all_removed = [x.isoformat() for x in sorted(all_removed)]
all_added = [x.isoformat() for x in sorted(all_added)]
# all_removed = [x.isoformat() for x in sorted(all_removed)]
# all_added = [x.isoformat() for x in sorted(all_added)]
all_removed = sorted(all_removed)
all_added = sorted(all_added)
print(json.dumps({'removed_times': all_removed, 'added_times': all_added}))
......
......@@ -61,9 +61,15 @@ run_tile_gen() {
# add string formatting portion to separate add 'product' sub-directory
out_dir="${out_dir}/{product}"
fi
# Add optimization to only compute the bboxonce
if ([ "$data_type" == "radf" ] || [ "$data_type" == "radc" ]) && [ $TILE_ARGS =~ "shared" ]; then
TILE_ARGS="${TILE_ARGS} --shared-bbox"
fi
echo "Generating tiles in directory: ${out_dir}"
for product in ${G2G_PRODUCTS}; do
json_times_changed=$(python3 generate_tiles.py ${TILE_ARGS} --json-times -p ${product} -- ${out_dir} ${path})
set -x
json_times_changed=$(python3 generate_tiles.py ${TILE_ARGS} --json-times -p ${product} -- ${out_dir} "${path}")
set +x
# OUT/<product>/<product>.shp
glob_pattern="${out_dir/\{product\}/${product}}/*.shp"
# Remove the /data prefix
......@@ -81,5 +87,5 @@ run_tile_gen() {
export -f run_tile_gen
echo "Listening to AMQP messages with topic \"$AMQPFIND_TOPIC\""
python3 amqpfind/amqpfind.py ${AMQPFIND_ARGS} -C "${AMQPFIND_TOPIC}" -j "{satellite_family} {satellite_ID} {instrument} {data_type} \'{path}\'" | xargs -I{} -P3 -n1 bash -c "run_tile_gen {}"
python3 amqpfind/amqpfind.py ${AMQPFIND_ARGS} -C "${AMQPFIND_TOPIC}" -j "{satellite_family} {satellite_ID} {instrument} {data_type} \'{path}\'" | xargs -I{} -P4 -n1 bash -c "run_tile_gen {}"
......@@ -41,7 +41,7 @@ possible_time_regex = (
)
def get_current_times(shp_filename):
def previous_shapes_generator(shp_filename):
"""Get currently available times in the shapefile."""
if not os.path.isfile(shp_filename):
# no shapefile, no times
......@@ -49,8 +49,7 @@ def get_current_times(shp_filename):
with fiona.open(shp_filename, 'r') as shp_file:
for time_step in shp_file:
t = time_step['properties']['time']
yield datetime.datetime.strptime(t, '%Y-%m-%dT%H:%M:%S')
yield time_step['properties']['location'], time_step
def get_file_time(fn):
......@@ -62,7 +61,47 @@ def get_file_time(fn):
raise ValueError("Unknown filename scheme, can't determine file time.")
def index(input_files, output_shapefile):
def shapes_generator(input_files, prev_shapefile, shared_bbox=False):
prev_shapes = previous_shapes_generator(prev_shapefile)
default_shape = (None, None)
location, shape_info = next(prev_shapes, default_shape)
while location is not None:
if location != input_files[0]:
# the old shapefile includes a file that we no longer want
# keep going through the existing shapes until we find one we
# do want
location, shape_info = next(prev_shapes, default_shape)
yield 'removed', location, shape_info
continue
yield 'existing', location, shape_info
location, shape_info = next(prev_shapes, default_shape)
input_files = input_files[1:]
# from here on out these are all newly added files
bbox_cache = None
for f in input_files:
try:
dt = get_file_time(f)
except ValueError:
logger.error(f"Can't time for file {f}")
continue
logger.info(f"Indexing {f} {dt.isoformat()}")
if bbox_cache is None or not shared_bbox:
with rasterio.open(f) as src:
bbox_cache = mapping(box(*src.bounds))
shape_info = {
'geometry': bbox_cache,
'properties': {
'location': f,
'time': dt.isoformat(),
},
}
yield 'added', f, shape_info
def index(input_files, output_shapefile, shared_bbox=False):
"""Create shapefile for location and times of provided geotiffs or tileDB arrays.
Note: All layers to be included in the shapefile must be provided all at
......@@ -78,30 +117,17 @@ def index(input_files, output_shapefile):
out_dir, shp_fn = os.path.split(output_shapefile)
tmp_dir = tempfile.mkdtemp("_tile_index")
tmp_shapefile = os.path.join(tmp_dir, shp_fn)
current_times = set(get_current_times(output_shapefile))
new_times = set()
times_added = []
times_removed = []
with fiona.open(tmp_shapefile, 'w', driver='ESRI Shapefile',
schema=temporal_schema) as output:
for f in input_files:
try:
dt = get_file_time(f)
new_times.add(dt)
except ValueError:
logger.error(f"Can't time for file {f}")
for shape_state, location, shape_info in shapes_generator(input_files, output_shapefile, shared_bbox=shared_bbox):
if shape_state == 'removed':
times_removed.append(shape_info['properties']['time'])
continue
logger.info(f"Indexing {f} {dt.isoformat()}")
with rasterio.open(f) as src:
g = box(*src.bounds)
output.write(
{
'geometry': mapping(g),
'properties': {
'location': f,
'time': dt.isoformat()
}
})
if shape_state == 'added':
times_added.append(shape_info['properties']['time'])
output.write(shape_info)
# move the shapefile contents to the final destination
for fn in os.listdir(tmp_dir):
......@@ -109,39 +135,7 @@ def index(input_files, output_shapefile):
# we don't need the temporary directory anymore
shutil.rmtree(tmp_dir, ignore_errors=True)
times_removed = current_times - new_times
times_added = new_times - current_times
return sorted(times_removed), sorted(times_added)
# def index(src_dir, output):
# files = glob.glob(os.path.join(src_dir, '*.tif'))
# folders = [os.path.join(src_dir, o) for o in os.listdir(src_dir) if os.path.isdir(os.path.join(src_dir, o))]
#
# with fiona.open(output, 'w', driver='ESRI Shapefile',
# schema=temporal_schema) as output:
# # simple toggle between indexing tiff files or tiledb arrays
# if len(files) > 0:
# it = files
# else:
# it = folders
#
# for f in it:
# parts = f.split('_')
# tstamp = parts[5] + parts[6]
# dt = datetime.datetime.strptime(tstamp, '%Y%m%d%H%M%S')
# logger.info(f"Indexing {f} {dt.isoformat()}")
# with rasterio.open(f) as src:
# g = box(*src.bounds)
#
# output.write(
# {
# 'geometry': mapping(g),
# 'properties': {
# 'location': f,
# 'time': dt.isoformat()
# }
# })
return times_removed, times_added
def main():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment