跳到内容

API

ansible_compat.config

将配置选项存储为单例。

AnsibleConfig

基类: UserDict[str, object]

用于查询 Ansible 配置的接口。

这应该允许用户访问 ansible-config dump 提供的所有内容,而无需自己解析数据。

源代码位于 ansible_compat/config.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
class AnsibleConfig(UserDict[str, object]):  # pylint: disable=too-many-ancestors
    """Interface to query Ansible configuration.

    This should allow user to access everything provided by `ansible-config dump` without having to parse the data himself.
    """

    _aliases = {
        "COLLECTIONS_PATH": "COLLECTIONS_PATHS",  # 2.9 -> 2.10
    }
    # Expose some attributes to enable auto-complete in editors, based on
    # https://docs.ansible.org.cn/ansible/latest/reference_appendices/config.html
    action_warnings: bool = True
    agnostic_become_prompt: bool = True
    allow_world_readable_tmpfiles: bool = False
    ansible_connection_path: str | None = None
    ansible_cow_acceptlist: list[str]
    ansible_cow_path: str | None = None
    ansible_cow_selection: str = "default"
    ansible_force_color: bool = False
    ansible_nocolor: bool = False
    ansible_nocows: bool = False
    ansible_pipelining: bool = False
    any_errors_fatal: bool = False
    become_allow_same_user: bool = False
    become_plugin_path: list[str] = [
        "~/.ansible/plugins/become",
        "/usr/share/ansible/plugins/become",
    ]
    cache_plugin: str = "memory"
    cache_plugin_connection: str | None = None
    cache_plugin_prefix: str = "ansible_facts"
    cache_plugin_timeout: int = 86400
    callable_accept_list: list[str] = []
    callbacks_enabled: list[str] = []
    collections_on_ansible_version_mismatch: Literal["warning", "ignore"] = "warning"
    collections_paths: list[str] = [
        "~/.ansible/collections",
        "/usr/share/ansible/collections",
    ]
    collections_scan_sys_path: bool = True
    color_changed: str = "yellow"
    color_console_prompt: str = "white"
    color_debug: str = "dark gray"
    color_deprecate: str = "purple"
    color_diff_add: str = "green"
    color_diff_lines: str = "cyan"
    color_diff_remove: str = "red"
    color_error: str = "red"
    color_highlight: str = "white"
    color_ok: str = "green"
    color_skip: str = "cyan"
    color_unreachable: str = "bright red"
    color_verbose: str = "blue"
    color_warn: str = "bright purple"
    command_warnings: bool = False
    conditional_bare_vars: bool = False
    connection_facts_modules: dict[str, str]
    controller_python_warning: bool = True
    coverage_remote_output: str | None
    coverage_remote_paths: list[str]
    default_action_plugin_path: list[str] = [
        "~/.ansible/plugins/action",
        "/usr/share/ansible/plugins/action",
    ]
    default_allow_unsafe_lookups: bool = False
    default_ask_pass: bool = False
    default_ask_vault_pass: bool = False
    default_become: bool = False
    default_become_ask_pass: bool = False
    default_become_exe: str | None = None
    default_become_flags: str
    default_become_method: str = "sudo"
    default_become_user: str = "root"
    default_cache_plugin_path: list[str] = [
        "~/.ansible/plugins/cache",
        "/usr/share/ansible/plugins/cache",
    ]
    default_callback_plugin_path: list[str] = [
        "~/.ansible/plugins/callback",
        "/usr/share/ansible/plugins/callback",
    ]
    default_cliconf_plugin_path: list[str] = [
        "~/.ansible/plugins/cliconf",
        "/usr/share/ansible/plugins/cliconf",
    ]
    default_connection_plugin_path: list[str] = [
        "~/.ansible/plugins/connection",
        "/usr/share/ansible/plugins/connection",
    ]
    default_debug: bool = False
    default_executable: str = "/bin/sh"
    default_fact_path: str | None = None
    default_filter_plugin_path: list[str] = [
        "~/.ansible/plugins/filter",
        "/usr/share/ansible/plugins/filter",
    ]
    default_force_handlers: bool = False
    default_forks: int = 5
    default_gathering: Literal["smart", "explicit", "implicit"] = "smart"
    default_gather_subset: list[str] = ["all"]
    default_gather_timeout: int = 10
    default_handler_includes_static: bool = False
    default_hash_behaviour: str = "replace"
    default_host_list: list[str] = ["/etc/ansible/hosts"]
    default_httpapi_plugin_path: list[str] = [
        "~/.ansible/plugins/httpapi",
        "/usr/share/ansible/plugins/httpapi",
    ]
    default_internal_poll_interval: float = 0.001
    default_inventory_plugin_path: list[str] = [
        "~/.ansible/plugins/inventory",
        "/usr/share/ansible/plugins/inventory",
    ]
    default_jinja2_extensions: list[str] = []
    default_jinja2_native: bool = False
    default_keep_remote_files: bool = False
    default_libvirt_lxc_noseclabel: bool = False
    default_load_callback_plugins: bool = False
    default_local_tmp: str = "~/.ansible/tmp"
    default_log_filter: list[str] = []
    default_log_path: str | None = None
    default_lookup_lugin_path: list[str] = [
        "~/.ansible/plugins/lookup",
        "/usr/share/ansible/plugins/lookup",
    ]
    default_managed_str: str = "Ansible managed"
    default_module_args: str
    default_module_compression: str = "ZIP_DEFLATED"
    default_module_name: str = "command"
    default_module_path: list[str] = [
        "~/.ansible/plugins/modules",
        "/usr/share/ansible/plugins/modules",
    ]
    default_module_utils_path: list[str] = [
        "~/.ansible/plugins/module_utils",
        "/usr/share/ansible/plugins/module_utils",
    ]
    default_netconf_plugin_path: list[str] = [
        "~/.ansible/plugins/netconf",
        "/usr/share/ansible/plugins/netconf",
    ]
    default_no_log: bool = False
    default_no_target_syslog: bool = False
    default_null_representation: str | None = None
    default_poll_interval: int = 15
    default_private_key_file: str | None = None
    default_private_role_vars: bool = False
    default_remote_port: str | None = None
    default_remote_user: str | None = None
    # https://docs.ansible.org.cn/ansible/latest/reference_appendices/config.html#collections-paths
    default_collections_path: list[str] = [
        "~/.ansible/collections",
        "/usr/share/ansible/collections",
    ]
    default_roles_path: list[str] = [
        "~/.ansible/roles",
        "/usr/share/ansible/roles",
        "/etc/ansible/roles",
    ]
    default_selinux_special_fs: list[str] = [
        "fuse",
        "nfs",
        "vboxsf",
        "ramfs",
        "9p",
        "vfat",
    ]
    default_stdout_callback: str = "default"
    default_strategy: str = "linear"
    default_strategy_plugin_path: list[str] = [
        "~/.ansible/plugins/strategy",
        "/usr/share/ansible/plugins/strategy",
    ]
    default_su: bool = False
    default_syslog_facility: str = "LOG_USER"
    default_task_includes_static: bool = False
    default_terminal_plugin_path: list[str] = [
        "~/.ansible/plugins/terminal",
        "/usr/share/ansible/plugins/terminal",
    ]
    default_test_plugin_path: list[str] = [
        "~/.ansible/plugins/test",
        "/usr/share/ansible/plugins/test",
    ]
    default_timeout: int = 10
    default_transport: str = "smart"
    default_undefined_var_behavior: bool = True
    default_vars_plugin_path: list[str] = [
        "~/.ansible/plugins/vars",
        "/usr/share/ansible/plugins/vars",
    ]
    default_vault_encrypt_identity: str | None = None
    default_vault_identity: str = "default"
    default_vault_identity_list: list[str] = []
    default_vault_id_match: bool = False
    default_vault_password_file: str | None = None
    default_verbosity: int = 0
    deprecation_warnings: bool = False
    devel_warning: bool = True
    diff_always: bool = False
    diff_context: int = 3
    display_args_to_stdout: bool = False
    display_skipped_hosts: bool = True
    docsite_root_url: str = "https://docs.ansible.org.cn/ansible/"
    doc_fragment_plugin_path: list[str] = [
        "~/.ansible/plugins/doc_fragments",
        "/usr/share/ansible/plugins/doc_fragments",
    ]
    duplicate_yaml_dict_key: Literal["warn", "error", "ignore"] = "warn"
    enable_task_debugger: bool = False
    error_on_missing_handler: bool = True
    facts_modules: list[str] = ["smart"]
    galaxy_cache_dir: str = "~/.ansible/galaxy_cache"
    galaxy_display_progress: str | None = None
    galaxy_ignore_certs: bool = False
    galaxy_role_skeleton: str | None = None
    galaxy_role_skeleton_ignore: list[str] = ["^.git$", "^.*/.git_keep$"]
    galaxy_server: str = "https://galaxy.ansible.com"
    galaxy_server_list: str | None = None
    galaxy_token_path: str = "~/.ansible/galaxy_token"
    host_key_checking: bool = True
    host_pattern_mismatch: Literal["warning", "error", "ignore"] = "warning"
    inject_facts_as_vars: bool = True
    interpreter_python: str = "auto_legacy"
    interpreter_python_distro_map: dict[str, str]
    interpreter_python_fallback: list[str]
    invalid_task_attribute_failed: bool = True
    inventory_any_unparsed_is_failed: bool = False
    inventory_cache_enabled: bool = False
    inventory_cache_plugin: str | None = None
    inventory_cache_plugin_connection: str | None = None
    inventory_cache_plugin_prefix: str = "ansible_facts"
    inventory_cache_timeout: int = 3600
    inventory_enabled: list[str] = [
        "host_list",
        "script",
        "auto",
        "yaml",
        "ini",
        "toml",
    ]
    inventory_export: bool = False
    inventory_ignore_exts: str
    inventory_ignore_patterns: list[str] = []
    inventory_unparsed_is_failed: bool = False
    localhost_warning: bool = True
    max_file_size_for_diff: int = 104448
    module_ignore_exts: str
    netconf_ssh_config: str | None = None
    network_group_modules: list[str] = [
        "eos",
        "nxos",
        "ios",
        "iosxr",
        "junos",
        "enos",
        "ce",
        "vyos",
        "sros",
        "dellos9",
        "dellos10",
        "dellos6",
        "asa",
        "aruba",
        "aireos",
        "bigip",
        "ironware",
        "onyx",
        "netconf",
        "exos",
        "voss",
        "slxos",
    ]
    old_plugin_cache_clearing: bool = False
    paramiko_host_key_auto_add: bool = False
    paramiko_look_for_keys: bool = True
    persistent_command_timeout: int = 30
    persistent_connect_retry_timeout: int = 15
    persistent_connect_timeout: int = 30
    persistent_control_path_dir: str = "~/.ansible/pc"
    playbook_dir: str | None
    playbook_vars_root: Literal["top", "bottom", "all"] = "top"
    plugin_filters_cfg: str | None = None
    python_module_rlimit_nofile: int = 0
    retry_files_enabled: bool = False
    retry_files_save_path: str | None = None
    run_vars_plugins: str = "demand"
    show_custom_stats: bool = False
    string_conversion_action: Literal["warn", "error", "ignore"] = "warn"
    string_type_filters: list[str] = [
        "string",
        "to_json",
        "to_nice_json",
        "to_yaml",
        "to_nice_yaml",
        "ppretty",
        "json",
    ]
    system_warnings: bool = True
    tags_run: list[str] = []
    tags_skip: list[str] = []
    task_debugger_ignore_errors: bool = True
    task_timeout: int = 0
    transform_invalid_group_chars: Literal[
        "always",
        "never",
        "ignore",
        "silently",
    ] = "never"
    use_persistent_connections: bool = False
    variable_plugins_enabled: list[str] = ["host_group_vars"]
    variable_precedence: list[str] = [
        "all_inventory",
        "groups_inventory",
        "all_plugins_inventory",
        "all_plugins_play",
        "groups_plugins_inventory",
        "groups_plugins_play",
    ]
    verbose_to_stderr: bool = False
    win_async_startup_timeout: int = 5
    worker_shutdown_poll_count: int = 0
    worker_shutdown_poll_delay: float = 0.1
    yaml_filename_extensions: list[str] = [".yml", ".yaml", ".json"]

    def __init__(
        self,
        config_dump: str | None = None,
        data: dict[str, object] | None = None,
        cache_dir: Path | None = None,
    ) -> None:
        """Load config dictionary."""
        super().__init__()

        self.cache_dir = cache_dir
        if data:
            self.data = copy.deepcopy(data)
        else:
            if not config_dump:
                env = os.environ.copy()
                # Avoid possible ANSI garbage
                env["ANSIBLE_FORCE_COLOR"] = "0"
                config_dump = subprocess.check_output(  # noqa: S603
                    ["ansible-config", "dump"],
                    universal_newlines=True,
                    env=env,
                )

            for match in re.finditer(
                r"^(?P<key>[A-Za-z0-9_]+).* = (?P<value>.*)$",
                config_dump,
                re.MULTILINE,
            ):
                key = match.groupdict()["key"]
                value = match.groupdict()["value"]
                try:
                    self[key] = ast.literal_eval(value)
                except (NameError, SyntaxError, ValueError):
                    self[key] = value
        # inject isolation collections paths into the config
        if self.cache_dir:
            cpaths = self.data["COLLECTIONS_PATHS"]
            if cpaths and isinstance(cpaths, list):
                cpaths.insert(
                    0,
                    f"{self.cache_dir}/collections",
                )
            else:  # pragma: no cover
                msg = f"Unexpected data type for COLLECTIONS_PATHS: {cpaths}"
                raise RuntimeError(msg)
        if data:
            return

    def __getattribute__(self, attr_name: str) -> object:
        """Allow access of config options as attributes."""
        _dict = super().__dict__  # pylint: disable=no-member
        if attr_name in _dict:
            return _dict[attr_name]

        data = super().__getattribute__("data")
        if attr_name == "data":  # pragma: no cover
            return data

        name = attr_name.upper()
        if name in data:
            return data[name]
        if name in AnsibleConfig._aliases:
            return data[AnsibleConfig._aliases[name]]

        return super().__getattribute__(attr_name)

    def __getitem__(self, name: str) -> object:
        """Allow access to config options using indexing."""
        return super().__getitem__(name.upper())

    def __copy__(self) -> AnsibleConfig:
        """Allow users to run copy on Config."""
        return AnsibleConfig(data=self.data)

    def __deepcopy__(self, memo: object) -> AnsibleConfig:
        """Allow users to run deeepcopy on Config."""
        return AnsibleConfig(data=self.data)

__copy__

__copy__() -> AnsibleConfig

允许用户在 Config 上运行 copy。

源代码位于 ansible_compat/config.py
def __copy__(self) -> AnsibleConfig:
    """Allow users to run copy on Config."""
    return AnsibleConfig(data=self.data)

__deepcopy__

__deepcopy__(memo: object) -> AnsibleConfig

允许用户在 Config 上运行 deepcopy。

源代码位于 ansible_compat/config.py
def __deepcopy__(self, memo: object) -> AnsibleConfig:
    """Allow users to run deeepcopy on Config."""
    return AnsibleConfig(data=self.data)

__getattribute__

__getattribute__(attr_name: str) -> object

允许将配置选项作为属性访问。

源代码位于 ansible_compat/config.py
def __getattribute__(self, attr_name: str) -> object:
    """Allow access of config options as attributes."""
    _dict = super().__dict__  # pylint: disable=no-member
    if attr_name in _dict:
        return _dict[attr_name]

    data = super().__getattribute__("data")
    if attr_name == "data":  # pragma: no cover
        return data

    name = attr_name.upper()
    if name in data:
        return data[name]
    if name in AnsibleConfig._aliases:
        return data[AnsibleConfig._aliases[name]]

    return super().__getattribute__(attr_name)

__getitem__

__getitem__(name: str) -> object

允许使用索引访问配置选项。

源代码位于 ansible_compat/config.py
def __getitem__(self, name: str) -> object:
    """Allow access to config options using indexing."""
    return super().__getitem__(name.upper())

__init__

__init__(
    config_dump: str | None = None,
    data: dict[str, object] | None = None,
    cache_dir: Path | None = None,
) -> None

加载配置字典。

源代码位于 ansible_compat/config.py
def __init__(
    self,
    config_dump: str | None = None,
    data: dict[str, object] | None = None,
    cache_dir: Path | None = None,
) -> None:
    """Load config dictionary."""
    super().__init__()

    self.cache_dir = cache_dir
    if data:
        self.data = copy.deepcopy(data)
    else:
        if not config_dump:
            env = os.environ.copy()
            # Avoid possible ANSI garbage
            env["ANSIBLE_FORCE_COLOR"] = "0"
            config_dump = subprocess.check_output(  # noqa: S603
                ["ansible-config", "dump"],
                universal_newlines=True,
                env=env,
            )

        for match in re.finditer(
            r"^(?P<key>[A-Za-z0-9_]+).* = (?P<value>.*)$",
            config_dump,
            re.MULTILINE,
        ):
            key = match.groupdict()["key"]
            value = match.groupdict()["value"]
            try:
                self[key] = ast.literal_eval(value)
            except (NameError, SyntaxError, ValueError):
                self[key] = value
    # inject isolation collections paths into the config
    if self.cache_dir:
        cpaths = self.data["COLLECTIONS_PATHS"]
        if cpaths and isinstance(cpaths, list):
            cpaths.insert(
                0,
                f"{self.cache_dir}/collections",
            )
        else:  # pragma: no cover
            msg = f"Unexpected data type for COLLECTIONS_PATHS: {cpaths}"
            raise RuntimeError(msg)
    if data:
        return

ansible_collections_path

ansible_collections_path() -> str

为当前版本的 Ansible 返回集合路径变量。

源代码位于 ansible_compat/config.py
def ansible_collections_path() -> str:
    """Return collection path variable for current version of Ansible."""
    for env_var in [
        "ANSIBLE_COLLECTIONS_PATH",
        "ANSIBLE_COLLECTIONS_PATHS",
    ]:
        if env_var in os.environ:
            return env_var
    return "ANSIBLE_COLLECTIONS_PATH"

ansible_version

ansible_version(version: str = '') -> Version

返回 Ansible 的当前 Version 对象。

如果未提及版本,则返回检测到的当前版本。当提及 version 参数时,它会将版本字符串转换为 Version 对象,以便在比较中使用。

源代码位于 ansible_compat/config.py
@cache
def ansible_version(version: str = "") -> Version:
    """Return current Version object for Ansible.

    If version is not mentioned, it returns current version as detected.
    When version argument is mentioned, it return converts the version string
    to Version object in order to make it usable in comparisons.
    """
    if version:
        return Version(version)

    proc = subprocess.run(  # noqa: S603
        ["ansible", "--version"],
        text=True,
        check=False,
        capture_output=True,
    )
    if proc.returncode != 0:
        raise MissingAnsibleError(proc=proc)

    return parse_ansible_version(proc.stdout)

parse_ansible_version

parse_ansible_version(stdout: str) -> Version

解析 'ansible --version' 的输出。

源代码位于 ansible_compat/config.py
def parse_ansible_version(stdout: str) -> Version:
    """Parse output of 'ansible --version'."""
    # Ansible can produce extra output before displaying version in debug mode.

    # ansible-core 2.11+: 'ansible [core 2.11.3]'
    match = re.search(
        r"^ansible \[(?:core|base) (?P<version>[^\]]+)\]",
        stdout,
        re.MULTILINE,
    )
    if match:
        return Version(match.group("version"))
    msg = f"Unable to parse ansible cli version: {stdout}\nKeep in mind that only {ANSIBLE_MIN_VERSION } or newer are supported."
    raise InvalidPrerequisiteError(msg)

ansible_compat.errors

用于处理错误的模块。

AnsibleCommandError

基类: RuntimeError

运行 Ansible 命令时发生异常。

源代码位于 ansible_compat/errors.py
class AnsibleCommandError(RuntimeError):
    """Exception running an Ansible command."""

    def __init__(self, proc: CompletedProcess[Any]) -> None:
        """Construct an exception given a completed process."""
        message = (
            f"Got {proc.returncode} exit code while running: {' '.join(proc.args)}"
        )
        super().__init__(message)
        self.proc = proc

__init__

__init__(proc: CompletedProcess[Any]) -> None

构造给定已完成进程的异常。

源代码位于 ansible_compat/errors.py
def __init__(self, proc: CompletedProcess[Any]) -> None:
    """Construct an exception given a completed process."""
    message = (
        f"Got {proc.returncode} exit code while running: {' '.join(proc.args)}"
    )
    super().__init__(message)
    self.proc = proc

AnsibleCompatError

基类: RuntimeError

源自 ansible_compat 库的通用错误。

源代码位于 ansible_compat/errors.py
class AnsibleCompatError(RuntimeError):
    """Generic error originating from ansible_compat library."""

    code = 1  # generic error

    def __init__(
        self,
        message: str | None = None,
        proc: CompletedProcess[Any] | None = None,
    ) -> None:
        """Construct generic library exception."""
        super().__init__(message)
        self.proc = proc

__init__

__init__(
    message: str | None = None,
    proc: CompletedProcess[Any] | None = None,
) -> None

构造通用库异常。

源代码位于 ansible_compat/errors.py
def __init__(
    self,
    message: str | None = None,
    proc: CompletedProcess[Any] | None = None,
) -> None:
    """Construct generic library exception."""
    super().__init__(message)
    self.proc = proc

InvalidPrerequisiteError

基类: AnsibleCompatError

报告缺少的要求。

源代码位于 ansible_compat/errors.py
class InvalidPrerequisiteError(AnsibleCompatError):
    """Reports a missing requirement."""

    code = INVALID_PREREQUISITES_RC

MissingAnsibleError

基类: AnsibleCompatError

报告缺少或损坏的 Ansible 安装。

源代码位于 ansible_compat/errors.py
class MissingAnsibleError(AnsibleCompatError):
    """Reports a missing or broken Ansible installation."""

    code = ANSIBLE_MISSING_RC

    def __init__(
        self,
        message: str | None = "Unable to find a working copy of ansible executable.",
        proc: CompletedProcess[Any] | None = None,
    ) -> None:
        """."""
        super().__init__(message)
        self.proc = proc

__init__

__init__(
    message: (
        str | None
    ) = "Unable to find a working copy of ansible executable.",
    proc: CompletedProcess[Any] | None = None,
) -> None

.

源代码位于 ansible_compat/errors.py
def __init__(
    self,
    message: str | None = "Unable to find a working copy of ansible executable.",
    proc: CompletedProcess[Any] | None = None,
) -> None:
    """."""
    super().__init__(message)
    self.proc = proc

ansible_compat.loaders

用于加载各种文件的实用程序。

colpath_from_path

colpath_from_path(path: Path) -> str | None

从路径返回 FQCN。

源代码位于 ansible_compat/loaders.py
def colpath_from_path(path: Path) -> str | None:
    """Return a FQCN from a path."""
    galaxy_file = path / "galaxy.yml"
    if galaxy_file.exists():
        galaxy = yaml_from_file(galaxy_file)
        for k in ("namespace", "name"):
            if k not in galaxy:
                msg = f"{galaxy_file} is missing the following mandatory field {k}"
                raise InvalidPrerequisiteError(msg)
        return f"{galaxy['namespace']}/{galaxy['name']}"
    return None

yaml_from_file

yaml_from_file(path: Path) -> Any

返回已加载的 YAML 文件。

源代码位于 ansible_compat/loaders.py
def yaml_from_file(path: Path) -> Any:  # noqa: ANN401
    """Return a loaded YAML file."""
    with path.open(encoding="utf-8") as content:
        return yaml.load(content, Loader=yaml.SafeLoader)

ansible_compat.prerun

用于配置 ansible 运行时环境的实用程序。

get_cache_dir

get_cache_dir(project_dir: Path) -> Path

根据项目路径计算要使用的缓存目录。

源代码位于 ansible_compat/prerun.py
def get_cache_dir(project_dir: Path) -> Path:
    """Compute cache directory to be used based on project path."""
    # we only use the basename instead of the full path in order to ensure that
    # we would use the same key regardless the location of the user home
    # directory or where the project is clones (as long the project folder uses
    # the same name).
    basename = project_dir.resolve().name.encode(encoding="utf-8")
    # 6 chars of entropy should be enough
    cache_key = hashlib.sha256(basename).hexdigest()[:6]
    cache_dir = (
        Path(os.getenv("XDG_CACHE_HOME", "~/.cache")).expanduser()
        / "ansible-compat"
        / cache_key
    )
    return cache_dir

ansible_compat.runtime

Ansible 运行时环境管理器。

AnsibleWarning

基类: Warning

与 Ansible 运行时相关的警告。

源代码位于 ansible_compat/runtime.py
class AnsibleWarning(Warning):
    """Warnings related to Ansible runtime."""

Collection dataclass

Ansible 集合信息的容器。

源代码位于 ansible_compat/runtime.py
@dataclass
class Collection:
    """Container for Ansible collection information."""

    name: str
    version: str
    path: Path

CollectionVersion

基类: Version

集合版本。

源代码位于 ansible_compat/runtime.py
class CollectionVersion(Version):
    """Collection version."""

    def __init__(self, version: str) -> None:
        """Initialize collection version."""
        # As packaging Version class does not support wildcard, we convert it
        # to "0", as this being the smallest version possible.
        if version == "*":
            version = "0"
        super().__init__(version)

__init__

__init__(version: str) -> None

初始化集合版本。

源代码位于 ansible_compat/runtime.py
def __init__(self, version: str) -> None:
    """Initialize collection version."""
    # As packaging Version class does not support wildcard, we convert it
    # to "0", as this being the smallest version possible.
    if version == "*":
        version = "0"
    super().__init__(version)

Plugins dataclass

用于访问已安装的 Ansible 插件的数据类,使用 ansible-doc 来检索它们。

源代码位于 ansible_compat/runtime.py
@dataclass
class Plugins:  # pylint: disable=too-many-instance-attributes
    """Dataclass to access installed Ansible plugins, uses ansible-doc to retrieve them."""

    runtime: Runtime
    become: dict[str, str] = field(init=False)
    cache: dict[str, str] = field(init=False)
    callback: dict[str, str] = field(init=False)
    cliconf: dict[str, str] = field(init=False)
    connection: dict[str, str] = field(init=False)
    httpapi: dict[str, str] = field(init=False)
    inventory: dict[str, str] = field(init=False)
    lookup: dict[str, str] = field(init=False)
    netconf: dict[str, str] = field(init=False)
    shell: dict[str, str] = field(init=False)
    vars: dict[str, str] = field(init=False)
    module: dict[str, str] = field(init=False)
    strategy: dict[str, str] = field(init=False)
    test: dict[str, str] = field(init=False)
    filter: dict[str, str] = field(init=False)
    role: dict[str, str] = field(init=False)
    keyword: dict[str, str] = field(init=False)

    @no_type_check
    def __getattribute__(self, attr: str):  # noqa: ANN204
        """Get attribute."""
        if attr in {
            "become",
            "cache",
            "callback",
            "cliconf",
            "connection",
            "httpapi",
            "inventory",
            "lookup",
            "netconf",
            "shell",
            "vars",
            "module",
            "strategy",
            "test",
            "filter",
            "role",
            "keyword",
        }:
            try:
                result = super().__getattribute__(attr)
            except AttributeError as exc:
                proc = self.runtime.run(
                    ["ansible-doc", "--json", "-l", "-t", attr],
                )
                data = json.loads(proc.stdout)
                if not isinstance(data, dict):  # pragma: no cover
                    msg = "Unexpected output from ansible-doc"
                    raise AnsibleCompatError(msg) from exc
                result = data
        else:
            result = super().__getattribute__(attr)

        return result

__getattribute__

__getattribute__(attr: str)

获取属性。

源代码位于 ansible_compat/runtime.py
@no_type_check
def __getattribute__(self, attr: str):  # noqa: ANN204
    """Get attribute."""
    if attr in {
        "become",
        "cache",
        "callback",
        "cliconf",
        "connection",
        "httpapi",
        "inventory",
        "lookup",
        "netconf",
        "shell",
        "vars",
        "module",
        "strategy",
        "test",
        "filter",
        "role",
        "keyword",
    }:
        try:
            result = super().__getattribute__(attr)
        except AttributeError as exc:
            proc = self.runtime.run(
                ["ansible-doc", "--json", "-l", "-t", attr],
            )
            data = json.loads(proc.stdout)
            if not isinstance(data, dict):  # pragma: no cover
                msg = "Unexpected output from ansible-doc"
                raise AnsibleCompatError(msg) from exc
            result = data
    else:
        result = super().__getattribute__(attr)

    return result

Runtime

Ansible 运行时管理器。

源代码位于 ansible_compat/runtime.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
class Runtime:
    """Ansible Runtime manager."""

    _version: Version | None = None
    collections: OrderedDict[str, Collection] = OrderedDict()
    cache_dir: Path | None = None
    # Used to track if we have already initialized the Ansible runtime as attempts
    # to do it multiple tilmes will cause runtime warnings from within ansible-core
    initialized: bool = False
    plugins: Plugins
    _has_playbook_cache: dict[tuple[str, Path | None], bool] = {}
    require_module: bool = False

    def __init__(
        self,
        project_dir: Path | None = None,
        *,
        isolated: bool = False,
        min_required_version: str | None = None,
        require_module: bool = False,
        max_retries: int = 0,
        environ: dict[str, str] | None = None,
        verbosity: int = 0,
    ) -> None:
        """Initialize Ansible runtime environment.

        :param project_dir: The directory containing the Ansible project. If
                            not mentioned it will be guessed from the current
                            working directory.
        :param isolated: Assure that installation of collections or roles
                         does not affect Ansible installation, an unique cache
                         directory being used instead.
        :param min_required_version: Minimal version of Ansible required. If
                                     not found, a :class:`RuntimeError`
                                     exception is raised.
        :param require_module: If set, instantiation will fail if Ansible
                               Python module is missing or is not matching
                               the same version as the Ansible command line.
                               That is useful for consumers that expect to
                               also perform Python imports from Ansible.
        :param max_retries: Number of times it should retry network operations.
                            Default is 0, no retries.
        :param environ: Environment dictionary to use, if undefined
                        ``os.environ`` will be copied and used.
        :param verbosity: Verbosity level to use.
        """
        self.project_dir = project_dir or Path.cwd()
        self.isolated = isolated
        self.max_retries = max_retries
        self.environ = environ or os.environ.copy()
        self.plugins = Plugins(runtime=self)
        self.verbosity = verbosity

        self.initialize_logger(level=self.verbosity)

        # Reduce noise from paramiko, unless user already defined PYTHONWARNINGS
        # paramiko/transport.py:236: CryptographyDeprecationWarning: Blowfish has been deprecated
        # https://github.com/paramiko/paramiko/issues/2038
        # As CryptographyDeprecationWarning is not a builtin, we cannot use
        # PYTHONWARNINGS to ignore it using category but we can use message.
        # https://stackoverflow.com/q/68251969/99834
        if "PYTHONWARNINGS" not in self.environ:  # pragma: no cover
            self.environ["PYTHONWARNINGS"] = "ignore:Blowfish has been deprecated"

        if isolated:
            self.cache_dir = get_cache_dir(self.project_dir)
        self.config = AnsibleConfig(cache_dir=self.cache_dir)

        # Add the sys.path to the collection paths if not isolated
        self._add_sys_path_to_collection_paths()

        if not self.version_in_range(lower=min_required_version):
            msg = f"Found incompatible version of ansible runtime {self.version}, instead of {min_required_version} or newer."
            raise RuntimeError(msg)
        if require_module:
            self.require_module = True
            self._ensure_module_available()

        # pylint: disable=import-outside-toplevel
        from ansible.utils.display import Display

        # pylint: disable=unused-argument
        def warning(
            self: Display,  # noqa: ARG001
            msg: str,
            *,
            formatted: bool = False,  # noqa: ARG001
        ) -> None:  # pragma: no cover
            """Override ansible.utils.display.Display.warning to avoid printing warnings."""
            warnings.warn(
                message=msg,
                category=AnsibleWarning,
                stacklevel=2,
                source={"msg": msg},
            )

        # Monkey patch ansible warning in order to use warnings module.
        Display.warning = warning

    def initialize_logger(self, level: int = 0) -> None:
        """Set up the global logging level based on the verbosity number."""
        verbosity_map = {
            -2: logging.CRITICAL,
            -1: logging.ERROR,
            0: logging.WARNING,
            1: logging.INFO,
            2: logging.DEBUG,
        }
        # Unknown logging level is treated as DEBUG
        logging_level = verbosity_map.get(level, logging.DEBUG)
        _logger.setLevel(logging_level)
        # Use module-level _logger instance to validate it
        _logger.debug("Logging initialized to level %s", logging_level)

    def _add_sys_path_to_collection_paths(self) -> None:
        """Add the sys.path to the collection paths."""
        if self.config.collections_scan_sys_path:
            for path in sys.path:
                if (
                    path not in self.config.collections_paths
                    and (Path(path) / "ansible_collections").is_dir()
                ):
                    self.config.collections_paths.append(  # pylint: disable=E1101
                        path,
                    )

    def load_collections(self) -> None:
        """Load collection data."""
        self.collections = OrderedDict()
        no_collections_msg = "None of the provided paths were usable"

        # do not use --path because it does not allow multiple values
        proc = self.run(
            [
                "ansible-galaxy",
                "collection",
                "list",
                "--format=json",
            ],
        )
        if proc.returncode == RC_ANSIBLE_OPTIONS_ERROR and (
            no_collections_msg in proc.stdout or no_collections_msg in proc.stderr
        ):  # pragma: no cover
            _logger.debug("Ansible reported no installed collections at all.")
            return
        if proc.returncode != 0:
            _logger.error(proc)
            msg = f"Unable to list collections: {proc}"
            raise RuntimeError(msg)
        try:
            data = json.loads(proc.stdout)
        except json.decoder.JSONDecodeError as exc:
            msg = f"Unable to parse galaxy output as JSON: {proc.stdout}"
            raise RuntimeError(msg) from exc
        if not isinstance(data, dict):
            msg = f"Unexpected collection data, {data}"
            raise TypeError(msg)
        for path in data:
            if not isinstance(data[path], dict):
                msg = f"Unexpected collection data, {data[path]}"
                raise TypeError(msg)
            for collection, collection_info in data[path].items():
                if not isinstance(collection_info, dict):
                    msg = f"Unexpected collection data, {collection_info}"
                    raise TypeError(msg)

                if collection in self.collections:
                    msg = f"Another version of '{collection}' {collection_info['version']} was found installed in {path}, only the first one will be used, {self.collections[collection].version} ({self.collections[collection].path})."
                    logging.warning(msg)
                else:
                    self.collections[collection] = Collection(
                        name=collection,
                        version=collection_info["version"],
                        path=path,
                    )

    def _ensure_module_available(self) -> None:
        """Assure that Ansible Python module is installed and matching CLI version."""
        ansible_release_module = None
        with contextlib.suppress(ModuleNotFoundError, ImportError):
            ansible_release_module = importlib.import_module("ansible.release")

        if ansible_release_module is None:
            msg = "Unable to find Ansible python module."
            raise RuntimeError(msg)

        ansible_module_version = Version(
            ansible_release_module.__version__,
        )
        if ansible_module_version != self.version:
            msg = f"Ansible CLI ({self.version}) and python module ({ansible_module_version}) versions do not match. This indicates a broken execution environment."
            raise RuntimeError(msg)

        # For ansible 2.15+ we need to initialize the plugin loader
        # https://github.com/ansible/ansible-lint/issues/2945
        if not Runtime.initialized:
            col_path = [f"{self.cache_dir}/collections"]
            # noinspection PyProtectedMember
            from ansible.utils.collection_loader._collection_finder import (  # pylint: disable=import-outside-toplevel
                _AnsibleCollectionFinder,
            )

            if self.version >= Version("2.15.0.dev0"):
                # pylint: disable=import-outside-toplevel,no-name-in-module
                from ansible.plugins.loader import init_plugin_loader

                _AnsibleCollectionFinder(  # noqa: SLF001
                    paths=col_path,
                )._remove()  # pylint: disable=protected-access
                init_plugin_loader(col_path)
            else:
                # noinspection PyProtectedMember
                # pylint: disable=protected-access
                col_path += self.config.collections_paths
                col_path += os.path.dirname(  # noqa: PTH120
                    os.environ.get(ansible_collections_path(), "."),
                ).split(":")
                _AnsibleCollectionFinder(  # noqa: SLF001
                    paths=col_path,
                )._install()  # pylint: disable=protected-access
            Runtime.initialized = True

    def clean(self) -> None:
        """Remove content of cache_dir."""
        if self.cache_dir:
            shutil.rmtree(self.cache_dir, ignore_errors=True)

    def run(  # ruff: disable=PLR0913
        self,
        args: str | list[str],
        *,
        retry: bool = False,
        tee: bool = False,
        env: dict[str, str] | None = None,
        cwd: Path | None = None,
        set_acp: bool = True,
    ) -> CompletedProcess:
        """Execute a command inside an Ansible environment.

        :param retry: Retry network operations on failures.
        :param tee: Also pass captured stdout/stderr to system while running.
        :param set_acp: Set the ANSIBLE_COLLECTIONS_PATH
        """
        if tee:
            run_func: Callable[..., CompletedProcess] = subprocess_tee.run
        else:
            run_func = subprocess.run
        env = self.environ if env is None else env.copy()
        # Presence of ansible debug variable or config option will prevent us
        # from parsing its JSON output due to extra debug messages on stdout.
        env["ANSIBLE_DEBUG"] = "0"

        # https://github.com/ansible/ansible-lint/issues/3522
        env["ANSIBLE_VERBOSE_TO_STDERR"] = "True"

        if set_acp:
            env["ANSIBLE_COLLECTIONS_PATH"] = ":".join(
                list(dict.fromkeys(self.config.collections_paths)),
            )

        for _ in range(self.max_retries + 1 if retry else 1):
            result = run_func(
                args,
                universal_newlines=True,
                check=False,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                env=env,
                cwd=str(cwd) if cwd else None,
            )
            if result.returncode == 0:
                break
            _logger.debug("Environment: %s", env)
            if retry:
                _logger.warning(
                    "Retrying execution failure %s of: %s",
                    result.returncode,
                    " ".join(args),
                )
        return result

    @property
    def version(self) -> Version:
        """Return current Version object for Ansible.

        If version is not mentioned, it returns current version as detected.
        When version argument is mentioned, it return converts the version string
        to Version object in order to make it usable in comparisons.
        """
        if self._version:
            return self._version

        proc = self.run(["ansible", "--version"])
        if proc.returncode == 0:
            self._version = parse_ansible_version(proc.stdout)
            return self._version

        msg = "Unable to find a working copy of ansible executable."
        raise MissingAnsibleError(msg, proc=proc)

    def version_in_range(
        self,
        lower: str | None = None,
        upper: str | None = None,
    ) -> bool:
        """Check if Ansible version is inside a required range.

        The lower limit is inclusive and the upper one exclusive.
        """
        if lower and self.version < Version(lower):
            return False
        return not (upper and self.version >= Version(upper))

    def has_playbook(self, playbook: str, *, basedir: Path | None = None) -> bool:
        """Return true if ansible can load a given playbook.

        This is also used for checking if playbooks from within collections
        are present and if they pass syntax check.
        """
        if (playbook, basedir) in self._has_playbook_cache:
            return self._has_playbook_cache[playbook, basedir]

        proc = self.run(["ansible-playbook", "--syntax-check", playbook], cwd=basedir)
        result = proc.returncode == 0
        if not result:
            if not basedir:
                basedir = Path()
            msg = f"has_playbook returned false for '{basedir / playbook}' due to syntax check returning {proc.returncode}"
            logging.debug(msg)

        # cache the result
        self._has_playbook_cache[playbook, basedir] = result

        return result

    def install_collection(
        self,
        collection: str | Path,
        *,
        destination: Path | None = None,
        force: bool = False,
    ) -> None:
        """Install an Ansible collection.

        Can accept arguments like:
            'foo.bar:>=1.2.3'
            'git+https://github.com/ansible-collections/ansible.posix.git,main'
        """
        cmd = [
            "ansible-galaxy",
            "collection",
            "install",
            "-vvv",  # this is needed to make ansible display important info in case of failures
        ]
        if force:
            cmd.append("--force")

        if isinstance(collection, Path):
            collection = str(collection)
        # As ansible-galaxy install is not able to automatically determine
        # if the range requires a pre-release, we need to manually add the --pre
        # flag when needed.
        matches = version_re.search(collection)

        if (
            not is_url(collection)
            and matches
            and CollectionVersion(matches[1]).is_prerelease
        ):
            cmd.append("--pre")

        cpaths: list[str] = self.config.collections_paths
        if destination and str(destination) not in cpaths:
            # we cannot use '-p' because it breaks galaxy ability to ignore already installed collections, so
            # we hack ansible_collections_path instead and inject our own path there.
            # pylint: disable=no-member
            cpaths.insert(0, str(destination))
        cmd.append(f"{collection}")

        _logger.info("Running from %s : %s", Path.cwd(), " ".join(cmd))
        process = self.run(
            cmd,
            retry=True,
            env={**self.environ, ansible_collections_path(): ":".join(cpaths)},
        )
        if process.returncode != 0:
            msg = f"Command {' '.join(cmd)}, returned {process.returncode} code:\n{process.stdout}\n{process.stderr}"
            _logger.error(msg)
            raise InvalidPrerequisiteError(msg)

    def install_collection_from_disk(
        self,
        path: Path,
        destination: Path | None = None,
    ) -> None:
        """Build and install collection from a given disk path."""
        self.install_collection(path, destination=destination, force=True)

    # pylint: disable=too-many-branches
    def install_requirements(  # noqa: C901
        self,
        requirement: Path,
        *,
        retry: bool = False,
        offline: bool = False,
    ) -> None:
        """Install dependencies from a requirements.yml.

        :param requirement: path to requirements.yml file
        :param retry: retry network operations on failures
        :param offline: bypass installation, may fail if requirements are not met.
        """
        if not Path(requirement).exists():
            return
        reqs_yaml = yaml_from_file(Path(requirement))
        if not isinstance(reqs_yaml, (dict, list)):
            msg = f"{requirement} file is not a valid Ansible requirements file."
            raise InvalidPrerequisiteError(msg)

        if isinstance(reqs_yaml, dict):
            for key in reqs_yaml:
                if key not in ("roles", "collections"):
                    msg = f"{requirement} file is not a valid Ansible requirements file. Only 'roles' and 'collections' keys are allowed at root level. Recognized valid locations are: {', '.join(REQUIREMENT_LOCATIONS)}"
                    raise InvalidPrerequisiteError(msg)

        if isinstance(reqs_yaml, list) or "roles" in reqs_yaml:
            cmd = [
                "ansible-galaxy",
                "role",
                "install",
                "-r",
                f"{requirement}",
            ]
            if self.verbosity > 0:
                cmd.extend(["-" + ("v" * self.verbosity)])
            if self.cache_dir:
                cmd.extend(["--roles-path", f"{self.cache_dir}/roles"])

            if offline:
                _logger.warning(
                    "Skipped installing old role dependencies due to running in offline mode.",
                )
            else:
                _logger.info("Running %s", " ".join(cmd))

                result = self.run(cmd, retry=retry)
                _logger.debug(result.stdout)
                if result.returncode != 0:
                    _logger.error(result.stderr)
                    raise AnsibleCommandError(result)

        # Run galaxy collection install works on v2 requirements.yml
        if "collections" in reqs_yaml and reqs_yaml["collections"] is not None:
            cmd = [
                "ansible-galaxy",
                "collection",
                "install",
            ]
            if self.verbosity > 0:
                cmd.extend(["-" + ("v" * self.verbosity)])

            for collection in reqs_yaml["collections"]:
                if isinstance(collection, dict) and collection.get("type", "") == "git":
                    _logger.info(
                        "Adding '--pre' to ansible-galaxy collection install because we detected one collection being sourced from git.",
                    )
                    cmd.append("--pre")
                    break
            if offline:
                _logger.warning(
                    "Skipped installing collection dependencies due to running in offline mode.",
                )
            else:
                cmd.extend(["-r", str(requirement)])
                _logger.info("Running %s", " ".join(cmd))
                result = self.run(
                    cmd,
                    retry=retry,
                )
                _logger.debug(result.stdout)
                if result.returncode != 0:
                    _logger.error(result.stderr)
                    raise AnsibleCommandError(result)
        if self.require_module:
            Runtime.initialized = False
            self._ensure_module_available()

    # pylint: disable=too-many-locals
    def prepare_environment(  # noqa: C901
        self,
        required_collections: dict[str, str] | None = None,
        *,
        retry: bool = False,
        install_local: bool = False,
        offline: bool = False,
        role_name_check: int = 0,
    ) -> None:
        """Make dependencies available if needed."""
        destination: Path | None = None
        if required_collections is None:
            required_collections = {}

        self._prepare_ansible_paths()
        # first one is standard for collection layout repos and the last two
        # are part of Tower specification
        # https://docs.ansible.org.cn/ansible-tower/latest/html/userguide/projects.html#ansible-galaxy-support
        # https://docs.ansible.org.cn/ansible-tower/latest/html/userguide/projects.html#collections-support
        for req_file in REQUIREMENT_LOCATIONS:
            file_path = Path(req_file)
            if self.project_dir:
                file_path = self.project_dir / req_file
            self.install_requirements(file_path, retry=retry, offline=offline)

        if not install_local:
            return

        for gpath in search_galaxy_paths(self.project_dir):
            # processing all found galaxy.yml files
            galaxy_path = Path(gpath)
            if galaxy_path.exists():
                data = yaml_from_file(galaxy_path)
                if isinstance(data, dict) and "dependencies" in data:
                    for name, required_version in data["dependencies"].items():
                        _logger.info(
                            "Provisioning collection %s:%s from galaxy.yml",
                            name,
                            required_version,
                        )
                        self.install_collection(
                            f"{name}{',' if is_url(name) else ':'}{required_version}",
                            destination=destination,
                        )

        if self.cache_dir:
            destination = self.cache_dir / "collections"
        for name, min_version in required_collections.items():
            self.install_collection(
                f"{name}:>={min_version}",
                destination=destination,
            )

        if (self.project_dir / "galaxy.yml").exists():
            if destination:
                # while function can return None, that would not break the logic
                colpath = Path(
                    f"{destination}/ansible_collections/{colpath_from_path(self.project_dir)}",
                )
                if colpath.is_symlink():
                    if os.path.realpath(colpath) == str(Path.cwd()):
                        _logger.warning(
                            "Found symlinked collection, skipping its installation.",
                        )
                        return
                    _logger.warning(
                        "Collection is symlinked, but not pointing to %s directory, so we will remove it.",
                        Path.cwd(),
                    )
                    colpath.unlink()

            # molecule scenario within a collection
            self.install_collection_from_disk(
                galaxy_path.parent,
                destination=destination,
            )
        elif Path.cwd().parent.name == "roles" and Path("../../galaxy.yml").exists():
            # molecule scenario located within roles/<role-name>/molecule inside
            # a collection
            self.install_collection_from_disk(
                Path("../.."),
                destination=destination,
            )
        else:
            # no collection, try to recognize and install a standalone role
            self._install_galaxy_role(
                self.project_dir,
                role_name_check=role_name_check,
                ignore_errors=True,
            )
        # reload collections
        self.load_collections()

    def require_collection(
        self,
        name: str,
        version: str | None = None,
        *,
        install: bool = True,
    ) -> tuple[CollectionVersion, Path]:
        """Check if a minimal collection version is present or exits.

        In the future this method may attempt to install a missing or outdated
        collection before failing.

        :param name: collection name
        :param version: minimal version required
        :param install: if True, attempt to install a missing collection
        :returns: tuple of (found_version, collection_path)
        """
        try:
            ns, coll = name.split(".", 1)
        except ValueError as exc:
            msg = f"Invalid collection name supplied: {name}%s"
            raise InvalidPrerequisiteError(
                msg,
            ) from exc

        paths: list[str] = self.config.collections_paths
        if not paths or not isinstance(paths, list):
            msg = f"Unable to determine ansible collection paths. ({paths})"
            raise InvalidPrerequisiteError(
                msg,
            )

        for path in paths:
            collpath = Path(path) / "ansible_collections" / ns / coll
            if collpath.exists():
                mpath = collpath / "MANIFEST.json"
                if not mpath.exists():
                    msg = f"Found collection at '{collpath}' but missing MANIFEST.json, cannot get info."
                    _logger.fatal(msg)
                    raise InvalidPrerequisiteError(msg)

                with mpath.open(encoding="utf-8") as f:
                    manifest = json.loads(f.read())
                    found_version = CollectionVersion(
                        manifest["collection_info"]["version"],
                    )
                    if version and found_version < CollectionVersion(version):
                        if install:
                            self.install_collection(f"{name}:>={version}")
                            self.require_collection(name, version, install=False)
                        else:
                            msg = f"Found {name} collection {found_version} but {version} or newer is required."
                            _logger.fatal(msg)
                            raise InvalidPrerequisiteError(msg)
                    return found_version, collpath.resolve()
        if install:
            self.install_collection(f"{name}:>={version}" if version else name)
            return self.require_collection(
                name=name,
                version=version,
                install=False,
            )
        msg = f"Collection '{name}' not found in '{paths}'"
        _logger.fatal(msg)
        raise InvalidPrerequisiteError(msg)

    def _prepare_ansible_paths(self) -> None:
        """Configure Ansible environment variables."""
        try:
            library_paths: list[str] = self.config.default_module_path.copy()
            roles_path: list[str] = self.config.default_roles_path.copy()
            collections_path: list[str] = self.config.collections_paths.copy()
        except AttributeError as exc:
            msg = "Unexpected ansible configuration"
            raise RuntimeError(msg) from exc

        alterations_list: list[tuple[list[str], str, bool]] = [
            (library_paths, "plugins/modules", True),
            (roles_path, "roles", True),
        ]

        alterations_list.extend(
            (
                [
                    (roles_path, f"{self.cache_dir}/roles", False),
                    (library_paths, f"{self.cache_dir}/modules", False),
                    (collections_path, f"{self.cache_dir}/collections", False),
                ]
                if self.isolated
                else []
            ),
        )

        for path_list, path_, must_be_present in alterations_list:
            path = Path(path_)
            if not path.exists():
                if must_be_present:
                    continue
                path.mkdir(parents=True, exist_ok=True)
            if str(path) not in path_list:
                path_list.insert(0, str(path))

        if library_paths != self.config.DEFAULT_MODULE_PATH:
            self._update_env("ANSIBLE_LIBRARY", library_paths)
        if collections_path != self.config.default_collections_path:
            self._update_env(ansible_collections_path(), collections_path)
        if roles_path != self.config.default_roles_path:
            self._update_env("ANSIBLE_ROLES_PATH", roles_path)

    def _get_roles_path(self) -> Path:
        """Return roles installation path.

        If `self.isolated` is set to `True`, `self.cache_dir` would be
        created, then it returns the `self.cache_dir/roles`. When `self.isolated` is
        not mentioned or set to `False`, it returns the first path in
        `default_roles_path`.
        """
        if self.cache_dir:
            path = Path(f"{self.cache_dir}/roles")
        else:
            path = Path(self.config.default_roles_path[0]).expanduser()
        return path

    def _install_galaxy_role(
        self,
        project_dir: Path,
        role_name_check: int = 0,
        *,
        ignore_errors: bool = False,
    ) -> None:
        """Detect standalone galaxy role and installs it.

        :param: role_name_check: logic to used to check role name
            0: exit with error if name is not compliant (default)
            1: warn if name is not compliant
            2: bypass any name checking

        :param: ignore_errors: if True, bypass installing invalid roles.

        Our implementation aims to match ansible-galaxy's behaviour for installing
        roles from a tarball or scm. For example ansible-galaxy will install a role
        that has both galaxy.yml and meta/main.yml present but empty. Also missing
        galaxy.yml is accepted but missing meta/main.yml is not.
        """
        yaml = None
        galaxy_info = {}

        for meta_main in META_MAIN:
            meta_filename = Path(project_dir) / meta_main

            if meta_filename.exists():
                break
        else:
            if ignore_errors:
                return

        yaml = yaml_from_file(meta_filename)

        if yaml and "galaxy_info" in yaml:
            galaxy_info = yaml["galaxy_info"]

        fqrn = _get_role_fqrn(galaxy_info, project_dir)

        if role_name_check in [0, 1]:
            if not re.match(r"[a-z0-9][a-z0-9_-]+\.[a-z][a-z0-9_]+$", fqrn):
                msg = MSG_INVALID_FQRL.format(fqrn)
                if role_name_check == 1:
                    _logger.warning(msg)
                else:
                    _logger.error(msg)
                    raise InvalidPrerequisiteError(msg)
        elif "role_name" in galaxy_info:
            # when 'role-name' is in skip_list, we stick to plain role names
            role_namespace = _get_galaxy_role_ns(galaxy_info)
            role_name = _get_galaxy_role_name(galaxy_info)
            fqrn = f"{role_namespace}{role_name}"
        else:
            fqrn = Path(project_dir).absolute().name
        path = self._get_roles_path()
        path.mkdir(parents=True, exist_ok=True)
        link_path = path / fqrn
        # despite documentation stating that is_file() reports true for symlinks,
        # it appears that is_dir() reports true instead, so we rely on exists().
        target = Path(project_dir).absolute()
        if not link_path.exists() or (
            link_path.is_symlink() and link_path.readlink() != target
        ):
            # must call unlink before checking exists because a broken
            # link reports as not existing and we want to repair it
            link_path.unlink(missing_ok=True)
            # https://github.com/python/cpython/issues/73843
            link_path.symlink_to(str(target), target_is_directory=True)
        _logger.info(
            "Using %s symlink to current repository in order to enable Ansible to find the role using its expected full name.",
            link_path,
        )

    def _update_env(self, varname: str, value: list[str], default: str = "") -> None:
        """Update colon based environment variable if needed.

        New values are prepended to make sure they take precedence.
        """
        if not value:
            return
        orig_value = self.environ.get(varname, default)
        if orig_value:
            # we just want to avoid repeating the same entries, but order is important
            value = list(dict.fromkeys([*value, *orig_value.split(":")]))
        value_str = ":".join(value)
        if value_str != self.environ.get(varname, ""):
            self.environ[varname] = value_str
            _logger.info("Set %s=%s", varname, value_str)

version property

version: Version

返回 Ansible 的当前 Version 对象。

如果未提及版本,则返回检测到的当前版本。当提及 version 参数时,它会将版本字符串转换为 Version 对象,以便在比较中使用。

__init__

__init__(
    project_dir: Path | None = None,
    *,
    isolated: bool = False,
    min_required_version: str | None = None,
    require_module: bool = False,
    max_retries: int = 0,
    environ: dict[str, str] | None = None,
    verbosity: int = 0
) -> None

初始化 Ansible 运行时环境。

参数

  • project_dir (Path | None, 默认: None ) –

    包含 Ansible 项目的目录。如果未提及,则将从当前工作目录猜测。

  • isolated (bool, 默认: False ) –

    确保集合或角色的安装不会影响 Ansible 安装,而是使用唯一的缓存目录。

  • min_required_version (str | None, 默认: None ) –

    所需的 Ansible 最低版本。如果未找到,则引发 :class:RuntimeError 异常。

  • require_module (bool, 默认: False ) –

    如果设置,则在缺少 Ansible Python 模块或其版本与 Ansible 命令行不匹配时,实例化将失败。这对于希望也从 Ansible 执行 Python 导入的消费者非常有用。

  • max_retries (int, 默认: 0 ) –

    应重试网络操作的次数。默认为 0,不重试。

  • environ (dict[str, str] | None, 默认: None ) –

    要使用的环境字典,如果未定义,将复制并使用 os.environ

  • verbosity (int, 默认: 0 ) –

    要使用的详细级别。

源代码位于 ansible_compat/runtime.py
def __init__(
    self,
    project_dir: Path | None = None,
    *,
    isolated: bool = False,
    min_required_version: str | None = None,
    require_module: bool = False,
    max_retries: int = 0,
    environ: dict[str, str] | None = None,
    verbosity: int = 0,
) -> None:
    """Initialize Ansible runtime environment.

    :param project_dir: The directory containing the Ansible project. If
                        not mentioned it will be guessed from the current
                        working directory.
    :param isolated: Assure that installation of collections or roles
                     does not affect Ansible installation, an unique cache
                     directory being used instead.
    :param min_required_version: Minimal version of Ansible required. If
                                 not found, a :class:`RuntimeError`
                                 exception is raised.
    :param require_module: If set, instantiation will fail if Ansible
                           Python module is missing or is not matching
                           the same version as the Ansible command line.
                           That is useful for consumers that expect to
                           also perform Python imports from Ansible.
    :param max_retries: Number of times it should retry network operations.
                        Default is 0, no retries.
    :param environ: Environment dictionary to use, if undefined
                    ``os.environ`` will be copied and used.
    :param verbosity: Verbosity level to use.
    """
    self.project_dir = project_dir or Path.cwd()
    self.isolated = isolated
    self.max_retries = max_retries
    self.environ = environ or os.environ.copy()
    self.plugins = Plugins(runtime=self)
    self.verbosity = verbosity

    self.initialize_logger(level=self.verbosity)

    # Reduce noise from paramiko, unless user already defined PYTHONWARNINGS
    # paramiko/transport.py:236: CryptographyDeprecationWarning: Blowfish has been deprecated
    # https://github.com/paramiko/paramiko/issues/2038
    # As CryptographyDeprecationWarning is not a builtin, we cannot use
    # PYTHONWARNINGS to ignore it using category but we can use message.
    # https://stackoverflow.com/q/68251969/99834
    if "PYTHONWARNINGS" not in self.environ:  # pragma: no cover
        self.environ["PYTHONWARNINGS"] = "ignore:Blowfish has been deprecated"

    if isolated:
        self.cache_dir = get_cache_dir(self.project_dir)
    self.config = AnsibleConfig(cache_dir=self.cache_dir)

    # Add the sys.path to the collection paths if not isolated
    self._add_sys_path_to_collection_paths()

    if not self.version_in_range(lower=min_required_version):
        msg = f"Found incompatible version of ansible runtime {self.version}, instead of {min_required_version} or newer."
        raise RuntimeError(msg)
    if require_module:
        self.require_module = True
        self._ensure_module_available()

    # pylint: disable=import-outside-toplevel
    from ansible.utils.display import Display

    # pylint: disable=unused-argument
    def warning(
        self: Display,  # noqa: ARG001
        msg: str,
        *,
        formatted: bool = False,  # noqa: ARG001
    ) -> None:  # pragma: no cover
        """Override ansible.utils.display.Display.warning to avoid printing warnings."""
        warnings.warn(
            message=msg,
            category=AnsibleWarning,
            stacklevel=2,
            source={"msg": msg},
        )

    # Monkey patch ansible warning in order to use warnings module.
    Display.warning = warning

clean

clean() -> None

删除 cache_dir 的内容。

源代码位于 ansible_compat/runtime.py
def clean(self) -> None:
    """Remove content of cache_dir."""
    if self.cache_dir:
        shutil.rmtree(self.cache_dir, ignore_errors=True)

has_playbook

has_playbook(
    playbook: str, *, basedir: Path | None = None
) -> bool

如果 ansible 可以加载给定的 playbook,则返回 true。

这也用于检查集合中的 playbook 是否存在以及它们是否通过语法检查。

源代码位于 ansible_compat/runtime.py
def has_playbook(self, playbook: str, *, basedir: Path | None = None) -> bool:
    """Return true if ansible can load a given playbook.

    This is also used for checking if playbooks from within collections
    are present and if they pass syntax check.
    """
    if (playbook, basedir) in self._has_playbook_cache:
        return self._has_playbook_cache[playbook, basedir]

    proc = self.run(["ansible-playbook", "--syntax-check", playbook], cwd=basedir)
    result = proc.returncode == 0
    if not result:
        if not basedir:
            basedir = Path()
        msg = f"has_playbook returned false for '{basedir / playbook}' due to syntax check returning {proc.returncode}"
        logging.debug(msg)

    # cache the result
    self._has_playbook_cache[playbook, basedir] = result

    return result

initialize_logger

initialize_logger(level: int = 0) -> None

根据详细程度数字设置全局日志级别。

源代码位于 ansible_compat/runtime.py
def initialize_logger(self, level: int = 0) -> None:
    """Set up the global logging level based on the verbosity number."""
    verbosity_map = {
        -2: logging.CRITICAL,
        -1: logging.ERROR,
        0: logging.WARNING,
        1: logging.INFO,
        2: logging.DEBUG,
    }
    # Unknown logging level is treated as DEBUG
    logging_level = verbosity_map.get(level, logging.DEBUG)
    _logger.setLevel(logging_level)
    # Use module-level _logger instance to validate it
    _logger.debug("Logging initialized to level %s", logging_level)

install_collection

install_collection(
    collection: str | Path,
    *,
    destination: Path | None = None,
    force: bool = False
) -> None

安装 Ansible 集合。

可以接受如下参数:'foo.bar:>=1.2.3' 'git+ansible-collections/ansible.posix.git,main'

源代码位于 ansible_compat/runtime.py
def install_collection(
    self,
    collection: str | Path,
    *,
    destination: Path | None = None,
    force: bool = False,
) -> None:
    """Install an Ansible collection.

    Can accept arguments like:
        'foo.bar:>=1.2.3'
        'git+https://github.com/ansible-collections/ansible.posix.git,main'
    """
    cmd = [
        "ansible-galaxy",
        "collection",
        "install",
        "-vvv",  # this is needed to make ansible display important info in case of failures
    ]
    if force:
        cmd.append("--force")

    if isinstance(collection, Path):
        collection = str(collection)
    # As ansible-galaxy install is not able to automatically determine
    # if the range requires a pre-release, we need to manually add the --pre
    # flag when needed.
    matches = version_re.search(collection)

    if (
        not is_url(collection)
        and matches
        and CollectionVersion(matches[1]).is_prerelease
    ):
        cmd.append("--pre")

    cpaths: list[str] = self.config.collections_paths
    if destination and str(destination) not in cpaths:
        # we cannot use '-p' because it breaks galaxy ability to ignore already installed collections, so
        # we hack ansible_collections_path instead and inject our own path there.
        # pylint: disable=no-member
        cpaths.insert(0, str(destination))
    cmd.append(f"{collection}")

    _logger.info("Running from %s : %s", Path.cwd(), " ".join(cmd))
    process = self.run(
        cmd,
        retry=True,
        env={**self.environ, ansible_collections_path(): ":".join(cpaths)},
    )
    if process.returncode != 0:
        msg = f"Command {' '.join(cmd)}, returned {process.returncode} code:\n{process.stdout}\n{process.stderr}"
        _logger.error(msg)
        raise InvalidPrerequisiteError(msg)

install_collection_from_disk

install_collection_from_disk(
    path: Path, destination: Path | None = None
) -> None

从给定的磁盘路径构建并安装集合。

源代码位于 ansible_compat/runtime.py
def install_collection_from_disk(
    self,
    path: Path,
    destination: Path | None = None,
) -> None:
    """Build and install collection from a given disk path."""
    self.install_collection(path, destination=destination, force=True)

install_requirements

install_requirements(
    requirement: Path,
    *,
    retry: bool = False,
    offline: bool = False
) -> None

从 requirements.yml 文件安装依赖项。

参数

  • requirement (Path) –

    requirements.yml 文件的路径

  • retry (bool, 默认值: False) –

    在失败时重试网络操作

  • offline (bool, 默认值: False) –

    绕过安装,如果未满足要求可能会失败。

源代码位于 ansible_compat/runtime.py
def install_requirements(  # noqa: C901
    self,
    requirement: Path,
    *,
    retry: bool = False,
    offline: bool = False,
) -> None:
    """Install dependencies from a requirements.yml.

    :param requirement: path to requirements.yml file
    :param retry: retry network operations on failures
    :param offline: bypass installation, may fail if requirements are not met.
    """
    if not Path(requirement).exists():
        return
    reqs_yaml = yaml_from_file(Path(requirement))
    if not isinstance(reqs_yaml, (dict, list)):
        msg = f"{requirement} file is not a valid Ansible requirements file."
        raise InvalidPrerequisiteError(msg)

    if isinstance(reqs_yaml, dict):
        for key in reqs_yaml:
            if key not in ("roles", "collections"):
                msg = f"{requirement} file is not a valid Ansible requirements file. Only 'roles' and 'collections' keys are allowed at root level. Recognized valid locations are: {', '.join(REQUIREMENT_LOCATIONS)}"
                raise InvalidPrerequisiteError(msg)

    if isinstance(reqs_yaml, list) or "roles" in reqs_yaml:
        cmd = [
            "ansible-galaxy",
            "role",
            "install",
            "-r",
            f"{requirement}",
        ]
        if self.verbosity > 0:
            cmd.extend(["-" + ("v" * self.verbosity)])
        if self.cache_dir:
            cmd.extend(["--roles-path", f"{self.cache_dir}/roles"])

        if offline:
            _logger.warning(
                "Skipped installing old role dependencies due to running in offline mode.",
            )
        else:
            _logger.info("Running %s", " ".join(cmd))

            result = self.run(cmd, retry=retry)
            _logger.debug(result.stdout)
            if result.returncode != 0:
                _logger.error(result.stderr)
                raise AnsibleCommandError(result)

    # Run galaxy collection install works on v2 requirements.yml
    if "collections" in reqs_yaml and reqs_yaml["collections"] is not None:
        cmd = [
            "ansible-galaxy",
            "collection",
            "install",
        ]
        if self.verbosity > 0:
            cmd.extend(["-" + ("v" * self.verbosity)])

        for collection in reqs_yaml["collections"]:
            if isinstance(collection, dict) and collection.get("type", "") == "git":
                _logger.info(
                    "Adding '--pre' to ansible-galaxy collection install because we detected one collection being sourced from git.",
                )
                cmd.append("--pre")
                break
        if offline:
            _logger.warning(
                "Skipped installing collection dependencies due to running in offline mode.",
            )
        else:
            cmd.extend(["-r", str(requirement)])
            _logger.info("Running %s", " ".join(cmd))
            result = self.run(
                cmd,
                retry=retry,
            )
            _logger.debug(result.stdout)
            if result.returncode != 0:
                _logger.error(result.stderr)
                raise AnsibleCommandError(result)
    if self.require_module:
        Runtime.initialized = False
        self._ensure_module_available()

load_collections

load_collections() -> None

加载集合数据。

源代码位于 ansible_compat/runtime.py
def load_collections(self) -> None:
    """Load collection data."""
    self.collections = OrderedDict()
    no_collections_msg = "None of the provided paths were usable"

    # do not use --path because it does not allow multiple values
    proc = self.run(
        [
            "ansible-galaxy",
            "collection",
            "list",
            "--format=json",
        ],
    )
    if proc.returncode == RC_ANSIBLE_OPTIONS_ERROR and (
        no_collections_msg in proc.stdout or no_collections_msg in proc.stderr
    ):  # pragma: no cover
        _logger.debug("Ansible reported no installed collections at all.")
        return
    if proc.returncode != 0:
        _logger.error(proc)
        msg = f"Unable to list collections: {proc}"
        raise RuntimeError(msg)
    try:
        data = json.loads(proc.stdout)
    except json.decoder.JSONDecodeError as exc:
        msg = f"Unable to parse galaxy output as JSON: {proc.stdout}"
        raise RuntimeError(msg) from exc
    if not isinstance(data, dict):
        msg = f"Unexpected collection data, {data}"
        raise TypeError(msg)
    for path in data:
        if not isinstance(data[path], dict):
            msg = f"Unexpected collection data, {data[path]}"
            raise TypeError(msg)
        for collection, collection_info in data[path].items():
            if not isinstance(collection_info, dict):
                msg = f"Unexpected collection data, {collection_info}"
                raise TypeError(msg)

            if collection in self.collections:
                msg = f"Another version of '{collection}' {collection_info['version']} was found installed in {path}, only the first one will be used, {self.collections[collection].version} ({self.collections[collection].path})."
                logging.warning(msg)
            else:
                self.collections[collection] = Collection(
                    name=collection,
                    version=collection_info["version"],
                    path=path,
                )

prepare_environment

prepare_environment(
    required_collections: dict[str, str] | None = None,
    *,
    retry: bool = False,
    install_local: bool = False,
    offline: bool = False,
    role_name_check: int = 0
) -> None

在需要时使依赖项可用。

源代码位于 ansible_compat/runtime.py
def prepare_environment(  # noqa: C901
    self,
    required_collections: dict[str, str] | None = None,
    *,
    retry: bool = False,
    install_local: bool = False,
    offline: bool = False,
    role_name_check: int = 0,
) -> None:
    """Make dependencies available if needed."""
    destination: Path | None = None
    if required_collections is None:
        required_collections = {}

    self._prepare_ansible_paths()
    # first one is standard for collection layout repos and the last two
    # are part of Tower specification
    # https://docs.ansible.org.cn/ansible-tower/latest/html/userguide/projects.html#ansible-galaxy-support
    # https://docs.ansible.org.cn/ansible-tower/latest/html/userguide/projects.html#collections-support
    for req_file in REQUIREMENT_LOCATIONS:
        file_path = Path(req_file)
        if self.project_dir:
            file_path = self.project_dir / req_file
        self.install_requirements(file_path, retry=retry, offline=offline)

    if not install_local:
        return

    for gpath in search_galaxy_paths(self.project_dir):
        # processing all found galaxy.yml files
        galaxy_path = Path(gpath)
        if galaxy_path.exists():
            data = yaml_from_file(galaxy_path)
            if isinstance(data, dict) and "dependencies" in data:
                for name, required_version in data["dependencies"].items():
                    _logger.info(
                        "Provisioning collection %s:%s from galaxy.yml",
                        name,
                        required_version,
                    )
                    self.install_collection(
                        f"{name}{',' if is_url(name) else ':'}{required_version}",
                        destination=destination,
                    )

    if self.cache_dir:
        destination = self.cache_dir / "collections"
    for name, min_version in required_collections.items():
        self.install_collection(
            f"{name}:>={min_version}",
            destination=destination,
        )

    if (self.project_dir / "galaxy.yml").exists():
        if destination:
            # while function can return None, that would not break the logic
            colpath = Path(
                f"{destination}/ansible_collections/{colpath_from_path(self.project_dir)}",
            )
            if colpath.is_symlink():
                if os.path.realpath(colpath) == str(Path.cwd()):
                    _logger.warning(
                        "Found symlinked collection, skipping its installation.",
                    )
                    return
                _logger.warning(
                    "Collection is symlinked, but not pointing to %s directory, so we will remove it.",
                    Path.cwd(),
                )
                colpath.unlink()

        # molecule scenario within a collection
        self.install_collection_from_disk(
            galaxy_path.parent,
            destination=destination,
        )
    elif Path.cwd().parent.name == "roles" and Path("../../galaxy.yml").exists():
        # molecule scenario located within roles/<role-name>/molecule inside
        # a collection
        self.install_collection_from_disk(
            Path("../.."),
            destination=destination,
        )
    else:
        # no collection, try to recognize and install a standalone role
        self._install_galaxy_role(
            self.project_dir,
            role_name_check=role_name_check,
            ignore_errors=True,
        )
    # reload collections
    self.load_collections()

require_collection

require_collection(
    name: str,
    version: str | None = None,
    *,
    install: bool = True
) -> tuple[CollectionVersion, Path]

检查是否存在最低集合版本,如果不存在则退出。

将来,此方法可能会尝试在失败之前安装丢失或过时的集合。

参数

  • name (str) –

    集合名称

  • version (str | None, 默认值: None) –

    要求的最低版本

  • install (bool, 默认值: True) –

    如果为 True,尝试安装丢失的集合

返回

源代码位于 ansible_compat/runtime.py
def require_collection(
    self,
    name: str,
    version: str | None = None,
    *,
    install: bool = True,
) -> tuple[CollectionVersion, Path]:
    """Check if a minimal collection version is present or exits.

    In the future this method may attempt to install a missing or outdated
    collection before failing.

    :param name: collection name
    :param version: minimal version required
    :param install: if True, attempt to install a missing collection
    :returns: tuple of (found_version, collection_path)
    """
    try:
        ns, coll = name.split(".", 1)
    except ValueError as exc:
        msg = f"Invalid collection name supplied: {name}%s"
        raise InvalidPrerequisiteError(
            msg,
        ) from exc

    paths: list[str] = self.config.collections_paths
    if not paths or not isinstance(paths, list):
        msg = f"Unable to determine ansible collection paths. ({paths})"
        raise InvalidPrerequisiteError(
            msg,
        )

    for path in paths:
        collpath = Path(path) / "ansible_collections" / ns / coll
        if collpath.exists():
            mpath = collpath / "MANIFEST.json"
            if not mpath.exists():
                msg = f"Found collection at '{collpath}' but missing MANIFEST.json, cannot get info."
                _logger.fatal(msg)
                raise InvalidPrerequisiteError(msg)

            with mpath.open(encoding="utf-8") as f:
                manifest = json.loads(f.read())
                found_version = CollectionVersion(
                    manifest["collection_info"]["version"],
                )
                if version and found_version < CollectionVersion(version):
                    if install:
                        self.install_collection(f"{name}:>={version}")
                        self.require_collection(name, version, install=False)
                    else:
                        msg = f"Found {name} collection {found_version} but {version} or newer is required."
                        _logger.fatal(msg)
                        raise InvalidPrerequisiteError(msg)
                return found_version, collpath.resolve()
    if install:
        self.install_collection(f"{name}:>={version}" if version else name)
        return self.require_collection(
            name=name,
            version=version,
            install=False,
        )
    msg = f"Collection '{name}' not found in '{paths}'"
    _logger.fatal(msg)
    raise InvalidPrerequisiteError(msg)

run

run(
    args: str | list[str],
    *,
    retry: bool = False,
    tee: bool = False,
    env: dict[str, str] | None = None,
    cwd: Path | None = None,
    set_acp: bool = True
) -> CompletedProcess

在 Ansible 环境中执行命令。

参数

  • retry (bool, 默认值: False) –

    在失败时重试网络操作。

  • tee (bool, 默认值: False) –

    在运行时,同时将捕获的 stdout/stderr 传递给系统。

  • set_acp (bool, 默认值: True) –

    设置 ANSIBLE_COLLECTIONS_PATH

源代码位于 ansible_compat/runtime.py
def run(  # ruff: disable=PLR0913
    self,
    args: str | list[str],
    *,
    retry: bool = False,
    tee: bool = False,
    env: dict[str, str] | None = None,
    cwd: Path | None = None,
    set_acp: bool = True,
) -> CompletedProcess:
    """Execute a command inside an Ansible environment.

    :param retry: Retry network operations on failures.
    :param tee: Also pass captured stdout/stderr to system while running.
    :param set_acp: Set the ANSIBLE_COLLECTIONS_PATH
    """
    if tee:
        run_func: Callable[..., CompletedProcess] = subprocess_tee.run
    else:
        run_func = subprocess.run
    env = self.environ if env is None else env.copy()
    # Presence of ansible debug variable or config option will prevent us
    # from parsing its JSON output due to extra debug messages on stdout.
    env["ANSIBLE_DEBUG"] = "0"

    # https://github.com/ansible/ansible-lint/issues/3522
    env["ANSIBLE_VERBOSE_TO_STDERR"] = "True"

    if set_acp:
        env["ANSIBLE_COLLECTIONS_PATH"] = ":".join(
            list(dict.fromkeys(self.config.collections_paths)),
        )

    for _ in range(self.max_retries + 1 if retry else 1):
        result = run_func(
            args,
            universal_newlines=True,
            check=False,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            env=env,
            cwd=str(cwd) if cwd else None,
        )
        if result.returncode == 0:
            break
        _logger.debug("Environment: %s", env)
        if retry:
            _logger.warning(
                "Retrying execution failure %s of: %s",
                result.returncode,
                " ".join(args),
            )
    return result

version_in_range

version_in_range(
    lower: str | None = None, upper: str | None = None
) -> bool

检查 Ansible 版本是否在要求的范围内。

下限是包含的,上限是不包含的。

源代码位于 ansible_compat/runtime.py
def version_in_range(
    self,
    lower: str | None = None,
    upper: str | None = None,
) -> bool:
    """Check if Ansible version is inside a required range.

    The lower limit is inclusive and the upper one exclusive.
    """
    if lower and self.version < Version(lower):
        return False
    return not (upper and self.version >= Version(upper))

is_url

is_url(name: str) -> bool

如果依赖项名称看起来像 URL,则返回 True。

源代码位于 ansible_compat/runtime.py
def is_url(name: str) -> bool:
    """Return True if a dependency name looks like an URL."""
    return bool(re.match("^git[+@]", name))

search_galaxy_paths

search_galaxy_paths(search_dir: Path) -> list[str]

搜索 galaxy 路径(仅限一层深度)。

源代码位于 ansible_compat/runtime.py
def search_galaxy_paths(search_dir: Path) -> list[str]:
    """Search for galaxy paths (only one level deep)."""
    galaxy_paths: list[str] = []
    for file in [".", *os.listdir(search_dir)]:
        # We ignore any folders that are not valid namespaces, just like
        # ansible galaxy does at this moment.
        if file != "." and not namespace_re.match(file):
            continue
        file_path = search_dir / file / "galaxy.yml"
        if file_path.is_file():
            galaxy_paths.append(str(file_path))
    return galaxy_paths

ansible_compat.schema

用于 JSON Schema 验证的工具。

JsonSchemaError dataclass

用于保存 json schema 验证错误的数据结构。

源代码在 ansible_compat/schema.py
@dataclass(order=True)
class JsonSchemaError:
    # pylint: disable=too-many-instance-attributes
    """Data structure to hold a json schema validation error."""

    # order of attributes below is important for sorting
    schema_path: str
    data_path: str
    json_path: str
    message: str
    expected: bool | int | str
    relative_schema: str
    validator: str
    found: str

    def to_friendly(self) -> str:
        """Provide a friendly explanation of the error.

        :returns: The error message
        """
        return f"In '{self.data_path}': {self.message}."

to_friendly

to_friendly() -> str

提供错误的友好解释。

返回

  • str

    错误消息

源代码在 ansible_compat/schema.py
def to_friendly(self) -> str:
    """Provide a friendly explanation of the error.

    :returns: The error message
    """
    return f"In '{self.data_path}': {self.message}."

json_path

json_path(absolute_path: Sequence[str | int]) -> str

将数据路径展平为点分隔的字符串。

参数

返回

  • str

    点分隔的字符串

源代码在 ansible_compat/schema.py
def json_path(absolute_path: Sequence[str | int]) -> str:
    """Flatten a data path to a dot delimited string.

    :param absolute_path: The path
    :returns: The dot delimited string
    """
    path = "$"
    for elem in absolute_path:
        if isinstance(elem, int):
            path += "[" + str(elem) + "]"
        else:
            path += "." + elem
    return path

to_path

to_path(schema_path: Sequence[str | int]) -> str

将路径展平为点分隔的字符串。

参数

返回

  • str

    点分隔的路径

源代码在 ansible_compat/schema.py
def to_path(schema_path: Sequence[str | int]) -> str:
    """Flatten a path to a dot delimited string.

    :param schema_path: The schema path
    :returns: The dot delimited path
    """
    return ".".join(str(index) for index in schema_path)

validate

validate(schema: JSON, data: JSON) -> list[JsonSchemaError]

根据 JSON schema 验证一些数据。

参数

  • schema (JSON) –

    用于验证的 JSON schema

  • data (JSON) –

    要验证的数据

返回

源代码在 ansible_compat/schema.py
def validate(
    schema: JSON,
    data: JSON,
) -> list[JsonSchemaError]:
    """Validate some data against a JSON schema.

    :param schema: the JSON schema to use for validation
    :param data: The data to validate
    :returns: Any errors encountered
    """
    errors: list[JsonSchemaError] = []

    if isinstance(schema, str):
        schema = json.loads(schema)
    try:
        if not isinstance(schema, Mapping):
            msg = "Invalid schema, must be a mapping"
            raise jsonschema.SchemaError(msg)  # noqa: TRY301
        validator = validator_for(schema)
        validator.check_schema(schema)
    except jsonschema.SchemaError as exc:
        error = JsonSchemaError(
            message=str(exc),
            data_path="schema sanity check",
            json_path="",
            schema_path="",
            relative_schema="",
            expected="",
            validator="",
            found="",
        )
        errors.append(error)
        return errors

    for validation_error in validator(schema).iter_errors(data):
        if isinstance(validation_error, jsonschema.ValidationError):
            error = JsonSchemaError(
                message=validation_error.message,
                data_path=to_path(validation_error.absolute_path),
                json_path=json_path(validation_error.absolute_path),
                schema_path=to_path(validation_error.schema_path),
                relative_schema=str(validation_error.schema),
                expected=str(validation_error.validator_value),
                validator=str(validation_error.validator),
                found=str(validation_error.instance),
            )
            errors.append(error)
    return sorted(errors)