import argparse
import importlib.resources as imp_res
from collections import OrderedDict
from typing import Dict
from jinja2 import Template
from lxml import etree
from ruamel.yaml import YAML
[docs]
class PolarisYaml:
"""
A class for reading writing and combining config files in yaml format
(e.g. as used in Omega).
Attributes
----------
configs : dict
Nested dictionaries containing config options
streams : dict
Nested dictionaries containing data about streams
streams_section : str
The name of the streams section
model : str
The name of the E3SM component
"""
[docs]
def __init__(self):
"""
Create a yaml config object
"""
self.configs = dict()
self.streams_section = 'streams'
self.streams = dict()
self.model = None
[docs]
@classmethod
def read(
cls,
filename,
package=None,
replacements=None,
model=None,
streams_section='streams',
):
"""
Add config options from a yaml file
Parameters
----------
filename : str
A template yaml config file
package : str, optional
The name of a package the filename is found in
replacements : dict, optional
A dictionary of replacements, which, if provided, is used to
replace Jinja variables and the yaml file is assumed do be a Jinja
template
model : str, optional
The name of the model to parse if the yaml file might have multiple
models
streams_section : str, optional
The name of the streams section
Returns
-------
yaml : polaris.yaml.PolarisYaml
A yaml object read in from the given file (and optionally package)
"""
# read the text from a file (possibly in a package)
if package is not None:
text = imp_res.files(package).joinpath(filename).read_text()
else:
with open(filename, 'r') as infile:
text = infile.read()
# if this is a jinja template, render the template with the
# replacements
if replacements is not None:
template = Template(text)
text = template.render(**replacements)
yaml = cls()
yaml.streams_section = streams_section
yaml_data = YAML(typ='rt')
configs = yaml_data.load(text)
if model is None:
keys = list(configs)
if len(keys) > 1:
raise ValueError(
f'Config yaml file contains unexpected sections: \n '
f'{keys[1:]}'
)
model = keys[0]
yaml.model = model
yaml.streams = {}
if model in configs:
configs = configs[model]
if streams_section in configs:
yaml.streams = configs[streams_section]
configs = dict(configs)
configs.pop(streams_section)
else:
configs = {}
yaml.configs = configs
return yaml
[docs]
def update(self, configs=None, options=None, quiet=True):
"""
Add config options from a dictionary
Parameters
----------
configs : dict, optional
A nested dictionary of config sections, options and values
options : dict, optional
A flat dictionary of options and values
quiet : bool, optional
Whether or not to print the updated config options as they are
replaced
"""
if configs is not None:
if self.model in configs:
# we want one layer deeper
configs = configs[self.model]
_update_section(configs, self.configs, quiet)
if options is not None:
_update_options(options, self.configs, quiet)
[docs]
def write(self, filename):
"""
Write config options to a yaml file
Parameters
----------
filename : str
A yaml config file
"""
yaml = YAML(typ='rt')
configs = dict(self.configs)
if self.streams:
configs[self.streams_section] = self.streams
model_configs = dict()
model_configs[self.model] = configs
with open(filename, 'w') as outfile:
yaml.dump(model_configs, outfile)
[docs]
def mpas_namelist_and_streams_to_yaml(
model, namelist_template=None, namelist=None, streams=None
):
"""
Add config options from a yaml file
Parameters
----------
model : str
The name of the model
namelist_template : str
An MPAS namelist template file
namelist : str, optional
An MPAS namelist file
streams : str, optional
An MPAS streams file
Returns
-------
yaml : polaris.yaml.PolarisYaml
A yaml object with the namelists and streams
"""
yaml = PolarisYaml()
yaml.model = model
if namelist is not None:
yaml.configs = _read_namelist(namelist_template, namelist)
if streams is not None:
yaml.streams = _streams_xml_to_dict(streams)
return yaml
[docs]
def main_mpas_to_yaml():
parser = argparse.ArgumentParser(
description='Convert a namelist and/or streams file to yaml'
)
parser.add_argument(
'-n',
'--namelist',
dest='namelist',
required=False,
help='MPAS namelist file',
)
parser.add_argument(
'-s',
'--streams',
dest='streams',
required=False,
help='MPAS streams file',
)
parser.add_argument(
'-t',
'--namelist_template',
dest='namelist_template',
required=False,
help='MPAS namelist template file (with all namelist '
'options). For MPAS-Ocean, this will typically be'
' ${PATH_TO_MPASO}/default_inputs/'
'namelist.ocean.forward',
)
parser.add_argument(
'-y', '--yaml', dest='yaml', required=True, help='Output yaml file'
)
parser.add_argument(
'-m',
'--model',
dest='model',
default='mpas-ocean',
help='Model name for the yaml',
)
args = parser.parse_args()
yaml = mpas_namelist_and_streams_to_yaml(
model=args.model,
namelist_template=args.namelist_template,
namelist=args.namelist,
streams=args.streams,
)
yaml.write(args.yaml)
[docs]
def yaml_to_mpas_streams(processed_registry_filename, yaml):
"""
Add config options from a yaml file
Parameters
----------
processed_registry_filename : str
The processed registry file, used to determine the types of variables
each steam (since the yaml format doesn't supply that information).
yaml : polaris.yaml.PolarisYaml
A yaml object with the namelists and streams
Returns
-------
tree : lxml.etree
A tree of XML data describing MPAS i/o streams with the content from
the streams in the yaml file
"""
with open(processed_registry_filename, 'r') as reg_file:
registry_string = reg_file.read()
registry_string = registry_string.lstrip('\n')
registry = etree.fromstring(registry_string)
root = etree.Element('streams')
for stream in yaml.streams:
# find out if stream or immutable_stream
tag = _get_stream_tag(registry, stream)
attrs = dict(yaml.streams[stream])
contents = None
if 'contents' in attrs:
contents = attrs.pop('contents')
attrs['name'] = stream
child = etree.SubElement(root, tag, attrib=attrs)
if contents is not None:
for var in contents:
# find out what type it has
tag = _get_var_tag(registry, var)
etree.SubElement(child, tag, attrib=dict(name=var))
tree = etree.ElementTree(element=root)
return tree
def _update_section(src, dst, quiet, print_section=None):
"""
Recursively update config options in a section of a config from a source
section to the associate destination dictionary
"""
for name in src:
if isinstance(src[name], (dict, OrderedDict)):
if print_section is not None:
print_subsection = f'{print_section}: {name}'
else:
print_subsection = name
if name not in dst:
raise ValueError(
f'Attempting to modify config options to a '
f'nonexistent config\n'
f'(sub)section: {print_subsection}'
)
# this is a subsection
src_sub = src[name]
dst_sub = dst[name]
_update_section(src_sub, dst_sub, quiet, print_subsection)
else:
if name not in dst:
raise ValueError(
f'Attempting to modify a nonexistent config '
f'options: {print_section}: {name}'
)
if not quiet:
print(f' {print_section}: {name} = {src[name]}')
dst[name] = src[name]
def _update_options(src, dst, quiet):
"""
Update config options by searching in the destination nested dictionary
"""
for name in src:
success = _update_option(name, src[name], dst, quiet)
if not success:
raise ValueError(
f'Attempting to modify a nonexistent config options: {name}'
)
def _update_option(option, value, dst, quiet, print_section=None):
"""
Recursively attempt to find and replace the value of the
given option
"""
for name in dst:
if isinstance(dst[name], (dict, OrderedDict)):
if print_section is not None:
print_subsection = f'{print_section}: {name}'
else:
print_subsection = name
success = _update_option(
option, value, dst[name], quiet, print_subsection
)
if success:
return True
elif name == option:
dst[name] = value
if not quiet:
print(f' {print_section}: {name} = {value}')
return True
return False
def _read_namelist(namelist_template, namelist_filename):
"""Read the defaults file"""
record_map = _read_namelist_template(namelist_template)
with open(namelist_filename, 'r') as f:
lines = f.readlines()
namelist: Dict[str, Dict[str, int | float | bool | str]] = dict()
for line in lines:
_, opt, value = _read_namelist_line(line)
if opt is not None and value is not None:
record = record_map[opt]
if record not in namelist:
namelist[record] = dict()
namelist[record][opt] = value
return namelist
def _read_namelist_template(namelist_template):
"""Read the defaults file"""
with open(namelist_template, 'r') as f:
lines = f.readlines()
record_map: Dict[str, str] = dict()
record = None
for line in lines:
new_record, opt, _ = _read_namelist_line(line)
if new_record is not None:
record = new_record
elif opt is not None and record is not None:
record_map[opt] = record
return record_map
def _read_namelist_line(line):
record = None
opt = None
value: int | float | bool | str | None = None
if '&' in line:
record = line.strip('&').strip('\n').strip()
elif '=' in line:
opt, val = line.strip('\n').split('=')
opt = opt.strip()
str_value = val.strip().strip('"').strip("'").strip()
try:
value = int(str_value)
except ValueError:
try:
value = float(str_value)
except ValueError:
if str_value in ['true', '.true.']:
value = True
elif str_value in ['false', '.false.']:
value = False
if value is None:
value = str_value
return record, opt, value
def _streams_xml_to_dict(streams_filename):
"""Convert a streams XML file to nested dictionaries"""
streams: Dict[str, Dict[str, str | list]] = dict()
tree = etree.parse(streams_filename)
xml_streams = next(tree.iter('streams'))
for child in xml_streams:
if child.tag not in ['stream', 'immutable_stream']:
raise ValueError(
f'Unexpected tag {child.tag} instead of stream or'
f'immutable stream'
)
stream_name = child.attrib['name']
streams[stream_name] = dict()
for attr, value in child.attrib.items():
if attr != 'name':
streams[stream_name][attr] = value
contents = list()
for grandchild in child:
if grandchild.tag == 'var':
contents.append(grandchild.attrib['name'])
elif grandchild.tag == 'var_struct':
contents.append(grandchild.attrib['name'])
elif grandchild.tag == 'var_array':
contents.append(grandchild.attrib['name'])
elif grandchild.tag == 'stream':
contents.append(grandchild.attrib['name'])
else:
raise ValueError(f'Unexpected tag {grandchild.tag}')
if len(contents) > 0:
streams[stream_name]['contents'] = contents
return streams
def _get_stream_tag(registry, stream):
"""Get the xml tag, 'stream' or 'immutable_stream' for a given stream"""
streams = next(next(registry.iter('registry')).iter('streams'))
# if we don't find the stream, it can't be an immutable stream
tag = 'stream'
for child in streams:
if child.tag == 'stream' and child.attrib['name'] == stream:
if (
'immutable' in child.attrib
and child.attrib['immutable'] == 'true'
):
tag = 'immutable_stream'
break
return tag
def _get_var_tag(registry, variable):
"""
Get the xml tag -- 'stream', 'var_struct', 'var_array' or 'var' -- for a
variable
"""
tag = None
streams = next(next(registry.iter('registry')).iter('streams'))
for child in streams:
if child.tag == 'stream' and child.attrib['name'] == variable:
return 'stream'
tree = next(registry.iter('registry'))
for child in tree:
if child.tag == 'var_struct':
if child.attrib['name'] == variable:
return 'var_struct'
for grandchild in child:
if (
grandchild.tag in ['var_struct', 'var_array', 'var']
and grandchild.attrib['name'] == variable
):
return grandchild.tag
if grandchild.tag in ['var_struct', 'var_array']:
for greatgrand in grandchild:
if (
greatgrand.tag in ['var_array', 'var']
and greatgrand.attrib['name'] == variable
):
return greatgrand.tag
if tag is None:
raise ValueError(f'Could not find {variable} in preprocessed registry')
return tag