Source code for ocp_resources.node_network_state

import time

from kubernetes.dynamic.exceptions import ConflictError

from ocp_resources.constants import TIMEOUT_4MINUTES
from ocp_resources.resource import Resource
from ocp_resources.utils import TimeoutSampler

SLEEP = 1


[docs]class NodeNetworkState(Resource): api_group = Resource.ApiGroup.NMSTATE_IO def __init__( self, name=None, client=None, teardown=True, yaml_file=None, delete_timeout=TIMEOUT_4MINUTES, **kwargs, ): super().__init__( name=name, client=client, teardown=teardown, yaml_file=yaml_file, delete_timeout=delete_timeout, **kwargs, ) status = self.instance.to_dict()["status"] self.desired_state = status.get("desiredState", {"interfaces": []})
[docs] def set_interface(self, interface): # First drop the interface is's already in the list interfaces = [ iface for iface in self.desired_state["interfaces"] if iface["name"] != interface["name"] ] # Add the interface interfaces.append(interface) self.desired_state["interfaces"] = interfaces
[docs] def to_dict(self): super().to_dict() if not self.yaml_file: self.res.update( { "spec": { "nodeName": self.name, "managed": True, "desiredState": self.desired_state, } } )
[docs] def apply(self): retries_on_conflict = 3 while True: try: self.res["metadata"] = self.instance.to_dict()["metadata"] self.update(self.res) break except ConflictError as e: retries_on_conflict -= 1 if retries_on_conflict == 0: raise e time.sleep(1)
[docs] def wait_until_up(self, name): def _find_up_interface(): iface = self.get_interface(name=name) if iface.get("state") == self.Interface.State.UP: return iface return None self.logger.info(f"Checking if interface {name} is up -- {self.name}") samples = TimeoutSampler( wait_timeout=TIMEOUT_4MINUTES, sleep=SLEEP, func=_find_up_interface ) for sample in samples: if sample: return
[docs] def wait_until_deleted(self, name): self.logger.info(f"Checking if interface {name} is deleted -- {self.name}") samples = TimeoutSampler( wait_timeout=self.delete_timeout, sleep=SLEEP, func=self.get_interface, name=name, ) for sample in samples: if not sample: return
@property def interfaces(self): return self.instance.to_dict()["status"]["currentState"]["interfaces"] @property def routes(self): return self.instance.status.currentState.routes
[docs] def ipv4(self, iface): for interface in self.interfaces: if interface["name"] == iface: addresses = interface["ipv4"]["address"] if addresses: return interface["ipv4"]["address"][0]["ip"]
[docs] def get_interface(self, name): for interface in self.interfaces: if interface["name"] == name: return interface return {}