Source code for polaris.run.serial

import argparse
import glob
import os
import pickle
import sys
import time
from datetime import timedelta
from typing import Dict, List, Optional

import mpas_tools.io
from mpas_tools.logging import LoggingContext, check_call

from polaris import Task
from polaris.logging import log_function_call, log_method_call
from polaris.parallel import (
    get_available_parallel_resources,
    run_command,
    set_cores_per_node,
)
from polaris.run import (
    complete_step_run,
    load_dependencies,
    setup_config,
    unpickle_suite,
)

# ANSI fail text: https://stackoverflow.com/a/287944/7728169
start_fail = '\033[91m'
start_pass = '\033[92m'
start_time_color = '\033[94m'
end_color = '\033[0m'
pass_str = f'{start_pass}PASS{end_color}'
success_str = f'{start_pass}SUCCESS{end_color}'
fail_str = f'{start_fail}FAIL{end_color}'
error_str = f'{start_fail}ERROR{end_color}'


[docs] def run_tasks( suite_name, quiet=False, is_task=False, steps_to_run=None, steps_to_skip=None, ): """ Run the given suite or task Parameters ---------- suite_name : str The name of the suite quiet : bool, optional Whether step names are not included in the output as the suite progresses is_task : bool Whether this is a task instead of a full suite steps_to_run : list of str, optional A list of the steps to run if this is a task, not a full suite. The default behavior is to run the default steps unless they are in ``steps_to_skip`` steps_to_skip : list of str, optional A list of steps not to run if this is a task, not a full suite. Typically, these are steps to remove from the defaults """ suite = unpickle_suite(suite_name) # get the config file for the first task in the suite task = next(iter(suite['tasks'].values())) component = task.component common_config = setup_config(task.base_work_dir, f'{component.name}.cfg') available_resources = get_available_parallel_resources(common_config) # start logging to stdout/stderr with LoggingContext(suite_name) as stdout_logger: os.environ['PYTHONUNBUFFERED'] = '1' if not is_task: try: os.makedirs('case_outputs') except OSError: pass failures = 0 cwd = os.getcwd() suite_start = time.time() task_times = dict() result_strs = dict() total_tasks = len(suite['tasks']) exec_fail_tasks: List[str] = [] diff_fail_tasks: List[str] = [] for task_name in suite['tasks']: stdout_logger.info(f'{task_name}') task = suite['tasks'][task_name] if is_task: log_filename = None task_logger = stdout_logger else: task_prefix = task.path.replace('/', '_') log_filename = f'{cwd}/case_outputs/{task_prefix}.log' task_logger = None ( result_str, success, task_time, exec_failed, diff_failed, ) = _log_and_run_task( task, stdout_logger, task_logger, quiet, log_filename, is_task, steps_to_run, steps_to_skip, available_resources, ) result_strs[task_name] = result_str if not success: failures += 1 if exec_failed: exec_fail_tasks.append(task_name) if diff_failed: diff_fail_tasks.append(task_name) task_times[task_name] = task_time suite_time = time.time() - suite_start os.chdir(cwd) # Write a concise, copy/paste-friendly summary for Omega PRs _write_output_for_pull_request( suite_name, suite, results={ 'total': total_tasks, 'failures': exec_fail_tasks, 'diffs': diff_fail_tasks, }, ) _log_task_runtimes( stdout_logger, task_times, result_strs, suite_time, failures )
[docs] def run_single_step(step_is_subprocess=False, quiet=False): """ Used by the framework to run a step when ``polaris serial`` gets called in the step's work directory Parameters ---------- step_is_subprocess : bool, optional Whether the step is being run as a subprocess of a task or suite quiet : bool, optional Whether step names are not included in the output as the suite progresses """ with open('step.pickle', 'rb') as handle: step = pickle.load(handle) task = Task(component=step.component, name='dummy_task') task.add_step(step) task.new_step_log_file = False # This prevents infinite loop of subprocesses if step_is_subprocess: step.run_as_subprocess = False config = setup_config(step.base_work_dir, step.config.filepath) task.config = config available_resources = get_available_parallel_resources(config) set_cores_per_node(task.config, available_resources['cores_per_node']) mpas_tools.io.default_format = config.get('io', 'format') mpas_tools.io.default_engine = config.get('io', 'engine') # start logging to stdout/stderr logger_name = step.path.replace('/', '_') with LoggingContext(name=logger_name) as stdout_logger: task.logger = stdout_logger if quiet: task.stdout_logger = None else: task.stdout_logger = stdout_logger log_function_call(function=_run_task, logger=stdout_logger) stdout_logger.info('') stdout_logger.info(f'Running step: {step.name}') _run_task(task, available_resources)
def main(): parser = argparse.ArgumentParser( description='Run a suite, task or step', prog='polaris serial' ) parser.add_argument( 'suite', nargs='?', help='The name of a suite to run. Can exclude ' 'or include the .pickle filename suffix.', ) parser.add_argument( '--steps', dest='steps', nargs='+', help='The steps of a task to run.' ) parser.add_argument( '--skip_steps', dest='skip_steps', nargs='+', help='The steps of a task not to run, see ' 'steps_to_run in the config file for defaults.', ) parser.add_argument( '-q', '--quiet', dest='quiet', action='store_true', help='If set, step names are not included in the ' 'output as the suite progresses. Has no ' 'effect when running tasks or steps on ' 'their own.', ) parser.add_argument( '--step_is_subprocess', dest='step_is_subprocess', action='store_true', help='Used internally by polaris to indicate that ' 'a step is being run as a subprocess.', ) args = parser.parse_args(sys.argv[2:]) if args.suite is not None: # Running a specified suite from the base work directory run_tasks(args.suite, quiet=args.quiet) elif os.path.exists('task.pickle'): # Running a task inside of its work directory run_tasks( suite_name='task', quiet=args.quiet, is_task=True, steps_to_run=args.steps, steps_to_skip=args.skip_steps, ) elif os.path.exists('step.pickle'): # Running a step inside of its work directory run_single_step( step_is_subprocess=args.step_is_subprocess, quiet=args.quiet ) else: pickles = glob.glob('*.pickle') if len(pickles) == 1: # Running an unspecified suite from the base work directory suite = os.path.splitext(os.path.basename(pickles[0]))[0] run_tasks(suite, quiet=args.quiet) elif len(pickles) == 0: raise OSError( 'No pickle files were found. Are you sure this is ' 'a polaris suite, task or step work directory?' ) else: raise ValueError( 'More than one suite was found. Please specify ' 'which to run: polaris serial <suite>' ) def _update_steps_to_run( task_name, steps_to_run, steps_to_skip, config, steps ): """ Update the steps to run """ if steps_to_run is None: step_str = config.get(task_name, 'steps_to_run').replace(',', ' ') steps_to_run = step_str.split() for step in steps_to_run: if step not in steps: raise ValueError( f'A step "{step}" was requested but is not one of the steps ' f'in this task:' f'\n{list(steps)}' ) if steps_to_skip is not None: for step in steps_to_skip: if step not in steps: raise ValueError( f'A step "{step}" was flagged not to run but is not one ' f'of the steps in this task:' f'\n{list(steps)}' ) steps_to_run = [ step for step in steps_to_run if step not in steps_to_skip ] return steps_to_run def _log_task_runtimes( stdout_logger, task_times, result_strs, suite_time, failures ): """ Log the runtimes for the task(s) """ stdout_logger.info('Task Runtimes:') for task_name, task_time in task_times.items(): task_time_str = str(timedelta(seconds=round(task_time))) stdout_logger.info( f'{task_time_str} {result_strs[task_name]} {task_name}' ) suite_time_str = str(timedelta(seconds=round(suite_time))) stdout_logger.info(f'Total runtime: {suite_time_str}') if failures == 0: stdout_logger.info('PASS: All passed successfully!') else: if failures == 1: message = '1 task' else: message = f'{failures} tasks' stdout_logger.error(f'FAIL: {message} failed, see above.') sys.exit(1) def _print_to_stdout(task, message): """ Write out a message to stdout if we're not running a single step """ if task.stdout_logger is not None: task.stdout_logger.info(message) if task.logger != task.stdout_logger: # also write it to the log file task.logger.info(message) def _log_and_run_task( task, stdout_logger, task_logger, quiet, log_filename, is_task, steps_to_run, steps_to_skip, available_resources, ): task_name = task.path.replace('/', '_') with LoggingContext( task_name, logger=task_logger, log_filename=log_filename ) as task_logger: if quiet: # just log the step names and any failure messages to the # log file task.stdout_logger = task_logger else: # log steps to stdout task.stdout_logger = stdout_logger task.logger = task_logger task.log_filename = log_filename # If we are running a task on its own, we want a log file per step # If we are running within a suite, we want a log file per task, with # output from each of its steps task.new_step_log_file = is_task os.chdir(task.work_dir) config = setup_config(task.base_work_dir, task.config.filepath) task.config = config set_cores_per_node(task.config, available_resources['cores_per_node']) mpas_tools.io.default_format = config.get('io', 'format') mpas_tools.io.default_engine = config.get('io', 'engine') task.steps_to_run = _update_steps_to_run( task.name, steps_to_run, steps_to_skip, config, task.steps ) task_start = time.time() log_function_call(function=_run_task, logger=task_logger) task_logger.info('') task_list = ', '.join(task.steps_to_run) task_logger.info(f'Running steps: {task_list}') # Default in case execution fails before setting this baselines_passed = None try: baselines_passed = _run_task(task, available_resources) run_status = success_str task_pass = True except Exception: run_status = error_str task_pass = False task_logger.exception( 'Exception raised while running the steps of the task' ) status = f' task execution: {run_status}' task_logger.info(f'POLARIS TASK: {"PASS" if task_pass else "FAIL"}') if task_pass: stdout_logger.info(status) if baselines_passed is None: result_str = pass_str success = True else: if baselines_passed: baseline_str = pass_str result_str = pass_str success = True else: baseline_str = fail_str result_str = fail_str success = False status = f' baseline comp.: {baseline_str}' stdout_logger.info(status) task_logger.info( f'POLARIS BASELINE: ' f'{"PASS" if baselines_passed else "FAIL"}' ) else: stdout_logger.error(status) if not is_task: stdout_logger.error(f' see: case_outputs/{task_name}.log') result_str = fail_str success = False task_time = time.time() - task_start task_time_str = str(timedelta(seconds=round(task_time))) stdout_logger.info( f' task runtime: {start_time_color}{task_time_str}{end_color}' ) exec_failed = not task_pass diff_failed = baselines_passed is False return result_str, success, task_time, exec_failed, diff_failed def _read_baseline_status_from_logs(step_work_dir: str) -> Optional[bool]: """Get baseline comparison status from existing log markers. Returns ------- Optional[bool] True if ``baseline_passed.log`` exists, False if ``baseline_failed.log`` exists, otherwise None. """ baseline_pass_filename = os.path.join(step_work_dir, 'baseline_passed.log') baseline_fail_filename = os.path.join(step_work_dir, 'baseline_failed.log') if os.path.exists(baseline_pass_filename): return True if os.path.exists(baseline_fail_filename): return False return None def _read_property_status_from_logs(step_work_dir: str) -> Optional[bool]: """Get property check status from existing log markers. Returns ------- Optional[bool] True if ``property_check_passed.log`` exists, False if ``property_check_failed.log`` exists, otherwise None. """ property_check_pass_filename = os.path.join( step_work_dir, 'property_check_passed.log' ) property_check_fail_filename = os.path.join( step_work_dir, 'property_check_failed.log' ) if os.path.exists(property_check_pass_filename): return True if os.path.exists(property_check_fail_filename): return False return None def _accumulate_baselines( baselines_passed: Optional[bool], status: bool ) -> Optional[bool]: """Aggregate baseline results across steps. None means no baseline comparisons were performed. If any comparison fails, the aggregate becomes False. """ if baselines_passed is None: return status return baselines_passed and status def _run_task(task, available_resources): """ Run each step of the task """ logger = task.logger cwd = os.getcwd() baselines_passed = None property_passed = None for step_name in task.steps_to_run: step = task.steps[step_name] complete_filename = os.path.join( step.work_dir, 'polaris_step_complete.log' ) _print_to_stdout(task, f' * step: {step_name}') if os.path.exists(complete_filename): _print_to_stdout(task, ' already completed') # print results of baseline comparison if it was done baseline_status = _read_baseline_status_from_logs(step.work_dir) if baseline_status is not None: baseline_str = pass_str if baseline_status else fail_str _print_to_stdout( task, f' baseline comp.: {baseline_str}' ) baselines_passed = _accumulate_baselines( baselines_passed, baseline_status ) property_status = None property_status = _read_property_status_from_logs(step.work_dir) if property_status is not None: property_str = pass_str if property_status else fail_str _print_to_stdout( task, f' property comp.: {property_str}' ) property_passed = _accumulate_baselines( property_passed, property_status ) continue if step.cached: _print_to_stdout(task, ' cached') # cached steps never perform baseline comparisons; leave # baselines_passed unchanged continue step_start = time.time() step.config = setup_config(step.base_work_dir, step.config.filepath) if task.log_filename is not None: step_log_filename = task.log_filename else: step_log_filename = None try: if step.run_as_subprocess: _run_step_as_subprocess(logger, step, task.new_step_log_file) else: _run_step( task, step, task.new_step_log_file, available_resources, step_log_filename, ) except Exception: _print_to_stdout(task, f' execution: {error_str}') raise finally: # Always restore the working directory, even if a step fails. os.chdir(cwd) _print_to_stdout(task, f' execution: {success_str}') step_time = time.time() - step_start step_time_str = str(timedelta(seconds=round(step_time))) compared, status = step.check_properties() if compared: if status: property_str = pass_str else: property_str = fail_str _print_to_stdout( task, f' property checks: {property_str}' ) property_passed = _accumulate_baselines(property_passed, status) compared, status = step.validate_baselines() if compared: if status: baseline_str = pass_str else: baseline_str = fail_str _print_to_stdout( task, f' baseline comp.: {baseline_str}' ) baselines_passed = _accumulate_baselines(baselines_passed, status) _print_to_stdout( task, f' runtime: ' f'{start_time_color}{step_time_str}{end_color}', ) return baselines_passed def _run_step( task, step, new_log_file, available_resources, step_log_filename ): """ Run the requested step """ logger = task.logger cwd = os.getcwd() missing_files = list() for input_file in step.inputs: if not os.path.exists(input_file): missing_files.append(input_file) if len(missing_files) > 0: raise OSError( f'input file(s) missing in step {step.name} in ' f'{step.component.name}/{step.subdir}: {missing_files}' ) load_dependencies(step) # each logger needs a unique name logger_name = step.path.replace('/', '_') if new_log_file: # we want to create new log file and point the step to that name new_log_filename = f'{cwd}/{step.name}.log' step_log_filename = new_log_filename step_logger = None else: # either we don't want a log file at all or there is an existing one # to use. Either way, we don't want a new log filename and we want # to use the existing logger. The step log filename will be whatever # is passed as a parameter step_logger = logger new_log_filename = None step.log_filename = step_log_filename with LoggingContext( name=logger_name, logger=step_logger, log_filename=new_log_filename ) as step_logger: step.logger = step_logger os.chdir(step.work_dir) step_logger.info('') log_method_call(method=step.constrain_resources, logger=step_logger) step_logger.info('') step.constrain_resources(available_resources) # runtime_setup() will perform small tasks that require knowing the # resources of the task before the step runs (such as creating # graph partitions) step_logger.info('') log_method_call(method=step.runtime_setup, logger=step_logger) step_logger.info('') step.runtime_setup() if step.args is not None: step_logger.info( "\nBypassing step's run() method and running " 'with command line args\n' ) for args in step.args: log_function_call(function=run_command, logger=step_logger) step_logger.info('') run_command( args, step.cpus_per_task, step.ntasks, step.openmp_threads, step.config, step.logger, ) else: step_logger.info('') log_method_call(method=step.run, logger=step_logger) step_logger.info('') step.run() complete_step_run(step) missing_files = list() for output_file in step.outputs: if not os.path.exists(output_file): missing_files.append(output_file) if len(missing_files) > 0: # We want to indicate that the step failed by removing the pickle try: os.remove('step_after_run.pickle') except FileNotFoundError: pass raise OSError( f'output file(s) missing in step {step.name} in ' f'{step.component.name}/{step.subdir}: {missing_files}' ) def _run_step_as_subprocess(logger, step, new_log_file): """ Run the requested step as a subprocess """ cwd = os.getcwd() logger_name = step.path.replace('/', '_') if new_log_file: log_filename = f'{cwd}/{step.name}.log' step_logger = None else: step_logger = logger log_filename = None step.log_filename = log_filename with LoggingContext( name=logger_name, logger=step_logger, log_filename=log_filename ) as step_logger: os.chdir(step.work_dir) step_args = ['polaris', 'serial', '--step_is_subprocess'] check_call(step_args, step_logger) def _write_output_for_pull_request( suite_name, suite, results: Optional[dict] = None ): """ Parse metadata from the provenance file and write a concise summary that can be copy/pasted into an Omega pull request. Parameters ---------- suite_name : str The name of the suite (or 'task') being run. suite : dict The unpickled suite dictionary containing tasks and base work dir. """ work_dir = suite.get('work_dir', os.getcwd()) provenance_path = os.path.join(work_dir, 'provenance') if not os.path.exists(provenance_path): return # keys in provenance are written exactly like these labels labels = { 'baseline work directory': 'baseline', 'build directory': 'build', 'work directory': 'work', 'machine': 'machine', 'partition': 'partition', 'compiler': 'compiler', } values: Dict[str, Optional[str]] = {v: None for v in labels.values()} _parse_provenance_into(provenance_path, labels, values) # If a baseline workdir exists, parse its provenance to get baseline build baseline_build: Optional[str] = None baseline_build = _parse_baseline_build(values.get('baseline')) # Build the output content. Only include optional lines if present. lines = [f'### Polaris `{suite_name}` suite'] if values['baseline']: lines.append(f'- Baseline workdir: `{values["baseline"]}`') if baseline_build: lines.append(f'- Baseline build: `{baseline_build}`') if values['build']: lines.append(f'- PR build: `{values["build"]}`') if values['work']: lines.append(f'- PR workdir: `{values["work"]}`') if values['machine']: lines.append(f'- Machine: `{values["machine"]}`') if values['partition']: lines.append(f'- Partition: `{values["partition"]}`') if values['compiler']: lines.append(f'- Compiler: `{values["compiler"]}`') # Placeholder for developer to fill in lines.append('- Build type: <Debug|Release>') # Try to include job scheduler log path for Slurm job_log = _derive_job_log_path(suite_name, suite) if job_log and os.path.exists(job_log): lines.append(f'- Log: `{job_log}`') else: lines.append('- Log: not found') # If we have results, summarize them if results is not None and isinstance(results, dict): total = int(results.get('total', 0) or 0) failures: List[str] = list(results.get('failures', []) or []) diffs: List[str] = list(results.get('diffs', []) or []) if total > 0 and not failures and not diffs: lines.append('- Result: All tests passed') else: lines.append('- Result:') if failures: lines.append(f' - Failures ({len(failures)} of {total}):') for name in failures: lines.append(f' - {name}') if diffs: lines.append(f' - Diffs ({len(diffs)} of {total}):') for name in diffs: lines.append(f' - `{name}`') out_path = os.path.join(work_dir, f'{suite_name}_output_for_pr.md') print(f'Writing output useful for copy/paste into PRs to:\n {out_path}') with open(out_path, 'w') as out: out.write('\n'.join(lines) + '\n') print('Done.') def _parse_provenance_into(path, labels, target_values): if not os.path.exists(path): return try: with open(path, 'r') as f: for line in f: if ':' not in line: continue parts = line.strip().split(':', 1) if len(parts) != 2: continue key = parts[0].strip().lower() val = parts[1].strip() if key in labels: target_values[labels[key]] = val except (OSError, UnicodeDecodeError): # silent failure; helper is best-effort pass def _parse_baseline_build(baseline_workdir: Optional[str]) -> Optional[str]: if not baseline_workdir: return None path = os.path.join(baseline_workdir, 'provenance') if not os.path.exists(path): return None try: with open(path, 'r') as f: for line in f: if ':' not in line: continue parts = line.strip().split(':', 1) if len(parts) != 2: continue key = parts[0].strip().lower() val = parts[1].strip() if key == 'build directory': return val except (OSError, UnicodeDecodeError): return None return None def _derive_job_log_path(suite_name: str, suite: dict) -> Optional[str]: """Best-effort reconstruction of the Slurm job log path.""" job_id = os.environ.get('SLURM_JOB_ID') if not job_id: return None # Reconstruct the job_name the same way the job script did # using the common component config try: task = next(iter(suite['tasks'].values())) component = task.component common_config = setup_config( task.base_work_dir, f'{component.name}.cfg' ) job_name = common_config.get('job', 'job_name') if job_name == '<<<default>>>': suite_suffix = f'_{suite_name}' if suite_name else '' job_name = f'polaris{suite_suffix}' work_dir = suite.get('work_dir', os.getcwd()) return os.path.join(work_dir, f'{job_name}.o{job_id}') except (StopIteration, KeyError, OSError, AttributeError): return None