sat_db_functions.py 12.65 KiB
#!/home/ygao/miniconda/envs/py39env/bin/python
import os
import json
import logging
import pandas as pd
from datetime import datetime, timezone,timedelta
from sat_latency.interface import satellite_data_from_filters
# Custom JSON encoder to handle datetime objects
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime, pd.Timestamp)):
return obj.isoformat()
return super().default(obj)
# Set up logging
logger = logging.getLogger()
# Path to database
SATELLITE_DATA_DIR = "/data/sat_latency" # Path to your latency database
RELATIONSHIPS_FILE = "satellite_relationships.json" # Path to your prebuilt relationships file
# Hard-coded mapping of satellite ID variations to canonical IDs
# This makes it easy for future developers to add or modify mappings
SATELLITE_ID_MAPPINGS = {
# Format: 'variant': 'canonical'
'G16': 'G16',
'g16': 'G16',
'G18': 'G18',
'g18': 'G18',
'G19': 'G19',
'g19': 'G19',
'DMSP-17': 'DMSP-17',
'dmsp17': 'DMSP-17',
'DMSP-18': 'DMSP-18',
'dmsp18': 'DMSP-18',
'DMSP-16': 'DMSP-16',
'dmsp16': 'DMSP-16',
'NOAA-19': 'NOAA-19',
'n19': 'NOAA-19',
'NOAA-20': 'NOAA-20',
'n20': 'NOAA-20',
'NOAA-21': 'NOAA-21',
'n21': 'NOAA-21'
}
# Create reverse mapping (canonical to variants)
CANONICAL_TO_VARIANTS = {}
for variant, canonical in SATELLITE_ID_MAPPINGS.items():
if canonical not in CANONICAL_TO_VARIANTS:
CANONICAL_TO_VARIANTS[canonical] = []
CANONICAL_TO_VARIANTS[canonical].append(variant)
def get_canonical_id(satellite_id):
"""Get canonical ID for a satellite ID variant"""
return SATELLITE_ID_MAPPINGS.get(satellite_id, satellite_id)
def get_all_variants(canonical_id):
"""Get all variants for a canonical satellite ID"""
return CANONICAL_TO_VARIANTS.get(canonical_id, [canonical_id])
def consolidate_satellite_data(original_data):
"""
Consolidate satellite data using the hard-coded mapping
"""
if not original_data:
return None
normalized_data = {
"satellites": [],
"coverages": original_data.get("coverages", []),
"instruments": original_data.get("instruments", []),
"relationships": {},
"satellite_variants": {} # Maps canonical IDs to their variants
}
# Group satellites by canonical ID
satellite_groups = {}
for sat_id in original_data.get("satellites", []):
canonical_id = get_canonical_id(sat_id)
if canonical_id not in satellite_groups:
satellite_groups[canonical_id] = []
satellite_groups[canonical_id].append(sat_id)
# Use canonical IDs as the satellite list
normalized_data["satellites"] = sorted(satellite_groups.keys())
# Store variant mapping
normalized_data["satellite_variants"] = satellite_groups
# Merge relationships for each canonical ID
for canonical_id, variants in satellite_groups.items():
normalized_data["relationships"][canonical_id] = {
"coverages": [],
"instruments": [],
"coverage_instruments": {}
}
# Merge relationship data from all variants
for variant_id in variants:
if variant_id not in original_data.get("relationships", {}):
continue
original_relationship = original_data["relationships"][variant_id]
# Merge coverages
for coverage in original_relationship.get("coverages", []):
if coverage not in normalized_data["relationships"][canonical_id]["coverages"]:
normalized_data["relationships"][canonical_id]["coverages"].append(coverage)
# Merge instruments
for instrument in original_relationship.get("instruments", []):
if instrument not in normalized_data["relationships"][canonical_id]["instruments"]:
normalized_data["relationships"][canonical_id]["instruments"].append(instrument)
# Merge coverage_instruments
for coverage, instruments in original_relationship.get("coverage_instruments", {}).items():
if coverage not in normalized_data["relationships"][canonical_id]["coverage_instruments"]:
normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage] = []
for instrument in instruments:
if instrument not in normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage]:
normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage].append(instrument)
# Sort arrays for consistent output
normalized_data["relationships"][canonical_id]["coverages"].sort()
normalized_data["relationships"][canonical_id]["instruments"].sort()
for coverage in normalized_data["relationships"][canonical_id]["coverage_instruments"]:
normalized_data["relationships"][canonical_id]["coverage_instruments"][coverage].sort()
return normalized_data
def load_relationship_data():
"""
Load prebuilt satellite relationship data from JSON file and consolidate duplicates.
"""
try:
if os.path.exists(RELATIONSHIPS_FILE):
with open(RELATIONSHIPS_FILE, 'r') as f:
relationships = json.load(f)
# Consolidate satellite data to merge variants
consolidated = consolidate_satellite_data(relationships)
if consolidated:
logger.info(f"Loaded and consolidated {len(consolidated['satellites'])} unique satellites from relationships")
return consolidated
else:
logger.warning("Failed to consolidate relationships data")
return relationships
else:
logger.warning(f"Relationships file not found: {RELATIONSHIPS_FILE}")
return None
except Exception as e:
logger.error(f"Error loading relationship data: {str(e)}")
return None
def run_sat_latency_query(start_time, end_time, filters=None):
"""
Query the satellite latency database using sat_latency.interface package
Args:
start_time (str): Start time in ISO format (YYYY-MM-DDTHH:MM:SS)
end_time (str): End time in ISO format (YYYY-MM-DDTHH:MM:SS)
filters (dict): Optional filters for satellite_id, coverage, instrument, etc.
Returns:
list: List of latency records as dictionaries
"""
try:
logger.info(f"Querying satellite latency data from {start_time} to {end_time}")
# Convert string ISO timestamps to datetime objects if they are strings
if isinstance(start_time, str):
start_datetime = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
else:
start_datetime = start_time
if isinstance(end_time, str):
end_datetime = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
else:
end_datetime = end_time
# Ensure timezone is set
if start_datetime.tzinfo is None:
start_datetime = start_datetime.replace(tzinfo=timezone.utc)
if end_datetime.tzinfo is None:
end_datetime = end_datetime.replace(tzinfo=timezone.utc)
logger.info(f"Converted timestamps: {start_datetime} to {end_datetime}")
# Initialize filter parameters for the sat_latency API
satellite_ids = None
coverage = None
instrument = None
# Process filters
if filters:
# Expand satellite IDs to include all variants
if "satellite-id" in filters:
satellite_id = filters["satellite-id"]
# Handle list or comma-separated list of satellites
if isinstance(satellite_id, list):
expanded_ids = []
for sat_id in satellite_id:
canonical_id = get_canonical_id(sat_id)
expanded_ids.extend(get_all_variants(canonical_id))
# Remove duplicates
satellite_ids = list(set(expanded_ids))
elif isinstance(satellite_id, str) and ',' in satellite_id:
satellite_ids_list = [s.strip() for s in satellite_id.split(',')]
expanded_ids = []
for sat_id in satellite_ids_list:
canonical_id = get_canonical_id(sat_id)
expanded_ids.extend(get_all_variants(canonical_id))
satellite_ids = list(set(expanded_ids))
elif isinstance(satellite_id, str):
canonical_id = get_canonical_id(satellite_id)
satellite_ids = get_all_variants(canonical_id)
# Get coverage filter
if "coverage" in filters:
coverage_value = filters["coverage"]
# Convert coverage to a list if it's a string
if isinstance(coverage_value, str):
if ',' in coverage_value:
coverage = [c.strip() for c in coverage_value.split(',')]
else:
coverage = [coverage_value]
else:
coverage = coverage_value # Already a list or None
# Get instrument filter
if "instrument" in filters:
instrument_value = filters["instrument"]
# Convert instrument to a list if it's a string
if isinstance(instrument_value, str):
if ',' in instrument_value:
instrument = [i.strip() for i in instrument_value.split(',')]
else:
instrument = [instrument_value]
else:
instrument = instrument_value # Already a list or None
# Log the query parameters
logger.info(f"Query parameters: database={SATELLITE_DATA_DIR}, start_date={start_datetime}, end_date={end_datetime}")
logger.info(f"Filters: satellite_ids={satellite_ids}, coverage={coverage}, instrument={instrument}")
# Call the sat_latency.interface function
data = satellite_data_from_filters(
SATELLITE_DATA_DIR,
start_date=start_datetime,
end_date=end_datetime,
satellite_ids=satellite_ids,
coverages=coverage,
instruments=instrument
)
# Convert result to a list of dictionaries for JSON serialization
if data is not None:
try:
# Convert Polars DataFrame to list of dictionaries
records = data.to_dicts()
# Process datetime objects for JSON serialization
processed_records = []
for record in records:
processed_record = {}
for key, value in record.items():
if isinstance(value, (datetime, pd.Timestamp)):
processed_record[key] = value.isoformat()
else:
processed_record[key] = value
processed_records.append(processed_record)
logger.info(f"Successfully converted data: {len(processed_records)} records found")
return processed_records
except Exception as e:
logger.error(f"Error converting Polars DataFrame to dict: {str(e)}")
# Fallback method if to_dicts() is not available
try:
pandas_df = data.to_pandas()
# Convert datetime columns to strings
for col in pandas_df.select_dtypes(include=['datetime64']).columns:
pandas_df[col] = pandas_df[col].astype(str)
records = pandas_df.to_dict(orient='records')
logger.info(f"Successfully converted data via pandas: {len(records)} records found")
return records
except Exception as e2:
logger.error(f"Error in pandas conversion fallback: {str(e2)}")
return []
else:
logger.warning("Query returned None")
return []
except Exception as e:
logger.error(f"Error executing satellite latency query: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return []