Skip to content

Runner

runner

ANTA runner function.

adjust_rlimit_nofile

adjust_rlimit_nofile() -> tuple[int, int]

Adjust the maximum number of open file descriptors for the ANTA process.

The limit is set to the lower of the current hard limit and the value of the ANTA_NOFILE environment variable.

If the ANTA_NOFILE environment variable is not set or is invalid, DEFAULT_NOFILE is used.

Returns:

Type Description
tuple[int, int]: The new soft and hard limits for open file descriptors.
Source code in anta/runner.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def adjust_rlimit_nofile() -> tuple[int, int]:
    """Adjust the maximum number of open file descriptors for the ANTA process.

    The limit is set to the lower of the current hard limit and the value of the ANTA_NOFILE environment variable.

    If the `ANTA_NOFILE` environment variable is not set or is invalid, `DEFAULT_NOFILE` is used.

    Returns
    -------
        tuple[int, int]: The new soft and hard limits for open file descriptors.
    """
    try:
        nofile = int(os.environ.get("ANTA_NOFILE", DEFAULT_NOFILE))
    except ValueError as exception:
        logger.warning("The ANTA_NOFILE environment variable value is invalid: %s\nDefault to %s.", exc_to_str(exception), DEFAULT_NOFILE)
        nofile = DEFAULT_NOFILE

    limits = resource.getrlimit(resource.RLIMIT_NOFILE)
    logger.debug("Initial limit numbers for open file descriptors for the current ANTA process: Soft Limit: %s | Hard Limit: %s", limits[0], limits[1])
    nofile = nofile if limits[1] > nofile else limits[1]
    logger.debug("Setting soft limit for open file descriptors for the current ANTA process to %s", nofile)
    resource.setrlimit(resource.RLIMIT_NOFILE, (nofile, limits[1]))
    return resource.getrlimit(resource.RLIMIT_NOFILE)

get_coroutines

get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]]) -> list[Coroutine[Any, Any, TestResult]]

Get the coroutines for the ANTA run.

Args:
selected_tests: A mapping of devices to the tests to run. The selected tests are generated by the `prepare_tests` function.

Returns:

Type Description
The list of coroutines to run.
Source code in anta/runner.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def get_coroutines(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]]) -> list[Coroutine[Any, Any, TestResult]]:
    """Get the coroutines for the ANTA run.

    Args:
    ----
        selected_tests: A mapping of devices to the tests to run. The selected tests are generated by the `prepare_tests` function.

    Returns
    -------
        The list of coroutines to run.
    """
    coros = []
    for device, test_definitions in selected_tests.items():
        for test in test_definitions:
            try:
                test_instance = test.test(device=device, inputs=test.inputs)
                coros.append(test_instance.test())
            except Exception as e:  # noqa: PERF203, pylint: disable=broad-exception-caught
                # An AntaTest instance is potentially user-defined code.
                # We need to catch everything and exit gracefully with an error message.
                message = "\n".join(
                    [
                        f"There is an error when creating test {test.test.module}.{test.test.__name__}.",
                        f"If this is not a custom test implementation: {GITHUB_SUGGESTION}",
                    ],
                )
                anta_log_exception(e, message, logger)
    return coros

log_cache_statistics

log_cache_statistics(devices: list[AntaDevice]) -> None

Log cache statistics for each device in the inventory.

Args:
devices: List of devices in the inventory.
Source code in anta/runner.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def log_cache_statistics(devices: list[AntaDevice]) -> None:
    """Log cache statistics for each device in the inventory.

    Args:
    ----
        devices: List of devices in the inventory.
    """
    for device in devices:
        if device.cache_statistics is not None:
            msg = (
                f"Cache statistics for '{device.name}': "
                f"{device.cache_statistics['cache_hits']} hits / {device.cache_statistics['total_commands_sent']} "
                f"command(s) ({device.cache_statistics['cache_hit_ratio']})"
            )
            logger.info(msg)
        else:
            logger.info("Caching is not enabled on %s", device.name)

main async

main(manager: ResultManager, inventory: AntaInventory, catalog: AntaCatalog, devices: set[str] | None = None, tests: set[str] | None = None, tags: set[str] | None = None, *, established_only: bool = True, dry_run: bool = False) -> None

Run ANTA.

Use this as an entrypoint to the test framework in your script. ResultManager object gets updated with the test results.

Args:
manager: ResultManager object to populate with the test results.
inventory: AntaInventory object that includes the device(s).
catalog: AntaCatalog object that includes the list of tests.
devices: Devices on which to run tests. None means all devices. These may come from the `--device / -d` CLI option in NRFU.
tests: Tests to run against devices. None means all tests. These may come from the `--test / -t` CLI option in NRFU.
tags: Tags to filter devices from the inventory. These may come from the `--tags` CLI option in NRFU.
established_only: Include only established device(s).
dry_run: Build the list of coroutine to run and stop before test execution.
Source code in anta/runner.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
@cprofile()
async def main(  # noqa: PLR0913
    manager: ResultManager,
    inventory: AntaInventory,
    catalog: AntaCatalog,
    devices: set[str] | None = None,
    tests: set[str] | None = None,
    tags: set[str] | None = None,
    *,
    established_only: bool = True,
    dry_run: bool = False,
) -> None:
    # pylint: disable=too-many-arguments
    """Run ANTA.

    Use this as an entrypoint to the test framework in your script.
    ResultManager object gets updated with the test results.

    Args:
    ----
        manager: ResultManager object to populate with the test results.
        inventory: AntaInventory object that includes the device(s).
        catalog: AntaCatalog object that includes the list of tests.
        devices: Devices on which to run tests. None means all devices. These may come from the `--device / -d` CLI option in NRFU.
        tests: Tests to run against devices. None means all tests. These may come from the `--test / -t` CLI option in NRFU.
        tags: Tags to filter devices from the inventory. These may come from the `--tags` CLI option in NRFU.
        established_only: Include only established device(s).
        dry_run: Build the list of coroutine to run and stop before test execution.
    """
    # Adjust the maximum number of open file descriptors for the ANTA process
    limits = adjust_rlimit_nofile()

    if not catalog.tests:
        logger.info("The list of tests is empty, exiting")
        return

    with Catchtime(logger=logger, message="Preparing ANTA NRFU Run"):
        # Setup the inventory
        selected_inventory = inventory if dry_run else await setup_inventory(inventory, tags, devices, established_only=established_only)
        if selected_inventory is None:
            return

        with Catchtime(logger=logger, message="Preparing the tests"):
            selected_tests = prepare_tests(selected_inventory, catalog, tests, tags)
            if selected_tests is None:
                return

        run_info = (
            "--- ANTA NRFU Run Information ---\n"
            f"Number of devices: {len(inventory)} ({len(selected_inventory)} established)\n"
            f"Total number of selected tests: {catalog.final_tests_count}\n"
            f"Maximum number of open file descriptors for the current ANTA process: {limits[0]}\n"
            "---------------------------------"
        )

        logger.info(run_info)

        if catalog.final_tests_count > limits[0]:
            logger.warning(
                "The number of concurrent tests is higher than the open file descriptors limit for this ANTA process.\n"
                "Errors may occur while running the tests.\n"
                "Please consult the ANTA FAQ."
            )

        coroutines = get_coroutines(selected_tests)

    if dry_run:
        logger.info("Dry-run mode, exiting before running the tests.")
        for coro in coroutines:
            coro.close()
        return

    if AntaTest.progress is not None:
        AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=len(coroutines))

    with Catchtime(logger=logger, message="Running ANTA tests"):
        test_results = await asyncio.gather(*coroutines)
        for r in test_results:
            manager.add(r)

    log_cache_statistics(selected_inventory.devices)

prepare_tests

prepare_tests(inventory: AntaInventory, catalog: AntaCatalog, tests: set[str] | None, tags: set[str] | None) -> defaultdict[AntaDevice, set[AntaTestDefinition]] | None

Prepare the tests to run.

Args:
inventory: AntaInventory object that includes the device(s).
catalog: AntaCatalog object that includes the list of tests.
tests: Tests to run against devices. None means all tests.
tags: Tags to filter devices from the inventory.

Returns:

Type Description
A mapping of devices to the tests to run or None if there are no tests to run.
Source code in anta/runner.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def prepare_tests(
    inventory: AntaInventory, catalog: AntaCatalog, tests: set[str] | None, tags: set[str] | None
) -> defaultdict[AntaDevice, set[AntaTestDefinition]] | None:
    """Prepare the tests to run.

    Args:
    ----
        inventory: AntaInventory object that includes the device(s).
        catalog: AntaCatalog object that includes the list of tests.
        tests: Tests to run against devices. None means all tests.
        tags: Tags to filter devices from the inventory.

    Returns
    -------
        A mapping of devices to the tests to run or None if there are no tests to run.
    """
    # Build indexes for the catalog. If `tests` is set, filter the indexes based on these tests
    catalog.build_indexes(filtered_tests=tests)

    # Using a set to avoid inserting duplicate tests
    device_to_tests: defaultdict[AntaDevice, set[AntaTestDefinition]] = defaultdict(set)

    # Create AntaTestRunner tuples from the tags
    for device in inventory.devices:
        if tags:
            # If there are CLI tags, only execute tests with matching tags
            device_to_tests[device].update(catalog.get_tests_by_tags(tags))
        else:
            # If there is no CLI tags, execute all tests that do not have any tags
            device_to_tests[device].update(catalog.tag_to_tests[None])

            # Then add the tests with matching tags from device tags
            device_to_tests[device].update(catalog.get_tests_by_tags(device.tags))

        catalog.final_tests_count += len(device_to_tests[device])

    if catalog.final_tests_count == 0:
        msg = (
            f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs."
        )
        logger.warning(msg)
        return None

    return device_to_tests

setup_inventory async

setup_inventory(inventory: AntaInventory, tags: set[str] | None, devices: set[str] | None, *, established_only: bool) -> AntaInventory | None

Set up the inventory for the ANTA run.

Args:
inventory: AntaInventory object that includes the device(s).
tags: Tags to filter devices from the inventory.
devices: Devices on which to run tests. None means all devices.

Returns:

Type Description
AntaInventory | None: The filtered inventory or None if there are no devices to run tests on.
Source code in anta/runner.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devices: set[str] | None, *, established_only: bool) -> AntaInventory | None:
    """Set up the inventory for the ANTA run.

    Args:
    ----
        inventory: AntaInventory object that includes the device(s).
        tags: Tags to filter devices from the inventory.
        devices: Devices on which to run tests. None means all devices.

    Returns
    -------
        AntaInventory | None: The filtered inventory or None if there are no devices to run tests on.
    """
    if len(inventory) == 0:
        logger.info("The inventory is empty, exiting")
        return None

    # Filter the inventory based on the CLI provided tags and devices if any
    selected_inventory = inventory.get_inventory(tags=tags, devices=devices) if tags or devices else inventory

    with Catchtime(logger=logger, message="Connecting to devices"):
        # Connect to the devices
        await selected_inventory.connect_inventory()

    # Remove devices that are unreachable
    selected_inventory = selected_inventory.get_inventory(established_only=established_only)

    # If there are no devices in the inventory after filtering, exit
    if not selected_inventory.devices:
        msg = f'No reachable device {f"matching the tags {tags} " if tags else ""}was found.{f" Selected devices: {devices} " if devices is not None else ""}'
        logger.warning(msg)
        return None

    return selected_inventory