import importlib.resources as imp_res
import logging
import os
import shutil
from mache import MachineInfo
from polaris.config import PolarisConfigParser
from polaris.io import download, symlink, update_permissions
from polaris.validate import compare_variables
[docs]
class Step:
"""
The base class for a step of a tasks, such as setting up a mesh,
creating an initial condition, or running the component forward in time.
The step is the smallest unit of work in polaris that can be run on its
own by a user, though users will typically run full tasks or suites.
Below, the terms "input" and "output" refer to inputs and outputs to the
step itself, not necessarily the MPAS model. In fact, the MPAS model
itself is often an input to the step.
Attributes
----------
name : str
the name of the step
component : polaris.Component
The component the step belongs to
subdir : str
the subdirectory for the step
path : str
the path within the base work directory of the step, made up of
``component``, the task's ``subdir`` and the step's ``subdir``
cpus_per_task : int, optional
the number of cores per task the step would ideally use. If
fewer cores per node are available on the system, the step will
run on all available cores as long as this is not below
``min_cpus_per_task``
min_cpus_per_task : int, optional
the number of cores per task the step requires. If the system
has fewer than this number of cores per node, the step will fail
ntasks : int, optional
the number of tasks the step would ideally use. If too few
cores are available on the system to accommodate the number of
tasks and the number of cores per task, the step will run on
fewer tasks as long as as this is not below ``min_tasks``
min_tasks : int, optional
the number of tasks the step requires. If the system has too
few cores to accommodate the number of tasks and cores per task,
the step will fail
openmp_threads : int
the number of OpenMP threads to use
max_memory : int
the amount of memory that the step is allowed to use in MB.
This is currently just a placeholder for later use with task
parallelism
input_data : list of dict
a list of dict used to define input files typically to be
downloaded to a database and/or symlinked in the work directory
inputs : list of str
a list of absolute paths of input files produced from ``input_data`` as
part of setting up the step. These input files must all exist at run
time or the step will raise an exception
outputs : list of str
a list of absolute paths of output files produced by this step (or
cached) and available as inputs to other tasks and steps. These
files must exist after the task has run or an exception will be raised
dependencies : dict of polaris.Step
A dictionary of steps that this step depends on (i.e. it can't run
until they have finished). Dependencies are used when the names of
the files produced by the dependency aren't known at setup (e.g.
because they depend on config options or data read in from files).
Under other circumstances, it is sufficient to indicate that an output
file from another step is an input of this step to establish a
dependency.
has_shared_config : bool
Whether this step uses a shared config file.
is_dependency : bool
Whether this step is the dependency of one or more other steps.
tasks : dict
The tasks this step is used in
config : polaris.config.PolarisConfigParser
Configuration options for this step, possibly shared with other tasks
and steps
machine_info : mache.MachineInfo
Information about E3SM supported machines
config_filename : str
The filename or symlink within the step where ``config`` is written to
during setup and read from during run
work_dir : str
The step's work directory, defined during setup as the combination
of ``base_work_dir`` and ``path``
base_work_dir : str
The base work directory
baseline_dir : str
Location of the same task within the baseline work directory,
for use in comparing variables and timers
validate_vars : dict of list
A list of variables for each output file for which a baseline
comparison should be performed if a baseline run has been provided. The
baseline validation is performed after the step has run.
logger : logging.Logger
A logger for output from the step
log_filename : str
At run time, the name of a log file where output/errors from the step
are being logged, or ``None`` if output is to stdout/stderr
cached : bool
Whether to get all of the outputs for the step from the database of
cached outputs for this component
run_as_subprocess : bool
Whether to run this step as a subprocess, rather than just running
it directly from the task. It is useful to run a step as a
subprocess if there is not a good way to redirect output to a log
file (e.g. if the step calls external code that, in turn, calls
additional subprocesses).
args : {list of list of str, None}
A list of lists of command-line arguments to call in parallel. Each
inner list represents a single command. All commands must use the
same parallel resources.
"""
[docs]
def __init__(self, component, name, subdir=None, indir=None,
cpus_per_task=1, min_cpus_per_task=1, ntasks=1, min_tasks=1,
openmp_threads=1, max_memory=None, cached=False,
run_as_subprocess=False):
"""
Create a new task
Parameters
----------
component : polaris.Component
The component the step belongs to
name : str
the name of the task
subdir : str, optional
the subdirectory for the step. If neither this nor ``indir``
are provided, the directory is the ``name``
indir : str, optional
the directory the step is in, to which ``name`` will be appended
cpus_per_task : int, optional
the number of cores per task the step would ideally use. If
fewer cores per node are available on the system, the step will
run on all available cores as long as this is not below
``min_cpus_per_task``
min_cpus_per_task : int, optional
the number of cores per task the step requires. If the system
has fewer than this number of cores per node, the step will fail
ntasks : int, optional
the number of tasks the step would ideally use. If too few
cores are available on the system to accommodate the number of
tasks and the number of cores per task, the step will run on
fewer tasks as long as as this is not below ``min_tasks``
min_tasks : int, optional
the number of tasks the step requires. If the system has too
few cores to accommodate the number of tasks and cores per task,
the step will fail
openmp_threads : int
the number of OpenMP threads to use
max_memory : int, optional
the amount of memory that the step is allowed to use in MB.
This is currently just a placeholder for later use with task
parallelism
cached : bool, optional
Whether to get all of the outputs for the step from the database of
cached outputs for this component
run_as_subprocess : bool
Whether to run this step as a subprocess, rather than just running
it directly from the task. It is useful to run a step as a
subprocess if there is not a good way to redirect output to a log
file (e.g. if the step calls external code that, in turn, calls
additional subprocesses).
"""
self.name = name
self.component = component
if subdir is not None:
self.subdir = subdir
elif indir is not None:
self.subdir = os.path.join(indir, name)
else:
self.subdir = name
self.cpus_per_task = cpus_per_task
self.min_cpus_per_task = min_cpus_per_task
self.ntasks = ntasks
self.min_tasks = min_tasks
self.openmp_threads = openmp_threads
self.max_memory = max_memory
self.path = os.path.join(self.component.name, self.subdir)
self.run_as_subprocess = run_as_subprocess
self.has_shared_config = False
self.config = PolarisConfigParser()
self.config_filename = ""
# child steps (or tasks) will add to these
self.input_data = list()
self.inputs = list()
self.outputs = list()
self.args = None
self.dependencies = dict()
self.is_dependency = False
self.tasks = dict()
# these will be set later during setup, dummy placeholders for now
self.machine_info = MachineInfo(machine='default')
self.work_dir = ""
self.base_work_dir = ""
# may be set during setup if there is a baseline for comparison
self.baseline_dir = None
self.validate_vars = dict()
self.setup_complete = False
# these will be set before running the step, dummy placeholders for now
self.logger = logging.getLogger('dummy')
self.log_filename = None
# output caching
self.cached = cached
[docs]
def set_resources(self, cpus_per_task=None, min_cpus_per_task=None,
ntasks=None, min_tasks=None, openmp_threads=None,
max_memory=None):
"""
Update the resources for the subtask. This can be done within init,
``setup()`` or ``runtime_setup()`` of this step, or init,
``configure()`` or ``run()`` for the task that this step belongs
to.
Parameters
----------
cpus_per_task : int, optional
the number of cores per task the step would ideally use. If
fewer cores per node are available on the system, the step will
run on all available cores as long as this is not below
``min_cpus_per_task``
min_cpus_per_task : int, optional
the number of cores per task the step requires. If the system
has fewer than this number of cores per node, the step will fail
ntasks : int, optional
the number of tasks the step would ideally use. If too few
cores are available on the system to accommodate the number of
tasks and the number of cores per task, the step will run on
fewer tasks as long as as this is not below ``min_tasks``
min_tasks : int, optional
the number of tasks the step requires. If the system has too
few cores to accommodate the number of tasks and cores per task,
the step will fail
openmp_threads : int, optional
the number of OpenMP threads to use
max_memory : int, optional
the amount of memory that the step is allowed to use in MB.
This is currently just a placeholder for later use with task
parallelism
"""
if cpus_per_task is not None:
self.cpus_per_task = cpus_per_task
if min_cpus_per_task is not None:
self.min_cpus_per_task = min_cpus_per_task
if ntasks is not None:
self.ntasks = ntasks
if min_tasks is not None:
self.min_tasks = min_tasks
if openmp_threads is not None:
self.openmp_threads = openmp_threads
if max_memory is not None:
self.max_memory = max_memory
[docs]
def constrain_resources(self, available_resources):
"""
Constrain ``cpus_per_task`` and ``ntasks`` based on the number of
cores available to this step
Parameters
----------
available_resources : dict
A dictionary containing available resources (cores, tasks, nodes
and cores_per_node)
"""
mpi_allowed = available_resources['mpi_allowed']
if not mpi_allowed and self.ntasks > 1:
raise ValueError(
'You are trying to run an MPI job on a login node.\n'
'Please switch to a compute node.')
available_cores = available_resources['cores']
cores_per_node = available_resources['cores_per_node']
self.cpus_per_task = min(self.cpus_per_task,
min(available_cores, cores_per_node))
if self.cpus_per_task < self.min_cpus_per_task:
raise ValueError(
f'Available cpus_per_task ({self.cpus_per_task}) is below the '
f'minimum of {self.min_cpus_per_task} for step {self.name}')
available_tasks = available_cores // self.cpus_per_task
self.ntasks = min(self.ntasks, available_tasks)
if self.ntasks < self.min_tasks:
raise ValueError(
f'Available number of MPI tasks ({self.ntasks}) is below the '
f'minimum of {self.min_tasks} for step {self.name}')
[docs]
def setup(self):
"""
Set up the task in the work directory, including downloading any
dependencies. The step should override this function to perform setup
operations such as generating namelist and streams files, adding inputs
and outputs.
"""
pass
[docs]
def runtime_setup(self):
"""
Update attributes of the step at runtime before calling the ``run()``
method. The most common reason to override this method is to
determine the number of cores and threads to run with. It may also
be useful for performing small (python) tasks such as creating
graph and partition files before running a parallel command. When
running with parallel tasks in the future, this method will be called
for each step in serial before steps are run in task parallel.
"""
pass
[docs]
def run(self):
"""
Run the step. Every child class must override this method to perform
the main work.
"""
pass
[docs]
def add_output_file(self, filename, validate_vars=None):
"""
Add the output file that must be produced by this step and may be made
available as an input to steps, perhaps in other tasks. This file
must exist after the task has run or an exception will be raised.
Optionally, a list of variables can be provided for validation against
a baseline (if one is provided), once the step has been run.
Parameters
----------
filename : str
The relative path of the output file within the step's work
directory
validate_vars : list, optional
A list of variable names to compare with a baseline (if one is
provided)
"""
self.outputs.append(filename)
if validate_vars is not None:
self.validate_vars[filename] = validate_vars
[docs]
def add_dependency(self, step, name=None):
"""
Add `step` as a dependency of this step (i.e. this step can't run
until the dependency has finished). A dependency should be used when
the names of the files produced by the dependency aren't known at setup
(e.g. because they depend on config options or data read in from
files). Under other circumstances, a dependency can be established by
indicating that an output (added with the ``add_output_file()`` method)
from another step is an input (added with ``add_input_file()``) of this
step .
Parameters
----------
step : polaris.Step
The step that is a dependency
name : str, optional
The name of the step used to access it in the ``dependencies``
dictionary. By default, it is ``step.name`` but another name may
be required if 2 dependencies have the same ``step.name``.
"""
if name is None:
name = step.name
if name in self.dependencies:
raise ValueError('Adding a dependency that is already in '
'dependencies.')
self.dependencies[name] = step
step.is_dependency = True
step.add_output_file('step_after_run.pickle')
filename = f'dependencies/{name}_after_run.pickle'
target = f'{step.path}/step_after_run.pickle'
self.add_input_file(filename=filename, work_dir_target=target)
[docs]
def validate_baselines(self):
"""
Compare variables between output files in this step and in the same
step from a baseline run if one was provided.
Returns
-------
compared : bool
Whether a baseline comparison was performed
success : bool
Whether the outputs were successfully validated against a baseline
"""
if self.work_dir is None:
raise ValueError('The work directory must be set before the step '
'outputs can be validated against baselines.')
compared = False
success = True
if self.baseline_dir is not None:
for filename, variables in self.validate_vars.items():
logger = self.logger
filename = str(filename)
this_filename = os.path.join(self.work_dir, filename)
baseline_filename = os.path.join(self.baseline_dir, filename)
result = compare_variables(
variables, this_filename, baseline_filename, logger=logger)
success = success and result
compared = True
return compared, success
[docs]
def set_shared_config(self, config, link=None):
"""
Replace the step's config parser with the shared config parser
Parameters
----------
config : polaris.config.PolarisConfigParser
A shared config parser whose ``filepath`` attribute must have been
set
link : str, optional
A link to the shared config file to go in the step's work
directory. If not provided, the config file itself must be in
the step's work directory
"""
self.has_shared_config = True
self._set_config(config=config, link=link)
def process_inputs_and_outputs(self):
"""
Process the inputs to and outputs from a step added with
:py:meth:`polaris.Step.add_input_file` and
:py:meth:`polaris.Step.add_output_file`. This includes downloading
files, making symlinks, and converting relative paths to absolute
paths.
"""
component = self.component.name
step_dir = self.work_dir
config = self.config
base_work_dir = self.base_work_dir
# process the outputs first because cached outputs will add more inputs
if self.cached:
# forget about the inputs -- we won't used them, but we will add
# the cached outputs as inputs
self.input_data = list()
for output in self.outputs:
filename = os.path.join(self.path, output)
if filename not in self.component.cached_files:
raise ValueError(f'The file {filename} has not been added '
f'to the cache database')
target = self.component.cached_files[filename]
self.add_input_file(
filename=output,
target=target,
database='polaris_cache')
inputs = []
databases_with_downloads = set()
for entry in self.input_data:
input_file, database_subdirs = Step._process_input(
entry, config, base_work_dir, component, step_dir)
if database_subdirs is not None:
databases_with_downloads.update(database_subdirs)
inputs.append(input_file)
self.inputs = inputs
if len(databases_with_downloads) > 0 and \
config.has_option('e3sm_unified', 'group'):
group = config.get('e3sm_unified', 'group')
update_permissions(databases_with_downloads, group)
# inputs are already absolute paths, convert outputs to absolute paths
self.outputs = [os.path.abspath(os.path.join(step_dir, filename)) for
filename in self.outputs]
def _set_config(self, config, link=None):
"""
Replace the step's config parser with the shared config parser
Parameters
----------
config : polaris.config.PolarisConfigParser
A shared config parser whose ``filepath`` attribute must have been
set
link : str, optional
A link to the shared config file to go in the step's work
directory. If not provided, the config file itself must be in
the step's work directory
"""
self.component.add_config(config)
self.config = config
if link is None:
directory, basename = os.path.split(config.filepath)
if directory != self.subdir:
raise ValueError('No link parameter was provided but the '
'config file is not in this step\'s work '
'directory.')
self.config_filename = basename
else:
self.config_filename = link
@staticmethod
def _process_input(entry, config, base_work_dir, component, step_dir):
database_subdirs = None
filename = entry['filename']
target = entry['target']
database = entry['database']
database_component = entry['database_component']
url = entry['url']
work_dir_target = entry['work_dir_target']
package = entry['package']
copy = entry['copy']
if package is not None:
if target is None:
target = filename
target = str(imp_res.as_file(
imp_res.files(package).joinpath(target)))
if work_dir_target is not None:
target = os.path.join(base_work_dir, work_dir_target)
if target is not None:
download_target = target
else:
download_target = filename
download_path = None
if database is not None:
# we're downloading a file to a cache of a database (if it's
# not already there.
if database_component is None:
database_component = component
if url is None:
base_url = config.get('download', 'server_base_url')
url = f'{base_url}/{database_component}/{database}/{target}'
database_root = config.get('paths', 'database_root')
download_path = os.path.join(database_root, database_component,
database, download_target)
if not os.path.exists(download_path):
database_subdirs = {
database_root,
os.path.join(database_root, database_component),
os.path.join(database_root, database_component, database)
}
elif url is not None:
download_path = download_target
if url is not None:
download_target = download(url, download_path, config)
if target is not None:
# this is the absolute path that we presumably want
target = download_target
if target is not None:
filepath = os.path.join(step_dir, filename)
dirname = os.path.dirname(filepath)
if copy:
shutil.copy(target, filepath)
else:
try:
os.makedirs(dirname)
except FileExistsError:
pass
symlink(target, filepath)
input_file = os.path.join(dirname, target)
else:
input_file = os.path.join(step_dir, filename)
input_file = os.path.abspath(input_file)
return input_file, database_subdirs