Skip to content
Snippets Groups Projects
Commit 64f3172e authored by Yuan Gao's avatar Yuan Gao
Browse files

a runable python library version

parent 7e29200f
No related branches found
Tags v1.0.1
No related merge requests found
#!/home/ygao/latency/venv/bin/python #!/home/ygao/miniconda/envs/py39env/bin/python
import cgi import cgi
import cgitb import cgitb
import json import json
...@@ -7,13 +7,12 @@ import os ...@@ -7,13 +7,12 @@ import os
import logging import logging
import pandas as pd import pandas as pd
import traceback import traceback
import subprocess
# Enable detailed CGI error reporting # Enable detailed CGI error reporting
cgitb.enable() cgitb.enable()
# Set up logging - use absolute path to ensure writability # Set up logging - use absolute path to ensure writability
LOG_DIR = "" LOG_DIR = "./" # Using /tmp which should be writable
os.makedirs(LOG_DIR, exist_ok=True) os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, "latency_viewer.log") LOG_FILE = os.path.join(LOG_DIR, "latency_viewer.log")
...@@ -33,54 +32,61 @@ logger.info(f"Script path: {os.path.abspath(__file__)}") ...@@ -33,54 +32,61 @@ logger.info(f"Script path: {os.path.abspath(__file__)}")
logger.info(f"Python version: {sys.version}") logger.info(f"Python version: {sys.version}")
logger.info(f"User running script: {os.getenv('USER') or 'Unknown'}") logger.info(f"User running script: {os.getenv('USER') or 'Unknown'}")
# Import functions from our shared module
# Define fallback functions in case import fails
def fallback_get_canonical_id(satellite_id):
"""Fallback function if import fails - returns the input as-is"""
logger.warning(f"Using fallback get_canonical_id for {satellite_id}")
return satellite_id
def fallback_get_all_variants(canonical_id):
"""Fallback function if import fails - returns the canonical ID in a list"""
logger.warning(f"Using fallback get_all_variants for {canonical_id}")
return [canonical_id]
def fallback_run_sat_latency_query(start_time, end_time, filters=None):
"""Fallback function if import fails - returns empty list"""
logger.error("Using fallback run_sat_latency_query - no data will be returned")
return []
# Set default functions to fallbacks
get_canonical_id = fallback_get_canonical_id
get_all_variants = fallback_get_all_variants
run_sat_latency_query = fallback_run_sat_latency_query
# Try to import the real functions
try: try:
# Log system information # Add possible module locations to the path
logger.info("System information:") current_dir = os.path.dirname(os.path.abspath(__file__))
whoami_output = subprocess.check_output(["whoami"], stderr=subprocess.STDOUT).decode().strip() sys.path.append(current_dir)
logger.info(f"Current user from whoami: {whoami_output}")
# Check script permissions
logger.info(f"Script permissions: {oct(os.stat(__file__).st_mode)}")
# Check if we can write to tmp directory # Also try parent directory
tmp_test_path = "/tmp/data_py_test.txt" parent_dir = os.path.dirname(current_dir)
try: sys.path.append(parent_dir)
with open(tmp_test_path, 'w') as f:
f.write("Test")
os.remove(tmp_test_path)
logger.info("Successfully wrote to and removed test file in /tmp")
except Exception as e:
logger.error(f"Failed to write to /tmp: {str(e)}")
# Check if sudo is available logger.info(f"Looking for sat_db_functions in: {current_dir} and {parent_dir}")
try:
sudo_test = subprocess.run(["sudo", "-l"], capture_output=True, text=True)
logger.info(f"Sudo permissions: {sudo_test.stdout}")
if sudo_test.returncode != 0:
logger.error(f"Sudo test failed: {sudo_test.stderr}")
except Exception as e:
logger.error(f"Error checking sudo permissions: {str(e)}")
except Exception as e:
logger.error(f"Error during environment checks: {str(e)}")
# Import functions from our shared module
# Adjust the path as needed to find the module
try:
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
logger.info(f"Looking for sat_db_functions in: {os.path.dirname(os.path.abspath(__file__))}")
# List directory contents to verify module existence # List directory contents to verify module existence
dir_contents = os.listdir(os.path.dirname(os.path.abspath(__file__))) dir_contents = os.listdir(current_dir)
logger.info(f"Directory contents: {dir_contents}") logger.info(f"Directory contents: {dir_contents}")
from sat_db_functions import run_sat_latency_query, get_canonical_id, get_all_variants # Try to import the module
import sat_db_functions
# If successful, override the fallback functions
get_canonical_id = sat_db_functions.get_canonical_id
get_all_variants = sat_db_functions.get_all_variants
run_sat_latency_query = sat_db_functions.run_sat_latency_query
logger.info("Successfully imported from sat_db_functions") logger.info("Successfully imported from sat_db_functions")
except ImportError as e: except ImportError as e:
logger.error(f"Import error: {str(e)}") logger.error(f"Import error: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
logger.error("Will use fallback functions that provide limited functionality")
except Exception as e: except Exception as e:
logger.error(f"Unexpected error during import: {str(e)}") logger.error(f"Unexpected error during import: {str(e)}")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
logger.error("Will use fallback functions that provide limited functionality")
def data_endpoint(): def data_endpoint():
""" """
......
This diff is collapsed.
#!/home/ygao/latency/venv/bin/python #!/home/ygao/miniconda/envs/py39env/bin/python
import cgi import cgi
import json import json
import os import os
......
#!/home/ygao/latency/venv/bin/python #!/home/ygao/miniconda/envs/py39env/bin/python
import os import os
import json import json
import subprocess
import logging import logging
import pandas as pd import pandas as pd
from datetime import datetime, timezone,timedelta
from sat_latency.interface import satellite_data_from_filters
# Custom JSON encoder to handle datetime objects
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime, pd.Timestamp)):
return obj.isoformat()
return super().default(obj)
# Set up logging # Set up logging
logger = logging.getLogger() logger = logging.getLogger()
# Path to conda environment and database # Path to database
CONDA_ENV_PATH = "/home/oper/.mdrexler_conda"
SATELLITE_DATA_DIR = "/data/sat_latency" # Path to your latency database SATELLITE_DATA_DIR = "/data/sat_latency" # Path to your latency database
RELATIONSHIPS_FILE = "satellite_relationships.json" # Path to your prebuilt relationships file RELATIONSHIPS_FILE = "satellite_relationships.json" # Path to your prebuilt relationships file
...@@ -154,7 +161,7 @@ def load_relationship_data(): ...@@ -154,7 +161,7 @@ def load_relationship_data():
def run_sat_latency_query(start_time, end_time, filters=None): def run_sat_latency_query(start_time, end_time, filters=None):
""" """
Directly query the satellite latency database using sat_latency_interface Query the satellite latency database using sat_latency.interface package
Args: Args:
start_time (str): Start time in ISO format (YYYY-MM-DDTHH:MM:SS) start_time (str): Start time in ISO format (YYYY-MM-DDTHH:MM:SS)
...@@ -164,99 +171,136 @@ def run_sat_latency_query(start_time, end_time, filters=None): ...@@ -164,99 +171,136 @@ def run_sat_latency_query(start_time, end_time, filters=None):
Returns: Returns:
list: List of latency records as dictionaries list: List of latency records as dictionaries
""" """
# Expand satellite IDs to include all variants
if filters and "satellite-id" in filters:
satellite_id = filters["satellite-id"]
# Handle comma-separated list of satellites
if isinstance(satellite_id, str) and ',' in satellite_id:
satellite_ids = [s.strip() for s in satellite_id.split(',')]
expanded_ids = []
for sat_id in satellite_ids:
# Get the canonical form
canonical_id = get_canonical_id(sat_id)
# Add all variants of this canonical ID
expanded_ids.extend(get_all_variants(canonical_id))
# Remove duplicates
expanded_ids = list(set(expanded_ids))
filters["satellite-id"] = expanded_ids
# Handle single satellite ID
elif isinstance(satellite_id, str):
canonical_id = get_canonical_id(satellite_id)
filters["satellite-id"] = get_all_variants(canonical_id)
# Build the command exactly as specified
base_cmd = "module load miniconda/3.6-base && source activate ~/.mdrexler_conda && sat_latency_interface"
# Add database path and time parameters
cmd = f"{base_cmd} -d {SATELLITE_DATA_DIR} --from '{start_time}' --until '{end_time}' --output-type json"
# Add filters if provided
if filters:
for key, values in filters.items():
if values and (isinstance(values, list) or isinstance(values, str)):
if isinstance(values, str):
values = [values]
filter_values = " ".join(f'"{v}"' for v in values if v)
if filter_values:
cmd += f" --{key} {filter_values}"
logger.info(f"Running command: {cmd}")
try: try:
# Create a temporary shell script to run the command logger.info(f"Querying satellite latency data from {start_time} to {end_time}")
script_path = "/tmp/run_sat_latency.sh"
with open(script_path, 'w') as f:
f.write("#!/bin/bash\n")
f.write(cmd + "\n")
# Make the script executable # Convert string ISO timestamps to datetime objects if they are strings
os.chmod(script_path, 0o755) if isinstance(start_time, str):
start_datetime = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
# Run the script using sudo as the oper user else:
sudo_cmd = ["sudo", "-u", "oper", "-i", script_path] start_datetime = start_time
if isinstance(end_time, str):
end_datetime = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
else:
end_datetime = end_time
logger.info(f"Executing: {' '.join(sudo_cmd)}") # Ensure timezone is set
if start_datetime.tzinfo is None:
start_datetime = start_datetime.replace(tzinfo=timezone.utc)
if end_datetime.tzinfo is None:
end_datetime = end_datetime.replace(tzinfo=timezone.utc)
logger.info(f"Converted timestamps: {start_datetime} to {end_datetime}")
# Use PIPE for stdout and stderr # Initialize filter parameters for the sat_latency API
process = subprocess.Popen( satellite_ids = None
sudo_cmd, coverage = None
stdout=subprocess.PIPE, instrument = None
stderr=subprocess.PIPE,
universal_newlines=True # Process filters
if filters:
# Expand satellite IDs to include all variants
if "satellite-id" in filters:
satellite_id = filters["satellite-id"]
# Handle list or comma-separated list of satellites
if isinstance(satellite_id, list):
expanded_ids = []
for sat_id in satellite_id:
canonical_id = get_canonical_id(sat_id)
expanded_ids.extend(get_all_variants(canonical_id))
# Remove duplicates
satellite_ids = list(set(expanded_ids))
elif isinstance(satellite_id, str) and ',' in satellite_id:
satellite_ids_list = [s.strip() for s in satellite_id.split(',')]
expanded_ids = []
for sat_id in satellite_ids_list:
canonical_id = get_canonical_id(sat_id)
expanded_ids.extend(get_all_variants(canonical_id))
satellite_ids = list(set(expanded_ids))
elif isinstance(satellite_id, str):
canonical_id = get_canonical_id(satellite_id)
satellite_ids = get_all_variants(canonical_id)
# Get coverage filter
if "coverage" in filters:
coverage_value = filters["coverage"]
# Convert coverage to a list if it's a string
if isinstance(coverage_value, str):
if ',' in coverage_value:
coverage = [c.strip() for c in coverage_value.split(',')]
else:
coverage = [coverage_value]
else:
coverage = coverage_value # Already a list or None
# Get instrument filter
if "instrument" in filters:
instrument_value = filters["instrument"]
# Convert instrument to a list if it's a string
if isinstance(instrument_value, str):
if ',' in instrument_value:
instrument = [i.strip() for i in instrument_value.split(',')]
else:
instrument = [instrument_value]
else:
instrument = instrument_value # Already a list or None
# Log the query parameters
logger.info(f"Query parameters: database={SATELLITE_DATA_DIR}, start_date={start_datetime}, end_date={end_datetime}")
logger.info(f"Filters: satellite_ids={satellite_ids}, coverage={coverage}, instrument={instrument}")
# Call the sat_latency.interface function
data = satellite_data_from_filters(
SATELLITE_DATA_DIR,
start_date=start_datetime,
end_date=end_datetime,
satellite_ids=satellite_ids,
coverages=coverage,
instruments=instrument
) )
# Convert result to a list of dictionaries for JSON serialization
# Get the output and error if data is not None:
stdout, stderr = process.communicate() try:
# Convert Polars DataFrame to list of dictionaries
# Check if the command was successful records = data.to_dicts()
if process.returncode != 0:
logger.error(f"Command failed with exit code {process.returncode}: {stderr}") # Process datetime objects for JSON serialization
return [] processed_records = []
for record in records:
# Log the first part of the output processed_record = {}
if stdout: for key, value in record.items():
logger.info(f"Command output (first 200 chars): {stdout[:200]}...") if isinstance(value, (datetime, pd.Timestamp)):
processed_record[key] = value.isoformat()
else:
processed_record[key] = value
processed_records.append(processed_record)
logger.info(f"Successfully converted data: {len(processed_records)} records found")
return processed_records
except Exception as e:
logger.error(f"Error converting Polars DataFrame to dict: {str(e)}")
# Fallback method if to_dicts() is not available
try:
pandas_df = data.to_pandas()
# Convert datetime columns to strings
for col in pandas_df.select_dtypes(include=['datetime64']).columns:
pandas_df[col] = pandas_df[col].astype(str)
records = pandas_df.to_dict(orient='records')
logger.info(f"Successfully converted data via pandas: {len(records)} records found")
return records
except Exception as e2:
logger.error(f"Error in pandas conversion fallback: {str(e2)}")
return []
else: else:
logger.warning("Command returned empty output") logger.warning("Query returned None")
# Parse the JSON output
try:
data = json.loads(stdout)
logger.info(f"Successfully parsed JSON data: {len(data)} records found")
return data
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON output: {e}")
logger.error(f"Raw output (first 500 chars): {stdout[:500]}...")
return [] return []
except Exception as e: except Exception as e:
logger.error(f"Error executing command: {str(e)}") logger.error(f"Error executing satellite latency query: {str(e)}")
return [] import traceback
finally: logger.error(traceback.format_exc())
# Clean up temporary script return []
if os.path.exists(script_path): \ No newline at end of file
os.remove(script_path)
\ No newline at end of file
#!/home/ygao/latency/venv/bin/python #!/home/ygao/miniconda/envs/py39env/bin/python
import cgi import cgi
import json import json
import sys import sys
......
#!/home/ygao/miniconda/envs/py39env/bin/python
"""
CGI script to test satellite data access with proper JSON serialization
"""
import cgi
import cgitb
import os
import sys
import json
from datetime import datetime, timezone, timedelta
import traceback
# Enable CGI error reporting to browser
cgitb.enable()
# Set up basic logging to a specific file
import logging
LOG_FILE = "./test_log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename=LOG_FILE,
filemode='a'
)
logger = logging.getLogger()
# Log script start and environment
logger.info("=" * 80)
logger.info("Test CGI script starting")
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"Script path: {os.path.abspath(__file__)}")
logger.info(f"Python executable: {sys.executable}")
logger.info(f"Python version: {sys.version}")
logger.info(f"User running script: {os.getenv('USER') or 'Unknown'}")
logger.info(f"Environment variables: {dict(os.environ)}")
def check_imports():
"""Test importing required modules and log results"""
modules_to_check = [
'pandas',
'sat_latency.interface'
]
results = {}
for module_name in modules_to_check:
try:
logger.info(f"Attempting to import {module_name}...")
if module_name == 'pandas':
import pandas
results[module_name] = f"Success (version: {pandas.__version__})"
elif module_name == 'sat_latency.interface':
from sat_latency.interface import satellite_data_from_filters
results[module_name] = "Success"
except Exception as e:
logger.error(f"Failed to import {module_name}: {str(e)}")
results[module_name] = f"Error: {str(e)}"
return results
def check_directory_permissions():
"""Check if the current user has access to the satellite data directory"""
data_dir = "/data/sat_latency"
# Get current user info
current_user = os.getenv('USER') or "Unknown"
try:
import pwd
uid = os.getuid()
current_user = pwd.getpwuid(uid).pw_name
except:
pass
# Check if directory exists
dir_exists = os.path.exists(data_dir)
logger.info(f"Directory exists: {dir_exists}")
# Check if it's readable
dir_readable = os.access(data_dir, os.R_OK)
logger.info(f"Directory readable: {dir_readable}")
# Get directory stats if possible
try:
dir_stat = os.stat(data_dir)
dir_mode = dir_stat.st_mode
dir_perms = oct(dir_mode & 0o777)
logger.info(f"Directory permissions: {dir_perms}")
try:
import pwd, grp
dir_owner = pwd.getpwuid(dir_stat.st_uid).pw_name
dir_group = grp.getgrgid(dir_stat.st_gid).gr_name
logger.info(f"Directory owner: {dir_owner}")
logger.info(f"Directory group: {dir_group}")
except:
pass
except Exception as e:
logger.error(f"Error getting directory stats: {str(e)}")
# Try to list directory contents
try:
dir_contents = os.listdir(data_dir)
logger.info(f"Successfully listed directory with {len(dir_contents)} items")
logger.info(f"First few items: {dir_contents[:5] if len(dir_contents) > 0 else 'empty'}")
return {
"success": True,
"user": current_user,
"exists": dir_exists,
"readable": dir_readable,
"items_count": len(dir_contents)
}
except Exception as e:
logger.error(f"Failed to list directory contents: {str(e)}")
return {
"success": False,
"user": current_user,
"exists": dir_exists,
"readable": dir_readable,
"error": str(e)
}
def run_diagnostics():
"""Run all diagnostic tests and return results"""
results = {
"timestamp": datetime.now().isoformat(),
"import_checks": None,
"directory_permissions": None,
"success": False,
"error": None
}
try:
# Check imports
results["import_checks"] = check_imports()
# Check directory permissions
results["directory_permissions"] = check_directory_permissions()
# Consider test successful if we got this far
results["success"] = True
except Exception as e:
logger.error(f"Error in diagnostics: {str(e)}")
logger.error(traceback.format_exc())
results["error"] = str(e)
return results
# Main CGI handler
if __name__ == "__main__":
try:
# Print HTTP headers
print("Content-Type: application/json")
print("Cache-Control: no-cache")
print() # Empty line after headers
# Run diagnostics
results = run_diagnostics()
# Custom JSON encoder to handle datetime objects
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
# Output JSON response using custom encoder
print(json.dumps(results, cls=DateTimeEncoder, indent=2))
except Exception as e:
# Handle any final errors
error_response = {
"success": False,
"message": f"Critical error: {str(e)}",
"error_type": type(e).__name__,
"traceback": traceback.format_exc()
}
# Log the error
logger.error(f"Critical error in main block: {str(e)}")
logger.error(traceback.format_exc())
try:
# Try to output JSON response
print(json.dumps(error_response))
except:
# If that fails, output plain text
print(f"Critical error: {str(e)}")
\ No newline at end of file
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment