import kubernetes
from ocp_resources.constants import PROTOCOL_ERROR_EXCEPTION_DICT, TIMEOUT_4MINUTES
from ocp_resources.resource import NamespacedResource
from ocp_resources.utils import TimeoutSampler
[docs]class DaemonSet(NamespacedResource):
"""
DaemonSet object.
"""
api_group = NamespacedResource.ApiGroup.APPS
[docs] def wait_until_deployed(self, timeout=TIMEOUT_4MINUTES):
"""
Wait until all Pods are deployed and ready.
Args:
timeout (int): Time to wait for the Daemonset.
Raises:
TimeoutExpiredError: If not all the pods are deployed.
"""
self.logger.info(f"Wait for {self.kind} {self.name} to deploy all desired pods")
samples = TimeoutSampler(
wait_timeout=timeout,
sleep=1,
exceptions_dict=PROTOCOL_ERROR_EXCEPTION_DICT,
func=self.api.get,
field_selector=f"metadata.name=={self.name}",
namespace=self.namespace,
)
for sample in samples:
if sample.items:
status = sample.items[0].status
desired_number_scheduled = status.desiredNumberScheduled
number_ready = status.numberReady
if (
desired_number_scheduled > 0
and desired_number_scheduled == number_ready
):
return
[docs] def delete(self, wait=False, timeout=TIMEOUT_4MINUTES, body=None):
"""
Delete Daemonset
Args:
wait (bool): True to wait for Daemonset to be deleted.
timeout (int): Time to wait for resource deletion
body (dict): Content to send for delete()
Returns:
bool: True if delete succeeded, False otherwise.
"""
return super().delete(
wait=wait,
timeout=timeout,
body=kubernetes.client.V1DeleteOptions(propagation_policy="Foreground"),
)