Source code for ocp_resources.resource

import contextlib
import copy
import json
import os
import re
import sys
from io import StringIO
from signal import SIGINT, signal

import kubernetes
import yaml
from benedict import benedict
from kubernetes.dynamic import DynamicClient
from kubernetes.dynamic.exceptions import (
    ConflictError,
    MethodNotAllowedError,
    NotFoundError,
)
from kubernetes.dynamic.resource import ResourceField
from packaging.version import Version
from simple_logger.logger import get_logger

from ocp_resources.constants import (
    DEFAULT_CLUSTER_RETRY_EXCEPTIONS,
    NOT_FOUND_ERROR_EXCEPTION_DICT,
    PROTOCOL_ERROR_EXCEPTION_DICT,
    TIMEOUT_1MINUTE,
    TIMEOUT_4MINUTES,
)
from ocp_resources.event import Event
from ocp_resources.utils import (
    TimeoutExpiredError,
    TimeoutSampler,
    TimeoutWatch,
    skip_existing_resource_creation_teardown,
)

LOGGER = get_logger(__name__)
MAX_SUPPORTED_API_VERSION = "v2"


def _find_supported_resource(dyn_client, api_group, kind):
    results = dyn_client.resources.search(group=api_group, kind=kind)
    sorted_results = sorted(
        results, key=lambda result: KubeAPIVersion(result.api_version), reverse=True
    )
    for result in sorted_results:
        if KubeAPIVersion(result.api_version) <= KubeAPIVersion(
            MAX_SUPPORTED_API_VERSION
        ):
            return result


def _get_api_version(dyn_client, api_group, kind):
    # Returns api_group/api_version
    res = _find_supported_resource(
        dyn_client=dyn_client, api_group=api_group, kind=kind
    )
    if not res:
        log = f"Couldn't find {kind} in {api_group} api group"
        LOGGER.warning(log)
        raise NotImplementedError(log)

    LOGGER.info(f"kind: {kind} api version: {res.group_version}")
    return res.group_version


[docs]def get_client(config_file=None, config_dict=None, context=None): """ Get a kubernetes client. Pass either config_file or config_dict. If none of them are passed, client will be created from default OS kubeconfig (environment variable or .kube folder). Args: config_file (str): path to a kubeconfig file. config_dict (dict): dict with kubeconfig configuration. context (str): name of the context to use. Returns: DynamicClient: a kubernetes client. """ if config_dict: return DynamicClient( client=kubernetes.config.new_client_from_config_dict( config_dict=config_dict, context=context, ) ) return DynamicClient( client=kubernetes.config.new_client_from_config( config_file=config_file, context=context, ) )
[docs]def sub_resource_level(current_class, owner_class, parent_class): # return the name of the last class in MRO list that is not one of base # classes; otherwise return None for class_iterator in reversed( [ class_iterator for class_iterator in current_class.mro() if class_iterator not in owner_class.mro() and issubclass(class_iterator, parent_class) ] ): return class_iterator.__name__
[docs]class KubeAPIVersion(Version): """ Implement the Kubernetes API versioning scheme from https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-versioning """ component_re = re.compile(r"(\d+ | [a-z]+)", re.VERBOSE) def __init__(self, vstring=None): self.vstring = vstring self.version = None super().__init__(version=vstring)
[docs] def parse(self, vstring): components = [comp for comp in self.component_re.split(vstring) if comp] for idx, obj in enumerate(components): with contextlib.suppress(ValueError): components[idx] = int(obj) errmsg = ( f"version '{vstring}' does not conform to kubernetes api versioning" " guidelines" ) if ( len(components) not in (2, 4) or components[0] != "v" or not isinstance(components[1], int) ): raise ValueError(errmsg) if len(components) == 4 and ( components[2] not in ("alpha", "beta") or not isinstance(components[3], int) ): raise ValueError(errmsg) self.version = components
def __str__(self): return self.vstring def __repr__(self): return "KubeAPIVersion ('{0}')".format(str(self)) def _cmp(self, other): if isinstance(other, str): other = KubeAPIVersion(vstring=other) myver = self.version otherver = other.version for ver in myver, otherver: if len(ver) == 2: ver.extend(["zeta", 9999]) if myver == otherver: return 0 if myver < otherver: return -1 if myver > otherver: return 1
[docs]class ClassProperty: def __init__(self, func): self.func = func def __get__(self, obj, owner): return self.func(owner)
[docs]class ValueMismatch(Exception): """ Raises when value doesn't match the class value """
[docs]class Resource: """ Base class for API resources """ api_group = None api_version = None singular_name = None timeout_seconds = TIMEOUT_1MINUTE
[docs] class Status: SUCCEEDED = "Succeeded" FAILED = "Failed" DELETING = "Deleting" DEPLOYED = "Deployed" PENDING = "Pending" COMPLETED = "Completed" RUNNING = "Running" READY = "Ready" TERMINATING = "Terminating" ERROR = "Error"
[docs] class Condition: UPGRADEABLE = "Upgradeable" AVAILABLE = "Available" DEGRADED = "Degraded" PROGRESSING = "Progressing" CREATED = "Created" RECONCILE_COMPLETE = "ReconcileComplete" READY = "Ready" FAILING = "Failing"
[docs] class Status: TRUE = "True" FALSE = "False" UNKNOWN = "Unknown"
[docs] class Phase: INSTALL_READY = "InstallReady" SUCCEEDED = "Succeeded"
[docs] class Reason: ALL_REQUIREMENTS_MET = "AllRequirementsMet" INSTALL_SUCCEEDED = "InstallSucceeded"
[docs] class Interface:
[docs] class State: UP = "up" DOWN = "down" ABSENT = "absent"
[docs] class ApiGroup: ADMISSIONREGISTRATION_K8S_IO = "admissionregistration.k8s.io" APIEXTENSIONS_K8S_IO = "apiextensions.k8s.io" APIREGISTRATION_K8S_IO = "apiregistration.k8s.io" APP_KUBERNETES_IO = "app.kubernetes.io" APPS = "apps" BATCH = "batch" CDI_KUBEVIRT_IO = "cdi.kubevirt.io" CLONE_KUBEVIRT_IO = "clone.kubevirt.io" CLUSTER_OPEN_CLUSTER_MANAGEMENT_IO = "cluster.open-cluster-management.io" CONFIG_OPENSHIFT_IO = "config.openshift.io" CONSOLE_OPENSHIFT_IO = "console.openshift.io" COORDINATION_K8S_IO = "coordination.k8s.io" DATA_IMPORT_CRON_TEMPLATE_KUBEVIRT_IO = "dataimportcrontemplate.kubevirt.io" DISCOVERY_K8S_IO = "discovery.k8s.io" EVENTS_K8S_IO = "events.k8s.io" EXPORT_KUBEVIRT_IO = "export.kubevirt.io" FORKLIFT_KONVEYOR_IO = "forklift.konveyor.io" INSTANCETYPE_KUBEVIRT_IO = "instancetype.kubevirt.io" HCO_KUBEVIRT_IO = "hco.kubevirt.io" HOSTPATHPROVISIONER_KUBEVIRT_IO = "hostpathprovisioner.kubevirt.io" IMAGE_OPENSHIFT_IO = "image.openshift.io" IMAGE_REGISTRY = "registry.redhat.io" INTEGREATLY_ORG = "integreatly.org" K8S_CNI_CNCF_IO = "k8s.cni.cncf.io" K8S_V1_CNI_CNCF_IO = "k8s.v1.cni.cncf.io" KUBERNETES_IO = "kubernetes.io" KUBEVIRT_IO = "kubevirt.io" KUBEVIRT_KUBEVIRT_IO = "kubevirt.kubevirt.io" LITMUS_IO = "litmuschaos.io" MACHINE_OPENSHIFT_IO = "machine.openshift.io" MACHINECONFIGURATION_OPENSHIFT_IO = "machineconfiguration.openshift.io" MAISTRA_IO = "maistra.io" METALLB_IO = "metallb.io" METRICS_K8S_IO = "metrics.k8s.io" MIGRATIONS_KUBEVIRT_IO = "migrations.kubevirt.io" MONITORING_COREOS_COM = "monitoring.coreos.com" NETWORKADDONSOPERATOR_NETWORK_KUBEVIRT_IO = ( "networkaddonsoperator.network.kubevirt.io" ) NETWORKING_ISTIO_IO = "networking.istio.io" NETWORKING_K8S_IO = "networking.k8s.io" NODE_LABELLER_KUBEVIRT_IO = "node-labeller.kubevirt.io" NMSTATE_IO = "nmstate.io" NODEMAINTENANCE_KUBEVIRT_IO = "nodemaintenance.kubevirt.io" OBSERVABILITY_OPEN_CLUSTER_MANAGEMENT_IO = ( "observability.open-cluster-management.io" ) OCS_OPENSHIFT_IO = "ocs.openshift.io" OPERATOR_OPEN_CLUSTER_MANAGEMENT_IO = "operator.open-cluster-management.io" OPERATOR_OPENSHIFT_IO = "operator.openshift.io" OPERATORS_COREOS_COM = "operators.coreos.com" OPERATORS_OPENSHIFT_IO = "operators.openshift.io" OS_TEMPLATE_KUBEVIRT_IO = "os.template.kubevirt.io" PACKAGES_OPERATORS_COREOS_COM = "packages.operators.coreos.com" POLICY = "policy" POOL_KUBEVIRT_IO = "pool.kubevirt.io" PROJECT_OPENSHIFT_IO = "project.openshift.io" RBAC_AUTHORIZATION_K8S_IO = "rbac.authorization.k8s.io" REMEDIATION_MEDIK8S_IO = "remediation.medik8s.io" RIPSAW_CLOUDBULLDOZER_IO = "ripsaw.cloudbulldozer.io" ROUTE_OPENSHIFT_IO = "route.openshift.io" SCHEDULING_K8S_IO = "scheduling.k8s.io" SECURITY_ISTIO_IO = "security.istio.io" SECURITY_OPENSHIFT_IO = "security.openshift.io" SNAPSHOT_STORAGE_K8S_IO = "snapshot.storage.k8s.io" SNAPSHOT_KUBEVIRT_IO = "snapshot.kubevirt.io" SRIOVNETWORK_OPENSHIFT_IO = "sriovnetwork.openshift.io" SSP_KUBEVIRT_IO = "ssp.kubevirt.io" STORAGE_K8S_IO = "storage.k8s.io" STORAGECLASS_KUBERNETES_IO = "storageclass.kubernetes.io" SUBRESOURCES_KUBEVIRT_IO = "subresources.kubevirt.io" TEKTONTASKS_KUBEVIRT_IO = "tektontasks.kubevirt.io" TEKTON_DEV = "tekton.dev" TEMPLATE_KUBEVIRT_IO = "template.kubevirt.io" TEMPLATE_OPENSHIFT_IO = "template.openshift.io" UPLOAD_CDI_KUBEVIRT_IO = "upload.cdi.kubevirt.io" V2V_KUBEVIRT_IO = "v2v.kubevirt.io" VELERO_IO = "velero.io" VM_KUBEVIRT_IO = "vm.kubevirt.io"
[docs] class ApiVersion: V1 = "v1" V1BETA1 = "v1beta1" V1ALPHA1 = "v1alpha1" V1ALPHA3 = "v1alpha3"
def __init__( self, name=None, client=None, teardown=True, timeout=TIMEOUT_4MINUTES, privileged_client=None, yaml_file=None, delete_timeout=TIMEOUT_4MINUTES, dry_run=None, node_selector=None, node_selector_labels=None, config_file=None, context=None, label=None, timeout_seconds=TIMEOUT_1MINUTE, api_group=None, hash_log_data=True, ): """ Create an API resource Args: name (str): Resource name client (DynamicClient): Dynamic client for connecting to a remote cluster teardown (bool): Indicates if this resource would need to be deleted privileged_client (DynamicClient): Instance of Dynamic client yaml_file (str): yaml file for the resource delete_timeout (int): timeout associated with delete action dry_run (bool): dry run node_selector (str): node selector node_selector_labels (str): node selector labels config_file (str): Path to config file for connecting to remote cluster. context (str): Context name for connecting to remote cluster. timeout_seconds (int): timeout for a get api call, call out be terminated after this many seconds label (dict): Resource labels api_group (str): Resource API group; will overwrite API group definition in resource class hash_log_data (bool): Hash resource content based on resource keys_to_hash property (example: Secret resource) """ self.api_group = api_group or self.api_group if not self.api_group and not self.api_version: raise NotImplementedError( "Subclasses of Resource require self.api_group or self.api_version to" " be defined" ) self.namespace = None self.name = name self.client = client self.privileged_client = privileged_client self.yaml_file = yaml_file self.resource_dict = None # Filled in case yaml_file is not None self.config_file = config_file self.context = context self.label = label if not (self.name or self.yaml_file): raise ValueError("name or yaml file is required") self.teardown = teardown self.timeout = timeout self.delete_timeout = delete_timeout self.dry_run = dry_run self.node_selector = node_selector self.node_selector_labels = node_selector_labels self.node_selector_spec = self._prepare_node_selector_spec() self.res = None self.yaml_file_contents = None self.initial_resource_version = None self.logger = self._set_logger() self.timeout_seconds = timeout_seconds self.hash_log_data = hash_log_data # self._set_client_and_api_version() must be last init line self._set_client_and_api_version() def _set_logger(self): log_level = os.environ.get("OPENSHIFT_PYTHON_WRAPPER_LOG_LEVEL", "INFO") log_file = os.environ.get("OPENSHIFT_PYTHON_WRAPPER_LOG_FILE", "") return get_logger( name=f"{__name__.rsplit('.')[0]} {self.kind}", level=log_level, filename=log_file, ) def _prepare_node_selector_spec(self): if self.node_selector: return {f"{self.ApiGroup.KUBERNETES_IO}/hostname": self.node_selector} if self.node_selector_labels: return self.node_selector_labels @ClassProperty def kind(cls): # noqa: N805 return sub_resource_level(cls, NamespacedResource, Resource) def _base_body(self): """ Generate resource dict from yaml if self.yaml_file else return base resource dict. Returns: dict: Resource dict. """ if self.yaml_file: if not self.yaml_file_contents: if isinstance(self.yaml_file, StringIO): self.yaml_file_contents = self.yaml_file.read() else: with open(self.yaml_file, "r") as stream: self.yaml_file_contents = stream.read() self.res = yaml.safe_load(stream=self.yaml_file_contents) self.res.get("metadata", {}).pop("resourceVersion", None) self.name = self.res["metadata"]["name"] else: self.res = { "apiVersion": self.api_version, "kind": self.kind, "metadata": {"name": self.name}, } if self.label: self.res.setdefault("metadata", {}).setdefault("labels", {}).update( self.label )
[docs] def to_dict(self): """ Generate intended dict representation of the resource. """ self._base_body()
def __enter__(self): signal(SIGINT, self._sigint_handler) return self.deploy() def __exit__(self, exception_type, exception_value, traceback): if self.teardown: self.clean_up() def _sigint_handler(self, signal_received, frame): self.__exit__(exception_type=None, exception_value=None, traceback=None) sys.exit(signal_received)
[docs] def deploy(self, wait=False): """ For debug, export REUSE_IF_RESOURCE_EXISTS to skip resource create. Spaces are important in the export dict Examples: To skip creation of all resources by kind: export REUSE_IF_RESOURCE_EXISTS="{Pod: {}}" To skip creation of resource by name (on all namespaces or non-namespaced resources): export REUSE_IF_RESOURCE_EXISTS="{Pod: {<pod-name>:}}" To skip creation of resource by name and namespace: export REUSE_IF_RESOURCE_EXISTS="{Pod: {<pod-name>: <pod-namespace>}}" To skip creation of multiple resources: export REUSE_IF_RESOURCE_EXISTS="{Namespace: {<namespace-name>:}, Pod: {<pod-name>: <pod-namespace>}}" """ _resource = None _export_str = "REUSE_IF_RESOURCE_EXISTS" skip_resource_kind_create_if_exists = os.environ.get(_export_str) if skip_resource_kind_create_if_exists: _resource = skip_existing_resource_creation_teardown( resource=self, export_str=_export_str, user_exported_args=skip_resource_kind_create_if_exists, ) if _resource: return _resource self.create(wait=wait) return self
[docs] def clean_up(self): """ For debug, export SKIP_RESOURCE_TEARDOWN to skip resource teardown. Spaces are important in the export dict Examples: To skip teardown of all resources by kind: export SKIP_RESOURCE_TEARDOWN="{Pod: {}}" To skip teardown of resource by name (on all namespaces): export SKIP_RESOURCE_TEARDOWN="{Pod: {<pod-name>:}}" To skip teardown of resource by name and namespace: export SKIP_RESOURCE_TEARDOWN="{Pod: {<pod-name>: <pod-namespace>}}" To skip teardown of multiple resources: export SKIP_RESOURCE_TEARDOWN="{Namespace: {<namespace-name>:}, Pod: {<pod-name>: <pod-namespace>}}" """ _export_str = "SKIP_RESOURCE_TEARDOWN" skip_resource_teardown = os.environ.get(_export_str) if skip_resource_teardown and skip_existing_resource_creation_teardown( resource=self, export_str=_export_str, user_exported_args=skip_resource_teardown, check_exists=False, ): self.logger.warning( f"Skip resource {self.kind} {self.name} teardown. Got" f" {_export_str}={skip_resource_teardown}" ) return self.delete(wait=True, timeout=self.delete_timeout)
@classmethod def _prepare_resources(cls, dyn_client, singular_name, *args, **kwargs): if not cls.api_version: cls.api_version = _get_api_version( dyn_client=dyn_client, api_group=cls.api_group, kind=cls.kind ) get_kwargs = {"singular_name": singular_name} if singular_name else {} return dyn_client.resources.get( kind=cls.kind, api_version=cls.api_version, **get_kwargs, ).get(*args, **kwargs, timeout_seconds=cls.timeout_seconds) def _prepare_singular_name_kwargs(self, **kwargs): kwargs = kwargs if kwargs else {} if self.singular_name: kwargs["singular_name"] = self.singular_name return kwargs def _set_client_and_api_version(self): if not self.client: self.client = get_client(config_file=self.config_file, context=self.context) if not self.api_version: self.api_version = _get_api_version( dyn_client=self.client, api_group=self.api_group, kind=self.kind )
[docs] def full_api(self, **kwargs): """ Get resource API Keyword Args: pretty _continue include_uninitialized field_selector label_selector limit resource_version timeout_seconds watch async_req Returns: Resource: Resource object. """ self._set_client_and_api_version() kwargs = self._prepare_singular_name_kwargs(**kwargs) return self.client.resources.get( api_version=self.api_version, kind=self.kind, **kwargs )
@property def api(self): return self.full_api()
[docs] def wait(self, timeout=TIMEOUT_4MINUTES, sleep=1): """ Wait for resource Args: timeout (int): Time to wait for the resource. sleep (int): Time to wait between retries Raises: TimeoutExpiredError: If resource not exists. """ self.logger.info(f"Wait until {self.kind} {self.name} is created") samples = TimeoutSampler( wait_timeout=timeout, sleep=sleep, exceptions_dict={ **PROTOCOL_ERROR_EXCEPTION_DICT, **NOT_FOUND_ERROR_EXCEPTION_DICT, **DEFAULT_CLUSTER_RETRY_EXCEPTIONS, }, func=lambda: self.exists, ) for sample in samples: if sample: return
[docs] def wait_deleted(self, timeout=TIMEOUT_4MINUTES): """ Wait until resource is deleted Args: timeout (int): Time to wait for the resource. Raises: TimeoutExpiredError: If resource still exists. """ self.logger.info(f"Wait until {self.kind} {self.name} is deleted") return self.client_wait_deleted(timeout=timeout)
@property def exists(self): """ Whether self exists on the server """ try: return self.instance except TimeoutExpiredError: return None
[docs] def client_wait_deleted(self, timeout): """ client-side Wait until resource is deleted Args: timeout (int): Time to wait for the resource. Raises: TimeoutExpiredError: If resource still exists. """ samples = TimeoutSampler( wait_timeout=timeout, sleep=1, func=lambda: self.exists ) for sample in samples: if not sample: return
[docs] def wait_for_status( self, status, timeout=TIMEOUT_4MINUTES, stop_status=None, sleep=1 ): """ Wait for resource to be in status Args: status (str): Expected status. timeout (int): Time to wait for the resource. stop_status (str): Status which should stop the wait and failed. Raises: TimeoutExpiredError: If resource in not in desire status. """ stop_status = stop_status if stop_status else self.Status.FAILED self.logger.info(f"Wait for {self.kind} {self.name} status to be {status}") samples = TimeoutSampler( wait_timeout=timeout, sleep=sleep, exceptions_dict={ **PROTOCOL_ERROR_EXCEPTION_DICT, **DEFAULT_CLUSTER_RETRY_EXCEPTIONS, }, func=self.api.get, field_selector=f"metadata.name=={self.name}", namespace=self.namespace, ) current_status = None last_logged_status = None try: for sample in samples: if sample.items: sample_status = sample.items[0].status if sample_status: current_status = sample_status.phase if current_status != last_logged_status: last_logged_status = current_status self.logger.info( f"Status of {self.kind} {self.name} is {current_status}" ) if current_status == status: return if current_status == stop_status: raise TimeoutExpiredError( f"Status of {self.kind} {self.name} is {current_status}" ) except TimeoutExpiredError: if current_status: self.logger.error( f"Status of {self.kind} {self.name} is {current_status}" ) raise
[docs] def create(self, wait=False): """ Create resource. Args: wait (bool) : True to wait for resource status. Returns: bool: True if create succeeded, False otherwise. Raises: ValueMismatch: When body value doesn't match class value """ if not self.res: self.to_dict() hashed_res = self.hash_resource_dict(resource_dict=self.res) self.logger.info(f"Create {self.kind} {self.name}") self.logger.info(f"Posting {hashed_res}") self.logger.debug(f"\n{yaml.dump(hashed_res)}") resource_ = self.api.create( body=self.res, namespace=self.namespace, dry_run=self.dry_run ) with contextlib.suppress(TimeoutExpiredError): # some resources do not support get() (no instance) or the client do not have permissions self.initial_resource_version = self.instance.metadata.resourceVersion if wait and resource_: return self.wait() return resource_
[docs] def delete(self, wait=False, timeout=TIMEOUT_4MINUTES, body=None): self.logger.info(f"Delete {self.kind} {self.name}") if self.exists: hashed_data = self.hash_resource_dict(resource_dict=self.instance.to_dict()) self.logger.info(f"Deleting {hashed_data}") self.logger.debug(f"\n{yaml.dump(hashed_data)}") try: res = self.api.delete(name=self.name, namespace=self.namespace, body=body) except NotFoundError: return False if wait and res: return self.wait_deleted(timeout=timeout) return res
@property def status(self): """ Get resource status Status: Running, Scheduling, Pending, Unknown, CrashLoopBackOff Returns: str: Status """ self.logger.info(f"Get {self.kind} {self.name} status") return self.instance.status.phase
[docs] def update(self, resource_dict): """ Update resource with resource dict Args: resource_dict: Resource dictionary """ hashed_resource_dict = self.hash_resource_dict(resource_dict=resource_dict) self.logger.info(f"Update {self.kind} {self.name}:\n{hashed_resource_dict}") self.logger.debug(f"\n{yaml.dump(hashed_resource_dict)}") self.api.patch( body=resource_dict, namespace=self.namespace, content_type="application/merge-patch+json", )
[docs] def update_replace(self, resource_dict): """ Replace resource metadata. Use this to remove existing field. (update() will only update existing fields) """ hashed_resource_dict = self.hash_resource_dict(resource_dict=resource_dict) self.logger.info(f"Replace {self.kind} {self.name}: \n{hashed_resource_dict}") self.logger.debug(f"\n{yaml.dump(hashed_resource_dict)}") self.api.replace(body=resource_dict, name=self.name, namespace=self.namespace)
[docs] @staticmethod def retry_cluster_exceptions( func, exceptions_dict=DEFAULT_CLUSTER_RETRY_EXCEPTIONS, **kwargs ): sampler = TimeoutSampler( wait_timeout=10, sleep=1, func=func, print_log=False, exceptions_dict=exceptions_dict, **kwargs, ) for sample in sampler: return sample
[docs] @classmethod def get( cls, dyn_client=None, config_file=None, context=None, singular_name=None, exceptions_dict=DEFAULT_CLUSTER_RETRY_EXCEPTIONS, *args, **kwargs, ): """ Get resources Args: dyn_client (DynamicClient): Open connection to remote cluster. config_file (str): Path to config file for connecting to remote cluster. context (str): Context name for connecting to remote cluster. singular_name (str): Resource kind (in lowercase), in use where we have multiple matches for resource. exceptions_dict (dict): Exceptions dict for TimeoutSampler Returns: generator: Generator of Resources of cls.kind. """ if not dyn_client: dyn_client = get_client(config_file=config_file, context=context) def _get(): _resources = cls._prepare_resources( dyn_client=dyn_client, singular_name=singular_name, *args, **kwargs ) try: for resource_field in _resources.items: yield cls(client=dyn_client, name=resource_field.metadata.name) except TypeError: yield cls(client=dyn_client, name=_resources.metadata.name) return Resource.retry_cluster_exceptions( func=_get, exceptions_dict=exceptions_dict )
@property def instance(self): """ Get resource instance Returns: openshift.dynamic.client.ResourceInstance """ def _instance(): return self.api.get(name=self.name) return self.retry_cluster_exceptions(func=_instance) @property def labels(self): """ Method to get labels for this resource Returns: openshift.dynamic.resource.ResourceField: Representation of labels """ return self.instance["metadata"]["labels"]
[docs] def watcher(self, timeout, resource_version=None): """ Get resource for a given timeout. Args: timeout (int): Time to get conditions. resource_version (str): The version with which to filter results. Only events with a resource_version greater than this value will be returned Yield: Event object with these keys: 'type': The type of event such as "ADDED", "DELETED", etc. 'raw_object': a dict representing the watched object. 'object': A ResourceInstance wrapping raw_object. """ yield from self.api.watch( timeout=timeout, namespace=self.namespace, field_selector=f"metadata.name=={self.name}", resource_version=resource_version or self.initial_resource_version, )
[docs] def wait_for_condition(self, condition, status, timeout=300): """ Wait for Resource condition to be in desire status. Args: condition (str): Condition to query. status (str): Expected condition status. timeout (int): Time to wait for the resource. Raises: TimeoutExpiredError: If Resource condition in not in desire status. """ self.logger.info( f"Wait for {self.kind}/{self.name}'s '{condition}' condition to be" f" '{status}'" ) timeout_watcher = TimeoutWatch(timeout=timeout) for sample in TimeoutSampler( wait_timeout=timeout, sleep=1, func=lambda: self.exists, ): if sample: break for sample in TimeoutSampler( wait_timeout=timeout_watcher.remaining_time(), sleep=1, func=lambda: self.instance, ): if sample: for cond in sample.get("status", {}).get("conditions", []): if cond["type"] == condition and cond["status"] == status: return
[docs] def api_request(self, method, action, url, **params): """ Handle API requests to resource. Args: method (str): Request method (GET/PUT etc.). action (str): Action to perform (stop/start/guestosinfo etc.). url (str): URL of resource. Returns: data(dict): response data """ client = self.privileged_client or self.client response = client.client.request( method=method, url=f"{url}/{action}", headers=self.client.configuration.api_key, **params, ) try: return json.loads(response.data) except json.decoder.JSONDecodeError: return response.data
[docs] def wait_for_conditions(self): timeout_watcher = TimeoutWatch(timeout=30) for sample in TimeoutSampler( wait_timeout=30, sleep=1, func=lambda: self.exists, ): if sample: break samples = TimeoutSampler( wait_timeout=timeout_watcher.remaining_time(), sleep=1, func=lambda: self.instance.status.conditions, ) for sample in samples: if sample: return
[docs] def events( self, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None, ): """ get - retrieves K8s events. Args: name (str): event name label_selector (str): filter events by labels; comma separated string of key=value field_selector (str): filter events by fields; comma separated string of key=valueevent fields; comma separated string of key=value resource_version (str): filter events by their resource's version timeout (int): timeout in seconds Returns list: event objects example: reading all CSV Warning events in namespace "my-namespace", with reason of "AnEventReason" pod = Pod(client=client, name="pod", namespace="my-namespace") for event in pod.events( default_client, namespace="my-namespace", field_selector="involvedObject.kind==ClusterServiceVersion,type==Warning,reason=AnEventReason", timeout=10, ): print(event.object) """ _field_selector = f"involvedObject.name=={self.name}" if field_selector: field_selector = f"{_field_selector},{field_selector}" yield from Event.get( dyn_client=self.client, namespace=self.namespace, name=name, label_selector=label_selector, field_selector=field_selector or _field_selector, resource_version=resource_version, timeout=timeout, )
[docs] @staticmethod def get_all_cluster_resources( config_file=None, config_dict=None, context=None, *args, **kwargs ): """ Get all cluster resources Args: config_file (str): path to a kubeconfig file. config_dict (dict): dict with kubeconfig configuration. context (str): name of the context to use. *args (tuple): args to pass to client.get() **kwargs (dict): kwargs to pass to client.get() Yields: kubernetes.dynamic.resource.ResourceField: Cluster resource. Example: for resource in get_all_cluster_resources(label_selector="my-label=value"): print(f"Resource: {resource}") """ client = get_client( config_file=config_file, config_dict=config_dict, context=context ) for _resource in client.resources.search(): try: _resources = client.get(_resource, *args, **kwargs) yield from _resources.items except (NotFoundError, TypeError, MethodNotAllowedError): continue
[docs] def to_yaml(self): """ Get resource as YAML representation. Returns: str: Resource YAML representation. """ if not self.res: self.to_dict() resource_yaml = yaml.dump(self.res) self.logger.info(f"\n{resource_yaml}") return resource_yaml
@property def keys_to_hash(self): """ Resource attributes list to hash in the logs. The list should hold absolute key paths in resource dict. Example: given a dict: {"spec": {"data": <value_to_hash>}} To hash spec['data'] key pass: ["spec..data"] """ return []
[docs] def hash_resource_dict(self, resource_dict): if self.keys_to_hash and self.hash_log_data: resource_dict = copy.deepcopy(resource_dict) resource_dict = benedict(resource_dict, keypath_separator="..") for key in self.keys_to_hash: if key in resource_dict: resource_dict[key] = "***" return resource_dict return resource_dict
[docs]class NamespacedResource(Resource): """ Namespaced object, inherited from Resource. """ def __init__( self, name=None, namespace=None, client=None, teardown=True, timeout=TIMEOUT_4MINUTES, privileged_client=None, yaml_file=None, delete_timeout=TIMEOUT_4MINUTES, **kwargs, ): super().__init__( name=name, client=client, teardown=teardown, timeout=timeout, privileged_client=privileged_client, yaml_file=yaml_file, delete_timeout=delete_timeout, **kwargs, ) self.namespace = namespace if not (self.name and self.namespace) and not self.yaml_file: raise ValueError("name and namespace or yaml file is required")
[docs] @classmethod def get( cls, dyn_client=None, config_file=None, context=None, singular_name=None, raw=False, *args, **kwargs, ): """ Get resources Args: dyn_client (DynamicClient): Open connection to remote cluster config_file (str): Path to config file for connecting to remote cluster. context (str): Context name for connecting to remote cluster. singular_name (str): Resource kind (in lowercase), in use where we have multiple matches for resource. raw (bool): If True return raw object. Returns: generator: Generator of Resources of cls.kind """ if not dyn_client: dyn_client = get_client(config_file=config_file, context=context) _resources = cls._prepare_resources( dyn_client=dyn_client, singular_name=singular_name, *args, **kwargs ) try: for resource_field in _resources.items: if raw: yield resource_field else: yield cls( client=dyn_client, name=resource_field.metadata.name, namespace=resource_field.metadata.namespace, ) except TypeError: if raw: yield _resources else: yield cls( client=dyn_client, name=_resources.metadata.name, namespace=_resources.metadata.namespace, )
@property def instance(self): """ Get resource instance Returns: openshift.dynamic.client.ResourceInstance """ def _instance(): return self.api.get(name=self.name, namespace=self.namespace) return self.retry_cluster_exceptions(func=_instance) def _base_body(self): if not self.res: super(NamespacedResource, self)._base_body() if self.yaml_file: self.namespace = self.res["metadata"].get("namespace", self.namespace) if not self.namespace: raise ValueError("Namespace must be passed or specified in the YAML file.") if not self.yaml_file: self.res["metadata"]["namespace"] = self.namespace
[docs] def to_dict(self): self._base_body()
[docs]class ResourceEditor: def __init__(self, patches, action="update", user_backups=None): """ Args: patches (dict): {<Resource object>: <yaml patch as dict>} e.g. {<Resource object>: {'metadata': {'labels': {'label1': 'true'}}} Allows for temporary edits to cluster resources for tests. During __enter__ user-specified patches (see args) are applied and old values are backed up, and during __exit__ these backups are used to reverse all changes made. Flow: 1) apply patches 2) automation runs 3) edits made to resources are reversed May also be used without being treated as a context manager by calling the methods update() and restore() after instantiation. *** the DynamicClient object used to get the resources must not be using an unprivileged_user; use default_client or similar instead.*** """ self._patches = self._dictify_resourcefield(res=patches) self.action = action self.user_backups = user_backups self._backups = {} @property def backups(self): """Returns a dict {<Resource object>: <backup_as_dict>} The backup dict kept for each resource edited""" return self._backups @property def patches(self): """Returns the patches dict provided in the constructor""" return self._patches
[docs] def update(self, backup_resources=False): """Prepares backup dicts (where necessary) and applies patches""" # prepare update dicts and backups resource_to_patch = [] if backup_resources: LOGGER.info("ResourceEdit: Backing up old data") if self.user_backups: resource_to_patch = self._patches self._backups = self.user_backups else: for resource, update in self._patches.items(): namespace = None # prepare backup try: original_resource_dict = resource.instance.to_dict() except NotFoundError: # Some resource cannot be found by name. # happens in 'ServiceMonitor' resource. original_resource_dict = list( resource.get( dyn_client=resource.client, field_selector=f"metadata.name={resource.name}", ) )[0].to_dict() namespace = update.get("metadata", {}).get("namespace") backup = self._create_backup( original=original_resource_dict, patch=update ) if namespace: # Add namespace to metadata for restore. backup["metadata"]["namespace"] = namespace # no need to back up if no changes have been made # if action is 'replace' we need to update even if no backup (replace update can be empty ) if backup or self.action == "replace": resource_to_patch.append(resource) self._backups[resource] = backup else: LOGGER.warning( "ResourceEdit: no diff found in patch for " f"{resource.name} -- skipping" ) if not resource_to_patch: return else: resource_to_patch = self._patches patches_to_apply = { resource: self._patches[resource] for resource in resource_to_patch } # apply changes self._apply_patches_sampler( patches=patches_to_apply, action_text="Updating", action=self.action )
[docs] def restore(self): self._apply_patches_sampler( patches=self._backups, action_text="Restoring", action=self.action )
def __enter__(self): self.update(backup_resources=True) return self def __exit__(self, exc_type, exc_val, exc_tb): # restore backups self.restore() @staticmethod def _dictify_resourcefield(res): """Recursively turns any ResourceField objects into dicts to avoid issues caused by appending lists, etc.""" if isinstance(res, ResourceField): return ResourceEditor._dictify_resourcefield(res=dict(res.items())) elif isinstance(res, dict): return { ResourceEditor._dictify_resourcefield( res=key ): ResourceEditor._dictify_resourcefield(res=value) for key, value in res.items() } elif isinstance(res, list): return [ResourceEditor._dictify_resourcefield(res=x) for x in res] return res @staticmethod def _create_backup(original, patch): """ Args: original (dict*): source of values to back up if necessary patch (dict*): 'new' values; keys needn't necessarily all be contained in original Returns a dict containing the fields in original that are different from update. Performs the Places None for fields in update that don't appear in original (because that's how the API knows to remove those fields from the yaml). * the first call will be with both of these arguments as dicts but this will not necessarily be the case during recursion""" # when both are dicts, get the diff (recursively if need be) if isinstance(original, dict) and isinstance(patch, dict): diff_dict = {} for key, value in patch.items(): if key not in original: diff_dict[key] = None continue # recursive call key_diff = ResourceEditor._create_backup( original=original[key], patch=value ) if key_diff is not None: diff_dict[key] = key_diff return diff_dict # for one or more non-dict values, just compare them if patch != original: return original else: # this return value will be received by key_diff above return None @staticmethod def _apply_patches(patches, action_text, action): """ Updates provided Resource objects with provided yaml patches Args: patches (dict): {<Resource object>: <yaml patch as dict>} action_text (str): "ResourceEdit <action_text> for resource <resource name>" will be printed for each resource; see below """ for resource, patch in patches.items(): LOGGER.info( f"ResourceEdits: {action_text} data for " f"resource {resource.kind} {resource.name}" ) # add name to patch if "metadata" not in patch: patch["metadata"] = {} # the api requires this field to be present in a yaml patch for # some resource kinds even if it is not changed if "name" not in patch["metadata"]: patch["metadata"]["name"] = resource.name if action == "update": resource.update(resource_dict=patch) # update the resource if action == "replace": if "metadata" not in patch: patch["metadata"] = {} patch["metadata"]["name"] = resource.name patch["metadata"]["namespace"] = resource.namespace patch["metadata"][ "resourceVersion" ] = resource.instance.metadata.resourceVersion patch["kind"] = resource.kind patch["apiVersion"] = resource.api_version resource.update_replace( resource_dict=patch ) # replace the resource metadata def _apply_patches_sampler(self, patches, action_text, action): exceptions_dict = {ConflictError: []} exceptions_dict.update(DEFAULT_CLUSTER_RETRY_EXCEPTIONS) return Resource.retry_cluster_exceptions( func=self._apply_patches, exceptions_dict=exceptions_dict, patches=patches, action_text=action_text, action=action, )