#!/usr/bin/env python
"""
NOVA web service backend implementation for SAMMY execution.
.. warning::
**NOVA backend support is currently paused.** The ORNL NOVA service API
is under active development, and PLEIADES integration is on hold until
the API stabilizes. Do not rely on this backend for production use.
Status: PAUSED (as of January 2025)
Tracking: https://github.com/lanl/PLEIADES/issues/98
"""
import os
import zipfile
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional
from uuid import uuid4
from nova.galaxy import (
Connection,
Dataset,
Parameters,
Tool,
)
from pleiades.sammy.config import NovaSammyConfig
from pleiades.sammy.interface import (
EnvironmentPreparationError,
SammyExecutionError,
SammyExecutionResult,
SammyFiles,
SammyRunner,
)
from pleiades.utils.logger import loguru_logger
logger = loguru_logger.bind(name=__name__)
[docs]
class NovaConnectionError(Exception):
"""Raised when NOVA connection fails."""
pass
[docs]
class NovaSammyRunner(SammyRunner):
"""Implementation of SAMMY runner for NOVA web service."""
def __init__(self, config: NovaSammyConfig):
super().__init__(config)
self.config: NovaSammyConfig = config
self._nova: Optional[Connection] = None
self._connection: Optional[Connection] = None
self._datastore_name: Optional[str] = None
self._temp_dir: Optional[TemporaryDirectory] = None
[docs]
def prepare_environment(self, files: SammyFiles) -> None:
"""
Prepare NOVA environment and validate connection.
Args:
files: Container with paths to required input files
Raises:
EnvironmentPreparationError: If preparation fails
"""
try:
# Validate input files
files.validate()
self._connection = Connection(self.config.url, api_key=self.config.api_key)
# Create temporary directory for downloads
self._temp_dir = TemporaryDirectory()
except Exception as e:
raise EnvironmentPreparationError(f"NOVA environment preparation failed: {str(e)}")
[docs]
def execute_sammy(self, files: SammyFiles) -> SammyExecutionResult:
"""Execute SAMMY using NOVA web service."""
execution_id = str(uuid4())
start_time = datetime.now()
logger.info(f"Starting SAMMY execution {execution_id} via NOVA")
try:
# Create unique datastore
self._datastore_name = f"sammy_{execution_id}"
datastore = self._connection.datastore(self._datastore_name)
# Prepare tool and datasets
tool = Tool(self._connection, tool_id=self.config.tool_id)
# Add input files as datasets
params = Parameters()
params.add_input("inp", Dataset(str(files.input_file)))
params.add_input("par", Dataset(str(files.parameter_file)))
params.add_input("data", Dataset(str(files.data_file)))
# Run SAMMY
results = tool.run(datastore=self._datastore_name, parameters=params)
# Get console output
if self._temp_dir is None:
raise SammyExecutionError("Temporary directory is not initialized.")
output_zip = Path(self._temp_dir.name) / "sammy_outputs.zip"
# Download and extract output files
console_output = results.outputs["sammy_console_output"].get_content()
results.outputs["sammy_output_files"].download(str(output_zip))
# Extract files to output directory, handling nested structure
with zipfile.ZipFile(output_zip) as zf:
for zip_info in zf.filelist:
# Get just the filename, ignoring directory structure in ZIP
filename = Path(zip_info.filename).name
# Extract if it's a file (not directory) and starts with SAM
if not zip_info.is_dir() and (filename.startswith("SAM") or filename == "SAMMY.LPT"):
# Read the file from zip
with zf.open(zip_info) as source:
# Write to output directory
output_file = self.config.output_dir / filename
output_file.write_bytes(source.read())
logger.debug(f"Extracted {filename} to output directory")
end_time = datetime.now()
success = " Normal finish to SAMMY" in console_output
error_message = None if success else "SAMMY execution failed"
if not success:
logger.error(f"SAMMY execution failed for {execution_id}")
else:
logger.info(f"SAMMY execution completed successfully for {execution_id}")
return SammyExecutionResult(
success=success,
execution_id=execution_id,
start_time=start_time,
end_time=end_time,
console_output=console_output,
error_message=error_message,
)
except Exception as e:
logger.exception(f"NOVA execution failed for {execution_id}")
raise SammyExecutionError(f"NOVA execution failed: {str(e)}")
[docs]
def cleanup(self, files: Optional[SammyFiles] = None) -> None:
"""Clean up NOVA resources."""
logger.debug("Performing NOVA cleanup")
try:
# Remove temporary directory if it exists
if self._temp_dir is not None:
self._temp_dir.cleanup()
self._temp_dir = None
if self._connection is not None:
self._connection.close()
self._connection = None
except Exception as e:
logger.error(f"Error during cleanup: {str(e)}")
[docs]
def validate_config(self):
"""Validate the configuration."""
return self.config.validate()
if __name__ == "__main__":
# Setup paths
test_data_dir = Path(__file__).parents[4] / "tests/data/ex012"
working_dir = Path.home() / "tmp" / "sammy_nova_run"
output_dir = working_dir / "output"
try:
# Get NOVA credentials from environment
nova_url = os.environ.get("NOVA_URL")
nova_api_key = os.environ.get("NOVA_API_KEY")
if not nova_url or not nova_api_key:
raise ValueError("NOVA_URL and NOVA_API_KEY environment variables must be set")
# Create and validate config
config = NovaSammyConfig(url=nova_url, api_key=nova_api_key, working_dir=working_dir, output_dir=output_dir)
config.validate()
# Create files container
files = SammyFiles(
input_file=test_data_dir / "ex012a.inp",
parameter_file=test_data_dir / "ex012a.par",
data_file=test_data_dir / "ex012a.dat",
)
# Create and use runner
runner = NovaSammyRunner(config)
# Execute pipeline
runner.prepare_environment(files)
result = runner.execute_sammy(files)
# Process results
if result.success:
print(f"SAMMY execution successful (runtime: {result.runtime_seconds:.2f}s)")
# NOTE: DO NOT collect here as nova runner packages all output files on the server
# and pleiades download the zip archive, extract and move to output directory.
print(f"Output files available in: {output_dir}")
else:
print("SAMMY execution failed:")
print(result.error_message)
print("\nConsole output:")
print(result.console_output)
except Exception as e:
print(f"Error running SAMMY: {str(e)}")
finally:
if "runner" in locals():
runner.cleanup()