Source code for pleiades.sammy.io.lpt_manager
import re
from collections import defaultdict
from pleiades.nuclear.isotopes.models import IsotopeInfo, IsotopeMassData
from pleiades.nuclear.models import IsotopeParameters, RadiusParameters
from pleiades.sammy.results.models import FitResults, RunResults
from pleiades.utils.helper import VaryFlag
from pleiades.utils.logger import loguru_logger
logger = loguru_logger.bind(name=__name__)
[docs]
def parse_value_and_varied(s: str) -> tuple[float, bool]:
"""
Parse a value that may have a parenthesis indicating it was varied.
Returns (float_value, varied_flag)
"""
match = re.match(r"([-\d.Ee+]+)(\s*\([^)]+\))?", s)
if match:
value = float(match.group(1))
varied = match.group(2) is not None
return value, varied
raise ValueError(f"Could not parse value: {s}")
[docs]
def split_lpt_values(line: str) -> list[str]:
"""
Splits a line into values, where each value may be followed by a parenthesis group.
Example: '2.9660E+02( 4) 1.1592E-01( 5)' -> ['2.9660E+02( 4)', '1.1592E-01( 5)']
"""
return re.findall(r"[-+]?\d*\.\d+E[+-]?\d+(?:\s*\([^)]+\))?", line)
[docs]
class LptManager:
"""
LptManager is a class designed to manage and extract results from SAMMY LPT files.
These files typically contain information about nuclear resonance parameters,
broadening parameters, normalization parameters, and chi-squared results for
various fit iterations. The class provides methods to parse and process these
files, extract relevant data, and organize it into structured objects for further analysis.
Attributes:
lpt_delimiters (list): A list of delimiters used to split the LPT file content
into blocks for processing. These delimiters correspond to specific sections
in the LPT file, such as initial values, intermediate values, and new values
for resonance parameters.
Methods:
__init__(file_path: str = None, run_results: RunResults = None):
Initializes the LptManager object. If a file path is provided, the LPT file
is processed immediately. If a RunResults object is provided, it is used to
store the extracted results.
extract_isotope_info(lines, nuclear_data):
Extracts isotope information, including isotopic abundance, mass, and spin
groups, from the LPT file lines. Updates the nuclear_data.isotopes attribute
with the extracted data.
extract_radius_info(lines, nuclear_data):
Extracts effective and true radii along with spin group numbers from the LPT
file lines. Groups the extracted data by isotopes and stores it in the
radius_parameters attribute of each isotope in nuclear_data.isotopes.
extract_broadening_info(lines, physics_data):
Extracts broadening parameters, such as temperature, thickness, and radius,
from the LPT file lines. Updates the physics_data.broadening_parameters
attribute with the extracted data.
extract_normalization_info(lines, physics_data):
Extracts normalization parameters, including background coefficients, from
the LPT file lines. Updates the physics_data.normalization_parameters
attribute with the extracted data.
extract_chi_squared_info(lines, chi_squared_results):
Extracts chi-squared, reduced chi-squared, and degrees of freedom (dof)
from the LPT file lines. Updates the chi_squared_results object with the
extracted data.
extract_results_from_string(lpt_block_string: str) -> FitResults:
Processes a block of LPT file content as a string and extracts various
results, including isotope, radius, broadening, normalization, and
chi-squared information. Returns a FitResults object containing the
extracted data.
split_lpt_blocks(lpt_content: str):
Splits the LPT file content into blocks based on predefined delimiters.
Returns a list of tuples, where each tuple contains the block type and
the corresponding block text.
process_lpt_file(file_path: str, run_results: RunResults = None) -> bool:
Processes a SAMMY LPT file by reading its content, splitting it into blocks,
extracting results from each block, and storing the results in a RunResults
object. Returns True if the file was processed successfully, otherwise False.
Usage:
The LptManager class is typically used to parse SAMMY LPT files and extract
structured data for further analysis. It can be initialized with a file path
to process the file immediately or used with its methods to extract specific
types of data from LPT file content.
"""
# List of delimiters to split the LPT file content
lpt_delimiters = [
"***** INITIAL VALUES FOR PARAMETERS",
"***** INTERMEDIATE VALUES FOR RESONANCE PARAMETERS",
"***** NEW VALUES FOR RESONANCE PARAMETERS",
]
# If initialize with a filepath then start processing the file
def __init__(self, file_path: str = None, run_results: RunResults = None):
if run_results is not None:
self.run_results = run_results
else:
self.run_results = RunResults()
if file_path:
self.process_lpt_file(file_path, self.run_results)
[docs]
def extract_isotope_info(self, lines, nuclear_data):
"""Extract isotope info and update nuclear_data.isotopes."""
logger.debug("Extracting isotope information...")
for idx, line in enumerate(lines):
if line.strip().startswith("Isotopic abundance and mass for each nuclide"):
i = idx + 2 # skip header
while i < len(lines):
line_content = lines[i].strip()
if not line_content or not re.match(r"^\d+", line_content):
break # End of isotope block
# Match with optional parentheses for varied abundance
match = re.match(r"^\s*(\d+)\s+([-\d.Ee]+)(\s*\([^)]+\))?\s+([-\d.Ee]+)\s+(.+)$", line_content)
if match:
abundance_str = match.group(2)
abundance = float(abundance_str)
abundance_paren = match.group(3)
mass = float(match.group(4))
spin_group_numbers = [int(s) for s in match.group(5).split()]
# Set vary_abundance flag
vary_abundance = VaryFlag.YES if abundance_paren else VaryFlag.NO
# Create SpinGroups objects from integers
from pleiades.nuclear.models import SpinGroups
spin_groups = [
SpinGroups(spin_group_number=sg_num, excluded=False) for sg_num in spin_group_numbers
]
# Minial IsotopeMassData
mass_data_info = IsotopeMassData(atomic_mass=mass)
# Minimal IsotopeInfo
isotope_info = IsotopeInfo(atomic_number=int(round(mass)), mass_data=mass_data_info)
isotope = IsotopeParameters(
isotope_infomation=isotope_info,
abundance=abundance,
spin_groups=spin_groups,
vary_abundance=vary_abundance,
)
nuclear_data.isotopes.append(isotope)
i += 1
break
# if isotope info was found then return true
return bool(nuclear_data.isotopes)
[docs]
def extract_radius_info(self, lines, nuclear_data):
"""
Extracts effective and true radii along with spin group numbers from the LPT file lines.
The extracted data is grouped by isotopes and stored in the radius_parameters attribute
of each isotope in nuclear_data.isotopes.
The function processes lines containing radius information, identifies spin groups
associated with specific radii, and organizes them into RadiusParameters objects.
Args:
lines (list): List of strings representing the lines of the LPT file.
nuclear_data (nuclearParameters): Object containing nuclear data, including isotopes.
Updates:
nuclear_data.isotopes: Each isotope's radius_parameters attribute is populated with
a list of RadiusParameters objects, each containing effective_radius, true_radius,
and associated spin groups. The spin groups for each radius must be in the isotope
spin group.
Returns:
bool: True if radius info is found in string block
False if not
"""
logger.debug("Extracting radius information...")
radii = []
last_radii = (None, None)
for idx, line in enumerate(lines):
if "EFFECTIVE RADIUS" in line and "TRUE" in line and "SPIN GROUP" in line:
in_block = True
i = idx + 2 # skip header and blank/label line
while i < len(lines):
line_content = lines[i].strip()
if not line_content or "#" not in line_content:
break
# Try to match full line with radii and spin group
match = re.match(r"^([-\d.Ee]+)\s+([-\d.Ee]+)\s+(\d+)\s+#\s+(.+)$", line_content)
if match:
eff_radius = float(match.group(1))
true_radius = float(match.group(2))
spin_group = int(match.group(3))
channels = [int(x) for x in match.group(4).split()]
last_radii = (eff_radius, true_radius)
else:
# Try to match continuation line (just spin group and channels)
match2 = re.match(r"^(\d+)\s+#\s+(.+)$", line_content)
if match2 and all(last_radii):
spin_group = int(match2.group(1))
channels = [int(x) for x in match2.group(2).split()]
eff_radius, true_radius = last_radii
else:
break # End of block
radii.append(
{
"effective_radius": eff_radius,
"true_radius": true_radius,
"spin_group": spin_group,
"channels": channels,
}
)
i += 1
break
# For each isotope, group spin groups by (effective_radius, true_radius)
for isotope in nuclear_data.isotopes:
# Extract spin group numbers from SpinGroups objects
spin_groups_set = set(sg.spin_group_number for sg in isotope.spin_groups)
# Group spin groups by radius pair
grouped = defaultdict(list)
for entry in radii:
if entry["spin_group"] in spin_groups_set:
key = (entry["effective_radius"], entry["true_radius"])
grouped[key].append(entry["spin_group"])
# Create RadiusParameters objects for each group
isotope.radius_parameters = []
for (eff_radius, true_radius), spin_groups in grouped.items():
# Convert integer spin groups to SpinGroupChannels objects
from pleiades.nuclear.models import SpinGroupChannels
spin_group_channels = [SpinGroupChannels(group_number=sg_num, channels=[]) for sg_num in spin_groups]
temp_radius_parameters = RadiusParameters(
effective_radius=eff_radius, true_radius=true_radius, spin_groups=spin_group_channels
)
isotope.radius_parameters.append(temp_radius_parameters)
# if radius info was found then return true
return bool(radii)
[docs]
def extract_broadening_info(self, lines, physics_data):
"""
Extracts the broadening parameters from an LPT file and stores them in
physics_data.broadening_parameters. Also sets .*_varied attributes if present.
Handles both cases: with or without RADIUS field.
"""
logger.debug("Extracting broadening information...")
paramters_found = False
for idx, line in enumerate(lines):
# Look for header line
if ("TEMPERATURE" in line and "THICKNESS" in line) or (
"RADIUS" in line and "TEMPERATURE" in line and "THICKNESS" in line
):
header = line.strip().split()
next_line = lines[idx + 1].strip()
parts = split_lpt_values(next_line)
# Case with RADIUS
if "RADIUS" in header and len(parts) >= 3:
paramters_found = True
radius, radius_varied = parse_value_and_varied(parts[0])
temp, temp_varied = parse_value_and_varied(parts[1])
thick, thick_varied = parse_value_and_varied(parts[2])
physics_data.broadening_parameters.crfn = radius
physics_data.broadening_parameters.temp = temp
physics_data.broadening_parameters.thick = thick
if hasattr(physics_data.broadening_parameters, "radius_varied"):
physics_data.broadening_parameters.flag_crfn = radius_varied
if hasattr(physics_data.broadening_parameters, "temp_varied"):
physics_data.broadening_parameters.flag_temp = temp_varied
if hasattr(physics_data.broadening_parameters, "thick_varied"):
physics_data.broadening_parameters.flag_thick = thick_varied
# Case without RADIUS
elif "TEMPERATURE" in header and "THICKNESS" in header and len(parts) >= 2:
paramters_found = True
temp, temp_varied = parse_value_and_varied(parts[0])
thick, thick_varied = parse_value_and_varied(parts[1])
physics_data.broadening_parameters.temp = temp
physics_data.broadening_parameters.thick = thick
if hasattr(physics_data.broadening_parameters, "temp_varied"):
physics_data.broadening_parameters.temp_varied = temp_varied
if hasattr(physics_data.broadening_parameters, "thick_varied"):
physics_data.broadening_parameters.thick_varied = thick_varied
else:
continue
# Find DELTA-L line
for j in range(idx + 2, min(idx + 6, len(lines))):
if "DELTA-L" in lines[j]:
delta_line = lines[j + 1].strip()
delta_parts = split_lpt_values(delta_line)
if len(delta_parts) >= 3:
deltal, deltal_varied = parse_value_and_varied(delta_parts[0])
deltag, deltag_varied = parse_value_and_varied(delta_parts[1])
deltae, deltae_varied = parse_value_and_varied(delta_parts[2])
physics_data.broadening_parameters.deltal = deltal
physics_data.broadening_parameters.deltag = deltag
physics_data.broadening_parameters.deltae = deltae
if hasattr(physics_data.broadening_parameters, "deltal_varied"):
physics_data.broadening_parameters.deltal_varied = deltal_varied
if hasattr(physics_data.broadening_parameters, "deltag_varied"):
physics_data.broadening_parameters.deltag_varied = deltag_varied
if hasattr(physics_data.broadening_parameters, "deltae_varied"):
physics_data.broadening_parameters.deltae_varied = deltae_varied
break
break # Only read the first block
return bool(paramters_found)
[docs]
def extract_normalization_info(self, lines, physics_data):
"""
Extracts normalization parameters from an LPT file and stores them in
physics_data.normalization_parameters (NormalizationParameters).
"""
logger.debug("Extracting normalization information...")
parameters_found = False
for idx, line in enumerate(lines):
# Look for the normalization header
if "NORMALIZATION" in line and "BCKG" in line:
next_line = lines[idx + 1].strip()
parts = split_lpt_values(next_line)
# There should be 4 values on this line
if len(parts) >= 4:
parameters_found = True
anorm, flag_anorm = parse_value_and_varied(parts[0])
backa, flag_backa = parse_value_and_varied(parts[1])
backb, flag_backb = parse_value_and_varied(parts[2])
backc, flag_backc = parse_value_and_varied(parts[3])
# Assign to the model
norm_params = physics_data.normalization_parameters
norm_params.anorm = anorm
norm_params.flag_anorm = VaryFlag.YES if flag_anorm else VaryFlag.NO
norm_params.backa = backa
norm_params.flag_backa = VaryFlag.YES if flag_backa else VaryFlag.NO
norm_params.backb = backb
norm_params.flag_backb = VaryFlag.YES if flag_backb else VaryFlag.NO
norm_params.backc = backc
norm_params.flag_backc = VaryFlag.YES if flag_backc else VaryFlag.NO
# Look for the next background line (for backd, backf)
for j in range(idx + 2, min(idx + 6, len(lines))):
if "BCKG*EXP" in lines[j]:
bkg_line = lines[j + 1].strip()
bkg_parts = split_lpt_values(bkg_line)
if len(bkg_parts) >= 2:
backd, flag_backd = parse_value_and_varied(bkg_parts[0])
backf, flag_backf = parse_value_and_varied(bkg_parts[1])
norm_params.backd = backd
norm_params.flag_backd = VaryFlag.YES if flag_backd else VaryFlag.NO
norm_params.backf = backf
norm_params.flag_backf = VaryFlag.YES if flag_backf else VaryFlag.NO
break
break # Only read the first normalization block
return parameters_found
[docs]
def extract_chi_squared_info(self, lines, chi_squared_results):
"""
Extracts chi-squared, reduced chi-squared, and dof from LPT file lines
and fills the ChiSquaredResults object.
"""
logger.debug("Extracting chi-squared information...")
chi2_found = False
chi2 = None
reduced_chi2 = None
dof = None
for idx, line in enumerate(lines):
# Chi-squared value
match_chi2 = re.search(r"CUSTOMARY CHI SQUARED\s*=\s*([-\d.Ee+]+)", line)
if match_chi2:
chi2 = float(match_chi2.group(1))
# Reduced chi-squared value
match_red = re.search(r"CUSTOMARY CHI SQUARED DIVIDED BY NDAT\s*=\s*([-\d.Ee+]+)", line)
if match_red:
reduced_chi2 = float(match_red.group(1))
# Number of data points (dof)
match_dof = re.search(r"Number of experimental data points\s*=\s*(\d+)", line)
if match_dof:
dof = int(match_dof.group(1))
# If all values were found, set boolean flag to true
if chi2 is not None and reduced_chi2 is not None and dof is not None:
chi_squared_results.chi_squared = chi2
chi_squared_results.reduced_chi_squared = reduced_chi2
chi_squared_results.dof = dof
chi2_found = True
return chi2_found
[docs]
def extract_results_from_string(self, lpt_block_string: str) -> FitResults:
fit_results = FitResults()
lines = lpt_block_string.splitlines()
# Call each extraction function in the order you want
isotpe_results_found = self.extract_isotope_info(lines, fit_results.nuclear_data)
if not isotpe_results_found:
logger.info("Isotope results not found.")
radius_results_found = self.extract_radius_info(lines, fit_results.nuclear_data)
if not radius_results_found:
logger.info("Radius results not found.")
broadening_results_found = self.extract_broadening_info(lines, fit_results.physics_data)
if not broadening_results_found:
logger.info("Broadening results not found.")
normalization_results_found = self.extract_normalization_info(lines, fit_results.physics_data)
if not normalization_results_found:
logger.info("Normalization results not found.")
chi_squared_results_found = self.extract_chi_squared_info(lines, fit_results.chi_squared_results)
if not chi_squared_results_found:
logger.info("Chi-squared results not found.")
return fit_results
[docs]
def split_lpt_blocks(self, lpt_content: str):
"""
Splits the LPT file content into blocks for each fit iteration.
Returns a list of (block_type, block_text) tuples.
"""
# Define the delimiters
initial = "***** INITIAL VALUES FOR PARAMETERS"
intermediate = "***** INTERMEDIATE VALUES FOR RESONANCE PARAMETERS"
new = "***** NEW VALUES FOR RESONANCE PARAMETERS"
pattern = f"({re.escape(initial)}|{re.escape(intermediate)}|{re.escape(new)})"
matches = list(re.finditer(pattern, lpt_content))
blocks = []
for i, match in enumerate(matches):
start = match.start()
block_type = match.group(0)
# Determine end of block
if i + 1 < len(matches):
end = matches[i + 1].start()
else:
end = len(lpt_content)
block_text = lpt_content[start:end]
blocks.append((block_type, block_text))
return blocks
[docs]
def process_lpt_file(self, file_path: str, run_results: RunResults = None) -> bool:
"""
Process a SAMMY LPT file into blocks of iteration results and store them in a
RunResults object. This function reads the .LPT file, extracts the results of each
fit iteration, stores them in a tempurary FitResults objects, and then appends them to a
RunResults object which is returned.
The RunResults object can be used to access the results of all iterations.
Args:
file_path (str): Path to the .LPT file.
"""
if run_results is None:
raise ValueError("A RunResults object must be provided to process_lpt_file.")
try:
with open(file_path, "r") as file:
lpt_content = file.read()
# Log that the file was read successfully
logger.info(f"Successfully read the file: {file_path}")
except FileNotFoundError:
logger.error(f"File not found: {file_path}")
return False
except Exception as e:
logger.error(f"An error occurred: {e}")
return False
# Split the content into blocks based on the delimiter
blocks = self.split_lpt_blocks(lpt_content)
logger.debug(f"Split LPT content into {len(blocks)} blocks.")
for block_type, block_text in blocks:
# Extract results from the block
fit_results = self.extract_results_from_string(block_text)
# Append the fit results to the RunResults object
run_results.add_fit_result(fit_results)