Source code for apama.docker.kubernetes

#!/usr/bin/env python
# Copyright (c) 2015-2021, 2023 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors. 
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG 

"""
Support for using Kubernetes and Rancher from PySys tests. 

.. autosummary::
	Pod
	Job
	Service
	NameSpace
	Secret
	PersistentVolume
	PersistentVolumeClaim
	Deployment
	StatefulSet
	Resource
	Project
	Cluster
"""


import tempfile, os, binascii, time, shutil
import threading, getpass
import io
from enum import Enum

from pysys.utils import filecopy
from pysys.utils import filereplace
from pysys.constants import *
from apama.docker.framework import DockerImage
from apama.docker.framework import DockerHelper

DEFAULT_TIMEOUT=240

#requirement is for the exe_command value to be on the path
class ClusterExecutable:


	#the instance is the test class 
	@classmethod
	def start(cls , instance , displayName , arguments , environs, rancherDebug=False, prefixDefaultArg=True,stdout=None, stderr=None, **kwargs):
		# default to kubectl, however if rancher is installed we can 
		# use this by setting CONTAINER_CMD to 'rancher'
		cmd = instance.parent.project.KUBECTL_EXE
		extra_arg = ["--kubeconfig", instance.parent.project.KUBE_CONFIG]

		if hasattr(instance.parent.project, 'CONTAINER_CMD')  and instance.parent.project.CONTAINER_CMD == 'rancher':
			extra_arg = ['kubectl']
			cmd = instance.parent.project.CONTAINER_EXE

		localargs = arguments
		if prefixDefaultArg: #only applicable in rancher (True always for kubectl)
			localargs = extra_arg + arguments
		if rancherDebug:
			localargs = ['--debug'] + localargs
		return instance.parent.startProcess(
			environs=environs,
			command=cmd, 
			arguments=localargs, 
			stdout=stdout, 
			stderr=stderr, 
			displayName=displayName,
			onError=lambda process: instance.parent.grepOrNone(stderr, '.+'),
			**kwargs)

	

[docs]class Resource(DockerHelper): """Helper class for working with Kubernetes objects. Represents any resource within Kubernetes - pods, services etc. Resources are created from a Kubernetes resource definition, either using createResource, or from a file using fromFile. fromFile will create multiple resources from a single file. """
[docs] @classmethod def fromFile(cls, parent, fileName, imageSubst=None, namespace=None): """ Create (multiple) resources from a single file, possibly substituting image names. Returns a list of Resource objects. :param parent: Reference to the parent PySys testcase :param fileName: Path of the yaml file to read the resource definitions from, which will be passed to "kubectrl create -f <filename>". Must be in UTF-8 encoding. :param imageSubst: a map of search:replacement for strings to replace in image: stanzas :param namespace: The Namespace to define these objects in :return: a list of Resource objects """ with io.open(fileName, encoding='utf-8') as f: data = f.readlines() # returns unicode character strings i = 0 start = 0 size = len(data) resources = [] while i < size: if '---' == data[i].strip(): resources.append(data[start:i]) start = i+1 i = i + 1 resources.append(data[start:i]) rv = {} for x in resources: res = cls.createResource(parent, x, imageSubst, namespace=namespace) if res: rv[res.getName()] = res return rv
[docs] @classmethod def createResource(cls, parent, data, imageSubst=None, namespace=None): """ Create a resource from a YAML resource definition, possibly substituting image names. :param parent: Reference to the parent PySys testcase :param data: The YAML content for a single resource, as a list of character strings representing each line. The "kind" dictionary value will be used to determine what class is instantiated. :param imageSubst: a map of search:replacement for strings to replace in image: stanzas :param namespace: The Namespace to define this objects in :return a Resource object """ assert not isinstance(data, str), 'data must be a list of strings not a string' kind = None for l in data: if 'kind:' in l: kind = l.split(":")[1].strip() if not kind: return None if imageSubst: newdata = [] for l in data: if 'image:' in l: for k in imageSubst: assert imageSubst[k].__class__.__name__=='DockerImage', type(imageSubst[k]) l = l.replace(k, imageSubst[k].getName()) newdata.append(l) data = newdata if kind == "Pod": return Pod(parent, data, namespace=namespace) elif kind == "Job": return Job(parent, data, namespace=namespace) elif kind == "Service": return Service(parent, data, namespace=namespace) elif kind == "Deployment": return Deployment(parent, data, namespace=namespace) elif kind == "StatefulSet": return StatefulSet(parent, data, namespace=namespace) elif kind == "PersistentVolumeClaim": return PersistentVolumeClaim(parent, data, namespace=namespace) elif kind == "PersistentVolume": return PersistentVolume(parent, data, namespace=namespace) elif kind == "Namespace": return NameSpace(parent, data) elif kind == "Secret": return Secret(parent, data, namespace=namespace) else: raise Exception("Unknown kind: %s" % kind)
[docs] @classmethod def createAll(cls, resources, waitForRunning=True, timeout=DEFAULT_TIMEOUT): """ Create all the specified resources, then wait for all of them to be running. :param resources: A dictionary string:Resource :param waitForRunning: Set to false to not wait for all the resources :param timeout: Maximum number of seconds to wait for each resource """ for res in resources: resources[res].create(waitForRunning=False) if waitForRunning: for res in resources: resources[res].waitForRunning(timeout=timeout)
def __init__(self, parent, data, namespace=None): """ Create a resource object from a Parent and YAML """ self.data = data self.parent = parent self.name = None self.namespace = namespace self.config = self.parent.project.KUBE_CONFIG self.extra_path=[parent.project.CONTAINER_PATH] self.extra_env={"HOME":parent.project.HOME} self.localenv = parent.createEnvirons(addToExePath=self.extra_path,overrides=self.extra_env) metadata = False for l in data: if 'metadata:' in l: metadata = True if 'spec:' in l: metadata = False if metadata and 'name:' in l: self.name = l.split(":")[1].strip() if not self.name: raise Exception("Couldn't find name in %s" % data) self.started = False self.parent.addResource(self) DockerHelper.__init__(self, parent=parent, displayName=self.name)
[docs] def getName(self): """ Return the name of this resource """ return self.name
[docs] def create(self, waitForRunning=True, timeout=DEFAULT_TIMEOUT): """ Create this resource within a Kubernetes node. :param waitForRunning: If True waits for the resource to be ready before returning. :param timeout: Maximum number of seconds to wait for the resource to be ready. """ assert(not self.started) instance = self.parent.getInstanceCount("kube-create") filename=self.parent.output+"/kube-create-%s-%s-data.yml" % (self.name, instance) with io.open(filename, "w", encoding='utf-8') as f: # yaml is always unicode so ok to assume UTF-8 data = self.data if data and isinstance(data[0], bytes): data = [x.decode('utf-8') for x in data] # python2 may have passed byte strings in, assume utf8 f.writelines(data) args=[ "create", "-f", filename] if self.namespace: args.append("--namespace="+self.namespace.getName()) try: ClusterExecutable.start( instance=self, displayName='kubectl create %s'%self.name, arguments=args, environs=self.localenv, stdout="kube-create-%s-%s.out"%(instance, self.name), stderr="kube-create-%i-%s.err"%(instance, self.name), ignoreExitStatus=False, abortOnError=True) except Exception: self.parent.logFileContents("kube-create-%i-%s.err"%(instance, self.name)) raise self.started = True if waitForRunning: self.waitForRunning(timeout) return self
def getDescribe(self, type): output = None instance = self.parent.getInstanceCount(type+"-get") args = [ "get", type, self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) stdout="kubectl_%s_get_%s_%i.out"%(type, self.name, instance) stderr="kubectl_%s_get_%s_%i.err"%(type, self.name, instance) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, stdout=stdout, stderr=stderr, displayName=type+"-get") output = self.parent.output+"/"+stdout stdout="kubectl_%s_describe_%s_%i.out"%(type, self.name, instance) stderr="kubectl_%s_describe_%s_%i.err"%(type, self.name, instance) args = [ "describe", type +"/"+ self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, stdout=stdout, stderr=stderr, displayName=type+"-describe") return output
[docs] def waitForRunning(self, timeout=DEFAULT_TIMEOUT): """ Overridden by subclasses. Waits until a resource is in the running state or timeout (seconds) occurs :param timeout: Maximum number of seconds to wait for the resource to be available """ pass
[docs]class Job(Resource): """ Represents a Kubernetes job. Provides controls to create and destroy the job, get the logs etc Create using Resource.fromFile """ def __init__(self, parent, data, own = True, namespace = None): """ Create a job from a Parent and YAML """ Resource.__init__(self, parent, data, namespace=namespace) self.exposed = False self.owning = own if not own: self.running = True
[docs] def waitForRunning(self, timeout=DEFAULT_TIMEOUT): """ Wait until the pod is in the running state :param timeout: Maximum number of seconds to wait for the resource to be available """ running = False count = 0 while not running: if count > timeout: self.parent.abort(TIMEDOUT, "Timeout waiting for job %s to start after %d secs" % (self.name, timeout)) count = count + 1 outfile = self.getDescribe('job') creating = False with self._safelyOpenProcessOutputFile(outfile) as f: for l in f: if self.name in l: columns = re.split('\s+', l) jobstatus = re.split('/', columns[1]) if jobstatus[0] == jobstatus[1]: running = True if not running: self.parent.wait(1)
[docs] def log(self, logfile): """ Stream the Kubernetes logs (i.e. the stdout and stderr) for a resource that has been run. Sends it live to two files, one for stdout, another for stderr. :param logfile: A named file prefix in the output directory. Suffixed with '.out' and '.err' """ args = [ "logs", "-f", self.name] stdout = os.path.join(self.parent.output, logfile+".out") stderr = os.path.join(self.parent.output, logfile+".err") if self.namespace: args.append("--namespace="+self.namespace.getName()) tail = ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName="kubectl logs", state=BACKGROUND, stdout=stdout, stderr=stderr) return self
[docs] def cp(self, containerPath): """ 'docker cp' a file out of the pod into your output directory :param containerPath: Absolute path of the file in the container. The file in your output directory will share its basename. """ instance = self.parent.getInstanceCount("job-cp") args = [ "cp", self.name+':'+containerPath, self.parent.output+'/'+os.path.basename(containerPath)] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, stdout="kubectl_job_cp_%i.out"%instance, stderr="kubectl_job_cp_%i.err"%instance, displayName="job-cp") return self
def __del__(self): """ Removes the job and waits for it to be removed """ if self.owning: self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True): """ Removes the job and waits for it to be removed. :param timeout: Maximum number of seconds to wait for the pod to be removed """ if self.started: args=[ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "job", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName='kubectl delete job') i = 0 args = [ "get", "job", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv, arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get job').exitStatus: self.parent.wait(1) i = i + 1 self.started=False
[docs]class Pod(Resource): """ Represents a Kubernetes pod. Provides controls to create and destroy the pod, get the logs etc Create using Resource.fromFile """ def __init__(self, parent, data, own = True, namespace = None): """ Create a pod from a Parent and YAML """ Resource.__init__(self, parent, data, namespace=namespace) self.exposed = False self.owning = own if not own: self.running = True self.started = True
[docs] def waitForRunning(self, timeout=DEFAULT_TIMEOUT): """ Wait until the pod is in the running state :param timeout: Maximum number of seconds to wait for the resource to be available """ running = False count = 0 while not running: if count > timeout: self.parent.addOutcome(TIMEDOUT, 'waiting for pod %s to start timed out after %d seconds'%(self.name, timeout), printReason=True, abortOnError=True) count = count + 1 outfile = self.getDescribe('pod') creating = False with self._safelyOpenProcessOutputFile(outfile) as f: for l in f: if 'Running' in l: running = True elif 'ContainerCreating' in l: creating = True elif 'Pending' in l: creating = True elif 'PodInitializing' in l: creating = True elif 'Init:' in l: creating = True if not running and not creating: self.log(self.name+'-start-error', once=True) self.parent.abort(BLOCKED, "Error creating pod "+self.name) if not running: self.parent.wait(1)
[docs] def log(self, logfile, once=False): """ Stream the Kubernetes logs (i.e. the stdout and stderr) for a resource that has been run. Sends it live to two files, one for stdout, another for stderr. :param logfile: A named file prefix in the output directory. Suffixed with '.out' and '.err' """ args = [ "logs" ] if not once: args.append("-f") args.append(self.name) stdout = os.path.join(self.parent.output, logfile+".out") stderr = os.path.join(self.parent.output, logfile+".err") if self.namespace: args.append("--namespace="+self.namespace.getName()) tail = ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName="kubectl logs", state=BACKGROUND, stdout=stdout, stderr=stderr) return self
[docs] def cp(self, containerPath): """ 'docker cp' a file out of the pod into your output directory :param containerPath: Absolute path of the file in the container. The file in your output directory will share its basename. """ instance = self.parent.getInstanceCount("pod-cp") args = [ "cp", self.name+':'+containerPath, self.parent.output+'/'+os.path.basename(containerPath)] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, stdout="kubectl_pod_cp_%i.out"%instance, stderr="kubectl_pod_cp_%i.err"%instance, displayName="pod-cp") return self
[docs] def expose(self): """ Expose the port(s) of the pod on the host """ if self.exposed: return instance = self.parent.getInstanceCount("expose-pod") Service(self.parent, ['metadata:',' name: %s' % self.name]) args = [ "expose", "pod", self.name, "--type=NodePort"] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, stdout="kubectl_pod_expose_%i.out"%instance, stderr="kubectl_pod_expose_%i.err"%instance, displayName="expose-pod") self.exposed = True return self
[docs] def getExternalPort(self, port): """ Return the port on the Kube node which corresponds to the given internal port. Exposes the ports if not already exposed. """ self.expose() instance = self.parent.getInstanceCount("get-service") args = [ "get", "service", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, stdout="kubectl_get_service_%i.out"%instance, stderr="kubectl_get_service_%i.err"%instance, displayName="get-service") with self._safelyOpenProcessOutputFile(os.path.join(self.parent.output, "kubectl_get_service_%i.out"%instance)) as f: for l in f: if l.startswith('NAME'): continue data = re.split('\s+', l.strip()) for d in data[4].split(','): ports = re.split('[^0-9]', d) if int(ports[0]) == port: return int(ports[1]) self.parent.abort(BLOCKED, 'Could not find a port for service %s port %s' % (self.name, port))
def __del__(self): """ Removes the pod and waits for it to be removed """ if self.owning: self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True): """ Removes the pod and waits for it to be removed. :param timeout: Maximum number of seconds to wait for the pod to be removed """ if self.started: args=[ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "pod", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName='kubectl delete pod') i = 0 args = [ "get", "pod", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv,arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get pod').exitStatus: self.parent.wait(1) i = i + 1 self.started=False
[docs]class Service(Resource): """ Represents a Kubernetes service. Provides controls to create and destroy the pod, get the logs etc. Create using Resource.fromFile """ def __init__(self, parent, data, namespace=None): """ Create a service from a Parent and YAML """ Resource.__init__(self, parent, data, namespace=namespace) def __del__(self): """ Removes the service and waits for it to be removed """ if self.started: self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True): """ Removes the service and waits for it to be removed. :param timeout: Maximum number of seconds to wait for the service to be removed """ if self.started: args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "service", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName='kubectl delete service') i = 0 args = [ "get", "service", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv,arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get service').exitStatus: self.parent.wait(1) i = i + 1 self.started=False
[docs] def log(self, logfile): """ Stream the Kubernetes logs (i.e. the stdout and stderr) for a resource that has been run. Sends it live to two files, one for stdout, another for stderr. :param logfile: A named file prefix in the output directory. Suffixed with '.out' and '.err' """ args = [ "logs", "-f", self.name] stdout = os.path.join(self.parent.output, logfile+".out") stderr = os.path.join(self.parent.output, logfile+".err") if self.namespace: args.append("--namespace="+self.namespace.getName()) tail = ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName="kubectl logs", state=BACKGROUND, stdout=stdout, stderr=stderr) return self
class NameSpace(Resource): """ Represents a Kubernetes NameSpace. Create using Resource.fromFile or NameSpace.fromName or NameSpace.fromRandom """ @classmethod def fromName(cls, parent, name): """ Create a named namespace """ data = [ "kind: Namespace\n", "apiVersion: v1\n", "metadata:\n", " name: %s\n"%name ] return NameSpace(parent, data) @classmethod def fromUnique(cls, parent): """ Create a namespace with a unique name """ return cls.fromName(parent, DockerImage.generateUniqueName()) def __init__(self, parent, data, namespace=None): """ Create a namespace from a Parent and YAML """ Resource.__init__(self, parent, data, namespace=namespace) def __del__(self): """ Removes the namespace and waits for it to be removed """ if self.started: self.delete() def delete(self, timeout=600, graceperiod=0, force=True): """ Removes the namespace and waits for it to be removed. :param timeout: Maximum number of seconds to wait for the volume to be removed """ args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "namespace", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName='kubectl delete namespace') i = 0 args = [ "get", "namespace", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) while i < timeout and 0 == ClusterExecutable.start(instance=self, environs=self.localenv, arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get namespace').exitStatus: self.parent.wait(1) i = i + 1
[docs]class Secret(Resource): """ Represents a Kubernetes Secret. Create using Resource.fromFils""" def __init__(self, parent, data, namespace=None): """ Create a secret from a Parent and YAML """ Resource.__init__(self, parent, data, namespace=namespace) def __del__(self): """ Removes the secret and waits for it to be removed """ if self.started: self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True): """ Removes the secret and waits for it to be removed. :param timeout: Maximum number of seconds to wait for the secret to be removed """ args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "secret", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName='kubectl delete secret', stdout=f"kubectl-delete-{self.name}.out", stderr=f"kubectl-delete-{self.name}.err") i = 0 args = [ "get", "secret", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) while i < timeout and 0 == ClusterExecutable.start(instance=self, environs=self.localenv, arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get secret').exitStatus: self.parent.wait(1) i = i + 1
[docs]class PersistentVolume(Resource): """ Represents a Kubernetes PersistentVolume. Create using Resource.fromFile """ def __init__(self, parent, data, namespace=None): """ Create a volume from a Parent and YAML """ Resource.__init__(self, parent, data, namespace=namespace) def __del__(self): """ Removes the volume and waits for it to be removed """ if self.started: self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True): """ Removes the volume and waits for it to be removed. :param timeout: Maximum number of seconds to wait for the volume to be removed """ args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "pv", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName='kubectl delete pv') i = 0 args = [ "get", "pv", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv,arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get pv').exitStatus: self.parent.wait(1) i = i + 1
[docs]class PersistentVolumeClaim(Resource): """ Represents a Kubernetes PersistentVolumeClaim. Create using Resource.fromFile """ def __init__(self, parent, data, namespace=None): """ Create a claim from a Parent and YAML """ Resource.__init__(self, parent, data, namespace=namespace) def __del__(self): """ Removes the claim and waits for it to be removed """ if self.started: self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True): """ Removes the claim and waits for it to be removed. :param timeout: Maximum number of seconds to wait for the claim to be removed """ args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "pvc", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName='kubectl delete pvc') i = 0 args = [ "get", "pvc", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv,arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get pvc').exitStatus: self.parent.wait(1) i = i + 1
[docs]class Deployment(Resource): """ Represents a Kubernetes Deployment. Create using Resource.fromFile """ def __init__(self, parent, data, namespace=None): """ Create a deployment from a Parent and YAML """ Resource.__init__(self, parent, data, namespace=namespace)
[docs] def waitForRunning(self, timeout=DEFAULT_TIMEOUT): """ Wait until the deployment and all its pods are in the running state :param timeout: Maximum number of seconds to wait for each pod to be available. """ running = False count = 0 while not running: if count > timeout: self.parent.abort(TIMEDOUT, "Timeout waiting for deployment %s to start" % self.name) count = count + 1 outfile = self.getDescribe('deployment') with self._safelyOpenProcessOutputFile(outfile) as f: for l in f: if self.name in l: columns = re.split('\s+', l) jobstatus = re.split('/', columns[1]) self.parent.log.info("waitForRunning 1==%s 2==%s", jobstatus[0],jobstatus[1]) if jobstatus[0] == jobstatus[1]: running = True if not running: self.parent.wait(1) for pod in self.getPods(): pod.waitForRunning(timeout)
[docs] def getPods(self): """ Get all the pods associated with this deployment """ instance = self.parent.getInstanceCount("pods-get") args = [ "get", "pods"] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, stdout="kubectl_pods_get_%i.out"%(instance), stderr="kubectl_pods_get_%i.err"%(instance), displayName="pods-get") pods = [] with self._safelyOpenProcessOutputFile(self.parent.output+"/kubectl_pods_get_%i.out"%(instance)) as f: for l in f: m = re.search(r'^(%s-[0-9a-f]+-[a-zA-Z0-9]*)\s+.*' % self.name, l) if m: pods.append(Pod(self.parent, ['metadata:',' name: %s' % m.group(1)], False, namespace=self.namespace)) return pods
def __del__(self): """ Removes the deployment and waits for it to be removed """ if self.started: self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True): """ Removes the deployment and waits for it to be removed. :param timeout: Maximum number of seconds to wait for the deployment to be removed """ if self.started: args=[ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "deployment", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName='kubectl delete deployment') i = 0 args = [ "get", "deployment", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) while i < timeout and 0 == ClusterExecutable.start(instance=self, environs=self.localenv, arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get deployment').exitStatus: self.parent.wait(1) i = i + 1 self.started=False
[docs]class StatefulSet(Resource): """ Represents a Kubernetes StatefulSet. Create using Resource.fromFile """ def __init__(self, parent, data, namespace=None): """ Create a statefulset from a Parent and YAML """ Resource.__init__(self, parent, data, namespace=namespace)
[docs] def waitForRunning(self, timeout=DEFAULT_TIMEOUT): """ Wait until the statefulset and all its pods are in the running state :param timeout: Maximum number of seconds to wait for each pod to be available. """ running = False count = 0 while not running: if count > timeout: self.parent.abort(TIMEDOUT, "Timeout waiting for statefulset %s to start" % self.name) count = count + 1 outfile = self.getDescribe('statefulset') with self._safelyOpenProcessOutputFile(outfile) as f: for l in f: if self.name in l: columns = re.split('\s+', l) jobstatus = re.split('/', columns[1]) self.parent.log.info("waitForRunning: %s / %s", jobstatus[0],jobstatus[1]) if jobstatus[0] == jobstatus[1]: running = True if not running: self.parent.wait(1) for pod in self.getPods(): pod.waitForRunning(timeout)
[docs] def getPods(self): """ Get all the pods associated with this statefulset """ instance = self.parent.getInstanceCount("pods-get") args = [ "get", "pods"] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, stdout="kubectl_pods_get_%i.out"%(instance), stderr="kubectl_pods_get_%i.err"%(instance), displayName="pods-get") pods = [] with self._safelyOpenProcessOutputFile(self.parent.output+"/kubectl_pods_get_%i.out"%(instance)) as f: for l in f: m = re.search(r'^(%s-[0-9]+)\s+.*' % self.name, l) if m: pods.append(Pod(self.parent, ['metadata:',' name: %s' % m.group(1)], False, namespace=self.namespace)) return pods
def __del__(self): """ Removes the statefulset and waits for it to be removed """ if self.started: self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True): """ Removes the statefulset and waits for it to be removed. :param timeout: Maximum number of seconds to wait for the statefulset to be removed """ if self.started: args=[ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "statefulset", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName='kubectl delete statefulset') i = 0 args = [ "get", "statefulset", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) while i < timeout and 0 == ClusterExecutable.start(instance=self, environs=self.localenv,arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get statefulset').exitStatus: self.parent.wait(1) i = i + 1 self.started=False
[docs]class Cluster(Resource): """ Represents a Rancher Cluster - top level organisation for objects. Also provides a hook for determining cluster health and availability. Create using Cluster.fromName """
[docs] @classmethod def fromName(cls, parent , name): """ Bending this to fit class hierarchy """ data = [ "kind: Cluster\n", "apiVersion: v1\n", "metadata:\n", " name: %s\n"%name ] return Cluster(parent,data,name)
def __init__(self,parent, data,name): """ Record Cluster using existing mechanism """ self.clusterName = name Resource.__init__(self, parent, data, None) def __del__(self): """ Cluster is Read-Only """ pass
[docs] def delete(self, timeout=600, graceperiod=0, force=True): """ Cluster is Read-Only """ pass
[docs]class Project(Resource): """ Represents a Rancher Project - A grouping mechanism in Rancher. Create using Project.fromContext """
[docs] @classmethod def fromName(cls, parent, cluster, name): """ Bending this to fit class hierarchy """ data = [ "kind: Project\n", "apiVersion: v1\n", "metadata:\n", " name: %s\n"%name ] return Project(parent, cluster, data)
[docs] @classmethod def fromUnique(cls, parent, cluster, rootname): """ Create a namespace with a unique name """ return cls.fromName(parent, cluster, "{}_{}".format(rootname,DockerImage.generateUniqueName()))
[docs] @classmethod def fromContext(cls, parent, cluster): """ Create a project with a unique name """ return cls.fromName(parent, cluster, "{}_{}_tests".format(getpass.getuser(),cluster.clusterName))
def __init__(self, parent,cluster, data): """ Record Cluster using existing mechanism """ self.cluster = cluster Resource.__init__(self, parent, data) def __del__(self): """ delete Project """ if self.started: self.delete()
[docs] def switch(self): """ Check Cluster and create project if needed - if not rancher then noop """ if hasattr(self.parent.project, 'CONTAINER_CMD') and self.parent.project.CONTAINER_CMD == 'rancher': args=["project", "ls"] tail = ClusterExecutable.start( instance=self, rancherDebug=False, prefixDefaultArg=False, environs=self.localenv, arguments=args, stdout="rancher_projects_get.out", stderr="rancher_projects_get.err", displayName='rancher project ls') self.defaultProject = '' found = False with self._safelyOpenProcessOutputFile(self.parent.output+"/rancher_projects_get.out") as f: for l in f: columns = re.split('\s+', l) if l.find(self.name) > -1: found = True break if not found: args=["project", "create", self.name, "--cluster",self.cluster.clusterName] ClusterExecutable.start( instance=self, prefixDefaultArg=False, environs=self.localenv, arguments=args, stdout="rancher_project_create.out", stderr="rancher_project_create.err", displayName='rancher project create') self.parent.log.info("Created project"+self.name) args=["context", "switch", self.name] ClusterExecutable.start( instance=self, prefixDefaultArg=False, environs=self.localenv, arguments=args, stdout="rancher_context_switch.out", stderr="rancher_context_switch.err", displayName='rancher context switch')
def delete(self, timeout=600, graceperiod=0, force=True): #we don't delete the project now, it is the base level for all namespaces #we will have switched into this name space however pass
[docs]class NameSpace(Resource): """ Represents a Kubernetes NameSpace. Create using Resource.fromFile or NameSpace.fromName or NameSpace.fromRandom """
[docs] @classmethod def fromName(cls, parent, name): """ Create a named namespace """ data = [ "kind: Namespace\n", "apiVersion: v1\n", "metadata:\n", " name: %s\n"%name ] return NameSpace(parent, data)
[docs] @classmethod def fromUnique(cls, parent): """ Create a namespace with a unique name """ return cls.fromName(parent, '{}-{}'.format(parent.descriptor.id.replace('_','').lower(), DockerImage.generateUniqueName() ))
def __init__(self, parent, data, namespace=None): """ Create a namespace from a Parent and YAML """ self.project = None Resource.__init__(self, parent, data, namespace=namespace)
[docs] def createInProject(self,project): """ Create the namespace under the suppied project - noop if not rancher """ self.project = project args=["namespace", "create",self.name] if hasattr(self.parent.project, 'CONTAINER_CMD') and self.parent.project.CONTAINER_CMD == 'rancher': ClusterExecutable.start( instance=self, arguments=args, prefixDefaultArg=False, stdout="rancher_namespace_create.out", stderr="rancher_namespace_create.err", environs=self.localenv, displayName='rancher namespace create') else: self.create()
def __del__(self): """ Removes the namespace and waits for it to be removed """ self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True): """ Removes the namespace and waits for it to be removed. :param timeout: Maximum number of seconds to wait for the volume to be removed """ args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "namespace", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) ClusterExecutable.start( instance=self, environs=self.localenv, arguments=args, displayName='kubectl delete namespace') i = 0 args = [ "get", "namespace", self.name] if self.namespace: args.append("--namespace="+self.namespace.getName()) while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv, arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get namespace').exitStatus: self.parent.wait(1) i = i + 1