diff --git a/bomem_file.py b/bomem_file.py new file mode 100644 index 0000000000000000000000000000000000000000..f3c2e500de1b99672f86b177e6d4ddfa9a573d3c --- /dev/null +++ b/bomem_file.py @@ -0,0 +1,140 @@ +import struct +import numpy as np +from collections import namedtuple, OrderedDict +from datetime import datetime + +FileHeader = namedtuple('FileHeader', ['machineCode', + 'skip0', + 'creationFlags', + 'numberOfSubfiles', + 'fileCreationDate', + 'headerDirectoryOffset', + 'subfileDirectoryOffset', + 'headerDataOffset', + 'firstSubfileDataBlockOffset', + 'indexTableOffset', + 'headerDirectorySize', + 'subfileDirectorySize', + 'headerDataSize', + 'indexTableSize', + 'headerCRC32']) + + +def decode_flags(flags): + isHeaderDirCompressed = (flags & 128) != 0 + isTTablePresent = (flags & 1) != 0 + isOffsetTablePresent = (flags & 2) != 0 + isSubfileVariableSize = (flags & 64) != 0 + isSubfileMagicNbrPresent = (flags & 8) != 0 + isSubfileSizePresent = (flags & 16 ) != 0 + isSubfileTValuePresent = (flags & 32 ) != 0 + isSubfileCRCPresent = (flags & 4 ) != 0 + return locals() + + +def data_start(decoded): + data_start = 0 + if (decoded['isSubfileCRCPresent']): + data_start += 8 + if (decoded['isSubfileMagicNbrPresent']): + data_start += 4 + if (decoded['isSubfileTValuePresent']): + data_start += 8 + if (decoded['isSubfileSizePresent']): + data_start += 4 + return data_start + + +def readString(inS): + length = struct.unpack('<i', inS.read(4))[0] + assert length < 1000 + return inS.read(length*2).decode('utf-16') + + +def read_data_directory(inS): + magic = struct.unpack_from('<i', inS.read(4))[0] + assert magic == 0x30726940 + directory = OrderedDict({}) + for i in range(struct.unpack_from('<i', inS.read(4))[0]): + if i > 10000: + break + name = readString(inS) + ndims, compression = struct.unpack_from('<hh', inS.read(4)) + axes = OrderedDict({}) + for dim in range(ndims): + axisName = readString(inS) + axisUnit = readString(inS) + axisType,axisNpts,axisMinValue,axisMaxValue = struct.unpack('<hidd', inS.read(2+4+8+8)) + axes[axisName] = (axisUnit, axisType, axisNpts, axisMinValue, axisMaxValue) + directory[name] = ndims, compression, axes + return directory + + +def calc_data_size(subfileDirectory): + total = 0 + for entry_name, (ndims, compression, axes) in subfileDirectory.items(): + assert compression == 0 + shape = tuple(axis[2] for axis in axes.values()) + type_number = list(axes.values())[0][1] + types_bytes = {1: 1, + 2:1, + 3:1, + 4:2, + 5:4, + 6: 4, + 7:4, + 8:8, + 9:8, + 10:16, + 50:0 + } + total += types_bytes[type_number]*np.prod(shape) + return total + + +def readSubfile(index, fileheader, subfileDirectory, inS): + dataStart = data_start(decode_flags(fileheader.creationFlags)) + subfileDataSize = calc_data_size(subfileDirectory) + offset = 504 + index * (dataStart + subfileDataSize) + inS.seek(offset + dataStart) + data = {} + for entry_name, (ndims, compression, axes) in subfileDirectory.items(): + assert compression == 0 + shape = tuple(axis[2] for axis in axes.values()) + type_number = list(axes.values())[0][1] + types = {1: np.ubyte, + 2:np.bool8, + 3:np.char, + 4:np.short, + 5:np.int32, + 6: np.long, + 7:np.float32, + 8:np.double, + 9:np.complex64, + 10:np.complex128, + 50:str + } + dtype = types[type_number] + entry_data = np.fromstring(inS.read(int(np.prod(shape)*dtype().nbytes)), dtype=dtype).reshape(shape) + if shape == (1,): + entry_data = entry_data.item(0) + data[entry_name] = entry_data + return data + + +def read_stream(inS): + inS.seek(0) + fmt = '32s2s64s2s64s2s254s2s' + [e.decode('utf-16') for e in struct.unpack_from(fmt, inS.read(struct.calcsize(fmt)))] + fmt = '<bbiilllllliiiil' + fh = FileHeader(*struct.unpack_from(fmt,inS.read(struct.calcsize(fmt)))) + inS.seek(fh.subfileDirectoryOffset) + headerDirectory = read_data_directory(inS) + subfileDirectory = read_data_directory(inS) + all_data = OrderedDict([]) + for index in range(fh.numberOfSubfiles): + yield readSubfile(index, fh, subfileDirectory, inS) + +def read_file(path): + with open(path, 'rb') as inS: + yield from read_stream(inS) diff --git a/igm_checks.py b/igm_checks.py new file mode 100644 index 0000000000000000000000000000000000000000..acb6b8a1db37c86b5bb5cfdc109e81b793ebb31f --- /dev/null +++ b/igm_checks.py @@ -0,0 +1,25 @@ +import numpy as np +import pandas as pd + +def spike_check(igms, parameters): + + if igms.empty: + return pd.DataFrame({'spike_check':[], 'sceneMirrorPositioni':[], 'datetime':[]}) + + data_a_mean = igms.DataA.mean(axis=0) + data_b_mean = igms.DataB.mean(axis=0) + + data_a_std = np.vstack(igms.DataA.values).std(axis=0) + data_b_std = np.vstack(igms.DataB.values).std(axis=0) + + any_spikes_in_data_a = igms.DataA.apply(lambda data_a: (abs((data_a - data_a_mean)/data_a_std) > 10).any()) + any_spikes_in_data_b = igms.DataB.apply(lambda data_b: (abs((data_b - data_b_mean)/data_b_std) > 10).any()) + + igms = igms.drop(['DataA','DataB'], axis=1) + igms['spike_check'] = any_spikes_in_data_a | any_spikes_in_data_b + datetime_grouped = igms.groupby('datetime') + + return pd.concat([ + datetime_grouped[['spike_check']].any(), + datetime_grouped[['sceneMirrorPosition']].first() + ], axis=1).reset_index() diff --git a/main.py b/main.py index 2e79fe980d0bd054255dcca4de54001f66744d0e..b26ff432bced00902043ce9f2f36bac205c2c32a 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,8 @@ import netCDF4 from aeri_tools.io.dmv.housekeeping import get_all_housekeeping import pandas as pd import numpy as np +from zipfile import ZipFile +from io import BytesIO from electronic_checks import electronic_checks @@ -13,6 +15,9 @@ from radiometric_checks import radiometric_checks from scene_checks import scene_checks from state_checks import state_checks from thermal_checks import thermal_checks +from bomem_file import read_stream +from igm_checks import spike_check +from datetime import datetime levels = [ global_checks, @@ -54,6 +59,26 @@ def read_frame(cxs_file, sum_file): hk.index.name = 'datetime' return hk.reset_index() +def read_igms(spc_zip_path): + if spc_zip_path is not None: + # Open zip file + with ZipFile(spc_zip_path) as spc_zip: + # Find all members with .Igm suffix + for name in spc_zip.namelist(): + if name.endswith('.Igm'): + # Read the data + with spc_zip.open(name) as igm: + inS = BytesIO(igm.read()) + for index, subfile in enumerate(read_stream(inS)): + # yield name, data pair + yield { + 'datetime':datetime.utcfromtimestamp(subfile['UTCTime']), + 'DataA':subfile['DataA'].squeeze(), + 'DataB':subfile['DataB'].squeeze(), + 'sceneMirrorPosition':ord(name[0]), + 'subfile':index + } + def check_frame(frame, parameters): frame['qc_percent'] = 0 frame['qc_notes'] = None @@ -62,20 +87,33 @@ def check_frame(frame, parameters): frame = level.compute(frame) return frame -def update_all(ftp_dir, parameters=None): +def update_all(ftp_dir, sci_dir, parameters=None): + # check for newer sum or cxs file cxs_files = glob(os.path.join(os.path.abspath(ftp_dir),'AE*','*B1.CXS')) for qc_file, cxs_file, sum_file in files_to_update(cxs_files): print('Performing quality control for {}'.format(cxs_file)) + # Find the Igms for these times + YYMMDD = re.search('([0-9]{6})', os.path.basename(cxs_file)).group(1) + zip_file = os.path.join(sci_dir, 'AE'+YYMMDD, 'SPC_AERI*.zip') + if not os.path.isfile(zip_file): + zip_file = None frame = read_frame(cxs_file, sum_file) + igms = pd.DataFrame.from_records(read_igms(zip_file)) if parameters is None: parameters = {} - frame = check_frame(frame, parameters) - save_quality(frame, qc_file) + frame_with_spikes = frame.merge(spike_check(igms, parameters), on='datetime', how='outer', sort=True) + # Only propogate presence of spikes, not abscence + frame_with_spikes.ix[frame_with_spikes.spike_check == False] = pd.np.nan + frame_with_spikes['spike_check'] = frame_with_spikes.spike_check.ffill(limit=1).bfill(limit=1) + frame_with_spikes = frame_with_spikes.ix[frame.index] + frame_with_spikes = check_frame(frame_with_spikes, parameters) + save_quality(frame_with_spikes, qc_file) def files_to_update(cxs_files, update_only=True): for cxs_file in cxs_files: possible_sum = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','.SUM')) possible_qc = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','QC.nc')) + cxs_file if os.path.isfile(possible_sum): sum_file = possible_sum @@ -91,9 +129,15 @@ def files_to_update(cxs_files, update_only=True): if __name__ == '__main__': import argparse + import os parser = argparse.ArgumentParser() parser.add_argument('ftp') + if 'nt' in os.name: + default_sci = 'C:\\' + else: + default_sci = '/cygdrive/c/' + parser.add_argument('sci', default=default_sci) args = parser.parse_args() - update_all(args.ftp) + update_all(args.ftp, args.sci)