# -*- coding: utf-8 -*-
from ocp_resources.constants import PROTOCOL_ERROR_EXCEPTION_DICT, TIMEOUT_4MINUTES
from ocp_resources.resource import NamespacedResource
from ocp_resources.utils import TimeoutSampler, TimeoutWatch
[docs]class Deployment(NamespacedResource):
"""
OpenShift Deployment object.
"""
api_group = NamespacedResource.ApiGroup.APPS
[docs] def scale_replicas(self, replica_count=int):
"""
Update replicas in deployment.
Args:
replica_count (int): Number of replicas.
Returns:
Deployment is updated successfully
"""
super().to_dict()
self.res.update({"spec": {"replicas": replica_count}})
self.logger.info(f"Set deployment replicas: {replica_count}")
return self.update(resource_dict=self.res)
[docs] def wait_for_replicas(self, deployed=True, timeout=TIMEOUT_4MINUTES):
"""
Wait until all replicas are updated.
Args:
deployed (bool): True for replicas deployed, False for no replicas.
timeout (int): Time to wait for the deployment.
Raises:
TimeoutExpiredError: If not availableReplicas is equal to replicas.
"""
self.logger.info(f"Wait for {self.kind} {self.name} to be deployed: {deployed}")
timeout_watcher = TimeoutWatch(timeout=timeout)
for sample in TimeoutSampler(
wait_timeout=timeout,
sleep=1,
func=lambda: self.exists,
):
if sample:
break
samples = TimeoutSampler(
wait_timeout=timeout_watcher.remaining_time(),
sleep=1,
exceptions_dict=PROTOCOL_ERROR_EXCEPTION_DICT,
func=lambda: self.instance,
)
for sample in samples:
if sample:
status = sample.status
spec_replicas = sample.spec.replicas
total_replicas = status.replicas or 0
updated_replicas = status.updatedReplicas or 0
available_replicas = status.availableReplicas or 0
ready_replicas = status.readyReplicas or 0
if (
(deployed and spec_replicas)
and spec_replicas
== updated_replicas
== available_replicas
== ready_replicas
) or not (deployed or spec_replicas or total_replicas):
return