Source code for lisa.platform_

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

from __future__ import annotations

from dataclasses import dataclass
from enum import Enum
from functools import partial
from typing import Any, Dict, List, Type, cast

from lisa import schema
from lisa.environment import Environment, EnvironmentStatus
from lisa.feature import Feature, Features
from lisa.messages import MessageBase
from lisa.node import Node, RemoteNode
from lisa.parameter_parser.runbook import RunbookBuilder
from lisa.util import (
from lisa.util.logger import Logger, get_logger
from lisa.util.perf_timer import create_timer

_get_init_logger = partial(get_logger, "init", "platform")

PlatformStatus = Enum(

class PlatformMessage(MessageBase):
    type: str = "Platform"
    name: str = ""
    status: PlatformStatus = PlatformStatus.INITIALIZED

[docs] class Platform(subclasses.BaseClassWithRunbookMixin, InitializableMixin): def __init__(self, runbook: schema.Platform) -> None: super().__init__(runbook) self._log = get_logger("", self.type_name()) plugin_manager.register(self)
[docs] @classmethod def type_schema(cls) -> Type[schema.TypedSchema]: return schema.Platform
[docs] @classmethod def supported_features(cls) -> List[Type[Feature]]: """ Indicates which feature classes should be used to instance a feature. For example, StartStop needs platform implementation, and LISA doesn't know which type uses to start/stop for Azure. So Azure platform needs to return a type like azure.StartStop. The azure.StartStop use same feature string as lisa.features.StartStop. When test cases reference a feature by string, it can be instanced to azure.StartStop. """ raise NotImplementedError()
[docs] def _initialize(self, *args: Any, **kwargs: Any) -> None: """ platform specified initialization """ pass
[docs] def _prepare_environment(self, environment: Environment, log: Logger) -> bool: """ Steps to prepare an environment. 1. check if platform can meet requirement of this environment. 2. if #1 is yes, specified platform context, so that the environment can be created in deploy phase with same spec as prepared. 3. set cost for environment priority. return True, if environment can be deployed. False, if cannot. """ raise NotImplementedError()
[docs] def _deploy_environment(self, environment: Environment, log: Logger) -> None: raise NotImplementedError()
[docs] def _delete_environment(self, environment: Environment, log: Logger) -> None: raise NotImplementedError()
[docs] def _get_environment_information(self, environment: Environment) -> Dict[str, str]: return {}
[docs] def _get_node_information(self, node: Node) -> Dict[str, str]: return {}
[docs] def _cleanup(self) -> None: """ Called when the platform is being discarded. Perform any platform level cleanup work here. """ pass
[docs] @hookimpl def get_environment_information(self, environment: Environment) -> Dict[str, str]: assert environment.platform if environment.platform != self: # when multiple platforms are created by multiple runners, it should # call for right platform only. return {} information: Dict[str, str] = {} information["platform"] = environment.platform.type_name() try: information.update( self._get_environment_information(environment=environment) ) except Exception as identifier: self._log.exception( "failed to get environment information on platform", exc_info=identifier ) return information
[docs] @hookimpl def get_node_information(self, node: Node) -> Dict[str, str]: information: Dict[str, str] = {} try: information.update(self._get_node_information(node=node)) except Exception as identifier: self._log.exception( "failed to get node information on platform", exc_info=identifier ) return information
[docs] def prepare_environment(self, environment: Environment) -> Environment: """ return prioritized environments. user defined environment is higher priority than test cases, and then lower cost is prior to higher. """ log = get_logger(f"prepare[{}]", parent=self._log) # check and fill connection information for RemoteNode. So that the # RemoteNodes can share the same connection information with created # nodes. platform_runbook = cast(schema.Platform, self.runbook) for node in environment.nodes.list(): if isinstance(node, RemoteNode): node.set_connection_info_by_runbook( default_username=platform_runbook.admin_username, default_password=platform_runbook.admin_password, default_private_key_file=platform_runbook.admin_private_key_file, ) try: is_success = self._prepare_environment(environment, log) except NotMeetRequirementException as identifier: raise SkippedException(identifier) if is_success: environment.status = EnvironmentStatus.Prepared else: raise LisaException( f"no capability found for environment: {environment.runbook}" ) return environment
[docs] def deploy_environment(self, environment: Environment) -> None: platform_runbook = cast(schema.Platform, self.runbook) log = get_logger(f"deploy[{}]", parent=self._log)"deploying environment: {}") timer = create_timer() environment.platform = self self._deploy_environment(environment, log) environment.status = EnvironmentStatus.Deployed # initialize features # features may need platform, so create it in platform for node in environment.nodes.list(): node.features = Features(node, self) node.capture_azure_information = platform_runbook.capture_azure_information node.capture_boot_time = platform_runbook.capture_boot_time node.capture_kernel_config = ( platform_runbook.capture_kernel_config_information ) if platform_runbook.guest_enabled: self._initialize_guest_nodes(node)"deployed in {timer}")
[docs] def delete_environment(self, environment: Environment) -> None: log = get_logger(f"del[{}]", parent=self._log) # mark environment is deleted firstly, if there is any error on # deleting, it should be ignored. environment.status = EnvironmentStatus.Deleted environment.cleanup() if self.runbook.keep_environment == constants.ENVIRONMENT_KEEP_ALWAYS: f"skipped to delete environment {}, " f"as runbook set to keep environment." ) # output addresses for troubleshooting easier. remote_addresses = [ x.connection_info[constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS] for x in environment.nodes.list() if isinstance(x, RemoteNode) and hasattr(x, "_connection_info") ] # if the connection info is not found, there is no ip address to # output. if remote_addresses:"node ip addresses: {remote_addresses}") else: log.debug("deleting") self._delete_environment(environment, log)
[docs] def cleanup(self) -> None: self._cleanup()
[docs] def _initialize_guest_nodes(self, node: Node) -> None: platform_runbook = cast(schema.Platform, self.runbook) if not platform_runbook.guests: raise LisaException( "guests must be defined in the platform runbook, " "when guest_enabled is True." ) for guest_runbook in platform_runbook.guests: guest_runbook = cast(schema.GuestNode, guest_runbook) # follow parent capability, so it can pass requirements validations. guest_runbook.capability = node.capability guest_node = node.create( index=len(node.guests), runbook=guest_runbook, logger_name="g", parent=node, ) node.guests.append(guest_node)
def load_platform(platforms_runbook: List[schema.Platform]) -> Platform: log = _get_init_logger() # we may extend it later to support multiple platforms platform_count = len(platforms_runbook) if platform_count != 1: raise LisaException("There must be 1 and only 1 platform") factory = subclasses.Factory[Platform](Platform) default_platform: Platform = factory.create_by_runbook(runbook=platforms_runbook[0]) log.debug(f"activated platform '{default_platform.type_name()}'") return default_platform def load_platform_from_builder(runbook_builder: RunbookBuilder) -> Platform: platform_runbook_data = runbook_builder.partial_resolve(constants.PLATFORM) platform_runbook = schema.load_by_type_many(schema.Platform, platform_runbook_data) platform = load_platform(platform_runbook) return platform