# -*- coding: utf-8 -*-
from ocp_resources.constants import PROTOCOL_ERROR_EXCEPTION_DICT, TIMEOUT_4MINUTES
from ocp_resources.resource import NamespacedResource
from ocp_resources.utils import TimeoutExpiredError, TimeoutSampler
from ocp_resources.virtual_machine import VirtualMachine
def _map_mappings(mappings):
mappings_list = []
for mapping in mappings:
mapping_dict = {"target": {"name": mapping.target_name}}
if mapping.target_namespace:
mapping_dict["target"]["namespace"] = mapping.target_namespace
if mapping.target_type:
mapping_dict["type"] = mapping.target_type
if mapping.source_id:
mapping_dict.setdefault("source", {})["id"] = mapping.source_id
if mapping.source_name:
mapping_dict.setdefault("source", {})["name"] = mapping.source_name
if mapping.target_access_modes:
mapping_dict["accessMode"] = mapping.target_access_modes
if mapping.target_volume_mode:
mapping_dict["volumeMode"] = mapping.target_volume_mode
mappings_list.append(mapping_dict)
return mappings_list
[docs]class VirtualMachineImport(NamespacedResource):
"""
Virtual Machine Import object, inherited from NamespacedResource.
"""
api_group = NamespacedResource.ApiGroup.V2V_KUBEVIRT_IO
[docs] class Condition(NamespacedResource.Condition):
SUCCEEDED = "Succeeded"
VALID = "Valid"
MAPPING_RULES_VERIFIED = "MappingRulesVerified"
PROCESSING = "Processing"
[docs] class ValidConditionReason:
"""
Valid condition reason object
"""
VALIDATION_COMPLETED = "ValidationCompleted"
SECRET_NOT_FOUND = "SecretNotFound" # pragma: allowlist secret
RESOURCE_MAPPING_NOT_FOUND = "ResourceMappingNotFound"
UNINITIALIZED_PROVIDER = "UninitializedProvider"
SOURCE_VM_NOT_FOUND = "SourceVMNotFound"
INCOMPLETE_MAPPING_RULES = "IncompleteMappingRules"
[docs] class MappingRulesConditionReason:
"""
Mapping rules verified condition reason object
"""
MAPPING_COMPLETED = "MappingRulesVerificationCompleted"
MAPPING_FAILED = "MappingRulesVerificationFailed"
MAPPING_REPORTED_WARNINGS = "MappingRulesVerificationReportedWarnings"
[docs] class ProcessingConditionReason:
"""
Processing condition reason object
"""
CREATING_TARGET_VM = "CreatingTargetVM"
COPYING_DISKS = "CopyingDisks"
COMPLETED = "ProcessingCompleted"
FAILED = "ProcessingFailed"
[docs] class SucceededConditionReason:
"""
Succeeced cond reason object
"""
VALIDATION_FAILED = "ValidationFailed"
VM_CREATION_FAILED = "VMCreationFailed"
DATAVOLUME_CREATION_FAILED = "DataVolumeCreationFailed"
VIRTUAL_MACHINE_READY = "VirtualMachineReady"
VIRTUAL_MACHINE_RUNNING = "VirtualMachineRunning"
VMTEMPLATE_MATCHING_FAILED = "VMTemplateMatchingFailed"
def __init__(
self,
name=None,
namespace=None,
provider_credentials_secret_name=None,
provider_type=None,
provider_credentials_secret_namespace=None,
client=None,
teardown=True,
vm_id=None,
vm_name=None,
cluster_id=None,
cluster_name=None,
target_vm_name=None,
start_vm=False,
provider_mappings=None,
resource_mapping_name=None,
resource_mapping_namespace=None,
warm=False,
finalize_date=None,
privileged_client=None,
yaml_file=None,
delete_timeout=TIMEOUT_4MINUTES,
**kwargs,
):
super().__init__(
name=name,
namespace=namespace,
client=client,
teardown=teardown,
privileged_client=privileged_client,
yaml_file=yaml_file,
delete_timeout=delete_timeout,
**kwargs,
)
self.vm_id = vm_id
self.vm_name = vm_name
self.cluster_id = cluster_id
self.cluster_name = cluster_name
self.target_vm_name = target_vm_name
self.start_vm = start_vm
self.provider_credentials_secret_name = provider_credentials_secret_name
self.provider_credentials_secret_namespace = (
provider_credentials_secret_namespace
)
self.provider_mappings = provider_mappings
self.resource_mapping_name = resource_mapping_name
self.resource_mapping_namespace = resource_mapping_namespace
self.provider_type = provider_type
self.warm = warm
self.finalize_date = finalize_date
@property
def vm(self):
return VirtualMachine(
name=self.target_vm_name,
namespace=self.namespace,
client=self.client,
privileged_client=self.privileged_client or self.client,
)
[docs] def to_dict(self):
super().to_dict()
if not self.yaml_file:
spec = self.res.setdefault("spec", {})
secret = spec.setdefault("providerCredentialsSecret", {})
secret["name"] = self.provider_credentials_secret_name
if self.provider_credentials_secret_namespace:
secret["namespace"] = self.provider_credentials_secret_namespace
if self.resource_mapping_name:
spec.setdefault("resourceMapping", {})[
"name"
] = self.resource_mapping_name
if self.resource_mapping_namespace:
spec.setdefault("resourceMapping", {})[
"namespace"
] = self.resource_mapping_namespace
if self.target_vm_name:
spec["targetVmName"] = self.target_vm_name
if self.start_vm is not None:
spec["startVm"] = self.start_vm
if self.warm:
spec["warm"] = self.warm
if self.finalize_date:
spec["finalizeDate"] = self.finalize_date.strftime(
format="%Y-%m-%dT%H:%M:%SZ"
)
provider_source = spec.setdefault("source", {}).setdefault(
self.provider_type, {}
)
vm = provider_source.setdefault("vm", {})
if self.vm_id:
vm["id"] = self.vm_id
if self.vm_name:
vm["name"] = self.vm_name
if self.cluster_id:
vm.setdefault("cluster", {})["id"] = self.cluster_id
if self.cluster_name:
vm.setdefault("cluster", {})["name"] = self.cluster_name
if self.provider_mappings:
if self.provider_mappings.disk_mappings:
mappings = _map_mappings(
mappings=self.provider_mappings.disk_mappings
)
provider_source.setdefault("mappings", {}).setdefault(
"diskMappings", mappings
)
if self.provider_mappings.network_mappings:
mappings = _map_mappings(
mappings=self.provider_mappings.network_mappings
)
provider_source.setdefault("mappings", {}).setdefault(
"networkMappings", mappings
)
if self.provider_mappings.storage_mappings:
mappings = _map_mappings(
mappings=self.provider_mappings.storage_mappings
)
provider_source.setdefault("mappings", {}).setdefault(
"storageMappings", mappings
)
[docs] def wait(
self,
timeout=600,
cond_reason=SucceededConditionReason.VIRTUAL_MACHINE_READY,
cond_status=Condition.Status.TRUE,
cond_type=Condition.SUCCEEDED,
):
self.logger.info(
f"Wait for {self.kind} {self.name} {cond_reason} condition to be"
f" {cond_status}"
)
samples = TimeoutSampler(
wait_timeout=timeout,
sleep=1,
exceptions_dict=PROTOCOL_ERROR_EXCEPTION_DICT,
func=self.api.get,
field_selector=f"metadata.name=={self.name}",
namespace=self.namespace,
)
last_condition = None
try:
for sample in samples:
if sample.items:
sample_status = sample.items[0].status
if sample_status:
current_conditions = sample_status.conditions
for cond in current_conditions:
last_condition = cond
if (
cond.type == cond_type
and cond.status == cond_status
and cond.reason == cond_reason
):
msg = (
f"Status of {self.kind} {self.name} {cond.type} is "
f"{cond.status} ({cond.reason}: {cond.message})"
)
self.logger.info(msg)
return
except TimeoutExpiredError:
raise TimeoutExpiredError(
f"Last condition of {self.kind} {self.name} {last_condition.type} was"
f" {last_condition.status} ({last_condition.reason}:"
f" {last_condition.message})"
)
[docs]class ResourceMapping(NamespacedResource):
"""
ResourceMapping object.
"""
api_group = NamespacedResource.ApiGroup.V2V_KUBEVIRT_IO
def __init__(
self,
name=None,
namespace=None,
mapping=None,
client=None,
teardown=True,
yaml_file=None,
**kwargs,
):
super().__init__(
name=name,
namespace=namespace,
client=client,
teardown=teardown,
yaml_file=yaml_file,
**kwargs,
)
self.mapping = mapping
[docs] def to_dict(self):
super().to_dict()
if not self.yaml_file:
for provider, mapping in self.mapping.items():
res_provider_section = self.res.setdefault("spec", {}).setdefault(
provider, {}
)
if mapping.network_mappings is not None:
res_provider_section.setdefault(
"networkMappings",
_map_mappings(mappings=mapping.network_mappings),
)
if mapping.storage_mappings is not None:
res_provider_section.setdefault(
"storageMappings",
_map_mappings(mappings=mapping.storage_mappings),
)