Skip to content
Snippets Groups Projects
Commit 93b7650e authored by Bruce Flynn's avatar Bruce Flynn
Browse files

Rework eos

parent ecbf80e9
No related branches found
No related tags found
No related merge requests found
...@@ -5,20 +5,43 @@ References: ...@@ -5,20 +5,43 @@ References:
Document Number 151840, May 1997, Appendix C Document Number 151840, May 1997, Appendix C
https://directreadout.sci.gsfc.nasa.gov/documents/satellite_gen/MODIS_UG.pdf https://directreadout.sci.gsfc.nasa.gov/documents/satellite_gen/MODIS_UG.pdf
""" """
from typing import Union
from enum import Enum
from dataclasses import dataclass
packet_types = {
0: "day",
1: "night",
2: "eng1",
4: "eng2",
}
packet_sources = { class PktType(Enum):
0: "solar-diff", Day = 0
1: "srca-cal", Night = 1
2: "bb-cal", Eng1 = 2
3: "space-cal", Eng2 = 3
}
class CalType(Enum):
SD = 0
SRCA = 1
BB = 2
Space = 3
class CalMode(Enum):
Radiometric = 0
Spatial = 1
Spectral = 2
NonCal = 3
@dataclass
class CalSource:
type: CalType
mode: CalMode
frame_count: int
@dataclass
class EarthSource:
frame_count: int
packets_per_scan = { packets_per_scan = {
"day": 3032, "day": 3032,
...@@ -26,41 +49,70 @@ packets_per_scan = { ...@@ -26,41 +49,70 @@ packets_per_scan = {
} }
def sourceid(p): def _caltype(p):
if p.data[0] >> 7 == 0: v = p.data[0] >> 5 & 0x3
return "earthdata" if v > 3:
return packet_sources[p.data[0] >> 5 & 0x3] return None
return CalType(v)
def _calmode(p):
v = p.data[0] >> 3 & 0x3
if v > 3:
return None
return CalMode(v)
def mirrorside(p): def _mirrorside(p):
return ["side1", "side2"][bytes(p.secondary_header)[8] & 0x1] return bytes(p.secondary_header)[8] & 0x1 + 1
def scancount(p): def _scancount(p):
return bytes(p.secondary_header)[8] >> 1 & 0x7 return bytes(p.secondary_header)[8] >> 1 & 0x7
def pkttype(p): def _pkttype(p):
return packet_types[bytes(p.secondary_header)[8] >> 4 & 0x7] v = bytes(p.secondary_header)[8] >> 4 & 0x7
if v > 3:
return None
return PktType(v)
def frame_count(p): def _cal_frame_count(p):
x = int.from_bytes(b"\x00\x00" + p.data[:2], "big") x = int.from_bytes(b"\x00\x00" + p.data[:2], "big")
if sourceid(p) == "earthdata":
return x >> 4 & 0x7FF
return x >> 4 & 0x3F return x >> 4 & 0x3F
# Combination of source ids and packet types to their position within a scan. def _earth_frame_count(p):
_sort_indexes = { x = int.from_bytes(b"\x00\x00" + p.data[:2], "big")
"solar-diff": 1, return x >> 4 & 0x7FF
"srca-cal": 2,
"bb-cal": 3,
"space-cal": 4, @dataclass
"earthdata": 5, class Info:
"eng1": 6, type: str
"eng2": 7, packet_type: PktType
} mirror_side: int
scancount: int
source: Union[EarthSource, CalSource]
def info(p):
tp = _pkttype(p)
source = None
type = "engr"
if tp in (PktType.Day, PktType.Night):
if p.data[0] & 0b10000000 == 0:
type = "earthdata"
source = EarthSource(frame_count=_earth_frame_count(p))
else:
type = "cal"
source = CalSource(
type=_caltype(p),
mode=_calmode(p),
frame_count=_earth_frame_count(p),
)
return Info(type, tp, _mirrorside(p), _scancount(p), source)
def _modis_sort_key(p): def _modis_sort_key(p):
...@@ -69,26 +121,18 @@ def _modis_sort_key(p): ...@@ -69,26 +121,18 @@ def _modis_sort_key(p):
Packets are sorted in the order defined in _sort_indexes. Packets are sorted in the order defined in _sort_indexes.
""" """
type_idx = _sort_indexes.get(pkttype(p), _sort_indexes[sourceid(p)]) frame_sort = 0
return (p.stamp, 64, type_idx, frame_count(p), int(p.is_last())) i = info(p)
if i.type == "earthdata":
type_idx = 5
frame_sort = i.source.frame_count
elif i.type == "cal":
type_idx = i.source.type.value
frame_sort = i.source.frame_count
else:
type_idx = 6 if i.type == PktType.Eng1 else 7
return (p.stamp, 64, type_idx, frame_sort, int(p.is_last()))
def sort_modis(packets): def sort_modis(packets):
return sorted(packets, key=_modis_sort_key) return sorted(packets, key=_modis_sort_key)
def collect_modis_scans(packets):
scan = {"scantime": None, "packets": {}}
scantimes = set()
for p in packets:
if p.apid not in (64, 957):
continue
tp = sourceid(p) if p.apid == 64 else "gbad"
if tp == "solar-diff" and p.stamp not in scantimes:
scantimes.add(p.stamp)
if scan is not None:
yield scan
scan = {"scantime": p.stamp, "packets": {}}
if tp not in scan["packets"]:
scan["packets"][tp] = []
scan["packets"][tp].append(p)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment