Skip to content
Snippets Groups Projects
Commit a9bfafc8 authored by David Hoese's avatar David Hoese
Browse files

Merge branch 'test-grb-sender' into 'master'

Add test container for sending GRB data

See merge request cspp_geo/cspp-geo-web-viewer!19
parents 77989f25 06d8fd6c
Branches
No related tags found
No related merge requests found
FROM python:3-alpine
# Gitlab repository isn't public so unauthenticated download isn't possible
#RUN apk add curl && \
# curl -LO https://gitlab.ssec.wisc.edu/davidh/cspp-geo-grb-test-data/raw/feature-tcp-sender/cadu_sender.py && \
# rm -rf /var/cache/apk/*
COPY cadu_sender.py .
CMD ["python", "cadu_sender.py", "--tcp", "-d", "/data"]
# CSPP Geo GRB Sender
This container can mimic a GRB connection (UDP or TCP) that can be used
by the cspp_geo_grb container or any other CSPP Geo GRB installation.
## Build
```bash
docker build -t gitlab.ssec.wisc.edu:5555/cspp_geo/cspp-geo-web-viewer/cspp_geo_grb_sender:latest tests/cspp_geo_grb_sender/
```
## Usage
To run this container along with the GRB processing container, they must be
on the same network. If one does not already exist then it should be created:
```bash
docker network create grb-network
```
Then the sender container can be started:
```bash
docker run -d --rm --network grb-network --name cspp-geo-grb-sender -v /path/to/cadus:/data gitlab.ssec.wisc.edu:5555/cspp_geo/cspp-geo-web-viewer/cspp_geo_grb_sender:latest
```
And the GRB container can then be run:
```bash
docker run -d --rm --network grb-network --name cspp-geo-grb -v /path/to/work:/data gitlab.ssec.wisc.edu:5555/cspp_geo/cspp-geo-web-viewer/cspp_geo_grb:latest ./grb-wrapper.sh cspp-geo-grb-sender
```
Output netCDF files should then appear in `/path/to/work/product`.
## Cleanup
The sender container will exit when it runs out of data. Use `docker ps` to
see this. The GRB container will stick around forever so you'll need to kill
it manually:
```bash
docker kill cspp-geo-grb
```
Note that if the GRB container doesn't consume all the data from the
sender, the sender will also never terminate and should be killed manually
as well.
Lastly, when done testing you can remove the network you created earlier:
```bash
docker network rm grb-network
```
## Customize Sender
This "cadu_sender.py" script is a copy of the one available from the
[cspp-geo-grb-test-data]() repository.
By default this container will run with all the defaults of that script except
TCP is enabled (`--tcp`) and the directory looked at for CADU files is the
entire mounted `/data` directory. To run the script with custom command line
flags run:
```bash
docker run -d --rm --network grb-network --name cspp-geo-grb-sender -v /path/to/cadus:/data gitlab.ssec.wisc.edu:5555/cspp_geo/cspp-geo-web-viewer/cspp_geo_grb_sender:latest python cadu_sender.py --tcp --rate 375000 -d /data
```
#!/usr/bin/env python
"""
This script feeds raw GRB CADUs to a receiving ingestor.
The CADUs are assumed to be grouped in files with naming
convention CADU_5_010XXXX
The _5_ refers to Virtual Channel ID 5. (will be 5 or 6)
The _NNNXXXX refers to the Virtual Channel Frame Count.
A given file contains frames 0xNNN0000 to 0xNNNffff.
"""
from __future__ import division
import sys
import multiprocessing
from datetime import datetime, timedelta
from time import sleep
import fnmatch
import socket
import os
DEFAULT_UDP_IP = '127.0.0.1'
DEFAULT_UDP_LEFT_PORT = 5530 # port for left-hand polarization data
DEFAULT_UDP_RIGHT_PORT = 5531 # port for right-hand polarization data
DEFAULT_TCP_IP = ''
DEFAULT_TCP_LEFT_PORT = 50010
DEFAULT_TCP_RIGHT_PORT = 50020
# Change this if you want to test different data rates
# Three common scenarios are given here, approx full rate, half rate, and quarter rate
DEFAULT_DATA_RATE = 1875000 # ~15 Mb in bytes
HALF_DATA_RATE = 875000 # ~7.5 Mb in bytes
QUARTER_DATA_RATE = 375000 # ~3.75 Mb in bytes
# Machine's socket buffer sizes. Set to the suggested default.
# FIXME: improvement would be to pull them via https://pypi.python.org/pypi/sysctl/0.1b2
RMEM_MAX = 16777216 # bytes
# If you don't know what these refer to, please review the relevant
# sections in the PUG, at http://www.goes-r.gov/users/docs/PUG-GRB-vol4-verC.pdf
CADU_SIZE = 2048
SYNC_SIZE = 4
TFPH_SIZE = 6
FECF_SIZE = 2
MPDU_HDR_SIZE = 2
# sync marker begins every CADU
SYNC = "\x1A\xCF\xFC\x1D"
# frame error control field, 2 bytes of junk (not used) at end of each CADU
FECF = "\x00\x00"
# seems to be the fill val, based on a hex dump only
FILL = "\x0F"
C_BLUE = '\033[94m'
C_END = '\033[0m'
def _update_progress_bar(part, whole, length=30, last=-1):
percent_sent = part / whole
progress_len = int(percent_sent * length)
if progress_len == last:
return progress_len
pstr = "[" + C_BLUE + ("=" * progress_len) + (" " * (length - progress_len)) + C_END + "]"
sys.stdout.write(pstr)
sys.stdout.flush()
sys.stdout.write('\r')
return progress_len
def _cadupop(cadustr):
i = 0
while i < len(cadustr):
yield cadustr[i:i+CADU_SIZE]
i += CADU_SIZE
def cadu_sender(vcid, cadu_files, ip_addr, connect_port, data_rate, tcp=False):
if tcp:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
sock.bind((ip_addr, connect_port))
sock.listen(1)
send_socket, address = sock.accept()
def send_data(data, send_socket=send_socket, address=address):
send_socket.send(data)
else:
send_sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
def send_data(data, send_sock=send_sock):
send_sock.sendto(data, (ip_addr, connect_port))
sent = 0
cadus = 0
totalbytes = 0
sent_since_sleep = 0
max_ss_sleep = 0
overflow_count = 0
tstart = datetime.now()
print("VCID %s start at: %s" % (vcid, tstart))
# loop over all files in directory
for nfile, cadu_filename in enumerate(sorted(cadu_files)):
sent_this_file = 0
print("Sending to %s:%d (%d of %d): %s" % (ip_addr, connect_port, nfile + 1, len(cadu_files), cadu_filename))
cadu_file_size = os.path.getsize(cadu_filename)
with open(cadu_filename, 'rb') as cadu_file:
all_cadus = cadu_file.read()
progress = 0
for cadu in _cadupop(all_cadus):
send_data(cadu)
sent = CADU_SIZE
sent_this_file += sent
totalbytes += CADU_SIZE
cadus += 1
# keep track of how many bytes we've sent since the last time we slept
# this is a hint that we might be overflowing the machine buffer after falling behind
sent_since_sleep += sent
max_ss_sleep = max(sent_since_sleep, max_ss_sleep)
if sent_since_sleep > RMEM_MAX:
overflow_count +=1
# adjust data transfer rate
tnow = datetime.now()
texpect = tstart + timedelta(seconds = (totalbytes / data_rate))
tdiff = (texpect - tnow).total_seconds()
if tdiff > 0:
sleep(tdiff)
sent_since_sleep = 0
# progress = _update_progress_bar(sent_this_file, cadu_file_size, length=30, last=progress)
print("")
tend = datetime.now()
tdiff = tend - tstart
print("VCID %s done at: %s" % (vcid, tend))
print("Effective bytes per second:", totalbytes / tdiff.total_seconds())
def _setup_argparse():
import argparse
parser = argparse.ArgumentParser(description="This script sends raw GRB CADUs to a receiving ingestor.")
# Required arguments:
parser.add_argument('cadu_files', nargs='+', help='any number of GRB CADU files')
# Optional arguments:
parser.add_argument('-d', '--directory', action="store_true", default=False,
help="treat all inputs as directories containing CADU files")
parser.add_argument('-c', '--channels', action="store", default='b', choices=['l','r','b'],
help='which channels to send - (l)eft, (r)ight, or (b)oth')
parser.add_argument('-r', '--rate', action="store", type=int, default=DEFAULT_DATA_RATE,
help="the data rate per channel in bytes per second")
parser.add_argument('--tcp', action="store_true",
help="Generate a TCP stream instead of UDP.")
parser.add_argument('--ip', action="store",
help='which IP to send to')
parser.add_argument('--left-port', action="store", type=int,
help='which port to send the left channel to')
parser.add_argument('--right-port', action="store", type=int,
help='which port to send the right channel to')
return parser
if __name__ == '__main__':
# Get input args
parser = _setup_argparse()
args = parser.parse_args()
if args.directory:
files = []
for d in args.cadu_files:
dirlist = os.listdir(d)
files = files + [os.path.join(d, f) for f in dirlist]
else:
files = args.cadu_files
if args.left_port is None:
args.left_port = DEFAULT_TCP_LEFT_PORT if args.tcp else DEFAULT_UDP_LEFT_PORT
if args.right_port is None:
args.right_port = DEFAULT_TCP_RIGHT_PORT if args.tcp else DEFAULT_UDP_RIGHT_PORT
if args.ip is None:
args.ip = DEFAULT_TCP_IP if args.tcp else DEFAULT_UDP_IP
kwargs = {}
kwargs['tcp'] = args.tcp
jobs = []
if args.channels in ['b', 'r']: # VCID 5 == right
vcid_5_files = fnmatch.filter(files, '*CADU_5*')
jobs.append(multiprocessing.Process(name="VCID 5 Sender", target=cadu_sender, args=(5, vcid_5_files, args.ip, args.right_port, args.rate), kwargs=kwargs))
if args.channels in ['b', 'l']:
vcid_6_files = fnmatch.filter(files, '*CADU_6*')
jobs.append(multiprocessing.Process(name="VCID 6 Sender", target=cadu_sender, args=(6, vcid_6_files, args.ip, args.left_port, args.rate), kwargs=kwargs))
for j in jobs:
j.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment