# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import copy
import functools
from typing import Any, Dict, List, Optional, Set
from lisa import schema
from lisa.parameter_parser.runbook import RunbookBuilder
from lisa.util import InitializableMixin, LisaException, constants, subclasses
from lisa.util.logger import get_logger
from lisa.variable import VariableEntry, merge_variables
_get_init_logger = functools.partial(get_logger, "init", "transformer")
def _sort(transformers: List[schema.Transformer]) -> List[schema.Transformer]:
visited: Set[str] = set()
all: Dict[str, schema.Transformer] = {}
sorted_transformers: List[schema.Transformer] = []
# construct full set and check duplicates
for transformer in transformers:
if transformer.name in all:
raise LisaException(
f"found duplicate transformers: '{transformer.name}', "
f"use different names for them."
)
all[transformer.name] = transformer
# build new sorted results
for transformer in transformers:
if transformer.name not in visited:
_sort_dfs(all, transformer, visited, sorted_transformers)
# sort by phase: init, expanded.
init_transformers: List[schema.Transformer] = []
expanded_transformers: List[schema.Transformer] = []
cleanup_transformers: List[schema.Transformer] = []
for transformer in sorted_transformers:
if transformer.phase == constants.TRANSFORMER_PHASE_INIT:
init_transformers.append(transformer)
elif transformer.phase == constants.TRANSFORMER_PHASE_EXPANDED:
expanded_transformers.append(transformer)
elif transformer.phase == constants.TRANSFORMER_PHASE_CLEANUP:
cleanup_transformers.append(transformer)
else:
raise LisaException(f"unknown transformer phase: {transformer.phase}")
sorted_transformers = (
init_transformers + expanded_transformers + cleanup_transformers
)
# check cycle reference
referenced: Set[str] = set()
for transformer in sorted_transformers:
for item in transformer.depends_on:
if item not in referenced:
raise LisaException(
f"found cycle dependent transformers: "
f"'{transformer.name}' and '{item}'"
)
referenced.add(transformer.name)
return sorted_transformers
def _sort_dfs(
transformers: Dict[str, schema.Transformer],
transformer: schema.Transformer,
visited: Set[str],
sorted_transformers: List[schema.Transformer],
) -> None:
visited.add(transformer.name)
for item in transformer.depends_on:
if item not in visited:
dependent = transformers.get(item, None)
if not dependent:
raise LisaException(
f"transformer '{transformer.name}' "
f"depends on non-existing transformer "
f"'{item}'"
)
_sort_dfs(transformers, dependent, visited, sorted_transformers)
sorted_transformers.append(transformer)
def _load_transformers(
runbook_builder: RunbookBuilder,
variables: Optional[Dict[str, VariableEntry]] = None,
) -> Dict[str, schema.Transformer]:
transformers_data = runbook_builder.partial_resolve(
partial_name=constants.TRANSFORMER, variables=variables
)
transformers = schema.load_by_type_many(schema.Transformer, transformers_data)
return {x.name: x for x in transformers}
def _run_transformers(
runbook_builder: RunbookBuilder,
phase: str = constants.TRANSFORMER_PHASE_INIT,
) -> Dict[str, VariableEntry]:
# resolve variables
transformers_dict = _load_transformers(runbook_builder=runbook_builder)
transformers_runbook = [x for x in transformers_dict.values()]
# resort the runbooks, and it's used in real run
transformers_runbook = _sort(transformers_runbook)
copied_variables: Dict[str, VariableEntry] = dict()
for value in runbook_builder.variables.values():
copied_variables[value.name] = value.copy()
factory = subclasses.Factory[Transformer](Transformer)
for runbook in transformers_runbook:
# load the original runbook to solve variables again.
raw_transformers = _load_transformers(
runbook_builder=runbook_builder, variables=copied_variables
)
runbook = raw_transformers[runbook.name]
# if phase is empty, pick up all of them.
if not runbook.enabled or (phase and runbook.phase != phase):
continue
derived_builder = runbook_builder.derive(copied_variables)
transformer = factory.create_by_runbook(
runbook=runbook, runbook_builder=derived_builder
)
transformer.initialize()
values = transformer.run()
merge_variables(copied_variables, values)
return copied_variables
def run(
runbook_builder: RunbookBuilder, phase: str = constants.TRANSFORMER_PHASE_INIT
) -> None:
log = _get_init_logger()
root_runbook_data = runbook_builder.raw_data
if constants.TRANSFORMER not in root_runbook_data:
log.debug("no transformer found, skipped")
return
# real run
log.debug(f"detecting or running transformers of phase '{phase}'...")
output_variables = _run_transformers(runbook_builder, phase=phase)
merge_variables(runbook_builder.variables, output_variables)