Skip to content
Snippets Groups Projects
Commit d0fbfee2 authored by Coda Phillips's avatar Coda Phillips
Browse files

Check for spikes in the Igm

parent f1592c3f
No related branches found
No related tags found
No related merge requests found
import struct
import numpy as np
from collections import namedtuple, OrderedDict
from datetime import datetime
FileHeader = namedtuple('FileHeader', ['machineCode',
'skip0',
'creationFlags',
'numberOfSubfiles',
'fileCreationDate',
'headerDirectoryOffset',
'subfileDirectoryOffset',
'headerDataOffset',
'firstSubfileDataBlockOffset',
'indexTableOffset',
'headerDirectorySize',
'subfileDirectorySize',
'headerDataSize',
'indexTableSize',
'headerCRC32'])
def decode_flags(flags):
isHeaderDirCompressed = (flags & 128) != 0
isTTablePresent = (flags & 1) != 0
isOffsetTablePresent = (flags & 2) != 0
isSubfileVariableSize = (flags & 64) != 0
isSubfileMagicNbrPresent = (flags & 8) != 0
isSubfileSizePresent = (flags & 16 ) != 0
isSubfileTValuePresent = (flags & 32 ) != 0
isSubfileCRCPresent = (flags & 4 ) != 0
return locals()
def data_start(decoded):
data_start = 0
if (decoded['isSubfileCRCPresent']):
data_start += 8
if (decoded['isSubfileMagicNbrPresent']):
data_start += 4
if (decoded['isSubfileTValuePresent']):
data_start += 8
if (decoded['isSubfileSizePresent']):
data_start += 4
return data_start
def readString(inS):
length = struct.unpack('<i', inS.read(4))[0]
assert length < 1000
return inS.read(length*2).decode('utf-16')
def read_data_directory(inS):
magic = struct.unpack_from('<i', inS.read(4))[0]
assert magic == 0x30726940
directory = OrderedDict({})
for i in range(struct.unpack_from('<i', inS.read(4))[0]):
if i > 10000:
break
name = readString(inS)
ndims, compression = struct.unpack_from('<hh', inS.read(4))
axes = OrderedDict({})
for dim in range(ndims):
axisName = readString(inS)
axisUnit = readString(inS)
axisType,axisNpts,axisMinValue,axisMaxValue = struct.unpack('<hidd', inS.read(2+4+8+8))
axes[axisName] = (axisUnit, axisType, axisNpts, axisMinValue, axisMaxValue)
directory[name] = ndims, compression, axes
return directory
def calc_data_size(subfileDirectory):
total = 0
for entry_name, (ndims, compression, axes) in subfileDirectory.items():
assert compression == 0
shape = tuple(axis[2] for axis in axes.values())
type_number = list(axes.values())[0][1]
types_bytes = {1: 1,
2:1,
3:1,
4:2,
5:4,
6: 4,
7:4,
8:8,
9:8,
10:16,
50:0
}
total += types_bytes[type_number]*np.prod(shape)
return total
def readSubfile(index, fileheader, subfileDirectory, inS):
dataStart = data_start(decode_flags(fileheader.creationFlags))
subfileDataSize = calc_data_size(subfileDirectory)
offset = 504 + index * (dataStart + subfileDataSize)
inS.seek(offset + dataStart)
data = {}
for entry_name, (ndims, compression, axes) in subfileDirectory.items():
assert compression == 0
shape = tuple(axis[2] for axis in axes.values())
type_number = list(axes.values())[0][1]
types = {1: np.ubyte,
2:np.bool8,
3:np.char,
4:np.short,
5:np.int32,
6: np.long,
7:np.float32,
8:np.double,
9:np.complex64,
10:np.complex128,
50:str
}
dtype = types[type_number]
entry_data = np.fromstring(inS.read(int(np.prod(shape)*dtype().nbytes)), dtype=dtype).reshape(shape)
if shape == (1,):
entry_data = entry_data.item(0)
data[entry_name] = entry_data
return data
def read_stream(inS):
inS.seek(0)
fmt = '32s2s64s2s64s2s254s2s'
[e.decode('utf-16') for e in struct.unpack_from(fmt, inS.read(struct.calcsize(fmt)))]
fmt = '<bbiilllllliiiil'
fh = FileHeader(*struct.unpack_from(fmt,inS.read(struct.calcsize(fmt))))
inS.seek(fh.subfileDirectoryOffset)
headerDirectory = read_data_directory(inS)
subfileDirectory = read_data_directory(inS)
all_data = OrderedDict([])
for index in range(fh.numberOfSubfiles):
yield readSubfile(index, fh, subfileDirectory, inS)
def read_file(path):
with open(path, 'rb') as inS:
yield from read_stream(inS)
import numpy as np
import pandas as pd
def spike_check(igms, parameters):
if igms.empty:
return pd.DataFrame({'spike_check':[], 'sceneMirrorPositioni':[], 'datetime':[]})
data_a_mean = igms.DataA.mean(axis=0)
data_b_mean = igms.DataB.mean(axis=0)
data_a_std = np.vstack(igms.DataA.values).std(axis=0)
data_b_std = np.vstack(igms.DataB.values).std(axis=0)
any_spikes_in_data_a = igms.DataA.apply(lambda data_a: (abs((data_a - data_a_mean)/data_a_std) > 10).any())
any_spikes_in_data_b = igms.DataB.apply(lambda data_b: (abs((data_b - data_b_mean)/data_b_std) > 10).any())
igms = igms.drop(['DataA','DataB'], axis=1)
igms['spike_check'] = any_spikes_in_data_a | any_spikes_in_data_b
datetime_grouped = igms.groupby('datetime')
return pd.concat([
datetime_grouped[['spike_check']].any(),
datetime_grouped[['sceneMirrorPosition']].first()
], axis=1).reset_index()
......@@ -5,6 +5,8 @@ import netCDF4
from aeri_tools.io.dmv.housekeeping import get_all_housekeeping
import pandas as pd
import numpy as np
from zipfile import ZipFile
from io import BytesIO
from electronic_checks import electronic_checks
......@@ -13,6 +15,9 @@ from radiometric_checks import radiometric_checks
from scene_checks import scene_checks
from state_checks import state_checks
from thermal_checks import thermal_checks
from bomem_file import read_stream
from igm_checks import spike_check
from datetime import datetime
levels = [
global_checks,
......@@ -54,6 +59,26 @@ def read_frame(cxs_file, sum_file):
hk.index.name = 'datetime'
return hk.reset_index()
def read_igms(spc_zip_path):
if spc_zip_path is not None:
# Open zip file
with ZipFile(spc_zip_path) as spc_zip:
# Find all members with .Igm suffix
for name in spc_zip.namelist():
if name.endswith('.Igm'):
# Read the data
with spc_zip.open(name) as igm:
inS = BytesIO(igm.read())
for index, subfile in enumerate(read_stream(inS)):
# yield name, data pair
yield {
'datetime':datetime.utcfromtimestamp(subfile['UTCTime']),
'DataA':subfile['DataA'].squeeze(),
'DataB':subfile['DataB'].squeeze(),
'sceneMirrorPosition':ord(name[0]),
'subfile':index
}
def check_frame(frame, parameters):
frame['qc_percent'] = 0
frame['qc_notes'] = None
......@@ -62,20 +87,33 @@ def check_frame(frame, parameters):
frame = level.compute(frame)
return frame
def update_all(ftp_dir, parameters=None):
def update_all(ftp_dir, sci_dir, parameters=None):
# check for newer sum or cxs file
cxs_files = glob(os.path.join(os.path.abspath(ftp_dir),'AE*','*B1.CXS'))
for qc_file, cxs_file, sum_file in files_to_update(cxs_files):
print('Performing quality control for {}'.format(cxs_file))
# Find the Igms for these times
YYMMDD = re.search('([0-9]{6})', os.path.basename(cxs_file)).group(1)
zip_file = os.path.join(sci_dir, 'AE'+YYMMDD, 'SPC_AERI*.zip')
if not os.path.isfile(zip_file):
zip_file = None
frame = read_frame(cxs_file, sum_file)
igms = pd.DataFrame.from_records(read_igms(zip_file))
if parameters is None:
parameters = {}
frame = check_frame(frame, parameters)
save_quality(frame, qc_file)
frame_with_spikes = frame.merge(spike_check(igms, parameters), on='datetime', how='outer', sort=True)
# Only propogate presence of spikes, not abscence
frame_with_spikes.ix[frame_with_spikes.spike_check == False] = pd.np.nan
frame_with_spikes['spike_check'] = frame_with_spikes.spike_check.ffill(limit=1).bfill(limit=1)
frame_with_spikes = frame_with_spikes.ix[frame.index]
frame_with_spikes = check_frame(frame_with_spikes, parameters)
save_quality(frame_with_spikes, qc_file)
def files_to_update(cxs_files, update_only=True):
for cxs_file in cxs_files:
possible_sum = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','.SUM'))
possible_qc = os.path.join(os.path.dirname(cxs_file), cxs_file.replace('B1.CXS','QC.nc'))
cxs_file
if os.path.isfile(possible_sum):
sum_file = possible_sum
......@@ -91,9 +129,15 @@ def files_to_update(cxs_files, update_only=True):
if __name__ == '__main__':
import argparse
import os
parser = argparse.ArgumentParser()
parser.add_argument('ftp')
if 'nt' in os.name:
default_sci = 'C:\\'
else:
default_sci = '/cygdrive/c/'
parser.add_argument('sci', default=default_sci)
args = parser.parse_args()
update_all(args.ftp)
update_all(args.ftp, args.sci)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment