Source code for ocp_resources.cdi_config

# -*- coding: utf-8 -*-

from ocp_resources.constants import PROTOCOL_ERROR_EXCEPTION_DICT, TIMEOUT_4MINUTES
from ocp_resources.resource import Resource
from ocp_resources.utils import TimeoutSampler


[docs]class CDIConfig(Resource): """ CDIConfig object. """ api_group = Resource.ApiGroup.CDI_KUBEVIRT_IO @property def scratch_space_storage_class_from_spec(self): return self.instance.spec.scratchSpaceStorageClass @property def scratch_space_storage_class_from_status(self): return self.instance.status.scratchSpaceStorageClass @property def upload_proxy_url(self): return self.instance.status.uploadProxyURL
[docs] def wait_until_upload_url_changed(self, uploadproxy_url, timeout=TIMEOUT_4MINUTES): """ Wait until upload proxy url is changed Args: timeout (int): Time to wait for CDI Config. Returns: bool: True if url is equal to uploadProxyURL. """ self.logger.info( f"Wait for {self.kind} {self.name} to ensure current URL == uploadProxyURL" ) samples = TimeoutSampler( wait_timeout=timeout, sleep=1, exceptions_dict=PROTOCOL_ERROR_EXCEPTION_DICT, func=self.api.get, field_selector=f"metadata.name=={self.name}", ) for sample in samples: if sample.items: status = sample.items[0].status current_url = status.uploadProxyURL if current_url == uploadproxy_url: return