Source code for ocp_resources.pod

import json

import kubernetes

from ocp_resources.constants import TIMEOUT_4MINUTES
from ocp_resources.node import Node
from ocp_resources.resource import NamespacedResource
from ocp_resources.utils import TimeoutWatch


[docs]class ExecOnPodError(Exception): def __init__(self, command, rc, out, err): self.cmd = command self.rc = rc self.out = out self.err = err def __str__(self): return ( "Command execution failure: " f"{self.cmd}, " f"RC: {self.rc}, " f"OUT: {self.out}, " f"ERR: {self.err}" )
[docs]class Pod(NamespacedResource): """ Pod object, inherited from Resource. """ api_version = NamespacedResource.ApiVersion.V1
[docs] class Status(NamespacedResource.Status): CRASH_LOOPBACK_OFF = "CrashLoopBackOff" IMAGE_PULL_BACK_OFF = "ImagePullBackOff" ERR_IMAGE_PULL = "ErrImagePull"
def __init__( self, name=None, namespace=None, client=None, teardown=True, privileged_client=None, yaml_file=None, delete_timeout=TIMEOUT_4MINUTES, **kwargs, ): super().__init__( name=name, namespace=namespace, client=client, teardown=teardown, privileged_client=privileged_client, yaml_file=yaml_file, delete_timeout=delete_timeout, **kwargs, ) @property def containers(self): """ Get Pod containers Returns: list: List of Pod containers """ return self.instance.spec.containers
[docs] def execute(self, command, timeout=60, container=None, ignore_rc=False): """ Run command on Pod Args: command (list): Command to run. timeout (int): Time to wait for the command. container (str): Container name where to exec the command. ignore_rc (bool): If True ignore error rc from the shell and return out. Returns: str: Command output. Raises: ExecOnPodError: If the command failed. """ error_channel = {} stream_closed_error = "stream resp is closed" self.logger.info(f"Execute {command} on {self.name} ({self.node.name})") resp = kubernetes.stream.stream( api_method=self._kube_v1_api.connect_get_namespaced_pod_exec, name=self.name, namespace=self.namespace, command=command, container=container or self.containers[0].name, stderr=True, stdin=False, stdout=True, tty=False, _preload_content=False, ) timeout_watch = TimeoutWatch(timeout=timeout) while resp.is_open(): resp.run_forever(timeout=2) try: error_channel = json.loads( resp.read_channel(kubernetes.stream.ws_client.ERROR_CHANNEL) ) break except json.decoder.JSONDecodeError: # Check remaining time, in order to throw exception # if remaining time reached zero if timeout_watch.remaining_time() <= 0: raise ExecOnPodError( command=command, rc=-1, out="", err=stream_closed_error ) rcstring = error_channel.get("status") if rcstring is None: raise ExecOnPodError( command=command, rc=-1, out="", err=stream_closed_error ) stdout = resp.read_stdout(timeout=5) stderr = resp.read_stderr(timeout=5) if rcstring == "Success" or ignore_rc: return stdout if rcstring == "Failure": raise ExecOnPodError(command=command, rc=-1, out="", err=error_channel) returncode = [ int(cause["message"]) for cause in error_channel["details"]["causes"] if cause["reason"] == "ExitCode" ][0] raise ExecOnPodError(command=command, rc=returncode, out=stdout, err=stderr)
[docs] def log(self, **kwargs): """ Get Pod logs Returns: str: Pod logs. """ return self._kube_v1_api.read_namespaced_pod_log( name=self.name, namespace=self.namespace, **kwargs )
@property def node(self): """ Get the node name where the Pod is running Returns: Node: Node """ node_name = self.instance.spec.nodeName assert node_name, f"Node not found for pod {self.name}" return Node( client=self.privileged_client or self.client, name=node_name, ) @property def ip(self): return self.instance.status.podIP @property def _kube_v1_api(self): return kubernetes.client.CoreV1Api(api_client=self.client.client)