#!/usr/bin/env python3
import argparse
import json
import os
import platform
import re
import shlex
import shutil
import subprocess
import sys
import time
from pathlib import Path
from typing import Dict, List # noqa: F401
CONDA_PLATFORM_MAP = {
('linux', 'x86_64'): 'linux-64',
('linux', 'aarch64'): 'linux-aarch64',
('linux', 'ppc64le'): 'linux-ppc64le',
('osx', 'x86_64'): 'osx-64',
('osx', 'arm64'): 'osx-arm64',
}
LOCAL_MACHE_SOURCE_ENV = 'MACHE_LOCAL_SOURCE_PATH'
PIXI_ENV_VARS_TO_UNSET = (
'PIXI_PROJECT_MANIFEST',
'PIXI_PROJECT_ROOT',
'PIXI_ENVIRONMENT_NAME',
'PIXI_IN_SHELL',
)
BOOTSTRAP_SETUPTOOLS_SPEC = '>=60'
BOOTSTRAP_WHEEL_SPEC = '*'
CONDA_FORGE_LABEL_ROOT = 'https://conda.anaconda.org/conda-forge/label'
MACHE_DEV_LABEL = f'{CONDA_FORGE_LABEL_ROOT}/mache_dev'
[docs]
def check_call(
commands,
log_filename,
quiet,
*,
capture_output=False,
check=True,
**popen_kwargs,
):
"""
Wrapper for making a shell call with logging and error management.
This function is intentionally similar to :py:func:`subprocess.run`, while
still providing the project-specific logging and "tee" behavior.
Parameters
----------
commands : str or list[str]
Either a single shell command string (possibly chaining commands with
"&&" or ";") or an argv-style command list for direct execution
without a shell.
log_filename : str
The path to the log file to append to
quiet : bool
If True, only log to the log file, not to the terminal
capture_output : bool, optional
If True, capture stdout (and merged stderr) and return it in the
returned :class:`subprocess.CompletedProcess`.
check : bool, optional
If True (default), raise :class:`subprocess.CalledProcessError` when
the command returns a nonzero status.
**popen_kwargs
Additional keyword arguments passed through to
:class:`subprocess.Popen`.
Returns
-------
result : subprocess.CompletedProcess
The result of the command. When ``capture_output`` is True, the
combined output is available as ``result.stdout``.
"""
_validate_check_call_kwargs(
capture_output=capture_output,
quiet=quiet,
popen_kwargs=popen_kwargs,
)
# Determine whether stdout is text or bytes (match subprocess defaults,
# but keep this wrapper text-friendly by default).
text, popen_kwargs = _normalize_popen_text_kwargs(popen_kwargs)
bufsize = popen_kwargs.get('bufsize', 1 if text else 0)
print_command = _format_check_call_message(
commands=commands, popen_kwargs=popen_kwargs
)
os.makedirs(os.path.dirname(os.path.abspath(log_filename)), exist_ok=True)
log_mode = 'a' if text else 'ab'
log_encoding = 'utf-8' if text else None
_write_check_call_message(
log_filename=log_filename,
text=text,
log_mode=log_mode,
log_encoding=log_encoding,
print_command=print_command,
)
if not quiet:
print(print_command)
base_popen_kwargs = _build_check_call_popen_kwargs(
commands=commands,
text=text,
bufsize=bufsize,
popen_kwargs=popen_kwargs,
)
if capture_output or not quiet:
process, stdout_data = _run_check_call_with_streaming(
commands=commands,
log_filename=log_filename,
log_mode=log_mode,
log_encoding=log_encoding,
quiet=quiet,
capture_output=capture_output,
text=text,
base_popen_kwargs=base_popen_kwargs,
)
else:
process = _run_check_call_with_log_only(
commands=commands,
log_filename=log_filename,
log_mode=log_mode,
log_encoding=log_encoding,
base_popen_kwargs=base_popen_kwargs,
)
stdout_data = None
result = subprocess.CompletedProcess(
args=commands,
returncode=process.returncode,
stdout=stdout_data,
stderr=None,
)
if check and process.returncode != 0:
raise subprocess.CalledProcessError(
process.returncode, commands, output=stdout_data
)
return result
def _validate_check_call_kwargs(*, capture_output, quiet, popen_kwargs):
"""Validate combinations of wrapper and subprocess kwargs."""
if capture_output and (
'stdout' in popen_kwargs
or 'stderr' in popen_kwargs
or 'capture_output' in popen_kwargs
):
raise ValueError(
'capture_output=True cannot be used with stdout/stderr/'
'capture_output in popen_kwargs.'
)
if (capture_output or not quiet) and (
'stdout' in popen_kwargs or 'stderr' in popen_kwargs
):
raise ValueError(
'stdout/stderr cannot be set when capture_output=True or '
'quiet=False because this wrapper needs to stream output for '
'logging/tee behavior.'
)
def _format_check_call_message(*, commands, popen_kwargs):
"""Format the command banner logged before subprocess execution."""
working_dir = popen_kwargs.get('cwd')
if working_dir is None:
working_dir = os.getcwd()
else:
working_dir = os.fspath(working_dir)
print_command = _format_check_call_command(commands)
return (
f'\n Running from:\n {working_dir}\n Running:\n {print_command}\n'
)
def _format_check_call_command(commands):
"""Render commands for readable logging without changing execution."""
if isinstance(commands, str):
command_list = _split_shell_on_andand(commands)
if command_list:
return '\n '.join(command_list)
return commands
return ' '.join(shlex.quote(str(arg)) for arg in commands)
def _write_check_call_message(
*, log_filename, text, log_mode, log_encoding, print_command
):
"""Write the command banner to the log file."""
with open(log_filename, log_mode, encoding=log_encoding) as log_file:
if text:
log_file.write(print_command + '\n')
else:
log_file.write((print_command + '\n').encode('utf-8'))
def _build_check_call_popen_kwargs(*, commands, text, bufsize, popen_kwargs):
"""Build the Popen kwargs used by check_call()."""
base_popen_kwargs = {
'universal_newlines': text,
'bufsize': bufsize,
}
if isinstance(commands, str):
base_popen_kwargs.update(
{
'executable': '/bin/bash',
'shell': True,
}
)
else:
base_popen_kwargs.setdefault('shell', False)
base_popen_kwargs.update(popen_kwargs)
return base_popen_kwargs
def _run_check_call_with_streaming(
*,
commands,
log_filename,
log_mode,
log_encoding,
quiet,
capture_output,
text,
base_popen_kwargs,
):
"""Run a subprocess while teeing output to the log and terminal."""
popen_kwargs = base_popen_kwargs.copy()
popen_kwargs.setdefault('stdout', subprocess.PIPE)
popen_kwargs.setdefault('stderr', subprocess.STDOUT)
with open(log_filename, log_mode, encoding=log_encoding) as log_file:
process = subprocess.Popen(commands, **popen_kwargs)
stdout_data = _stream_check_call_output(
process=process,
log_file=log_file,
quiet=quiet,
capture_output=capture_output,
text=text,
)
process.wait()
return process, stdout_data
def _run_check_call_with_log_only(
*, commands, log_filename, log_mode, log_encoding, base_popen_kwargs
):
"""Run a subprocess with stdout directed only to the log file."""
popen_kwargs = base_popen_kwargs.copy()
popen_kwargs.setdefault('stdout', None)
popen_kwargs.setdefault('stderr', subprocess.STDOUT)
with open(log_filename, log_mode, encoding=log_encoding) as log_file:
popen_kwargs['stdout'] = log_file
process = subprocess.Popen(commands, **popen_kwargs)
process.wait()
return process
def _stream_check_call_output(
*, process, log_file, quiet, capture_output, text
):
"""Stream subprocess output to the log and optionally capture it."""
assert process.stdout is not None
captured_chunks = []
for chunk in process.stdout:
if capture_output:
captured_chunks.append(chunk)
_write_streamed_output(
chunk=chunk,
log_file=log_file,
quiet=quiet,
text=text,
)
if not capture_output:
return None
return _decode_captured_chunks(captured_chunks, text=text)
def _write_streamed_output(*, chunk, log_file, quiet, text):
"""Write one streamed subprocess chunk to the log and terminal."""
if text:
log_file.write(chunk)
log_file.flush()
if not quiet:
sys.stdout.write(chunk)
sys.stdout.flush()
return
chunk_bytes = _to_bytes(chunk)
log_file.write(chunk_bytes)
log_file.flush()
if not quiet:
sys.stdout.buffer.write(chunk_bytes)
sys.stdout.buffer.flush()
def _decode_captured_chunks(captured_chunks, *, text):
"""Normalize captured subprocess output to str."""
if text:
return ''.join(
chunk if isinstance(chunk, str) else chunk.decode('utf-8')
for chunk in captured_chunks
)
stdout_bytes = b''.join(_to_bytes(chunk) for chunk in captured_chunks)
return stdout_bytes.decode('utf-8', errors='replace')
def _to_bytes(chunk):
"""Normalize a subprocess output chunk to bytes."""
if isinstance(chunk, str):
return chunk.encode('utf-8')
return chunk
def check_call_with_retries(
commands,
log_filename,
quiet,
*,
retries=3,
retry_delay=2.0,
**popen_kwargs,
):
"""Run a command with a few retries for transient pixi/network failures."""
last_error = None
for attempt in range(1, retries + 1):
try:
return check_call(commands, log_filename, quiet, **popen_kwargs)
except subprocess.CalledProcessError as exc:
last_error = exc
if attempt >= retries:
raise
message = (
f'Command failed on attempt {attempt}/{retries}; '
f'retrying in {retry_delay:.0f}s...\n'
)
with open(log_filename, 'a', encoding='utf-8') as log_file:
log_file.write(message)
if not quiet:
print(message)
time.sleep(retry_delay)
if last_error is not None:
raise last_error
[docs]
def build_pixi_shell_hook_prefix(*, pixi_exe: str, pixi_toml: str) -> str:
"""Build a shell prefix to activate a pixi env in the current shell.
Uses `pixi shell-hook` so activation applies to this shell rather than
running a nested shell process.
"""
hook_cmd = (
f'{build_pixi_env_unset_prefix()} '
f'{shlex.quote(pixi_exe)} shell-hook -s bash -m '
f'{shlex.quote(pixi_toml)}'
)
return f'eval "$({hook_cmd})" &&'
def build_pixi_env(base_env=None):
"""Build an environment with pixi nesting variables removed."""
env = dict(os.environ if base_env is None else base_env)
for var in PIXI_ENV_VARS_TO_UNSET:
env.pop(var, None)
return env
[docs]
def check_location(software=None):
"""
Ensure that the script is being run from the root of the target software
repository.
Parameters
----------
software : str, optional
The target software name, used for a more specific error message.
If not provided, a generic message is used.
"""
expected_files = [
'deploy.py',
'deploy/cli_spec.json',
'deploy/config.yaml.j2',
'deploy/pins.cfg',
]
missing_files = []
for filename in expected_files:
if not os.path.exists(filename):
missing_files.append(filename)
if missing_files:
missing_str = '\n - '.join(missing_files)
if software:
location_desc = f'the root of the local {software} branch'
else:
location_desc = 'the root of the target software repository'
raise RuntimeError(
f'The deploy script must be run from {location_desc}. '
f'Expected files that were not found:\n - {missing_str}'
)
[docs]
def install_dev_mache(
pixi_shell_hook_prefix,
log_filename,
quiet,
*,
repo_root=None,
src_dir=None,
):
"""
Install mache from a fork and branch for development and testing
"""
print('Clone and install local mache\n')
# NOTE: We use `pixi shell-hook` to activate the environment in this
# shell, then run pip install within that environment.
# Also, the caller may have `cd`'d into the bootstrap pixi project, so we
# explicitly `cd` back to the repo root first.
if repo_root is None:
repo_root = os.path.abspath(os.getcwd())
else:
repo_root = os.path.abspath(os.path.expanduser(str(repo_root)))
if src_dir is None:
src_dir = os.path.join(repo_root, 'deploy_tmp', 'build_mache', 'mache')
else:
src_dir = os.path.abspath(os.path.expanduser(str(src_dir)))
if not os.path.isdir(src_dir):
raise FileNotFoundError(
'Expected mache source clone not found at '
f'{src_dir}. This should have been created during bootstrap when '
'using --mache-fork/--mache-branch.'
)
bash_cmd = (
f'cd {shlex.quote(src_dir)} && '
'python -m pip install --no-deps --no-build-isolation .'
)
commands = f'{pixi_shell_hook_prefix} {bash_cmd}'
try:
check_call(commands, log_filename, quiet)
except subprocess.CalledProcessError:
if quiet:
print(
f'Failed to clone and install local mache. See '
f'{log_filename} for details.\n'
)
raise
[docs]
def main():
"""
Entry point for the configure script
"""
os.makedirs(name='deploy_tmp/logs', exist_ok=True)
log_filename = 'deploy_tmp/logs/bootstrap.log'
if os.path.exists(log_filename):
os.remove(log_filename)
try:
_run(log_filename)
except subprocess.CalledProcessError as e:
_print_failure_summary(e, log_filename)
raise
except Exception as e:
# unexpected python-level failures: missing perms, bad paths, etc.
_print_failure_summary(e, log_filename)
raise
def _run(log_filename):
"""
Run the bootstrap process with the given log file
"""
args = _parse_args()
software = args.software
quiet = args.quiet
mache_version = args.mache_version
check_location(software)
os.makedirs('deploy_tmp', exist_ok=True)
pixi_exe = _get_pixi_executable(
args.pixi, log_filename=log_filename, quiet=quiet
)
bootstrap_dir = Path('deploy_tmp/bootstrap_pixi').resolve()
bootstrap_dir.mkdir(parents=True, exist_ok=True)
if args.recreate and (bootstrap_dir / '.pixi').exists():
shutil.rmtree(bootstrap_dir / '.pixi')
_write_bootstrap_pixi_config(bootstrap_dir=bootstrap_dir)
# Create/update the bootstrap env
if args.mache_fork is not None and args.mache_branch is not None:
_clone_mache_repo(
mache_fork=args.mache_fork,
mache_branch=args.mache_branch,
log_filename=log_filename,
quiet=quiet,
recreate=args.recreate,
)
# Developer-style install path: use mache's own pixi.toml to create
# the environment, then install mache from source without PyPI deps.
pixi_toml_path = bootstrap_dir / 'pixi.toml'
_copy_mache_pixi_toml(
dest_pixi_toml=pixi_toml_path,
source_repo_dir=Path('deploy_tmp/build_mache/mache'),
python_version=args.python,
)
cmd_install = [pixi_exe, 'install']
check_call_with_retries(
cmd_install,
log_filename,
quiet,
cwd=str(bootstrap_dir),
env=build_pixi_env(),
)
pixi_toml = str(pixi_toml_path.resolve())
pixi_shell_hook_prefix = build_pixi_shell_hook_prefix(
pixi_exe=pixi_exe,
pixi_toml=pixi_toml,
)
install_dev_mache(
pixi_shell_hook_prefix=pixi_shell_hook_prefix,
log_filename=log_filename,
quiet=quiet,
)
else:
# Release/tag install path: install mache from conda-forge directly.
pixi_toml_path = bootstrap_dir / 'pixi.toml'
_write_bootstrap_pixi_toml_with_mache(
pixi_toml_path=pixi_toml_path,
software=software,
mache_version=mache_version,
python_version=args.python,
)
cmd_install = [pixi_exe, 'install']
check_call_with_retries(
cmd_install,
log_filename,
quiet,
cwd=str(bootstrap_dir),
env=build_pixi_env(),
)
def _parse_args():
"""
Parse arguments from the configure conda environment script call
"""
parser = argparse.ArgumentParser(
description='Bootstrap a pixi environment for running mache deploy'
)
parser.add_argument(
'--software',
dest='software',
required=True,
help='The name of the target software.',
)
parser.add_argument(
'--pixi',
dest='pixi',
help='Path to the pixi executable. If not provided, pixi is found '
'on PATH.',
)
parser.add_argument(
'--pixi-path',
'--prefix',
dest='pixi_path',
help='Install the pixi environment at this path (directory). '
'This is a deploy-time option; bootstrap accepts it for CLI '
'contract compatibility but does not use it. `--prefix` is '
'deprecated.',
)
parser.add_argument(
'--recreate',
dest='recreate',
action='store_true',
help='Recreate the environment if it exists.',
)
parser.add_argument(
'--mache-fork',
dest='mache_fork',
help='Point to a mache org/fork (and branch) for testing. '
'Example: E3SM-Project/mache',
)
parser.add_argument(
'--mache-branch',
dest='mache_branch',
help='Point to a mache branch (and fork) for testing.',
)
parser.add_argument(
'--mache-version',
dest='mache_version',
help='The version of mache to install if not from a branch.',
)
parser.add_argument(
'--python',
dest='python',
required=True,
help='The python major and minor version to use.',
)
parser.add_argument(
'--quiet',
dest='quiet',
action='store_true',
help='Only print output to log files, not to the terminal.',
)
args = parser.parse_args(sys.argv[1:])
if (args.mache_fork is None) != (args.mache_branch is None):
raise ValueError(
'You must supply both or neither of '
'--mache-fork and --mache-branch'
)
if (
args.mache_version is None
and args.mache_fork is None
and args.mache_branch is None
):
raise ValueError(
'You must supply --mache-version and/or both --mache-fork '
'and --mache-branch.'
)
return args
def _normalize_popen_text_kwargs(popen_kwargs):
"""Translate subprocess text-mode kwargs for Python 3.6 compatibility."""
normalized = dict(popen_kwargs)
text = normalized.pop('text', None)
universal_newlines = normalized.get('universal_newlines')
if text is not None and universal_newlines is not None:
if bool(text) != bool(universal_newlines):
raise ValueError(
'text and universal_newlines must match when both are set.'
)
if text is None:
text = universal_newlines
if text is None:
text = True
normalized['universal_newlines'] = text
return text, normalized
def _split_shell_on_andand(commands):
"""Split a shell command string on top-level '&&' (quote-aware).
This is used only for logging/pretty-printing in check_call(). It is not a
full shell parser; it is a best-effort splitter that avoids breaking
quoted sub-commands (e.g. bash -c '... && ...').
"""
parts = []
buf = []
quote = None
i = 0
n = len(commands)
while i < n:
ch = commands[i]
if quote is not None:
# In double quotes, backslash can escape characters.
if quote == '"' and ch == '\\' and i + 1 < n:
buf.append(ch)
buf.append(commands[i + 1])
i += 2
continue
if ch == quote:
quote = None
buf.append(ch)
i += 1
continue
# Not currently in a quote
if ch == "'" or ch == '"':
quote = ch
buf.append(ch)
i += 1
continue
if ch == '&' and i + 1 < n and commands[i + 1] == '&':
part = ''.join(buf).strip()
if part:
parts.append(part)
buf = []
i += 2
continue
buf.append(ch)
i += 1
last = ''.join(buf).strip()
if last:
parts.append(last)
return parts
def _print_failure_summary(err, log_filename, tail_lines=60):
print('\nERROR: bootstrap failed.')
print(f'See log: {os.path.abspath(log_filename)}')
if isinstance(err, subprocess.CalledProcessError):
print('\nFailing command:')
print(err.cmd)
else:
print('\nDetails:')
print(repr(err))
try:
with open(log_filename, 'r', encoding='utf-8') as f:
lines = f.readlines()
tail = ''.join(lines[-tail_lines:])
print(f'\nLast {min(tail_lines, len(lines))} log lines:\n{tail}')
except OSError:
pass
def _default_pixi_path():
home = os.path.expanduser('~')
return os.path.join(home, '.pixi', 'bin', 'pixi')
def build_pixi_env_unset_prefix():
return 'env ' + ' '.join(f'-u {name}' for name in PIXI_ENV_VARS_TO_UNSET)
def _get_pixi_platform():
system = platform.system().lower()
if system == 'darwin':
system = 'osx'
machine = platform.machine().lower()
try:
return CONDA_PLATFORM_MAP[(system, machine)]
except KeyError as exc:
raise ValueError(
f'Unsupported platform for pixi bootstrap: {system} {machine}'
) from exc
def _install_pixi(*, log_filename, quiet, pixi_bin_dir=None):
env_prefix_parts = [
# Avoid modifying shell rc files during bootstrap.
'PIXI_NO_PATH_UPDATE=1',
]
if pixi_bin_dir is not None:
env_prefix_parts.append(f'PIXI_BIN_DIR={shlex.quote(pixi_bin_dir)}')
env_prefix = ' '.join(env_prefix_parts)
cmd_curl = f'{env_prefix} curl -fsSL https://pixi.sh/install.sh | sh'
cmd_wget = f'{env_prefix} wget -qO- https://pixi.sh/install.sh | sh'
try:
check_call(cmd_curl, log_filename, quiet)
except subprocess.CalledProcessError:
# Fallback path, matching pixi installation instructions.
check_call(cmd_wget, log_filename, quiet)
def _get_pixi_executable(pixi, *, log_filename, quiet):
"""Find pixi, installing it if needed.
Behavior
--------
- If ``--pixi`` points to an existing executable, use it.
- If ``--pixi`` is supplied but does not exist, treat it as the desired
install location and install pixi there.
- If ``--pixi`` is not supplied and pixi is not on PATH, install pixi
in the default location (typically ``~/.pixi/bin``).
The upstream installer supports choosing a destination via environment
variables (not flags): ``PIXI_BIN_DIR`` and ``PIXI_HOME``.
"""
# 1) Explicit path
if pixi:
pixi_path = os.path.abspath(os.path.expanduser(str(pixi)))
if os.path.isfile(pixi_path) and os.access(pixi_path, os.X_OK):
return pixi_path
# Not an existing executable: treat as install target.
if pixi_path.endswith(os.sep) or os.path.isdir(pixi_path):
pixi_bin_dir = os.path.abspath(pixi_path)
expected_exec = os.path.join(pixi_bin_dir, 'pixi')
else:
pixi_bin_dir = os.path.dirname(pixi_path)
expected_exec = pixi_path
_install_pixi(
log_filename=log_filename, quiet=quiet, pixi_bin_dir=pixi_bin_dir
)
if os.path.isfile(expected_exec) and os.access(expected_exec, os.X_OK):
return expected_exec
raise RuntimeError(
'pixi was installed but the executable was not found where '
f'expected. Looked for: {expected_exec}'
)
# 2) PATH
which = shutil.which('pixi')
if which is not None:
return which
# 3) Default path
if os.path.isfile(_default_pixi_path()) and os.access(
_default_pixi_path(), os.X_OK
):
return _default_pixi_path()
# 4) Auto-install default
_install_pixi(log_filename=log_filename, quiet=quiet, pixi_bin_dir=None)
if os.path.isfile(_default_pixi_path()) and os.access(
_default_pixi_path(), os.X_OK
):
return _default_pixi_path()
which = shutil.which('pixi')
if which is not None:
return which
raise RuntimeError(
'pixi was installed but could not be found. Expected it in the '
"default location (e.g. '~/.pixi/bin') or on PATH."
)
def _write_bootstrap_pixi_toml_with_mache(
*,
pixi_toml_path,
software,
mache_version,
python_version,
):
name = f'{software}-mache-bootstrap'
channels = _get_bootstrap_channels_for_mache_version(mache_version)
lines = [
'[workspace]',
f'name = "{name}"',
f'channels = [{", ".join(json.dumps(c) for c in channels)}]',
f'platforms = ["{_get_pixi_platform()}"]',
'channel-priority = "strict"',
'',
'[dependencies]',
f'python = "{python_version}.*"',
'pip = "*"',
'rattler-build = "*"',
f'mache = "{_format_pixi_version_specifier(mache_version)}"',
]
pixi_toml_path.write_text('\n'.join(lines) + '\n', encoding='utf-8')
def _write_bootstrap_pixi_config(*, bootstrap_dir: Path) -> None:
config_dir = bootstrap_dir / '.pixi'
config_dir.mkdir(parents=True, exist_ok=True)
config_toml = config_dir / 'config.toml'
lines = [
'# Keep conda-forge label channels on Anaconda because prefix.dev',
'# does not currently mirror public conda-forge labels such as ',
'# mache_dev.',
'[mirrors]',
f'"{CONDA_FORGE_LABEL_ROOT}" = [',
f' "{CONDA_FORGE_LABEL_ROOT}",',
']',
]
config_toml.write_text('\n'.join(lines) + '\n', encoding='utf-8')
def _get_bootstrap_channels_for_mache_version(mache_version: str) -> List[str]:
normalized = str(mache_version).strip().lower()
if 'rc' in normalized:
return [MACHE_DEV_LABEL, 'conda-forge']
return ['conda-forge']
def _format_pixi_version_specifier(version: str) -> str:
"""Return a pixi version specifier from a mache version pin.
Pixi dependency values expect a version specifier, not a full conda
matchspec. Exact pins use ``==`` while wildcard pins already use the
correct ``3.0.2.*``-style form.
"""
normalized = str(version).strip()
if normalized.endswith('.*'):
return normalized
return f'=={normalized}'
def _copy_mache_pixi_toml(*, dest_pixi_toml, source_repo_dir, python_version):
src = Path(source_repo_dir) / 'pixi.toml'
if not src.is_file():
raise RuntimeError(
f'Expected mache pixi.toml not found in cloned repo: {src}'
)
source_text = src.read_text(encoding='utf-8')
channels = _get_pixi_channels_from_text(source_text)
dependencies = _get_pixi_dependencies_from_text(source_text)
_write_bootstrap_pixi_toml_with_local_source(
pixi_toml_path=Path(dest_pixi_toml),
channels=channels,
dependencies=dependencies,
python_version=python_version,
)
def _write_bootstrap_pixi_toml_with_local_source(
*,
pixi_toml_path: Path,
channels: List[str],
dependencies: Dict[str, str],
python_version: str,
) -> None:
"""Write a slim bootstrap pixi manifest for a local mache source tree.
The full repo ``pixi.toml`` may contain CI-only features, named
environments, and multiple platforms. The bootstrap environment only needs
the current platform, the requested Python, and the runtime/build
dependencies required to install the local mache checkout.
"""
merged_channels = channels[:] if channels else ['conda-forge']
merged_dependencies = dict(dependencies)
merged_dependencies.setdefault('pip', '*')
merged_dependencies.setdefault('rattler-build', '*')
merged_dependencies.setdefault('setuptools', BOOTSTRAP_SETUPTOOLS_SPEC)
merged_dependencies.setdefault('wheel', BOOTSTRAP_WHEEL_SPEC)
lines = [
'[workspace]',
'name = "mache-bootstrap-local"',
f'channels = [{", ".join(json.dumps(c) for c in merged_channels)}]',
f'platforms = ["{_get_pixi_platform()}"]',
'channel-priority = "strict"',
'',
'[dependencies]',
f'python = "{python_version}.*"',
]
for name, spec in merged_dependencies.items():
if name == 'python':
continue
lines.append(f'{name} = {json.dumps(spec)}')
pixi_toml_path.write_text('\n'.join(lines) + '\n', encoding='utf-8')
def merge_pixi_toml_dependencies(
*,
target_pixi_toml: str,
source_repo_dir: str,
python_version: str,
) -> None:
"""Merge pixi channels and dependencies from a source repo into a target.
This is used when deploying a local/forked ``mache`` branch into a
downstream pixi environment. The source repo's ``pixi.toml`` is treated as
the authoritative dependency set, including non-PyPI tools such as
``rsync``.
"""
source_pixi_toml = Path(source_repo_dir) / 'pixi.toml'
if not source_pixi_toml.is_file():
raise RuntimeError(
f'Expected mache pixi.toml not found in cloned repo: '
f'{source_pixi_toml}'
)
source_text = source_pixi_toml.read_text(encoding='utf-8')
channels = _get_pixi_channels_from_text(source_text)
dependencies = _get_pixi_dependencies_from_text(source_text)
target_path = Path(target_pixi_toml)
text = target_path.read_text(encoding='utf-8')
text = _merge_workspace_channels(text=text, channels=channels)
text = _merge_dependencies_table(text=text, dependencies=dependencies)
target_path.write_text(text, encoding='utf-8')
def _get_pixi_channels_from_text(source_text):
section = _find_toml_table_block(text=source_text, table='workspace')
if section is None:
return []
start, end = section
workspace_text = source_text[start:end]
match = re.search(r'(?ms)^channels\s*=\s*\[(.*?)\]', workspace_text)
if match is None:
return []
return re.findall(r'"([^"]+)"', match.group(1))
def _get_pixi_dependencies_from_text(source_text):
dependencies = {} # type: Dict[str, str]
section = _find_toml_table_block(text=source_text, table='dependencies')
if section is None:
return dependencies
start, end = section
section_text = source_text[start:end]
for match in re.finditer(
r'(?m)^([A-Za-z0-9_.-]+)\s*=\s*"([^"\n]+)"\s*$',
section_text,
):
name = match.group(1)
spec = match.group(2)
if name == 'python':
continue
dependencies.setdefault(name, spec)
return dependencies
def _merge_workspace_channels(*, text, channels):
if not channels:
return text
section = _find_toml_table_block(text=text, table='workspace')
if section is None:
return text
start, end = section
workspace_text = text[start:end]
match = re.search(r'(?ms)^channels\s*=\s*\[(.*?)\]', workspace_text)
if match is None:
return text
existing = re.findall(r'"([^"]+)"', match.group(1))
merged = existing[:]
for channel in channels:
if channel not in merged:
merged.append(channel)
replacement = (
'channels = [' + ', '.join(json.dumps(c) for c in merged) + ']'
)
updated_workspace = (
workspace_text[: match.start()]
+ replacement
+ workspace_text[match.end() :]
)
return text[:start] + updated_workspace + text[end:]
def _merge_dependencies_table(*, text, dependencies):
if not dependencies:
return text
section = _find_toml_table_block(text=text, table='dependencies')
if section is None:
return text
start, end = section
deps_text = text[start:end]
existing = {
match.group(1)
for match in re.finditer(
r'(?m)^([A-Za-z0-9_.-]+)\s*=',
deps_text,
)
}
additions = [
f'{name} = {json.dumps(spec)}'
for name, spec in dependencies.items()
if name not in existing
]
if not additions:
return text
if deps_text and not deps_text.endswith('\n'):
deps_text += '\n'
deps_text += '\n'.join(additions) + '\n'
return text[:start] + deps_text + text[end:]
def _find_toml_table_block(*, text, table):
header = re.compile(rf'(?m)^\[{re.escape(table)}\]\s*$')
match = header.search(text)
if match is None:
return None
start = match.end()
next_header = re.compile(r'(?m)^\[[^\]]+\]\s*$')
next_match = next_header.search(text, start)
end = next_match.start() if next_match else len(text)
return start, end
def _clone_mache_repo(
*,
mache_fork,
mache_branch,
log_filename,
quiet,
recreate,
):
build_root = Path('deploy_tmp/build_mache').resolve()
repo_dir = build_root / 'mache'
if recreate and build_root.exists():
shutil.rmtree(str(build_root))
if repo_dir.exists():
# Avoid clobbering developer edits in an existing clone.
return
build_root.mkdir(parents=True, exist_ok=True)
local_source = os.environ.get(LOCAL_MACHE_SOURCE_ENV)
if local_source:
source_repo = Path(
os.path.abspath(os.path.expanduser(local_source))
).resolve()
if not source_repo.is_dir():
raise RuntimeError(
f'Local mache source override does not exist: {source_repo}'
)
_copy_local_mache_source_snapshot(
source_repo=source_repo,
repo_dir=repo_dir,
)
return
env = build_pixi_env()
env['GIT_SSH_COMMAND'] = 'ssh -oBatchMode=yes'
commands = [
'git',
'clone',
'--depth',
'1',
'--single-branch',
'-b',
mache_branch,
f'git@github.com:{mache_fork}.git',
'mache',
]
check_call(
commands,
log_filename,
quiet,
cwd=str(build_root),
env=env,
)
def _copy_local_mache_source_snapshot(
*, source_repo: Path, repo_dir: Path
) -> None:
"""Copy a clean local source snapshot for bootstrap installs.
Using the live developer worktree directly can pull in untracked files or
local symlinks that break setuptools packaging. Prefer a tracked-file
snapshot when the source is a git checkout, and fall back to a filtered
copytree otherwise.
"""
try:
tracked = subprocess.check_output(
['git', '-C', str(source_repo), 'ls-files', '-z'],
text=False,
)
except (OSError, subprocess.CalledProcessError):
shutil.copytree(
source_repo,
repo_dir,
ignore=shutil.ignore_patterns(
'.git',
'.pixi',
'deploy_tmp',
'__pycache__',
'*.pyc',
),
)
return
repo_dir.mkdir(parents=True, exist_ok=True)
for raw_path in tracked.split(b'\x00'):
if not raw_path:
continue
rel_path = Path(raw_path.decode('utf-8'))
src_path = source_repo / rel_path
dest_path = repo_dir / rel_path
dest_path.parent.mkdir(parents=True, exist_ok=True)
if src_path.is_symlink():
os.symlink(os.readlink(src_path), dest_path)
else:
shutil.copy2(src_path, dest_path)
if __name__ == '__main__':
main()