From 89218034cfc015fb174a9288018b672e64d3c702 Mon Sep 17 00:00:00 2001 From: Greg Quinn <greg.quinn@ssec.wisc.edu> Date: Thu, 5 Oct 2017 08:03:25 -0500 Subject: [PATCH] rdrgen: cap number of open FDs --- edosl0util/rdrgen.py | 53 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/edosl0util/rdrgen.py b/edosl0util/rdrgen.py index 6749769..ee2e601 100644 --- a/edosl0util/rdrgen.py +++ b/edosl0util/rdrgen.py @@ -1,9 +1,9 @@ import ctypes import itertools import os +import tempfile from collections import OrderedDict from datetime import datetime -from tempfile import TemporaryFile import attr import h5py @@ -17,8 +17,9 @@ from edosl0util.stream import jpss_packet_stream def packets_to_rdrs(sat, pkt_files): - # FIXME: refactor!!! - rdr_pkt_files = {} + + # divy packets up into temp files organized by granule + rdr_pkt_files = BinnedTemporaryFileManager() get_jpss_packet_time = GetJpssPacketTime() for pkt_file in pkt_files: with open(pkt_file) as file_obj: @@ -27,12 +28,11 @@ def packets_to_rdrs(sat, pkt_files): rdr_type = get_rdr_type(pkt.apid) pkt_iet = get_jpss_packet_time(pkt) gran = calc_rdr_granule(sat, rdr_type, pkt_iet) - if (rdr_type, gran) not in rdr_pkt_files: - rdr_pkt_files[rdr_type, gran] = TemporaryFile() - rdr_pkt_files[rdr_type, gran].write(pkt.bytes()) + rdr_pkt_files.add_data((rdr_type, gran), pkt.bytes()) + + # make RDRs one granule at a time get_jpss_packet_time = GetJpssPacketTime() - for rdr_pkt_file in rdr_pkt_files.values(): - rdr_pkt_file.seek(0) + for rdr_pkt_file in rdr_pkt_files.process_files(): pkts = list(jpss_packet_stream(rdr_pkt_file)) pkt_times = {p: get_jpss_packet_time(p) for p in pkts} pkts.sort(key=(lambda p: (pkt_times[p], p.apid))) @@ -40,6 +40,43 @@ def packets_to_rdrs(sat, pkt_files): write_rdr(sat, blob) +class BinnedTemporaryFileManager(object): + def __init__(self, parent_dir='.', max_open_files=32): + self.max_open_files = max_open_files + self.dir = tempfile.mkdtemp(dir=parent_dir) + self._file_paths = {} + self._file_objs = OrderedDict() + self._process_files_called = False + + def add_data(self, bin_key, data): + assert not self._process_files_called + file_obj = self._file_objs.pop(bin_key, None) + if not file_obj: + file_path = self._file_paths.get(bin_key) + if file_path: + file_obj = open(file_path, 'a+') + else: + file_obj = tempfile.NamedTemporaryFile(dir=self.dir, delete=False) + file_path = file_obj.name + self._file_paths[bin_key] = file_path + if len(self._file_objs) == self.max_open_files: + _, old_file_obj = self._file_objs.popitem(last=False) + old_file_obj.close() + self._file_objs[bin_key] = file_obj + file_obj.write(data) + + def process_files(self): + assert not self._process_files_called + self._process_files_called = True + for file_obj in self._file_objs.values(): + file_obj.close() + for _, file_name in sorted(self._file_paths.items()): + file_obj = open(file_name) + yield file_obj + file_obj.close() + os.remove(file_name) + os.rmdir(self.dir) + def write_rdr(sat, blob, dir_path='.', distributor=None, origin=None, domain=None, compressed=False, orbit_num=0, creation_time=None, gran_creation_time=None, software_ver=None): -- GitLab