Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
E
EdosL0Util
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Deploy
Releases
Model registry
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
SIPS
EdosL0Util
Commits
d7a11622
Commit
d7a11622
authored
9 years ago
by
Greg Quinn
Browse files
Options
Downloads
Patches
Plain Diff
Add crio and crgen for construction record biznis
parent
2bb68486
No related branches found
No related tags found
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
edosl0util/crgen.py
+200
-0
200 additions, 0 deletions
edosl0util/crgen.py
edosl0util/crio.py
+186
-0
186 additions, 0 deletions
edosl0util/crio.py
edosl0util/headers.py
+3
-0
3 additions, 0 deletions
edosl0util/headers.py
with
389 additions
and
0 deletions
edosl0util/crgen.py
0 → 100644
+
200
−
0
View file @
d7a11622
"""
EDOS PDS construction record generation for SUOMI NPP
"""
from
datetime
import
datetime
import
itertools
import
os
from
edosl0util.headers
import
DaySegmentedTimecode
from
edosl0util.stream
import
jpss_packet_stream
def
test_build_apid_info
():
# FIXME: build CR comparison into the CLI
calculated
=
build_cr
(
'
P1571289CRISSCIENCEAAT15320210920101.PDS
'
)
ingested
=
read_pds_cr
(
'
P1571289CRISSCIENCEAAT15320210920100.PDS
'
)
insert_fake_cr_info
(
ingested
)
del
calculated
[
'
completion_time
'
]
# it seems CR completion time does not match PDS
del
ingested
[
'
completion_time
'
]
# creation time from the file name
assert
calculated
==
ingested
def
build_cr
(
pds_file
,
prev_pds_file
=
None
):
"""
Best-effort CR generation by scanning a PDS data file
Previous PDS data file may also be given to make gap detection more complete.
"""
def
main
():
scan
=
scan_packets
(
pds_file
,
prev_pds_file
)
rv
=
{}
rv
[
'
pds_id
'
]
=
pds_id_from_path
(
pds_file
)
rv
[
'
completion_time
'
]
=
get_pds_creation_time
(
pds_file
)
rv
[
'
first_packet_time
'
]
=
scan
[
'
first_packet_time
'
]
rv
[
'
last_packet_time
'
]
=
scan
[
'
last_packet_time
'
]
rv
[
'
apid_info
'
]
=
build_apid_info
(
scan
[
'
apid_info
'
])
rv
[
'
apid_count
'
]
=
len
(
rv
[
'
apid_info
'
])
rv
[
'
file_info
'
]
=
file_info
(
rv
[
'
pds_id
'
],
rv
[
'
apid_info
'
])
rv
[
'
file_count
'
]
=
len
(
rv
[
'
file_info
'
])
rv
.
update
(
aggregated_values
(
rv
[
'
apid_info
'
]))
insert_fake_cr_info
(
rv
)
return
rv
def
file_info
(
pds_id
,
apid_info
):
cr_entry
=
{
'
file_name
'
:
pds_id
+
'
.PDS
'
,
'
apid_count
'
:
0
,
'
apid_info
'
:
[]}
data_entry
=
{
'
file_name
'
:
pds_file
,
'
apid_count
'
:
len
(
apid_info
),
'
apid_info
'
:
[{
'
scid
'
:
npp_scid
,
'
apid
'
:
entry
[
'
apid
'
],
'
first_packet_time
'
:
entry
[
'
first_packet_time
'
],
'
last_packet_time
'
:
entry
[
'
last_packet_time
'
]}
for
entry
in
apid_info
]}
return
[
cr_entry
,
data_entry
]
def
aggregated_values
(
apid_info
):
keys
=
[
'
total_packets
'
,
'
total_bytes
'
,
'
gap_count
'
,
'
fill_bytes
'
,
'
mismatched_length_packets
'
,
'
rs_corrected_packets
'
]
return
{
key
:
sum
(
entry
[
key
]
for
entry
in
apid_info
)
for
key
in
keys
}
return
main
()
def
insert_fake_cr_info
(
cr
):
"""
Populate a CR with phony values for fields that can
'
t be discovered via a packet scan
"""
cr
.
update
({
'
edos_sw_ver_major
'
:
0
,
'
edos_sw_ver_minor
'
:
0
,
'
cr_type
'
:
1
,
'
test_flag
'
:
0
,
'
scs_count
'
:
1
,
'
scs_info
'
:
{
'
start
'
:
missing_time_value
,
'
stop
'
:
missing_time_value
},
'
first_packet_esh_time
'
:
missing_time_value
,
'
last_packet_esh_time
'
:
missing_time_value
,
'
fill_bytes
'
:
0
,
'
mismatched_length_packets
'
:
0
,
'
rs_corrected_packets
'
:
0
})
insert_fake_apid_info
(
cr
[
'
apid_info
'
])
def
pds_id_from_path
(
pds_file
):
"""
Pull 36-char PDS ID from a file name; that
'
s the CR file name minus the .PDS
"""
file_name
=
os
.
path
.
basename
(
pds_file
)
pds_file_name_length
=
40
if
len
(
file_name
)
!=
pds_file_name_length
:
raise
ValueError
(
'
PDS file name {} not of expected length {}
'
.
format
(
file_name
,
pds_file_name_length
))
return
file_name
[:
34
]
+
'
00
'
def
get_pds_creation_time
(
pds_file_or_id
):
"""
Parse 11-char creation time out of a PDS ID or file name; return a DaySegmentedTimecode
"""
return
datetime_to_ccsds
(
datetime
.
strptime
(
pds_file_or_id
[
22
:
33
],
'
%y%j%H%M%S
'
))
def
build_apid_info
(
scan_apid_info
):
"""
Build up apid_info resulting from scan_packets into a full apid_info for a CR
"""
for
entry
in
scan_apid_info
:
entry
[
'
scid
'
]
=
npp_scid
entry
[
'
vcid_count
'
]
=
1
entry
[
'
vcid_info
'
]
=
[{
'
scid
'
:
npp_scid
,
'
vcid
'
:
npp_apid_to_vcid_map
[
entry
[
'
apid
'
]]}]
entry
[
'
gap_count
'
]
=
len
(
entry
[
'
gap_info
'
])
del
entry
[
'
last_packet_ssc
'
]
# field was needed for bookkeeping but isn't in the CR
insert_fake_apid_info
(
apid_info
)
return
apid_info
def
insert_fake_apid_info
(
apid_info
):
"""
Fill CR apid_info with phony values for fields that can
'
t be found via a packet scan
"""
for
entry
in
apid_info
:
entry
.
update
({
'
fill_packets
'
:
0
,
'
fill_packet_info
'
:
[],
'
fill_bytes
'
:
0
,
'
mismatched_length_packets
'
:
0
,
'
mismatched_length_packet_ssc_list
'
:
[],
'
first_packet_esh_time
'
:
missing_time_value
,
'
last_packet_esh_time
'
:
missing_time_value
,
'
rs_corrected_packets
'
:
0
})
for
gap
in
entry
[
'
gap_info
'
]:
gap
.
update
({
'
pre_gap_packet_esh_time
'
:
missing_time_value
,
'
post_gap_packet_esh_time
'
:
missing_time_value
})
missing_time_value
=
DaySegmentedTimecode
(
0
,
0
,
0
)
npp_scid
=
157
# FIXME: i guess this should come from the file name
# ripped from MDFCB, 2014-06-05 revision
# modified to place CrIS FOV 6 in VCID 7 after testing against some real data
npp_vcid_to_apids_map
=
{
0
:
list
(
range
(
0
,
15
))
+
list
(
range
(
16
,
50
))
+
[
65
,
70
,
100
,
146
,
155
,
512
,
513
,
518
,
543
,
544
,
545
,
550
,
768
,
769
,
773
]
+
list
(
range
(
1280
,
1289
)),
1
:
[
50
,
101
,
515
,
528
,
530
,
531
],
2
:
[
102
],
3
:
[
103
,
514
,
536
],
4
:
[
104
],
5
:
[
105
],
6
:
[
106
,
1289
,
1290
]
+
list
(
set
(
range
(
1315
,
1396
))
-
set
(
range
(
1318
,
1391
,
9
))
-
set
(
range
(
1320
,
1393
,
9
))),
7
:
[
107
]
+
list
(
range
(
1318
,
1391
,
9
))
+
list
(
range
(
1320
,
1393
,
9
)),
8
:
[
108
,
1294
,
1295
,
1296
,
1398
],
9
:
[
109
],
10
:
[
110
],
11
:
[
111
,
560
,
561
,
564
,
565
,
576
],
12
:
[
112
,
562
,
563
,
566
],
13
:
[
113
,
546
,
577
,
578
,
579
,
580
,
581
,
582
],
14
:
[
114
],
15
:
[
115
],
16
:
[
116
]
+
list
(
range
(
800
,
806
))
+
list
(
range
(
807
,
824
))
+
[
825
,
826
],
17
:
[
117
,
806
],
18
:
[
118
,
770
]
+
list
(
range
(
830
,
854
))
+
[
855
,
856
],
19
:
[
119
],
20
:
[
120
],
21
:
[
121
,
517
,
524
,
549
,
556
,
780
,
1291
,
1292
,
1293
,
1397
],
22
:
[
122
],
24
:
[
147
,
148
,
149
,
150
]}
npp_apid_to_vcid_map
=
{
apid
:
vcid
for
vcid
,
apids
in
npp_vcid_to_apids_map
.
items
()
for
apid
in
apids
}
def
scan_packets
(
pds_file
,
prev_pds_file
=
None
):
"""
Scan a PDS data file for information needed to produce a construction record
"""
def
main
():
prev_apid_map
=
build_prev_apid_map
(
prev_pds_file
)
apid_map
=
{}
stream
=
jpss_packet_stream
(
open
(
pds_file
,
'
rb
'
))
first_pkt
=
stream
.
next
()
for
pkt
in
itertools
.
chain
([
first_pkt
],
stream
):
entry
=
apid_map
.
get
(
pkt
.
apid
)
if
not
entry
:
entry_from_prev_pds
=
prev_apid_map
.
get
(
pkt
.
apid
)
apid_map
[
pkt
.
apid
]
=
init_entry
(
pkt
,
entry_from_prev_pds
)
else
:
update_entry
(
entry
,
pkt
)
last_pkt
=
pkt
return
{
'
first_packet_time
'
:
datetime_to_ccsds
(
first_pkt
.
stamp
),
'
last_packet_time
'
:
datetime_to_ccsds
(
last_pkt
.
stamp
),
'
apid_info
'
:
[
apid_map
[
k
]
for
k
in
sorted
(
apid_map
)]}
def
build_prev_apid_map
(
prev_pds_file
):
if
prev_pds_file
:
return
{
entry
[
'
apid
'
]:
entry
for
entry
in
scan_packets
(
prev_pds_file
)[
'
apid_info
'
]}
else
:
return
{}
def
init_entry
(
pkt
,
entry_from_prev_pds
):
rv
=
{
'
apid
'
:
pkt
.
apid
,
'
first_packet_time
'
:
datetime_to_ccsds
(
pkt
.
stamp
),
'
first_packet_offset
'
:
pkt
.
offset
,
'
last_packet_time
'
:
datetime_to_ccsds
(
pkt
.
stamp
),
'
last_packet_ssc
'
:
pkt
.
seqid
,
'
total_packets
'
:
1
,
'
total_bytes
'
:
pkt
.
size
,
'
gap_info
'
:
[]}
if
entry_from_prev_pds
:
update_gap_info
(
rv
[
'
gap_info
'
],
entry_from_prev_pds
[
'
last_packet_ssc
'
],
entry_from_prev_pds
[
'
last_packet_time
'
],
pkt
)
return
rv
def
update_entry
(
entry
,
new_pkt
):
prev_last_ssc
=
entry
[
'
last_packet_ssc
'
]
prev_last_time
=
entry
[
'
last_packet_time
'
]
entry
[
'
last_packet_time
'
]
=
datetime_to_ccsds
(
new_pkt
.
stamp
)
entry
[
'
last_packet_ssc
'
]
=
new_pkt
.
seqid
entry
[
'
total_packets
'
]
+=
1
entry
[
'
total_bytes
'
]
+=
new_pkt
.
size
update_gap_info
(
entry
[
'
gap_info
'
],
prev_last_ssc
,
prev_last_time
,
new_pkt
)
def
update_gap_info
(
gap_info
,
last_ssc
,
last_pkt_time
,
new_pkt
):
ssc_limit
=
16384
# one more than highest SSC
expected_new_ssc
=
(
last_ssc
+
1
)
%
ssc_limit
if
new_pkt
.
seqid
!=
expected_new_ssc
:
gap_entry
=
{
'
first_missing_ssc
'
:
expected_new_ssc
,
'
missing_packet_count
'
:
(
new_pkt
.
seqid
-
expected_new_ssc
)
%
ssc_limit
,
'
pre_gap_packet_time
'
:
last_pkt_time
,
'
post_gap_packet_time
'
:
datetime_to_ccsds
(
new_pkt
.
stamp
),
'
post_gap_packet_offset
'
:
new_pkt
.
offset
}
gap_info
.
append
(
gap_entry
)
return
main
()
def
datetime_to_ccsds
(
dt
):
"""
Convert a packet stamp to DaySegmentedTimecode
Handles input of None by returning epoch value of 1958-01-01.
"""
if
dt
is
not
None
:
epoch
=
datetime
(
1958
,
1
,
1
)
days
=
(
dt
-
epoch
).
days
micros
=
int
((
dt
-
datetime
(
dt
.
year
,
dt
.
month
,
dt
.
day
)).
total_seconds
()
*
1e6
)
return
DaySegmentedTimecode
(
days
,
micros
//
1000
,
micros
%
1000
)
else
:
return
DaySegmentedTimecode
()
This diff is collapsed.
Click to expand it.
edosl0util/crio.py
0 → 100644
+
186
−
0
View file @
d7a11622
"""
PDS construction record input and output
"""
import
ctypes
as
c
import
warnings
from
edosl0util.headers
import
BaseStruct
,
DaySegmentedTimecode
def
read
(
cr_file
):
"""
Parse a PDS construction record from a file (*00.PDS)
"""
def
main
():
rv
=
{}
with
open
(
cr_file
,
'
rb
'
)
as
f
:
read_into_dict
(
f
,
Main1Struct
,
rv
)
rv
[
'
scs_info
'
]
=
[
read_struct
(
f
,
ScsStruct
)
for
i
in
range
(
rv
[
'
scs_count
'
])]
read_into_dict
(
f
,
Main2Struct
,
rv
)
rv
[
'
apid_info
'
]
=
[]
for
i
in
range
(
rv
[
'
apid_count
'
]):
d
=
{}
read_into_dict
(
f
,
Apid1Struct
,
d
)
d
[
'
vcid_info
'
]
=
[
read_struct
(
f
,
ApidVcidStruct
)
for
j
in
range
(
d
[
'
vcid_count
'
])]
read_into_dict
(
f
,
Apid2Struct
,
d
)
d
[
'
gap_info
'
]
=
[
read_struct
(
f
,
ApidGapStruct
)
for
j
in
range
(
d
[
'
gap_count
'
])]
read_into_dict
(
f
,
Apid3Struct
,
d
)
d
[
'
fill_packet_info
'
]
=
[
read_struct
(
f
,
ApidFillStruct
)
for
j
in
range
(
d
[
'
fill_packets
'
])]
read_into_dict
(
f
,
Apid4Struct
,
d
)
d
[
'
mismatched_length_packet_ssc_list
'
]
=
[
read_struct
(
f
,
ApidMismatchedLengthStruct
)[
'
packet_ssc
'
]
for
i
in
range
(
d
[
'
mismatched_length_packets
'
])]
read_into_dict
(
f
,
Apid5Struct
,
d
)
rv
[
'
apid_info
'
].
append
(
d
)
read_into_dict
(
f
,
Main3Struct
,
rv
)
rv
[
'
file_info
'
]
=
[]
for
i
in
range
(
rv
[
'
file_count
'
]):
d
=
{}
read_into_dict
(
f
,
FileStruct
,
d
)
d
[
'
apid_info
'
]
=
[
read_struct
(
f
,
FileApidStruct
)
for
j
in
range
(
d
[
'
apid_count
'
])]
if
d
[
'
apid_count
'
]
==
0
:
# bogus all-zero apid struct is present for the CR file
read_struct
(
f
,
FileApidStruct
)
rv
[
'
file_info
'
].
append
(
d
)
if
f
.
read
():
warnings
.
warn
(
'
{} bytes remain after reading CR
'
.
format
(
len
(
extra
)))
return
rv
def
read_into_dict
(
f
,
struct
,
data
):
data
.
update
(
read_struct
(
f
,
struct
))
def
read_struct
(
f
,
struct
):
rv
=
struct_to_dict
(
struct
.
from_buffer_copy
(
f
.
read
(
c
.
sizeof
(
struct
))))
rv
=
{
k
:
v
for
k
,
v
in
rv
.
items
()
if
not
k
.
startswith
(
'
spare_
'
)}
# no spare fields
return
{
k
:
int
(
v
)
if
isinstance
(
v
,
long
)
else
v
for
k
,
v
in
rv
.
items
()}
# no longs
return
main
()
def
write
(
cr
,
out_file
):
"""
Write out a PDS construction record file
"""
def
main
():
with
open
(
out_file
,
'
wb
'
)
as
f
:
write_struct
(
cr
,
Main1Struct
,
f
)
for
d
in
cr
[
'
scs_info
'
]:
write_struct
(
d
,
ScsStruct
,
f
)
write_struct
(
cr
,
Main2Struct
,
f
)
for
d
in
cr
[
'
apid_info
'
]:
write_struct
(
d
,
Apid1Struct
,
f
)
for
dd
in
d
[
'
vcid_info
'
]:
write_struct
(
dd
,
ApidVcidStruct
,
f
)
write_struct
(
d
,
Apid2Struct
,
f
)
for
dd
in
d
[
'
gap_info
'
]:
write_struct
(
dd
,
ApidGapStruct
,
f
)
write_struct
(
d
,
Apid3Struct
,
f
)
for
dd
in
d
[
'
fill_packet_info
'
]:
write_struct
(
dd
,
ApidFillStruct
,
f
)
write_struct
(
d
,
Apid4Struct
,
f
)
for
ssc
in
d
[
'
mismatched_length_packet_ssc_list
'
]:
write_struct
({
'
ssc
'
:
ssc
},
ApidMismatchedLengthStruct
,
f
)
write_struct
(
d
,
Apid5Struct
,
f
)
write_struct
(
cr
,
Main3Struct
,
f
)
for
d
in
cr
[
'
file_info
'
]:
write_struct
(
d
,
FileStruct
,
f
)
for
dd
in
d
[
'
apid_info
'
]:
write_struct
(
dd
,
FileApidStruct
,
f
)
if
d
[
'
apid_count
'
]
==
0
:
write_struct
({},
FileApidStruct
,
f
)
# one all-zero apid struct if no others
def
write_struct
(
data
,
struct
,
out
):
fields
=
[
f
[
0
]
for
f
in
struct
.
_fields_
]
struct_data
=
{
k
:
v
for
k
,
v
in
data
.
items
()
if
k
in
fields
}
out
.
write
(
memoryview
(
struct
(
**
struct_data
)))
main
()
def
struct_to_dict
(
s
):
return
{
f
[
0
]:
getattr
(
s
,
f
[
0
])
for
f
in
s
.
_fields_
}
class
Main1Struct
(
BaseStruct
):
_fields_
=
[(
'
edos_sw_ver_major
'
,
c
.
c_uint8
),
(
'
edos_sw_ver_minor
'
,
c
.
c_uint8
),
(
'
cr_type
'
,
c
.
c_uint8
),
(
'
spare_1
'
,
c
.
c_uint8
*
1
),
(
'
pds_id
'
,
c
.
c_char
*
36
),
(
'
test_flag
'
,
c
.
c_uint8
),
(
'
spare_2
'
,
c
.
c_uint8
*
9
),
(
'
scs_count
'
,
c
.
c_uint16
)]
class
ScsStruct
(
BaseStruct
):
_fields_
=
[(
'
start
'
,
DaySegmentedTimecode
),
(
'
stop
'
,
DaySegmentedTimecode
)]
class
Main2Struct
(
BaseStruct
):
_fields_
=
[(
'
fill_bytes
'
,
c
.
c_uint64
),
(
'
mismatched_length_packets
'
,
c
.
c_uint32
),
(
'
first_packet_time
'
,
DaySegmentedTimecode
),
(
'
last_packet_time
'
,
DaySegmentedTimecode
),
(
'
first_packet_esh_time
'
,
DaySegmentedTimecode
),
(
'
last_packet_esh_time
'
,
DaySegmentedTimecode
),
(
'
rs_corrected_packets
'
,
c
.
c_uint32
),
(
'
total_packets
'
,
c
.
c_uint32
),
(
'
total_bytes
'
,
c
.
c_uint64
),
(
'
gap_count
'
,
c
.
c_uint32
),
(
'
completion_time
'
,
DaySegmentedTimecode
),
(
'
spare_3
'
,
c
.
c_uint8
*
7
),
(
'
apid_count
'
,
c
.
c_uint8
)]
class
Apid1Struct
(
BaseStruct
):
_fields_
=
[(
'
spare_1
'
,
c
.
c_uint8
*
1
),
(
'
scid
'
,
c
.
c_uint8
),
(
'
apid
'
,
c
.
c_uint16
),
(
'
first_packet_offset
'
,
c
.
c_uint64
),
(
'
spare_2
'
,
c
.
c_uint8
*
3
),
(
'
vcid_count
'
,
c
.
c_uint8
)]
class
ApidVcidStruct
(
BaseStruct
):
_fields_
=
[(
'
spare_1
'
,
c
.
c_uint16
),
(
'
spare_2
'
,
c
.
c_uint16
,
2
),
(
'
scid
'
,
c
.
c_uint16
,
8
),
(
'
vcid
'
,
c
.
c_uint16
,
6
)]
class
Apid2Struct
(
BaseStruct
):
_fields_
=
[(
'
gap_count
'
,
c
.
c_uint32
)]
class
ApidGapStruct
(
BaseStruct
):
_fields_
=
[(
'
first_missing_ssc
'
,
c
.
c_uint32
),
(
'
post_gap_packet_offset
'
,
c
.
c_uint64
),
(
'
missing_packet_count
'
,
c
.
c_uint32
),
(
'
pre_gap_packet_time
'
,
DaySegmentedTimecode
),
(
'
post_gap_packet_time
'
,
DaySegmentedTimecode
),
(
'
pre_gap_packet_esh_time
'
,
DaySegmentedTimecode
),
(
'
post_gap_packet_esh_time
'
,
DaySegmentedTimecode
)]
class
Apid3Struct
(
BaseStruct
):
_fields_
=
[(
'
fill_packets
'
,
c
.
c_uint32
)]
class
ApidFillStruct
(
BaseStruct
):
_fields_
=
[(
'
packet_ssc
'
,
c
.
c_uint32
),
(
'
packet_offset
'
,
c
.
c_uint64
),
(
'
first_fill_byte
'
,
c
.
c_uint32
)]
class
Apid4Struct
(
BaseStruct
):
_fields_
=
[(
'
fill_bytes
'
,
c
.
c_uint64
),
(
'
mismatched_length_packets
'
,
c
.
c_uint32
)]
class
ApidMismatchedLengthStruct
(
BaseStruct
):
_fields_
=
[(
'
packet_ssc
'
,
c
.
c_uint32
)]
class
Apid5Struct
(
BaseStruct
):
_fields_
=
[(
'
first_packet_time
'
,
DaySegmentedTimecode
),
(
'
last_packet_time
'
,
DaySegmentedTimecode
),
(
'
first_packet_esh_time
'
,
DaySegmentedTimecode
),
(
'
last_packet_esh_time
'
,
DaySegmentedTimecode
),
(
'
rs_corrected_packets
'
,
c
.
c_uint32
),
(
'
total_packets
'
,
c
.
c_uint32
),
(
'
total_bytes
'
,
c
.
c_uint64
),
(
'
spare_3
'
,
c
.
c_uint64
)]
class
Main3Struct
(
BaseStruct
):
_fields_
=
[(
'
spare_4
'
,
c
.
c_uint8
*
3
),
(
'
file_count
'
,
c
.
c_uint8
)]
class
FileStruct
(
BaseStruct
):
_fields_
=
[(
'
file_name
'
,
c
.
c_char
*
40
),
(
'
spare_1
'
,
c
.
c_uint8
*
3
),
(
'
apid_count
'
,
c
.
c_uint8
)]
class
FileApidStruct
(
BaseStruct
):
_fields_
=
[(
'
spare_1
'
,
c
.
c_uint8
*
1
),
(
'
scid
'
,
c
.
c_uint8
),
(
'
apid
'
,
c
.
c_uint16
),
(
'
first_packet_time
'
,
DaySegmentedTimecode
),
(
'
last_packet_time
'
,
DaySegmentedTimecode
),
(
'
spare_2
'
,
c
.
c_uint8
*
4
)]
This diff is collapsed.
Click to expand it.
edosl0util/headers.py
+
3
−
0
View file @
d7a11622
...
...
@@ -37,6 +37,9 @@ class BaseStruct(c.BigEndianStructure):
fields
=
'
,
'
.
join
(
'
%s=%s
'
%
(
f
[
0
],
repr
(
getattr
(
self
,
f
[
0
])))
for
f
in
self
.
_fields_
)
return
'
<%s (%s)>
'
%
(
self
.
__class__
.
__name__
,
fields
)
def
__eq__
(
self
,
other
):
return
all
(
getattr
(
self
,
f
[
0
])
==
getattr
(
other
,
f
[
0
])
for
f
in
self
.
_fields_
)
class
PrimaryHeader
(
BaseStruct
):
"""
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment