import os
import numpy as np
import xarray as xr
[docs]
def compare_variables(variables, filename1, filename2, logger, l1_norm=0.0,
l2_norm=0.0, linf_norm=0.0, quiet=True):
"""
compare variables in the two files
Parameters
----------
variables : list
A list of variable names to compare
filename1 : str
The relative path to a file within the ``work_dir``. If ``filename2``
is also given, comparison will be performed with ``variables`` in that
file. If a baseline directory was provided when setting up the
test case, the ``variables`` will be compared between this test case
and the same relative filename in the baseline version of the test
case.
filename2 : str
The relative path to another file within the ``work_dir`` if comparing
between files within the current test case. If a baseline directory
was provided, the ``variables`` from this file will also be compared
with those in the corresponding baseline file.
logger: logging.Logger
The logger to log validation output to
l1_norm : float, optional
The maximum allowed L1 norm difference between the variables in
``filename1`` and ``filename2``. To skip L1 norm check, pass None.
l2_norm : float, optional
The maximum allowed L2 norm difference between the variables in
``filename1`` and ``filename2``. To skip L2 norm check, pass None.
linf_norm : float, optional
The maximum allowed L-Infinity norm difference between the variables in
``filename1`` and ``filename2``. To skip Linf norm check, pass None.
quiet : bool, optional
Whether to print detailed information. If quiet is False, the norm
tolerance values being compared against will be printed when the
comparison is made. This is generally desirable when using nonzero
norm tolerance values.
Returns
-------
all_pass : bool
Whether all variables passed the validation checks
"""
for filename in [filename1, filename2]:
if not os.path.exists(filename):
logger.error(f'File {filename} does not exist.')
return False
ds1 = xr.open_dataset(filename1)
ds2 = xr.open_dataset(filename2)
all_pass = True
for variable in variables:
all_found = True
for ds, filename in [(ds1, filename1), (ds2, filename2)]:
if variable not in ds:
logger.error(f'Variable {variable} not in {filename}.')
all_found = False
if not all_found:
all_pass = False
continue
da1 = ds1[variable]
da2 = ds2[variable]
if not np.all(da1.dims == da2.dims):
logger.error(f"Dimensions for variable {variable} don't match "
f"between files {filename1} and {filename2}.")
all_pass = False
continue
all_match = True
for dim in da1.sizes:
if da1.sizes[dim] != da2.sizes[dim]:
logger.error(f"Field sizes for variable {variable} don't "
f"match files {filename1} and {filename2}.")
all_match = False
if not all_match:
all_pass = False
continue
if not quiet:
print(" Pass thresholds are:")
if l1_norm is not None:
print(f" L1: {l1_norm:16.14e}")
if l2_norm is not None:
print(f" L2: {l2_norm:16.14e}")
if linf_norm is not None:
print(f" L_Infinity: {linf_norm:16.14e}")
variable_pass = True
if 'Time' in da1.dims:
time_range = range(0, da1.sizes['Time'])
time_str = ', '.join([f'{j}' for j in time_range])
print(f'{variable.ljust(20)} Time index: {time_str}')
for time_index in time_range:
slice1 = da1.isel(Time=time_index)
slice2 = da2.isel(Time=time_index)
result = _compute_norms(slice1, slice2, quiet, l1_norm,
l2_norm, linf_norm,
time_index=time_index)
variable_pass = variable_pass and result
else:
print(f'{variable}')
result = _compute_norms(da1, da2, quiet, l1_norm, l2_norm,
linf_norm)
variable_pass = variable_pass and result
# ANSI fail text: https://stackoverflow.com/a/287944/7728169
start_fail = '\033[91m'
start_pass = '\033[92m'
end = '\033[0m'
pass_str = f'{start_pass}PASS{end}'
fail_str = f'{start_fail}FAIL{end}'
if variable_pass:
print(f' {pass_str} {filename1}\n')
else:
print(f' {fail_str} {filename1}\n')
print(f' {filename2}\n')
all_pass = all_pass and variable_pass
return all_pass
def _compute_norms(da1, da2, quiet, max_l1_norm, max_l2_norm, max_linf_norm,
time_index=None):
""" Compute norms between variables in two DataArrays """
da1 = _rename_duplicate_dims(da1)
da2 = _rename_duplicate_dims(da2)
result = True
diff = np.abs(da1 - da2).values.ravel()
# skip entries where one field or both are a fill value
diff = diff[np.isfinite(diff)]
l1_norm = np.linalg.norm(diff, ord=1)
l2_norm = np.linalg.norm(diff, ord=2)
linf_norm = np.linalg.norm(diff, ord=np.inf)
if time_index is None:
diff_str = ''
else:
diff_str = f'{time_index:d}: '
if max_l1_norm is not None:
if max_l1_norm < l1_norm:
result = False
diff_str = f'{diff_str} l1: {l1_norm:16.14e} '
if max_l2_norm is not None:
if max_l2_norm < l2_norm:
result = False
diff_str = f'{diff_str} l2: {l2_norm:16.14e} '
if max_linf_norm is not None:
if max_linf_norm < linf_norm:
result = False
diff_str = f'{diff_str} linf: {linf_norm:16.14e} '
if not quiet or not result:
print(diff_str)
return result
def _rename_duplicate_dims(da):
dims = list(da.dims)
new_dims = list(dims)
duplicates = False
for index, dim in enumerate(dims):
if dim in dims[index + 1:]:
duplicates = True
suffix = 2
for other_index, other in enumerate(dims[index + 1:]):
if other == dim:
new_dims[other_index + index + 1] = f'{dim}_{suffix}'
suffix += 1
if not duplicates:
return da
da = xr.DataArray(data=da.values, dims=new_dims)
return da