import argparse
import glob
import os
import pickle
import sys
import time
from datetime import timedelta
from typing import Dict, List, Optional
import mpas_tools.io
from mpas_tools.logging import LoggingContext, check_call
from polaris import Task
from polaris.logging import log_function_call, log_method_call
from polaris.parallel import (
get_available_parallel_resources,
run_command,
set_cores_per_node,
)
from polaris.run import (
complete_step_run,
load_dependencies,
setup_config,
unpickle_suite,
)
# ANSI fail text: https://stackoverflow.com/a/287944/7728169
start_fail = '\033[91m'
start_pass = '\033[92m'
start_time_color = '\033[94m'
end_color = '\033[0m'
pass_str = f'{start_pass}PASS{end_color}'
success_str = f'{start_pass}SUCCESS{end_color}'
fail_str = f'{start_fail}FAIL{end_color}'
error_str = f'{start_fail}ERROR{end_color}'
[docs]
def run_tasks(
suite_name,
quiet=False,
is_task=False,
steps_to_run=None,
steps_to_skip=None,
):
"""
Run the given suite or task
Parameters
----------
suite_name : str
The name of the suite
quiet : bool, optional
Whether step names are not included in the output as the suite
progresses
is_task : bool
Whether this is a task instead of a full suite
steps_to_run : list of str, optional
A list of the steps to run if this is a task, not a full suite.
The default behavior is to run the default steps unless they are in
``steps_to_skip``
steps_to_skip : list of str, optional
A list of steps not to run if this is a task, not a full suite.
Typically, these are steps to remove from the defaults
"""
suite = unpickle_suite(suite_name)
# get the config file for the first task in the suite
task = next(iter(suite['tasks'].values()))
component = task.component
common_config = setup_config(task.base_work_dir, f'{component.name}.cfg')
available_resources = get_available_parallel_resources(common_config)
# start logging to stdout/stderr
with LoggingContext(suite_name) as stdout_logger:
os.environ['PYTHONUNBUFFERED'] = '1'
if not is_task:
try:
os.makedirs('case_outputs')
except OSError:
pass
failures = 0
cwd = os.getcwd()
suite_start = time.time()
task_times = dict()
result_strs = dict()
total_tasks = len(suite['tasks'])
exec_fail_tasks: List[str] = []
diff_fail_tasks: List[str] = []
for task_name in suite['tasks']:
stdout_logger.info(f'{task_name}')
task = suite['tasks'][task_name]
if is_task:
log_filename = None
task_logger = stdout_logger
else:
task_prefix = task.path.replace('/', '_')
log_filename = f'{cwd}/case_outputs/{task_prefix}.log'
task_logger = None
(
result_str,
success,
task_time,
exec_failed,
diff_failed,
) = _log_and_run_task(
task,
stdout_logger,
task_logger,
quiet,
log_filename,
is_task,
steps_to_run,
steps_to_skip,
available_resources,
)
result_strs[task_name] = result_str
if not success:
failures += 1
if exec_failed:
exec_fail_tasks.append(task_name)
if diff_failed:
diff_fail_tasks.append(task_name)
task_times[task_name] = task_time
suite_time = time.time() - suite_start
os.chdir(cwd)
# Write a concise, copy/paste-friendly summary for Omega PRs
_write_output_for_pull_request(
suite_name,
suite,
results={
'total': total_tasks,
'failures': exec_fail_tasks,
'diffs': diff_fail_tasks,
},
)
_log_task_runtimes(
stdout_logger, task_times, result_strs, suite_time, failures
)
[docs]
def run_single_step(step_is_subprocess=False, quiet=False):
"""
Used by the framework to run a step when ``polaris serial`` gets called in
the step's work directory
Parameters
----------
step_is_subprocess : bool, optional
Whether the step is being run as a subprocess of a task or suite
quiet : bool, optional
Whether step names are not included in the output as the suite
progresses
"""
with open('step.pickle', 'rb') as handle:
step = pickle.load(handle)
task = Task(component=step.component, name='dummy_task')
task.add_step(step)
task.new_step_log_file = False
# This prevents infinite loop of subprocesses
if step_is_subprocess:
step.run_as_subprocess = False
config = setup_config(step.base_work_dir, step.config.filepath)
task.config = config
available_resources = get_available_parallel_resources(config)
set_cores_per_node(task.config, available_resources['cores_per_node'])
mpas_tools.io.default_format = config.get('io', 'format')
mpas_tools.io.default_engine = config.get('io', 'engine')
# start logging to stdout/stderr
logger_name = step.path.replace('/', '_')
with LoggingContext(name=logger_name) as stdout_logger:
task.logger = stdout_logger
if quiet:
task.stdout_logger = None
else:
task.stdout_logger = stdout_logger
log_function_call(function=_run_task, logger=stdout_logger)
stdout_logger.info('')
stdout_logger.info(f'Running step: {step.name}')
_run_task(task, available_resources)
def main():
parser = argparse.ArgumentParser(
description='Run a suite, task or step', prog='polaris serial'
)
parser.add_argument(
'suite',
nargs='?',
help='The name of a suite to run. Can exclude '
'or include the .pickle filename suffix.',
)
parser.add_argument(
'--steps', dest='steps', nargs='+', help='The steps of a task to run.'
)
parser.add_argument(
'--skip_steps',
dest='skip_steps',
nargs='+',
help='The steps of a task not to run, see '
'steps_to_run in the config file for defaults.',
)
parser.add_argument(
'-q',
'--quiet',
dest='quiet',
action='store_true',
help='If set, step names are not included in the '
'output as the suite progresses. Has no '
'effect when running tasks or steps on '
'their own.',
)
parser.add_argument(
'--step_is_subprocess',
dest='step_is_subprocess',
action='store_true',
help='Used internally by polaris to indicate that '
'a step is being run as a subprocess.',
)
args = parser.parse_args(sys.argv[2:])
if args.suite is not None:
# Running a specified suite from the base work directory
run_tasks(args.suite, quiet=args.quiet)
elif os.path.exists('task.pickle'):
# Running a task inside of its work directory
run_tasks(
suite_name='task',
quiet=args.quiet,
is_task=True,
steps_to_run=args.steps,
steps_to_skip=args.skip_steps,
)
elif os.path.exists('step.pickle'):
# Running a step inside of its work directory
run_single_step(
step_is_subprocess=args.step_is_subprocess, quiet=args.quiet
)
else:
pickles = glob.glob('*.pickle')
if len(pickles) == 1:
# Running an unspecified suite from the base work directory
suite = os.path.splitext(os.path.basename(pickles[0]))[0]
run_tasks(suite, quiet=args.quiet)
elif len(pickles) == 0:
raise OSError(
'No pickle files were found. Are you sure this is '
'a polaris suite, task or step work directory?'
)
else:
raise ValueError(
'More than one suite was found. Please specify '
'which to run: polaris serial <suite>'
)
def _update_steps_to_run(
task_name, steps_to_run, steps_to_skip, config, steps
):
"""
Update the steps to run
"""
if steps_to_run is None:
step_str = config.get(task_name, 'steps_to_run').replace(',', ' ')
steps_to_run = step_str.split()
for step in steps_to_run:
if step not in steps:
raise ValueError(
f'A step "{step}" was requested but is not one of the steps '
f'in this task:'
f'\n{list(steps)}'
)
if steps_to_skip is not None:
for step in steps_to_skip:
if step not in steps:
raise ValueError(
f'A step "{step}" was flagged not to run but is not one '
f'of the steps in this task:'
f'\n{list(steps)}'
)
steps_to_run = [
step for step in steps_to_run if step not in steps_to_skip
]
return steps_to_run
def _log_task_runtimes(
stdout_logger, task_times, result_strs, suite_time, failures
):
"""
Log the runtimes for the task(s)
"""
stdout_logger.info('Task Runtimes:')
for task_name, task_time in task_times.items():
task_time_str = str(timedelta(seconds=round(task_time)))
stdout_logger.info(
f'{task_time_str} {result_strs[task_name]} {task_name}'
)
suite_time_str = str(timedelta(seconds=round(suite_time)))
stdout_logger.info(f'Total runtime: {suite_time_str}')
if failures == 0:
stdout_logger.info('PASS: All passed successfully!')
else:
if failures == 1:
message = '1 task'
else:
message = f'{failures} tasks'
stdout_logger.error(f'FAIL: {message} failed, see above.')
sys.exit(1)
def _print_to_stdout(task, message):
"""
Write out a message to stdout if we're not running a single step
"""
if task.stdout_logger is not None:
task.stdout_logger.info(message)
if task.logger != task.stdout_logger:
# also write it to the log file
task.logger.info(message)
def _log_and_run_task(
task,
stdout_logger,
task_logger,
quiet,
log_filename,
is_task,
steps_to_run,
steps_to_skip,
available_resources,
):
task_name = task.path.replace('/', '_')
with LoggingContext(
task_name, logger=task_logger, log_filename=log_filename
) as task_logger:
if quiet:
# just log the step names and any failure messages to the
# log file
task.stdout_logger = task_logger
else:
# log steps to stdout
task.stdout_logger = stdout_logger
task.logger = task_logger
task.log_filename = log_filename
# If we are running a task on its own, we want a log file per step
# If we are running within a suite, we want a log file per task, with
# output from each of its steps
task.new_step_log_file = is_task
os.chdir(task.work_dir)
config = setup_config(task.base_work_dir, task.config.filepath)
task.config = config
set_cores_per_node(task.config, available_resources['cores_per_node'])
mpas_tools.io.default_format = config.get('io', 'format')
mpas_tools.io.default_engine = config.get('io', 'engine')
task.steps_to_run = _update_steps_to_run(
task.name, steps_to_run, steps_to_skip, config, task.steps
)
task_start = time.time()
log_function_call(function=_run_task, logger=task_logger)
task_logger.info('')
task_list = ', '.join(task.steps_to_run)
task_logger.info(f'Running steps: {task_list}')
# Default in case execution fails before setting this
baselines_passed = None
try:
baselines_passed = _run_task(task, available_resources)
run_status = success_str
task_pass = True
except Exception:
run_status = error_str
task_pass = False
task_logger.exception(
'Exception raised while running the steps of the task'
)
status = f' task execution: {run_status}'
task_logger.info(f'POLARIS TASK: {"PASS" if task_pass else "FAIL"}')
if task_pass:
stdout_logger.info(status)
if baselines_passed is None:
result_str = pass_str
success = True
else:
if baselines_passed:
baseline_str = pass_str
result_str = pass_str
success = True
else:
baseline_str = fail_str
result_str = fail_str
success = False
status = f' baseline comp.: {baseline_str}'
stdout_logger.info(status)
task_logger.info(
f'POLARIS BASELINE: '
f'{"PASS" if baselines_passed else "FAIL"}'
)
else:
stdout_logger.error(status)
if not is_task:
stdout_logger.error(f' see: case_outputs/{task_name}.log')
result_str = fail_str
success = False
task_time = time.time() - task_start
task_time_str = str(timedelta(seconds=round(task_time)))
stdout_logger.info(
f' task runtime: {start_time_color}{task_time_str}{end_color}'
)
exec_failed = not task_pass
diff_failed = baselines_passed is False
return result_str, success, task_time, exec_failed, diff_failed
def _read_baseline_status_from_logs(step_work_dir: str) -> Optional[bool]:
"""Get baseline comparison status from existing log markers.
Returns
-------
Optional[bool]
True if ``baseline_passed.log`` exists, False if
``baseline_failed.log`` exists, otherwise None.
"""
baseline_pass_filename = os.path.join(step_work_dir, 'baseline_passed.log')
baseline_fail_filename = os.path.join(step_work_dir, 'baseline_failed.log')
if os.path.exists(baseline_pass_filename):
return True
if os.path.exists(baseline_fail_filename):
return False
return None
def _read_property_status_from_logs(step_work_dir: str) -> Optional[bool]:
"""Get property check status from existing log markers.
Returns
-------
Optional[bool]
True if ``property_check_passed.log`` exists, False if
``property_check_failed.log`` exists, otherwise None.
"""
property_check_pass_filename = os.path.join(
step_work_dir, 'property_check_passed.log'
)
property_check_fail_filename = os.path.join(
step_work_dir, 'property_check_failed.log'
)
if os.path.exists(property_check_pass_filename):
return True
if os.path.exists(property_check_fail_filename):
return False
return None
def _accumulate_baselines(
baselines_passed: Optional[bool], status: bool
) -> Optional[bool]:
"""Aggregate baseline results across steps.
None means no baseline comparisons were performed. If any comparison fails,
the aggregate becomes False.
"""
if baselines_passed is None:
return status
return baselines_passed and status
def _run_task(task, available_resources):
"""
Run each step of the task
"""
logger = task.logger
cwd = os.getcwd()
baselines_passed = None
property_passed = None
for step_name in task.steps_to_run:
step = task.steps[step_name]
complete_filename = os.path.join(
step.work_dir, 'polaris_step_complete.log'
)
_print_to_stdout(task, f' * step: {step_name}')
if os.path.exists(complete_filename):
_print_to_stdout(task, ' already completed')
# print results of baseline comparison if it was done
baseline_status = _read_baseline_status_from_logs(step.work_dir)
if baseline_status is not None:
baseline_str = pass_str if baseline_status else fail_str
_print_to_stdout(
task, f' baseline comp.: {baseline_str}'
)
baselines_passed = _accumulate_baselines(
baselines_passed, baseline_status
)
property_status = None
property_status = _read_property_status_from_logs(step.work_dir)
if property_status is not None:
property_str = pass_str if property_status else fail_str
_print_to_stdout(
task, f' property comp.: {property_str}'
)
property_passed = _accumulate_baselines(
property_passed, property_status
)
continue
if step.cached:
_print_to_stdout(task, ' cached')
# cached steps never perform baseline comparisons; leave
# baselines_passed unchanged
continue
step_start = time.time()
step.config = setup_config(step.base_work_dir, step.config.filepath)
if task.log_filename is not None:
step_log_filename = task.log_filename
else:
step_log_filename = None
try:
if step.run_as_subprocess:
_run_step_as_subprocess(logger, step, task.new_step_log_file)
else:
_run_step(
task,
step,
task.new_step_log_file,
available_resources,
step_log_filename,
)
except Exception:
_print_to_stdout(task, f' execution: {error_str}')
raise
finally:
# Always restore the working directory, even if a step fails.
os.chdir(cwd)
_print_to_stdout(task, f' execution: {success_str}')
step_time = time.time() - step_start
step_time_str = str(timedelta(seconds=round(step_time)))
compared, status = step.check_properties()
if compared:
if status:
property_str = pass_str
else:
property_str = fail_str
_print_to_stdout(
task, f' property checks: {property_str}'
)
property_passed = _accumulate_baselines(property_passed, status)
compared, status = step.validate_baselines()
if compared:
if status:
baseline_str = pass_str
else:
baseline_str = fail_str
_print_to_stdout(
task, f' baseline comp.: {baseline_str}'
)
baselines_passed = _accumulate_baselines(baselines_passed, status)
_print_to_stdout(
task,
f' runtime: '
f'{start_time_color}{step_time_str}{end_color}',
)
return baselines_passed
def _run_step(
task, step, new_log_file, available_resources, step_log_filename
):
"""
Run the requested step
"""
logger = task.logger
cwd = os.getcwd()
missing_files = list()
for input_file in step.inputs:
if not os.path.exists(input_file):
missing_files.append(input_file)
if len(missing_files) > 0:
raise OSError(
f'input file(s) missing in step {step.name} in '
f'{step.component.name}/{step.subdir}: {missing_files}'
)
load_dependencies(step)
# each logger needs a unique name
logger_name = step.path.replace('/', '_')
if new_log_file:
# we want to create new log file and point the step to that name
new_log_filename = f'{cwd}/{step.name}.log'
step_log_filename = new_log_filename
step_logger = None
else:
# either we don't want a log file at all or there is an existing one
# to use. Either way, we don't want a new log filename and we want
# to use the existing logger. The step log filename will be whatever
# is passed as a parameter
step_logger = logger
new_log_filename = None
step.log_filename = step_log_filename
with LoggingContext(
name=logger_name, logger=step_logger, log_filename=new_log_filename
) as step_logger:
step.logger = step_logger
os.chdir(step.work_dir)
step_logger.info('')
log_method_call(method=step.constrain_resources, logger=step_logger)
step_logger.info('')
step.constrain_resources(available_resources)
# runtime_setup() will perform small tasks that require knowing the
# resources of the task before the step runs (such as creating
# graph partitions)
step_logger.info('')
log_method_call(method=step.runtime_setup, logger=step_logger)
step_logger.info('')
step.runtime_setup()
if step.args is not None:
step_logger.info(
"\nBypassing step's run() method and running "
'with command line args\n'
)
for args in step.args:
log_function_call(function=run_command, logger=step_logger)
step_logger.info('')
run_command(
args,
step.cpus_per_task,
step.ntasks,
step.openmp_threads,
step.config,
step.logger,
)
else:
step_logger.info('')
log_method_call(method=step.run, logger=step_logger)
step_logger.info('')
step.run()
complete_step_run(step)
missing_files = list()
for output_file in step.outputs:
if not os.path.exists(output_file):
missing_files.append(output_file)
if len(missing_files) > 0:
# We want to indicate that the step failed by removing the pickle
try:
os.remove('step_after_run.pickle')
except FileNotFoundError:
pass
raise OSError(
f'output file(s) missing in step {step.name} in '
f'{step.component.name}/{step.subdir}: {missing_files}'
)
def _run_step_as_subprocess(logger, step, new_log_file):
"""
Run the requested step as a subprocess
"""
cwd = os.getcwd()
logger_name = step.path.replace('/', '_')
if new_log_file:
log_filename = f'{cwd}/{step.name}.log'
step_logger = None
else:
step_logger = logger
log_filename = None
step.log_filename = log_filename
with LoggingContext(
name=logger_name, logger=step_logger, log_filename=log_filename
) as step_logger:
os.chdir(step.work_dir)
step_args = ['polaris', 'serial', '--step_is_subprocess']
check_call(step_args, step_logger)
def _write_output_for_pull_request(
suite_name, suite, results: Optional[dict] = None
):
"""
Parse metadata from the provenance file and write a concise summary that
can be copy/pasted into an Omega pull request.
Parameters
----------
suite_name : str
The name of the suite (or 'task') being run.
suite : dict
The unpickled suite dictionary containing tasks and base work dir.
"""
work_dir = suite.get('work_dir', os.getcwd())
provenance_path = os.path.join(work_dir, 'provenance')
if not os.path.exists(provenance_path):
return
# keys in provenance are written exactly like these labels
labels = {
'baseline work directory': 'baseline',
'build directory': 'build',
'work directory': 'work',
'machine': 'machine',
'partition': 'partition',
'compiler': 'compiler',
}
values: Dict[str, Optional[str]] = {v: None for v in labels.values()}
_parse_provenance_into(provenance_path, labels, values)
# If a baseline workdir exists, parse its provenance to get baseline build
baseline_build: Optional[str] = None
baseline_build = _parse_baseline_build(values.get('baseline'))
# Build the output content. Only include optional lines if present.
lines = [f'### Polaris `{suite_name}` suite']
if values['baseline']:
lines.append(f'- Baseline workdir: `{values["baseline"]}`')
if baseline_build:
lines.append(f'- Baseline build: `{baseline_build}`')
if values['build']:
lines.append(f'- PR build: `{values["build"]}`')
if values['work']:
lines.append(f'- PR workdir: `{values["work"]}`')
if values['machine']:
lines.append(f'- Machine: `{values["machine"]}`')
if values['partition']:
lines.append(f'- Partition: `{values["partition"]}`')
if values['compiler']:
lines.append(f'- Compiler: `{values["compiler"]}`')
# Placeholder for developer to fill in
lines.append('- Build type: <Debug|Release>')
# Try to include job scheduler log path for Slurm
job_log = _derive_job_log_path(suite_name, suite)
if job_log and os.path.exists(job_log):
lines.append(f'- Log: `{job_log}`')
else:
lines.append('- Log: not found')
# If we have results, summarize them
if results is not None and isinstance(results, dict):
total = int(results.get('total', 0) or 0)
failures: List[str] = list(results.get('failures', []) or [])
diffs: List[str] = list(results.get('diffs', []) or [])
if total > 0 and not failures and not diffs:
lines.append('- Result: All tests passed')
else:
lines.append('- Result:')
if failures:
lines.append(f' - Failures ({len(failures)} of {total}):')
for name in failures:
lines.append(f' - {name}')
if diffs:
lines.append(f' - Diffs ({len(diffs)} of {total}):')
for name in diffs:
lines.append(f' - `{name}`')
out_path = os.path.join(work_dir, f'{suite_name}_output_for_pr.md')
print(f'Writing output useful for copy/paste into PRs to:\n {out_path}')
with open(out_path, 'w') as out:
out.write('\n'.join(lines) + '\n')
print('Done.')
def _parse_provenance_into(path, labels, target_values):
if not os.path.exists(path):
return
try:
with open(path, 'r') as f:
for line in f:
if ':' not in line:
continue
parts = line.strip().split(':', 1)
if len(parts) != 2:
continue
key = parts[0].strip().lower()
val = parts[1].strip()
if key in labels:
target_values[labels[key]] = val
except (OSError, UnicodeDecodeError):
# silent failure; helper is best-effort
pass
def _parse_baseline_build(baseline_workdir: Optional[str]) -> Optional[str]:
if not baseline_workdir:
return None
path = os.path.join(baseline_workdir, 'provenance')
if not os.path.exists(path):
return None
try:
with open(path, 'r') as f:
for line in f:
if ':' not in line:
continue
parts = line.strip().split(':', 1)
if len(parts) != 2:
continue
key = parts[0].strip().lower()
val = parts[1].strip()
if key == 'build directory':
return val
except (OSError, UnicodeDecodeError):
return None
return None
def _derive_job_log_path(suite_name: str, suite: dict) -> Optional[str]:
"""Best-effort reconstruction of the Slurm job log path."""
job_id = os.environ.get('SLURM_JOB_ID')
if not job_id:
return None
# Reconstruct the job_name the same way the job script did
# using the common component config
try:
task = next(iter(suite['tasks'].values()))
component = task.component
common_config = setup_config(
task.base_work_dir, f'{component.name}.cfg'
)
job_name = common_config.get('job', 'job_name')
if job_name == '<<<default>>>':
suite_suffix = f'_{suite_name}' if suite_name else ''
job_name = f'polaris{suite_suffix}'
work_dir = suite.get('work_dir', os.getcwd())
return os.path.join(work_dir, f'{job_name}.o{job_id}')
except (StopIteration, KeyError, OSError, AttributeError):
return None