Source code for polaris.ocean.model.ocean_model_step

import importlib.resources as imp_res
import os
from types import ModuleType
from typing import Any, Dict, List, Optional, Tuple, Union

from ruamel.yaml import YAML

from polaris.model_step import ModelStep
from polaris.ocean.conservation import (
    compute_total_energy,
    compute_total_mass,
    # compute_total_mass_nonboussinesq, # Add when Omega EOS is used
    compute_total_salt,
    compute_total_tracer,
)
from polaris.tasks.ocean import Ocean

OptionValue = Union[str, int, float, bool]
MapSectionKey = Union[str, List[str]]
# in principle, any number of levels but 4 seems sufficient for now
ConfigsType = Dict[
    str,
    Union[
        Dict[str, OptionValue],
        Dict[str, Dict[str, OptionValue]],
        Dict[str, Dict[str, Dict[str, OptionValue]]],
        Dict[str, Dict[str, Dict[str, Dict[str, OptionValue]]]],
    ],
]


[docs] class OceanModelStep(ModelStep): """ An Omega or MPAS-Ocean step Attributes ---------- dynamic_ntasks : bool Whether the target and minimum number of MPI tasks (``ntasks`` and ``min_tasks``) are computed dynamically from the number of cells in the mesh config_map : dict A nested dictionary that maps from MPAS-Ocean to Omega model config options graph_target : str The name of the graph partition file to link to (relative to the base working directory) """ # make sure component is of type Ocean component: Ocean
[docs] def __init__( self, component: Ocean, name: str, subdir: Optional[str] = None, indir: Optional[str] = None, ntasks: Optional[int] = None, min_tasks: Optional[int] = None, openmp_threads: Optional[int] = None, max_memory: Optional[int] = None, cached: bool = False, yaml: Optional[str] = None, update_pio: bool = True, update_eos: bool = False, make_graph: bool = False, mesh_filename: Optional[str] = None, partition_graph: bool = True, graph_target: Optional[str] = None, ) -> None: """ Make a step for running the model Parameters ---------- component : polaris.Component The component the step belongs to name : str The name of the step subdir : str, optional the subdirectory for the step. If neither this nor ``indir`` are provided, the directory is the ``name`` indir : str, optional the directory the step is in, to which ``name`` will be appended ntasks : int, optional the target number of tasks the step would ideally use. If too few cores are available on the system to accommodate the number of tasks and the number of cores per task, the step will run on fewer tasks as long as as this is not below ``min_tasks`` min_tasks : int, optional the number of tasks the step requires. If the system has too few cores to accommodate the number of tasks and cores per task, the step will fail openmp_threads : int, optional the number of OpenMP threads to use max_memory : int, optional the amount of memory that the step is allowed to use in MB. This is currently just a placeholder for later use with task parallelism cached : bool, optional Whether to get all of the outputs for the step from the database of cached outputs for this component update_pio : bool, optional Whether to modify the namelist so the number of PIO tasks and the stride between them is consistent with the number of nodes and cores (one PIO task per node). update_eos : bool, optional Whether to modify the namelist so the equation of state is consistent with config options. make_graph : bool, optional Whether to make a graph file from the given MPAS mesh file. If ``True``, ``mesh_filename`` must be given. mesh_filename : str, optional The name of an MPAS mesh file to use to make the graph file partition_graph : bool, optional Whether to partition the domain for the requested number of cores. If so, the partitioning executable is taken from the ``partition`` option of the ``[executables]`` config section. graph_target : str, optional The graph file name (relative to the base work directory). If none, it will be created. """ if graph_target is None: self.make_graph = True super().__init__( component=component, name=name, subdir=subdir, indir=indir, ntasks=ntasks, min_tasks=min_tasks, openmp_threads=openmp_threads, max_memory=max_memory, cached=cached, yaml=yaml, update_pio=update_pio, make_graph=make_graph, mesh_filename=mesh_filename, partition_graph=partition_graph, graph_filename='graph.info', ) self.dynamic_ntasks = ntasks is None and min_tasks is None self.config_map: Union[ None, List[Dict[str, Dict[MapSectionKey, str]]] ] = None self.graph_target = graph_target self.update_eos = update_eos
[docs] def setup(self) -> None: """ Determine if we will make yaml files or namelists and streams files, then, determine the number of MPI tasks to use based on the estimated mesh size """ config = self.config model = config.get('ocean', 'model') if model == 'omega': self.make_yaml = True self.config_models = ['ocean', 'Omega'] self.yaml = 'omega.yml' self.streams_section = 'IOStreams' self._read_config_map() self.partition_graph = False elif model == 'mpas-ocean': self.config_models = ['ocean', 'mpas-ocean'] self.make_yaml = False self.add_input_file( filename='graph.info', work_dir_target=self.graph_target ) self.streams_section = 'streams' else: raise ValueError(f'Unexpected ocean model: {model}') self.dynamic_ntasks = self.ntasks is None and self.min_tasks is None if self.dynamic_ntasks: self._update_ntasks() super().setup()
def dynamic_model_config(self, at_setup: bool) -> None: """ Add model config options, namelist, streams and yaml files using config options or template replacements that need to be set both during step setup and at runtime Parameters ---------- at_setup : bool Whether this method is being run during setup of the step, as opposed to at runtime """ super().dynamic_model_config(at_setup) if self.update_eos: self.update_namelist_eos()
[docs] def constrain_resources(self, available_cores: Dict[str, Any]) -> None: """ Update the number of MPI tasks to use based on the estimated mesh size """ if self.dynamic_ntasks: self._update_ntasks() super().constrain_resources(available_cores)
[docs] def compute_cell_count(self) -> Optional[int]: """ Compute the approximate number of cells in the mesh, used to constrain resources Returns ------- cell_count : int or None The approximate number of cells in the mesh """ return None
[docs] def map_yaml_options( self, options: Dict[str, OptionValue], config_model: Optional[str], ) -> Dict[str, OptionValue]: """ A mapping between model config options from MPAS-Ocean to Omega Parameters ---------- options : dict A dictionary of yaml options and value to use as replacements for existing values config_model : str or None If config options are available for multiple models, the model that the config options are from Returns ------- options : dict A revised dictionary of yaml options and value to use as replacements for existing values """ config = self.config model = config.get('ocean', 'model') if model == 'omega' and config_model == 'ocean': options = self._map_mpaso_to_omega_options(options) return options
[docs] def map_yaml_configs( self, configs: ConfigsType, config_model: Optional[str], ) -> ConfigsType: """ A mapping between model sections and config options from MPAS-Ocean to Omega Parameters ---------- configs : dict A nested dictionary of yaml sections, options and value to use as replacements for existing values config_model : str or None If config options are available for multiple models, the model that the config options are from Returns ------- configs : dict A revised nested dictionary of yaml sections, options and value to use as replacements for existing values """ config = self.config model = config.get('ocean', 'model') if model == 'omega' and config_model == 'ocean': configs = self._map_mpaso_to_omega_configs(configs) return configs
def add_namelist_file( self, package: Union[str, ModuleType], namelist: str, ) -> None: """ Add a file with updates to namelist options to the step to be parsed when generating a complete namelist file if and when the step gets set up. Parameters ---------- package : Package The package name or module object that contains ``namelist`` namelist : str The name of the namelist replacements file to read from """ raise ValueError( 'Input namelist files are not supported in OceanModelStep' ) def add_streams_file( self, package: Union[str, ModuleType], streams: str, template_replacements: Optional[Dict[str, Any]] = None, ) -> None: """ Add a streams file to the step to be parsed when generating a complete streams file if and when the step gets set up. Parameters ---------- package : Package The package name or module object that contains the streams file streams : str The name of the streams file to read from template_replacements : dict, optional A dictionary of replacements, in which case ``streams`` must be a Jinja2 template to be rendered with these replacements """ raise ValueError( 'Input streams files are not supported in OceanModelStep' ) def update_namelist_eos(self) -> None: """ Modify the namelist to make it consistent with eos config options """ config = self.config section = config['ocean'] eos_type = section.get('eos_type') eos_linear_alpha = section.getfloat('eos_linear_alpha') eos_linear_beta = section.getfloat('eos_linear_beta') eos_linear_rhoref = section.getfloat('eos_linear_rhoref') eos_linear_Tref = section.getfloat('eos_linear_Tref') eos_linear_Sref = section.getfloat('eos_linear_Sref') replacements = { 'config_eos_type': eos_type, 'config_eos_linear_alpha': eos_linear_alpha, 'config_eos_linear_beta': eos_linear_beta, 'config_eos_linear_densityref': eos_linear_rhoref, 'config_eos_linear_Tref': eos_linear_Tref, 'config_eos_linear_Sref': eos_linear_Sref, } self.add_model_config_options( options=replacements, config_model='ocean' )
[docs] def check_properties(self): checked = False success = True if self.work_dir is None: raise ValueError( 'The work directory must be set before the step ' 'output properties can be checked.' ) passed_properties = [] failed_properties = [] for filename, properties in self.properties_to_check.items(): filename = str(filename) mesh_filename = os.path.join(self.work_dir, 'mesh.nc') this_filename = os.path.join(self.work_dir, filename) ds_mesh = self.component.open_model_dataset(mesh_filename) ds = self.component.open_model_dataset(this_filename) if 'tracer conservation' in properties: # All tracers in mpaso_to_omega.yaml tracers_to_check = [ 'temperature', 'salinity', 'tracer1', 'tracer2', 'tracer3', ] # Expand 'tracer conservation' into list of tracers to check properties = [ item for item in properties if item != 'tracer conservation' ] for tracer in tracers_to_check: if tracer in ds.keys(): properties.append(f'tracer conservation-{tracer}') for output_property in properties: if output_property == 'mass conservation': tol = self.config.getfloat( 'ocean', 'mass_conservation_tolerance' ) # Add when Omega EOS is used # if self.config.get('ocean', 'model') == 'omega': # relative_error = self._compute_rel_err( # compute_total_mass_nonboussinesq, ds_mesh, ds # ) # else: relative_error = self._compute_rel_err( compute_total_mass, ds_mesh=ds_mesh, ds=ds ) elif output_property == 'salt conservation': tol = self.config.getfloat( 'ocean', 'salt_conservation_tolerance' ) relative_error = self._compute_rel_err( compute_total_mass, ds_mesh, ds ) relative_error = self._compute_rel_err( compute_total_salt, ds_mesh, ds ) elif output_property.split('-')[0] == 'tracer conservation': tol = self.config.getfloat( 'ocean', 'tracer_conservation_tolerance' ) tracer = output_property.split('-')[1] relative_error = self._compute_rel_err( compute_total_tracer, ds_mesh, ds, tracer_name=tracer, ) elif output_property == 'energy conservation': tol = self.config.getfloat( 'ocean', 'energy_conservation_tolerance' ) relative_error = self._compute_rel_err( compute_total_energy, ds_mesh, ds ) else: raise ValueError( 'Could not find method to execute property check ' f'{output_property}' ) result = relative_error < tol success = success and result checked = True # We already appended log strings for tracer conservation if output_property != 'tracer conservation': if not result: failed_properties.append( f'{output_property} relative error ' f'{relative_error:.3e} exceeds {tol}' ) else: passed_properties.append( f'{output_property} relative error ' f'{relative_error:.3e}' ) if checked and success: log_filename = os.path.join( self.work_dir, 'property_check_passed.log' ) passed_properties_str = '\n '.join(passed_properties) with open(log_filename, 'w') as result_log_file: result_log_file.write( f'Output file {filename} passed property checks.\n' f'{passed_properties_str}\n' ) elif checked and not success: log_filename = os.path.join( self.work_dir, 'property_check_failed.log' ) failed_properties_str = '\n '.join(failed_properties) with open(log_filename, 'w') as result_log_file: result_log_file.write( f'Property checks on {filename} failed for:\n ' f'{failed_properties_str}\n' ) return checked, success
def _update_ntasks(self) -> None: """ Update ``ntasks`` and ``min_tasks`` for the step based on the estimated mesh size """ config = self.config cell_count = self.compute_cell_count() if cell_count is None: raise ValueError( 'ntasks and min_tasks were not set explicitly ' 'but they also cannot be computed because ' 'compute_cell_count() does not appear to have ' 'been overridden.' ) goal_cells_per_core = config.getfloat('ocean', 'goal_cells_per_core') max_cells_per_core = config.getfloat('ocean', 'max_cells_per_core') # machines (e.g. Perlmutter) seem to be happier with ntasks that # are multiples of 4 # ideally, about 200 cells per core self.ntasks = max(1, 4 * round(cell_count / (4 * goal_cells_per_core))) # In a pinch, about 2000 cells per core self.min_tasks = max( 1, 4 * round(cell_count / (4 * max_cells_per_core)) ) def _read_config_map(self) -> None: """ Read the map from MPAS-Ocean to Omega config options """ package = 'polaris.ocean.model' filename = 'mpaso_to_omega.yaml' text = imp_res.files(package).joinpath(filename).read_text() yaml_data = YAML(typ='rt') nested_dict = yaml_data.load(text) self.config_map = nested_dict['config'] def _map_mpaso_to_omega_options( self, options: Dict[str, OptionValue], ) -> Dict[str, OptionValue]: """ Map MPAS-Ocean namelist options to Omega config options """ out_options: Dict[str, OptionValue] = {} not_found = [] for mpaso_option, mpaso_value in options.items(): try: omega_option, omega_value = self._map_mpaso_to_omega_option( option=mpaso_option, value=mpaso_value ) out_options[omega_option] = omega_value except ValueError: not_found.append(mpaso_option) self._warn_not_found(not_found) return out_options def _map_mpaso_to_omega_option( self, option: str, value: OptionValue, ) -> Tuple[str, OptionValue]: """ Map MPAS-Ocean namelist option to Omega equivalent """ out_option = option found = False assert self.config_map is not None # traverse the map for entry in self.config_map: options_dict = entry['options'] for mpaso_option, omega_option in options_dict.items(): if option == mpaso_option: found = True out_option = omega_option break if found: break if not found: raise ValueError(f'No mapping found for {option}') out_option, out_value = self._map_handle_not(out_option, value) return out_option, out_value def _map_mpaso_to_omega_configs( self, configs: ConfigsType, ) -> ConfigsType: """ Map MPAS-Ocean namelist options to Omega config options """ out_configs: ConfigsType = {} not_found = [] for section, options in configs.items(): for option, mpaso_value in options.items(): if isinstance(mpaso_value, dict): raise ValueError( f'Nested sections are not supported in ' f'MPAS-Ocean configs: {section}/{option}' ) try: omega_sections, omega_option, omega_value = ( self._map_mpaso_to_omega_section_option( section=section, option=option, value=mpaso_value ) ) local_config: Dict[str, Any] = out_configs sec_str = '/'.join(omega_sections) for omega_section in omega_sections: if omega_section not in local_config: local_config[omega_section] = {} if not isinstance(local_config[omega_section], dict): raise ValueError( f'{sec_str} appears to point to a config ' f'option, not a section' ) local_config = local_config[omega_section] local_config[omega_option] = omega_value except ValueError: not_found.append(f'{sec_str}/{option}') self._warn_not_found(not_found) return out_configs def _map_mpaso_to_omega_section_option( self, section: str, option: str, value: OptionValue, ) -> Tuple[List[str], str, OptionValue]: """ Map MPAS-Ocean namelist section and option to Omega equivalent """ out_sections: List[str] = [section] out_option = option assert self.config_map is not None option_found = False # traverse the map for entry in self.config_map: section_dict = entry['section'] try: omega_section: MapSectionKey = section_dict[section] except KeyError: continue else: options_dict = entry['options'] option_found = False try: omega_option = options_dict[option] except KeyError: continue else: option_found = True # make sure out_sections is a list out_sections = ( omega_section if isinstance(omega_section, list) else [omega_section] ) out_option = omega_option break if not option_found: sec_str = ( '/'.join(section) if isinstance(section, list) else section ) raise ValueError(f'No mapping found for {sec_str}/{option}') out_option, out_value = self._map_handle_not(out_option, value) return out_sections, out_option, out_value def _compute_rel_err( self, func, ds_mesh, ds, time_index_start=0, time_index_end=-1, **kwargs, ): init_val = func(ds_mesh, ds.isel(Time=time_index_start), **kwargs) final_val = func(ds_mesh, ds.isel(Time=time_index_end), **kwargs) val_change = final_val - init_val return abs(val_change) / (final_val + 1.0) @staticmethod def _warn_not_found(not_found: List[str]) -> None: """Warn about options that were not found in the map""" if len(not_found) == 0: return print('WARNING: No Omega mapping found for these MPASO options:') for string in not_found: print(f' {string}') print() @staticmethod def _map_handle_not( option: str, value: OptionValue, ) -> Tuple[str, OptionValue]: """ Handle negation of boolean value if the option starts with "not" """ if option.startswith('not '): # a special case where we want the opposite of a boolean value option = option[4:] value = not value return option, value