Source code for ocp_resources.virtual_machine

# -*- coding: utf-8 -*-


from ocp_resources.constants import (
    DEFAULT_CLUSTER_RETRY_EXCEPTIONS,
    PROTOCOL_ERROR_EXCEPTION_DICT,
    TIMEOUT_4MINUTES,
)
from ocp_resources.resource import NamespacedResource
from ocp_resources.utils import TimeoutSampler
from ocp_resources.virtual_machine_instance import VirtualMachineInstance


[docs]class VirtualMachine(NamespacedResource): """ Virtual Machine object, inherited from Resource. Implements actions start / stop / status / wait for VM status / is running """ api_group = NamespacedResource.ApiGroup.KUBEVIRT_IO
[docs] class RunStrategy: MANUAL = "Manual" HALTED = "Halted" ALWAYS = "Always" RERUNONFAILURE = "RerunOnFailure"
[docs] class Status(NamespacedResource.Status): MIGRATING = "Migrating" PAUSED = "Paused" PROVISIONING = "Provisioning" STARTING = "Starting" STOPPED = "Stopped" STOPPING = "Stopping" WAITING_FOR_VOLUME_BINDING = "WaitingForVolumeBinding" ERROR_UNSCHEDULABLE = "ErrorUnschedulable" DATAVOLUME_ERROR = "DataVolumeError" ERROR_PVC_NOT_FOUND = "ErrorPvcNotFound" IMAGE_PULL_BACK_OFF = "ImagePullBackOff" ERR_IMAGE_PULL = "ErrImagePull" CRASH_LOOPBACK_OFF = "CrashLoopBackOff"
def __init__( self, name=None, namespace=None, client=None, body=None, teardown=True, privileged_client=None, yaml_file=None, delete_timeout=TIMEOUT_4MINUTES, **kwargs, ): super().__init__( name=name, namespace=namespace, client=client, teardown=teardown, privileged_client=privileged_client, yaml_file=yaml_file, delete_timeout=delete_timeout, **kwargs, ) self.body = body @property def _subresource_api_url(self): return ( f"{self.client.configuration.host}/" f"apis/subresources.kubevirt.io/{self.api.api_version}/" f"namespaces/{self.namespace}/virtualmachines/{self.name}" )
[docs] def api_request(self, method, action, **params): return super().api_request( method=method, action=action, url=self._subresource_api_url, **params )
[docs] def to_dict(self): super().to_dict() if not self.yaml_file: body_spec = self.body.get("spec") if self.body else None self.res["spec"] = body_spec or {"template": {"spec": {}}}
[docs] def start(self, timeout=TIMEOUT_4MINUTES, wait=False): self.api_request(method="PUT", action="start") if wait: return self.wait_for_ready_status(timeout=timeout, status=True)
[docs] def restart(self, timeout=TIMEOUT_4MINUTES, wait=False): self.api_request(method="PUT", action="restart") if wait: self.vmi.virt_launcher_pod.wait_deleted() return self.vmi.wait_until_running(timeout=timeout, stop_status="dummy")
[docs] def stop( self, timeout=TIMEOUT_4MINUTES, vmi_delete_timeout=TIMEOUT_4MINUTES, wait=False ): self.api_request(method="PUT", action="stop") if wait: self.wait_for_ready_status(timeout=timeout, status=None) return self.vmi.wait_deleted(timeout=vmi_delete_timeout)
[docs] def wait_for_ready_status(self, status, timeout=TIMEOUT_4MINUTES, sleep=1): """ Wait for VM resource ready status to be at desire status Args: status (any): True for a running VM, None for a stopped VM. timeout (int): Time to wait for the resource. Raises: TimeoutExpiredError: If timeout reached. """ self.logger.info( f"Wait for {self.kind} {self.name} status to be" f" {'ready' if status == True else status}" ) samples = TimeoutSampler( wait_timeout=timeout, sleep=sleep, exceptions_dict={ **PROTOCOL_ERROR_EXCEPTION_DICT, **DEFAULT_CLUSTER_RETRY_EXCEPTIONS, }, func=self.api.get, field_selector=f"metadata.name=={self.name}", namespace=self.namespace, ) for sample in samples: if sample.items and self.ready == status: # VM with runStrategy does not have spec.running attribute # VM status should be taken from spec.status.ready return
[docs] def get_interfaces(self): return self.instance.spec.template.spec.domain.devices.interfaces
@property def vmi(self): """ Get VMI Returns: VirtualMachineInstance: VMI """ return VirtualMachineInstance( client=self.client, name=self.name, namespace=self.namespace, privileged_client=self.privileged_client or self.client, ) @property def ready(self): """ Get VM status Returns: True if Running else None """ return self.instance.get("status", {}).get("ready") @property def printable_status(self): """ Get VM printableStatus Returns: VM printableStatus if VM.status.printableStatus else None """ return self.instance.get("status", {}).get("printableStatus")
[docs] def wait_for_status_none(self, status, timeout=TIMEOUT_4MINUTES): self.logger.info(f"Wait for {self.kind} {self.name} status {status} to be None") for sample in TimeoutSampler( wait_timeout=timeout, sleep=1, exceptions_dict=PROTOCOL_ERROR_EXCEPTION_DICT, func=lambda: self.instance.get("status", {}).get(status), ): if sample is None: return