Skip to content
Snippets Groups Projects
Commit 89218034 authored by Greg Quinn's avatar Greg Quinn
Browse files

rdrgen: cap number of open FDs

parent 57a6005b
No related branches found
No related tags found
No related merge requests found
import ctypes import ctypes
import itertools import itertools
import os import os
import tempfile
from collections import OrderedDict from collections import OrderedDict
from datetime import datetime from datetime import datetime
from tempfile import TemporaryFile
import attr import attr
import h5py import h5py
...@@ -17,8 +17,9 @@ from edosl0util.stream import jpss_packet_stream ...@@ -17,8 +17,9 @@ from edosl0util.stream import jpss_packet_stream
def packets_to_rdrs(sat, pkt_files): def packets_to_rdrs(sat, pkt_files):
# FIXME: refactor!!!
rdr_pkt_files = {} # divy packets up into temp files organized by granule
rdr_pkt_files = BinnedTemporaryFileManager()
get_jpss_packet_time = GetJpssPacketTime() get_jpss_packet_time = GetJpssPacketTime()
for pkt_file in pkt_files: for pkt_file in pkt_files:
with open(pkt_file) as file_obj: with open(pkt_file) as file_obj:
...@@ -27,12 +28,11 @@ def packets_to_rdrs(sat, pkt_files): ...@@ -27,12 +28,11 @@ def packets_to_rdrs(sat, pkt_files):
rdr_type = get_rdr_type(pkt.apid) rdr_type = get_rdr_type(pkt.apid)
pkt_iet = get_jpss_packet_time(pkt) pkt_iet = get_jpss_packet_time(pkt)
gran = calc_rdr_granule(sat, rdr_type, pkt_iet) gran = calc_rdr_granule(sat, rdr_type, pkt_iet)
if (rdr_type, gran) not in rdr_pkt_files: rdr_pkt_files.add_data((rdr_type, gran), pkt.bytes())
rdr_pkt_files[rdr_type, gran] = TemporaryFile()
rdr_pkt_files[rdr_type, gran].write(pkt.bytes()) # make RDRs one granule at a time
get_jpss_packet_time = GetJpssPacketTime() get_jpss_packet_time = GetJpssPacketTime()
for rdr_pkt_file in rdr_pkt_files.values(): for rdr_pkt_file in rdr_pkt_files.process_files():
rdr_pkt_file.seek(0)
pkts = list(jpss_packet_stream(rdr_pkt_file)) pkts = list(jpss_packet_stream(rdr_pkt_file))
pkt_times = {p: get_jpss_packet_time(p) for p in pkts} pkt_times = {p: get_jpss_packet_time(p) for p in pkts}
pkts.sort(key=(lambda p: (pkt_times[p], p.apid))) pkts.sort(key=(lambda p: (pkt_times[p], p.apid)))
...@@ -40,6 +40,43 @@ def packets_to_rdrs(sat, pkt_files): ...@@ -40,6 +40,43 @@ def packets_to_rdrs(sat, pkt_files):
write_rdr(sat, blob) write_rdr(sat, blob)
class BinnedTemporaryFileManager(object):
def __init__(self, parent_dir='.', max_open_files=32):
self.max_open_files = max_open_files
self.dir = tempfile.mkdtemp(dir=parent_dir)
self._file_paths = {}
self._file_objs = OrderedDict()
self._process_files_called = False
def add_data(self, bin_key, data):
assert not self._process_files_called
file_obj = self._file_objs.pop(bin_key, None)
if not file_obj:
file_path = self._file_paths.get(bin_key)
if file_path:
file_obj = open(file_path, 'a+')
else:
file_obj = tempfile.NamedTemporaryFile(dir=self.dir, delete=False)
file_path = file_obj.name
self._file_paths[bin_key] = file_path
if len(self._file_objs) == self.max_open_files:
_, old_file_obj = self._file_objs.popitem(last=False)
old_file_obj.close()
self._file_objs[bin_key] = file_obj
file_obj.write(data)
def process_files(self):
assert not self._process_files_called
self._process_files_called = True
for file_obj in self._file_objs.values():
file_obj.close()
for _, file_name in sorted(self._file_paths.items()):
file_obj = open(file_name)
yield file_obj
file_obj.close()
os.remove(file_name)
os.rmdir(self.dir)
def write_rdr(sat, blob, dir_path='.', def write_rdr(sat, blob, dir_path='.',
distributor=None, origin=None, domain=None, compressed=False, orbit_num=0, distributor=None, origin=None, domain=None, compressed=False, orbit_num=0,
creation_time=None, gran_creation_time=None, software_ver=None): creation_time=None, gran_creation_time=None, software_ver=None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment