from ocp_resources.constants import (
TIMEOUT_1MINUTE,
TIMEOUT_2MINUTES,
TIMEOUT_4MINUTES,
TIMEOUT_10MINUTES,
TIMEOUT_10SEC,
)
from ocp_resources.persistent_volume_claim import PersistentVolumeClaim
from ocp_resources.resource import NamespacedResource, Resource
from ocp_resources.utils import TimeoutExpiredError, TimeoutSampler
[docs]class DataVolume(NamespacedResource):
"""
DataVolume object.
"""
api_group = NamespacedResource.ApiGroup.CDI_KUBEVIRT_IO
[docs] class Status(NamespacedResource.Status):
BLANK = "Blank"
PVC_BOUND = "PVCBound"
IMPORT_SCHEDULED = "ImportScheduled"
ClONE_SCHEDULED = "CloneScheduled"
UPLOAD_SCHEDULED = "UploadScheduled"
IMPORT_IN_PROGRESS = "ImportInProgress"
CLONE_IN_PROGRESS = "CloneInProgress"
UPLOAD_IN_PROGRESS = "UploadInProgress"
SNAPSHOT_FOR_SMART_CLONE_IN_PROGRESS = "SnapshotForSmartCloneInProgress"
SMART_CLONE_PVC_IN_PROGRESS = "SmartClonePVCInProgress"
UPLOAD_READY = "UploadReady"
UNKNOWN = "Unknown"
WAIT_FOR_FIRST_CONSUMER = "WaitForFirstConsumer"
PENDING_POPULATION = "PendingPopulation"
[docs] class AccessMode:
"""
AccessMode object.
"""
RWO = "ReadWriteOnce"
ROX = "ReadOnlyMany"
RWX = "ReadWriteMany"
[docs] class ContentType:
"""
ContentType object
"""
KUBEVIRT = "kubevirt"
ARCHIVE = "archive"
[docs] class VolumeMode:
"""
VolumeMode object
"""
BLOCK = "Block"
FILE = "Filesystem"
[docs] class Condition:
[docs] class Type:
READY = "Ready"
BOUND = "Bound"
RUNNING = "Running"
[docs] class Status(Resource.Condition.Status):
UNKNOWN = "Unknown"
def __init__(
self,
name=None,
namespace=None,
source=None,
size=None,
storage_class=None,
url=None,
content_type=ContentType.KUBEVIRT,
access_modes=None,
cert_configmap=None,
secret=None,
client=None,
volume_mode=None,
hostpath_node=None,
source_pvc=None,
source_namespace=None,
multus_annotation=None,
bind_immediate_annotation=None,
preallocation=None,
teardown=True,
privileged_client=None,
yaml_file=None,
delete_timeout=TIMEOUT_4MINUTES,
api_name="pvc",
delete_after_completion=None,
**kwargs,
):
"""
DataVolume object
Args:
name (str): DataVolume name.
namespace (str): DataVolume namespace.
source (str): source of DV - upload/http/pvc/registry.
size (str): DataVolume size - format size+size unit, for example: "5Gi".
storage_class (str, default: None): storage class name for DataVolume.
url (str, default: None): url for importing DV, when source is http/registry.
content_type (str, default: "kubevirt"): DataVolume content type.
access_modes (str, default: None): DataVolume access mode.
cert_configmap (str, default: None): name of config map for TLS certificates.
secret (Secret, default: None): to be set as secretRef.
client (DynamicClient): DynamicClient to use.
volume_mode (str, default: None): DataVolume volume mode.
hostpath_node (str, default: None): Node name to provision the DV on.
source_pvc (str, default: None): PVC name for when cloning the DV.
source_namespace (str, default: None): PVC namespace for when cloning the DV.
multus_annotation (str, default: None): network nad name.
bind_immediate_annotation (bool, default: None): when WaitForFirstConsumer is set in StorageClass and DV
should be bound immediately.
preallocation (bool, default: None): preallocate disk space.
teardown (bool, default: True): Indicates if this resource would need to be deleted.
privileged_client (DynamicClient, default: None): Instance of Dynamic client
yaml_file (yaml, default: None): yaml file for the resource.
delete_timeout (int, default: 4 minutes): timeout associated with delete action.
api_name (str, default: "pvc"): api used for DV, pvc/storage
delete_after_completion (str, default: None): annotation for garbage collector - "true"/"false"
"""
super().__init__(
name=name,
namespace=namespace,
client=client,
teardown=teardown,
privileged_client=privileged_client,
yaml_file=yaml_file,
delete_timeout=delete_timeout,
**kwargs,
)
self.source = source
self.url = url
self.cert_configmap = cert_configmap
self.secret = secret
self.content_type = content_type
self.size = size
self.access_modes = access_modes
self.storage_class = storage_class
self.volume_mode = volume_mode
self.hostpath_node = hostpath_node
self.source_pvc = source_pvc
self.source_namespace = source_namespace
self.multus_annotation = multus_annotation
self.bind_immediate_annotation = bind_immediate_annotation
self.preallocation = preallocation
self.api_name = api_name
self.delete_after_completion = delete_after_completion
[docs] def to_dict(self):
super().to_dict()
if not self.yaml_file:
self.res.update(
{
"spec": {
"source": {self.source: {"url": self.url}},
self.api_name: {
"resources": {"requests": {"storage": self.size}},
},
}
}
)
if self.access_modes:
self.res["spec"][self.api_name]["accessModes"] = [self.access_modes]
if self.content_type:
self.res["spec"]["contentType"] = self.content_type
if self.storage_class:
self.res["spec"][self.api_name]["storageClassName"] = self.storage_class
if self.secret:
self.res["spec"]["source"][self.source]["secretRef"] = self.secret.name
if self.volume_mode:
self.res["spec"][self.api_name]["volumeMode"] = self.volume_mode
if self.source == "http" or "registry":
self.res["spec"]["source"][self.source]["url"] = self.url
if self.cert_configmap:
self.res["spec"]["source"][self.source][
"certConfigMap"
] = self.cert_configmap
if self.source == "upload" or self.source == "blank":
self.res["spec"]["source"][self.source] = {}
if self.hostpath_node:
self.res["metadata"].setdefault("annotations", {}).update(
{
f"{NamespacedResource.ApiGroup.KUBEVIRT_IO}/provisionOnNode": (
self.hostpath_node
)
}
)
if self.multus_annotation:
self.res["metadata"].setdefault("annotations", {}).update(
{
f"{NamespacedResource.ApiGroup.K8S_V1_CNI_CNCF_IO}/networks": (
self.multus_annotation
)
}
)
if self.bind_immediate_annotation:
self.res["metadata"].setdefault("annotations", {}).update(
{f"{self.api_group}/storage.bind.immediate.requested": "true"}
)
if self.source == "pvc":
self.res["spec"]["source"]["pvc"] = {
"name": self.source_pvc or "dv-source",
"namespace": self.source_namespace or self.namespace,
}
if self.preallocation is not None:
self.res["spec"]["preallocation"] = self.preallocation
if self.delete_after_completion:
self.res["metadata"].setdefault("annotations", {}).update(
{
f"{self.api_group}/storage.deleteAfterCompletion": (
self.delete_after_completion
)
}
)
[docs] def wait_deleted(self, timeout=TIMEOUT_4MINUTES):
"""
Wait until DataVolume and the PVC created by it are deleted
Args:
timeout (int): Time to wait for the DataVolume and PVC to be deleted.
Returns:
bool: True if DataVolume and its PVC are gone, False if timeout reached.
"""
super().wait_deleted(timeout=timeout)
return self.pvc.wait_deleted(timeout=timeout)
[docs] def wait(self, timeout=TIMEOUT_10MINUTES, failure_timeout=TIMEOUT_2MINUTES):
self._check_none_pending_status(failure_timeout=failure_timeout)
# If DV's status is not Pending, continue with the flow
self.wait_for_status(status=self.Status.SUCCEEDED, timeout=timeout)
self.pvc.wait_for_status(
status=PersistentVolumeClaim.Status.BOUND, timeout=timeout
)
@property
def pvc(self):
return PersistentVolumeClaim(
client=self.privileged_client or self.client,
name=self.name,
namespace=self.namespace,
)
@property
def scratch_pvc(self):
scratch_pvc_prefix = (
self.pvc.prime_pvc.name if self.pvc.use_populator else self.name
)
return PersistentVolumeClaim(
name=f"{scratch_pvc_prefix}-scratch",
namespace=self.namespace,
client=self.client,
)
def _check_none_pending_status(self, failure_timeout=TIMEOUT_2MINUTES):
# Avoid waiting for "Succeeded" status if DV's in Pending/None status
sample = None
# if garbage collector is enabled, DV will be deleted after success
try:
for sample in TimeoutSampler(
wait_timeout=failure_timeout,
sleep=TIMEOUT_10SEC,
func=lambda: self.exists,
):
# If DV status is Pending (or Status is not yet updated) continue to wait, else exit the wait loop
if sample and (
not sample.status
or sample.status.phase
in [
self.Status.PENDING,
None,
]
):
continue
break
except TimeoutExpiredError:
self.logger.error(f"{self.name} status is {sample}")
raise
[docs] def wait_for_dv_success(
self,
timeout=TIMEOUT_10MINUTES,
failure_timeout=TIMEOUT_2MINUTES,
stop_status_func=None,
*stop_status_func_args,
**stop_status_func_kwargs,
):
"""
Wait until DataVolume succeeded with or without DV Garbage Collection enabled
Args:
timeout (int): Time to wait for the DataVolume to succeed.
failure_timeout (int): Time to wait for the DataVolume to have not Pending/None status
stop_status_func (function): function that is called inside the TimeoutSampler
if it returns True - stop the Sampler and raise TimeoutExpiredError
Example:
def dv_is_not_progressing(dv):
return True if dv.instance.status.conditions.restartCount > 3 else False
def test_dv():
...
stop_status_func_kwargs = {"dv": dv}
dv.wait_for_dv_success(stop_status_func=dv_is_not_progressing, **stop_status_func_kwargs)
Returns:
bool: True if DataVolume succeeded.
"""
self.logger.info(f"Wait DV success for {timeout} seconds")
self._check_none_pending_status(failure_timeout=failure_timeout)
sample = None
status_of_dv_str = (
f"Status of {self.kind} '{self.name}' in namespace '{self.namespace}':\n"
)
try:
for sample in TimeoutSampler(
sleep=1,
wait_timeout=timeout,
func=lambda: self.exists,
):
# DV reach to success if the status is succeeded or if the DV does not exist
if sample is None or sample.status.phase == self.Status.SUCCEEDED:
break
elif stop_status_func and stop_status_func(
*stop_status_func_args, **stop_status_func_kwargs
):
raise TimeoutExpiredError(
value=(
"Exited on the stop_status_func"
f" {stop_status_func.__name__}."
f" {status_of_dv_str} {sample.status}"
)
)
except TimeoutExpiredError:
self.logger.error(f"{status_of_dv_str} {sample.status}")
raise
# For CSI storage, PVC gets Bound after DV succeeded
return self.pvc.wait_for_status(
status=PersistentVolumeClaim.Status.BOUND, timeout=TIMEOUT_1MINUTE
)
[docs] def delete(self, wait=False, timeout=TIMEOUT_4MINUTES, body=None):
"""
Delete DataVolume
Args:
wait (bool): True to wait for DataVolume and PVC to be deleted.
timeout (int): Time to wait for resources deletion
body (dict): Content to send for delete()
Returns:
bool: True if delete succeeded, False otherwise.
"""
# if garbage collector is enabled, DV will be deleted after success
if self.exists:
return super().delete(wait=wait, timeout=timeout, body=body)
else:
return self.pvc.delete(wait=wait, timeout=timeout, body=body)