import argparse
import os
import pickle
import shutil
import sys
import warnings
from typing import Dict, List
from polaris import Task, provenance
from polaris.build.mpas_ocean import build_mpas_ocean
from polaris.build.omega import build_omega
from polaris.config import PolarisConfigParser
from polaris.io import symlink
from polaris.job import write_job_script
from polaris.machines import discover_machine
from polaris.tasks import get_components
[docs]
def setup_tasks(
work_dir,
task_list=None,
numbers=None,
config_file=None,
machine=None,
baseline_dir=None,
component_path=None,
suite_name='custom',
cached=None,
copy_executable=False,
clean_tasks=False,
model=None,
build=None,
branch=None,
clean_build=None,
quiet_build=None,
cmake_flags=None,
debug=None,
):
"""
Set up one or more tasks
Parameters
----------
work_dir : str
A directory that will serve as the base for creating task
directories
task_list : list of str, optional
Relative paths for a tasks to set up
numbers : list of str, optional
Task numbers to setup, as listed from ``polaris list``, optionally with
a suffix ``c`` to indicate that all steps in that task should be
cached
config_file : {str, None}, optional
Configuration file with custom options for setting up and running tasks
machine : str, optional
The name of one of the machines with defined config options, which can
be listed with ``polaris list --machines``
baseline_dir : str, optional
Location of baselines that can be compared to
component_path : str, optional
The relative or absolute path to the location where the model and
default namelists have been (or will be) built
suite_name : str, optional
The name of the suite if tasks are being set up through a suite or
``'custom'`` if not
cached : list of list of str, optional
For each task in ``tasks``, which steps (if any) should be cached,
or a list with "_all" as the first entry if all steps in the task
should be cached
copy_executable : bool, optional
Whether to copy the model executable to the work directory
clean_tasks : bool, optional
Whether to delete the contents of the task work directories before
setting up tasks
model : str, optional
The model to run
build : bool, optional
Whether to build the model
branch : str, optional
The relative or absolute path to the base of the component branch to
build. If not provided, ``component_path`` must point to an existing
build.
clean_build : bool, optional
Whether to clean the build directory before building the model.
``clean_build = True`` implies ``build = True``.
quiet_build : bool, optional
Whether to build the model without output. Implies ``build = True``.
cmake_flags : str, optional
Additional flags to pass to make or CMake when building the model
debug : bool, optional
Whether to build the model in debug mode
Returns
-------
tasks : dict of polaris.Task
A dictionary of tasks, with the relative path in the work
directory as keys
"""
machine = __get_machine_and_check_params(
machine, config_file, task_list, numbers, cached
)
if work_dir is None:
print(
'Warning: no base work directory was provided so setting up in '
'the current directory.'
)
work_dir = os.getcwd()
work_dir = os.path.abspath(work_dir)
all_tasks = dict()
for component in get_components():
for task in component.tasks.values():
all_tasks[task.path] = task
tasks: Dict[str, Task] = dict()
cached_steps: Dict[str, List[str]] = dict()
_add_tasks_by_number(numbers, all_tasks, tasks, cached_steps)
_add_tasks_by_name(task_list, all_tasks, cached, tasks, cached_steps)
# get the component of the first task. We'll ensure that all tasks are
# for this component
first_path = next(iter(tasks))
component = tasks[first_path].component
basic_config = _get_basic_config(
config_file=config_file,
machine=machine,
component_path=component_path,
component=component,
model=model,
build=build,
branch=branch,
cmake_flags=cmake_flags,
debug=debug,
clean_build=clean_build,
quiet_build=quiet_build,
work_dir=work_dir,
)
component.configure(basic_config, list(tasks.values()))
provenance.write(
work_dir,
tasks,
config=basic_config,
machine=machine,
baseline_dir=baseline_dir,
)
section = basic_config['build']
build = section.getboolean('build')
if build:
_build_model(
basic_config=basic_config,
component=component,
machine=machine,
)
if clean_tasks:
print('')
print('Cleaning task and step work directories:')
_clean_tasks_and_steps(tasks, work_dir)
print('')
_setup_configs(
basic_config=basic_config,
component=component,
tasks=tasks,
work_dir=work_dir,
copy_executable=copy_executable,
)
# do this after _setup_configs() in case tasks mark additional steps
# as cached in their configure() methods
_expand_and_mark_cached_steps(tasks, cached_steps)
print('Setting up tasks:')
for path, task in tasks.items():
setup_task(
path,
task,
machine,
work_dir,
baseline_dir,
cached_steps=cached_steps[path],
)
_check_dependencies(tasks)
suite = {'name': suite_name, 'tasks': tasks, 'work_dir': work_dir}
# pickle the task or step dictionary for use at runtime
pickle_file = os.path.join(suite['work_dir'], f'{suite_name}.pickle')
with open(pickle_file, 'wb') as handle:
pickle.dump(suite, handle, protocol=pickle.HIGHEST_PROTOCOL)
_symlink_load_script(work_dir)
max_cores, max_of_min_cores = _get_required_cores(tasks)
print(f'target cores: {max_cores}')
print(f'minimum cores: {max_of_min_cores}')
if machine is not None:
write_job_script(
config=basic_config,
machine=machine,
target_cores=max_cores,
min_cores=max_of_min_cores,
work_dir=work_dir,
suite=suite_name,
)
return tasks
[docs]
def setup_task(path, task, machine, work_dir, baseline_dir, cached_steps):
"""
Set up one or more tasks
Parameters
----------
path : str
Relative path for a tasks to set up
task : polaris.Task
A task to set up
machine : str
The name of one of the machines with defined config options, which can
be listed with ``polaris list --machines``
work_dir : str
A directory that will serve as the base for creating task directories
baseline_dir : str
Location of baselines that can be compared to
cached_steps : list of str
Which steps (if any) should be cached, identified by a list of
subdirectories in the component
"""
print(f' {path}')
task_dir = os.path.join(work_dir, path)
try:
os.makedirs(task_dir)
except FileExistsError:
pass
task.work_dir = task_dir
task.base_work_dir = work_dir
# add the baseline directory for this task
if baseline_dir is not None:
task.baseline_dir = os.path.join(baseline_dir, path)
if len(cached_steps) > 0:
print_steps = ' '.join(cached_steps)
print(f' steps with cached outputs: {print_steps}')
# iterate over steps
for step in task.steps.values():
_setup_step(task, step, work_dir, baseline_dir, task_dir)
# wait until we've set up all the steps before pickling because steps may
# need other steps to be set up
for step in task.steps.values():
if step.setup_complete:
# this is a shared step that has already been set up
continue
# pickle the task and step for use at runtime
pickle_filename = os.path.join(step.work_dir, 'step.pickle')
with open(pickle_filename, 'wb') as handle:
pickle.dump(step, handle, protocol=pickle.HIGHEST_PROTOCOL)
_symlink_load_script(step.work_dir)
if machine is not None:
cores = step.cpus_per_task * step.ntasks
min_cores = step.min_cpus_per_task * step.min_tasks
write_job_script(
config=step.config,
machine=machine,
target_cores=cores,
min_cores=min_cores,
work_dir=step.work_dir,
)
step.setup_complete = True
# pickle the task and step for use at runtime
pickle_filename = os.path.join(task.work_dir, 'task.pickle')
with open(pickle_filename, 'wb') as handle:
suite = {
'name': 'task',
'tasks': {task.path: task},
'work_dir': task.work_dir,
}
pickle.dump(suite, handle, protocol=pickle.HIGHEST_PROTOCOL)
_symlink_load_script(task_dir)
if machine is not None:
max_cores, max_of_min_cores = _get_required_cores({path: task})
write_job_script(
config=task.config,
machine=machine,
target_cores=max_cores,
min_cores=max_of_min_cores,
work_dir=task_dir,
)
def main():
parser = argparse.ArgumentParser(
description='Set up one or more tasks', prog='polaris setup'
)
parser.add_argument(
'-t',
'--tasks',
nargs='+',
dest='tasks',
help='Relative path for a task(s) to set up.',
metavar='PATH',
)
parser.add_argument(
'-n',
'--task_number',
nargs='+',
dest='task_num',
type=str,
help='Task number(s) to setup, as listed from '
"'polaris list'. Can be a space-separated "
"list of task numbers. A suffix 'c' indicates "
'that all steps in the task should use cached '
'outputs.',
metavar='NUM',
)
parser.add_argument(
'-f',
'--config_file',
dest='config_file',
help='Configuration file for task setup.',
metavar='FILE',
)
parser.add_argument(
'-m',
'--machine',
dest='machine',
help='The name of the machine for loading machine-'
'related config options.',
metavar='MACH',
)
parser.add_argument(
'-w',
'--work_dir',
dest='work_dir',
required=True,
help='A base directory for setting up tasks.',
metavar='PATH',
)
parser.add_argument(
'-b',
'--baseline_dir',
dest='baseline_dir',
help='Location of baselines that can be compared to.',
metavar='PATH',
)
parser.add_argument(
'-p',
'--component_path',
dest='component_path',
help='The path where the component executable and '
'default namelists have been (or will be) built.',
metavar='PATH',
)
parser.add_argument(
'--suite_name',
dest='suite_name',
default='custom',
help="The name to use for the 'custom' suite "
'containing all setup tasks.',
metavar='SUITE',
)
parser.add_argument(
'--cached',
dest='cached',
nargs='+',
help='A list of steps in a single task supplied with '
'--tasks or --task_number that should use cached '
"outputs, or '_all' if all steps should be "
'cached.',
metavar='STEP',
)
parser.add_argument(
'--copy_executable',
dest='copy_executable',
action='store_true',
help='If the model executable should be copied to the work directory.',
)
parser.add_argument(
'--clean_tasks',
dest='clean_tasks',
action='store_true',
help='If the task work directories should be deleted '
'before setting up the tasks.',
)
parser.add_argument(
'--model',
dest='model',
help="The model to run (one of 'mpas-ocean', 'omega', "
"or 'mpas-seaice')",
)
parser.add_argument(
'--build',
dest='build',
action='store_true',
help='If the model should be built.',
)
parser.add_argument(
'--branch',
dest='branch',
help='The branch of the model to build. The default is the submodule '
'associated with the model',
)
parser.add_argument(
'--clean_build',
dest='clean_build',
action='store_true',
help='If the model should be cleaned before building. Implies '
'--build.',
)
parser.add_argument(
'--quiet_build',
dest='quiet_build',
action='store_true',
help='If the model should be built without output. Implies --build.',
)
parser.add_argument(
'--cmake_flags',
dest='cmake_flags',
help='Additional flags to pass to make or CMake when building the '
'model.',
)
parser.add_argument(
'--debug',
dest='debug',
action='store_true',
help='If the model should be built in debug mode.',
)
args = parser.parse_args(sys.argv[2:])
cached = None
if args.cached is not None:
if args.tasks is not None and len(args.tasks) != 1:
raise ValueError(
'You can only cache steps for one task at at time.'
)
if args.task_num is not None and len(args.task_num) != 1:
raise ValueError(
'You can only cache steps for one task at at time.'
)
# cached is a list of lists
cached = [args.cached]
setup_tasks(
task_list=args.tasks,
numbers=args.task_num,
config_file=args.config_file,
machine=args.machine,
work_dir=args.work_dir,
baseline_dir=args.baseline_dir,
component_path=args.component_path,
suite_name=args.suite_name,
cached=cached,
copy_executable=args.copy_executable,
clean_tasks=args.clean_tasks,
model=args.model,
build=args.build,
branch=args.branch,
clean_build=args.clean_build,
quiet_build=args.quiet_build,
cmake_flags=args.cmake_flags,
debug=args.debug,
)
def _expand_and_mark_cached_steps(tasks, cached_steps):
"""
Mark any steps that will be cached. If any task asked for a step to
be cached, it will be cached for all tasks that share the step.
"""
for path, task in tasks.items():
cached_names = cached_steps[path]
if len(cached_names) > 0 and cached_names[0] == '_all':
cached_steps[path] = list(task.steps.keys())
for step_name in cached_steps[path]:
task.steps[step_name].cached = True
for step_name, step in task.steps.items():
if step.cached and step_name not in cached_steps[path]:
cached_steps[path].append(step_name)
def _setup_configs(
basic_config,
component,
tasks,
work_dir,
copy_executable,
):
"""Set up config parsers for this component"""
common_config = basic_config.copy()
if copy_executable:
common_config.set('setup', 'copy_executable', 'True')
if 'POLARIS_BRANCH' in os.environ:
polaris_branch = os.environ['POLARIS_BRANCH']
common_config.set('paths', 'polaris_branch', polaris_branch)
else:
common_config.set('paths', 'polaris_branch', os.getcwd())
initial_configs = _add_task_configs(component, tasks, common_config)
# okay, we're finally ready to configure all the tasks and add configs
# to the "owned" steps
configs = _configure_tasks_and_add_step_configs(
tasks, initial_configs, common_config
)
_write_configs(common_config, configs, component.name, work_dir)
_symlink_configs(tasks, work_dir)
def _add_task_configs(component, tasks, common_config):
"""
Add config parsers for tasks and steps that don't already have shared ones
"""
# get a list of shared steps and add config files for tasks to the
# component
configs = dict()
for task in tasks.values():
if task.config.filepath is None:
task.config_filename = f'{task.name}.cfg'
task.config.filepath = os.path.join(
component.name, task.subdir, task.config_filename
)
component.add_config(task.config)
configs[task.config.filepath] = task.config
# now go through all the configs and prepend the common config options,
# then run the setup() method for each in case there is some customization
for config in configs.values():
config.prepend(common_config)
config.setup()
return configs
def _configure_tasks_and_add_step_configs(
tasks, initial_configs, common_config
):
"""
Call the configure() method for each task and add configs to "owned" steps
"""
for config in initial_configs.values():
for task in config.tasks:
task.configure()
config.set(
section=f'{task.name}',
option='steps_to_run',
value=' '.join(task.steps_to_run),
comment=f'A list of steps to include when running the '
f'{task.name} task',
)
# add configs to steps after calling task.configure() on all tasks in case
# new steps were added
configs = dict()
new_configs = dict()
for task in tasks.values():
configs[task.config.filepath] = task.config
for step in task.steps.values():
if step.has_shared_config:
if step.config.filepath is None:
step.config_filename = f'{step.name}.cfg'
step.config.filepath = os.path.join(
step.component.name, step.subdir, step.config_filename
)
configs[step.config.filepath] = step.config
if step.config.filepath not in initial_configs:
new_configs[step.config.filepath] = step.config
step.component.add_config(step.config)
else:
step._set_config(task.config, link=task.config_filename)
for config in new_configs.values():
config.prepend(common_config)
config.setup()
return configs
def _write_configs(common_config, configs, component_name, work_dir):
"""Write out all the config files"""
# add the common config at the component level
common_config.filepath = f'{component_name}.cfg'
configs[common_config.filepath] = common_config
# finally, write out the config files
for config in configs.values():
config_filepath = os.path.join(work_dir, config.filepath)
config_dir = os.path.dirname(config_filepath)
try:
os.makedirs(config_dir)
except FileExistsError:
pass
with open(config_filepath, 'w') as f:
config.write(f)
def _symlink_configs(tasks, work_dir):
"""Symlink config files for requested tasks and steps"""
symlinks = dict()
for task in tasks.values():
config = task.config
config_filepath = os.path.join(work_dir, config.filepath)
link_path = os.path.join(
work_dir, task.component.name, task.subdir, task.config_filename
)
if not os.path.exists(link_path) and link_path not in symlinks:
symlinks[link_path] = config_filepath
for step in task.steps.values():
config = step.config
config_filepath = os.path.join(work_dir, config.filepath)
link_path = os.path.join(
work_dir,
step.component.name,
step.subdir,
step.config_filename,
)
if not os.path.exists(link_path) and link_path not in symlinks:
symlinks[link_path] = config_filepath
for link_path, config_filepath in symlinks.items():
link_dir = os.path.dirname(link_path)
try:
os.makedirs(link_dir)
except FileExistsError:
pass
symlink(config_filepath, link_path)
def _clean_tasks_and_steps(tasks, base_work_dir):
"""
Remove contents of task and step work directories to start fresh
"""
print(f'{base_work_dir}:')
for path, task in tasks.items():
task_work_dir = os.path.join(base_work_dir, path)
try:
shutil.rmtree(task_work_dir)
print(f' {path}')
except FileNotFoundError:
pass
for step in task.steps.values():
step_work_dir = os.path.join(base_work_dir, step.path)
try:
shutil.rmtree(step_work_dir)
print(f' {step.path}')
except FileNotFoundError:
pass
def _get_required_cores(tasks):
"""Get the maximum number of target cores and the max of min cores"""
max_cores = 0
max_of_min_cores = 0
for task in tasks.values():
for step_name in task.steps_to_run:
step = task.steps[step_name]
if step.cached:
continue
if step.ntasks is None:
raise ValueError(
f'The number of tasks (ntasks) was never set for '
f'{task.path} step {step_name}'
)
if step.cpus_per_task is None:
raise ValueError(
f'The number of CPUs per task (cpus_per_task) was never '
f'set for {task.path} step {step_name}'
)
cores = step.cpus_per_task * step.ntasks
min_cores = step.min_cpus_per_task * step.min_tasks
max_cores = max(max_cores, cores)
max_of_min_cores = max(max_of_min_cores, min_cores)
return max_cores, max_of_min_cores
def __get_machine_and_check_params(
machine, config_file, tasks, numbers, cached
):
if machine is None and 'POLARIS_MACHINE' in os.environ:
machine = os.environ['POLARIS_MACHINE']
if machine is None:
machine = discover_machine()
if config_file is None and machine is None:
raise ValueError('At least one of config_file and machine is needed.')
if config_file is not None and not os.path.exists(config_file):
raise FileNotFoundError(
f"The user config file wasn't found: {config_file}"
)
if tasks is None and numbers is None:
raise ValueError('At least one of tasks or numbers is needed.')
if cached is not None:
if tasks is None:
warnings.warn(
'Ignoring "cached" argument because "tasks" was not provided',
stacklevel=2,
)
elif len(cached) != len(tasks):
raise ValueError(
'A list of cached steps must be provided for each task in'
+ '"tasks"'
)
return machine
def _get_basic_config(
config_file,
machine,
component_path,
component,
model,
build,
branch,
cmake_flags,
debug,
clean_build,
quiet_build,
work_dir,
):
"""
Get a base config parser for the machine and component but not a specific
task
"""
config = PolarisConfigParser()
if config_file is not None:
config.add_user_config(config_file)
# set the model from the command line if provided
if model is not None:
config.set(component.name, 'model', model, user=True)
# start with default polaris config options
config.add_from_package('polaris', 'default.cfg')
# add the E3SM config options from mache
if machine is not None:
config.add_from_package(
'mache.machines', f'{machine}.cfg', exception=False
)
# add the polaris machine config file
if machine is None:
machine = 'default'
config.add_from_package('polaris.machines', f'{machine}.cfg')
if 'POLARIS_BRANCH' in os.environ:
polaris_branch = os.environ['POLARIS_BRANCH']
config.set('paths', 'polaris_branch', polaris_branch)
else:
config.set('paths', 'polaris_branch', os.getcwd())
# add the config options for the component
config.add_from_package(
f'polaris.{component.name.replace("/", ".")}',
f'{component.name}.cfg',
exception=False,
)
# set the component_path path from the command line if provided
if component_path is not None:
component_path = os.path.abspath(component_path)
config.set('paths', 'component_path', component_path, user=True)
if build is not None:
config.set('build', 'build', str(build), user=True)
if branch is not None:
config.set('build', 'branch', os.path.abspath(branch), user=True)
compiler = os.environ['POLARIS_COMPILER']
mpi = os.environ['POLARIS_MPI']
config.set('build', 'machine', machine, user=True)
config.set('build', 'compiler', compiler, user=True)
config.set('build', 'mpi', mpi, user=True)
if cmake_flags is not None:
config.set('build', 'cmake_flags', cmake_flags, user=True)
if debug is not None:
config.set('build', 'debug', str(debug), user=True)
if clean_build is not None:
config.set('build', 'clean', str(clean_build), user=True)
if quiet_build is not None:
config.set('build', 'quiet', str(quiet_build), user=True)
build = config.getboolean('build', 'build')
clean_build = config.getboolean('build', 'clean')
if clean_build and not build:
# the user presumably expects to clean the build, notice that it's
# absent, and build again
config.set('build', 'build', 'True', user=True)
if quiet_build and not build:
# the user presumably expects to build, since they want to build
# quietly
config.set('build', 'build', 'True', user=True)
config.set('paths', 'base_work_dir', work_dir, user=True)
return config
def _add_tasks_by_number(numbers, all_tasks, tasks, cached_steps):
if numbers is not None:
keys = list(all_tasks)
for number in numbers:
cache_all = False
if number.endswith('c'):
cache_all = True
number = int(number[:-1])
else:
number = int(number)
if number >= len(keys):
raise ValueError(
f'task number {number} is out of range. '
f'There are only {len(keys)} tasks.'
)
path = keys[number]
if cache_all:
cached_steps[path] = ['_all']
else:
cached_steps[path] = list()
tasks[path] = all_tasks[path]
def _add_tasks_by_name(task_list, all_tasks, cached, tasks, cached_steps):
if task_list is not None:
for index, path in enumerate(task_list):
if path not in all_tasks:
raise ValueError(f'Task with path {path} is not in tasks')
if cached is not None:
cached_steps[path] = cached[index]
else:
cached_steps[path] = list()
tasks[path] = all_tasks[path]
def _setup_step(task, step, work_dir, baseline_dir, task_dir):
"""Set up a step in a task"""
# make the step directory if it doesn't exist
step_dir = os.path.join(work_dir, step.path)
if step.name in task.step_symlinks:
symlink(
step_dir, os.path.join(task_dir, task.step_symlinks[step.name])
)
if step.setup_complete:
# this is a shared step that has already been set up
return
try:
os.makedirs(step_dir)
except FileExistsError:
pass
step.work_dir = step_dir
step.base_work_dir = work_dir
# set up the step
step.setup()
# add the baseline directory for this step
if baseline_dir is not None:
step.baseline_dir = os.path.join(baseline_dir, step.path)
# process input, output, namelist and streams files
step.process_inputs_and_outputs()
def _symlink_load_script(work_dir):
"""make a symlink to the script for loading the polaris conda env."""
if 'LOAD_POLARIS_ENV' in os.environ:
script_filename = os.environ['LOAD_POLARIS_ENV']
symlink(script_filename, os.path.join(work_dir, 'load_polaris_env.sh'))
def _check_dependencies(tasks):
for task in tasks.values():
for step in task.steps.values():
for name, dependency in step.dependencies.items():
if dependency.work_dir == '':
raise ValueError(
f'The dependency {name} of '
f'{task.path} step {step.name} was '
f'not set up.'
)
def _build_model(basic_config, component, machine):
model = basic_config.get(component.name, 'model')
section = basic_config['build']
branch = section.get('branch')
clean_build = section.getboolean('clean')
quiet_build = section.getboolean('quiet')
debug = section.getboolean('debug')
cmake_flags = section.get('cmake_flags')
build_dir = basic_config.get('paths', 'component_path')
if basic_config.has_option('parallel', 'account'):
account = basic_config.get('parallel', 'account')
else:
account = None
if model == 'omega':
log_filename = os.path.join(build_dir, 'build_omega.log')
build_omega(
branch=branch,
build_dir=build_dir,
clean=clean_build,
quiet=quiet_build,
debug=debug,
cmake_flags=cmake_flags,
account=account,
log_filename=log_filename,
)
elif model == 'mpas-ocean':
section = basic_config['build']
compiler = section.get('compiler')
mpilib = section.get('mpi')
key = f'{compiler}_{mpilib}_target'
if not section.has_option(key):
raise ValueError(
f'The build target {key} is not defined in the [build] '
f'section of the config file for machine {machine}.'
)
make_target = section.get(key)
log_filename = os.path.join(build_dir, 'build_mpas_ocean.log')
build_mpas_ocean(
branch=branch,
build_dir=build_dir,
clean=clean_build,
quiet=quiet_build,
debug=debug,
make_flags=cmake_flags,
make_target=make_target,
log_filename=log_filename,
)
else:
raise ValueError(
f'Automated build is not implemented for model {model}'
)