#!/usr/bin/env python
# Copyright (c) 2015-2021, 2023 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.
# Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG
"""
Support for using Kubernetes and Rancher from PySys tests.
.. autosummary::
Pod
Job
Service
NameSpace
Secret
PersistentVolume
PersistentVolumeClaim
Deployment
StatefulSet
Resource
Project
Cluster
"""
import tempfile, os, binascii, time, shutil
import threading, getpass
import io
from enum import Enum
from pysys.utils import filecopy
from pysys.utils import filereplace
from pysys.constants import *
from apama.docker.framework import DockerImage
from apama.docker.framework import DockerHelper
DEFAULT_TIMEOUT=240
#requirement is for the exe_command value to be on the path
class ClusterExecutable:
#the instance is the test class
@classmethod
def start(cls , instance , displayName , arguments , environs, rancherDebug=False, prefixDefaultArg=True,stdout=None, stderr=None, **kwargs):
# default to kubectl, however if rancher is installed we can
# use this by setting CONTAINER_CMD to 'rancher'
cmd = instance.parent.project.KUBECTL_EXE
extra_arg = ["--kubeconfig", instance.parent.project.KUBE_CONFIG]
if hasattr(instance.parent.project, 'CONTAINER_CMD') and instance.parent.project.CONTAINER_CMD == 'rancher':
extra_arg = ['kubectl']
cmd = instance.parent.project.CONTAINER_EXE
localargs = arguments
if prefixDefaultArg: #only applicable in rancher (True always for kubectl)
localargs = extra_arg + arguments
if rancherDebug:
localargs = ['--debug'] + localargs
return instance.parent.startProcess(
environs=environs,
command=cmd,
arguments=localargs,
stdout=stdout,
stderr=stderr,
displayName=displayName,
onError=lambda process: instance.parent.grepOrNone(stderr, '.+'),
**kwargs)
[docs]class Resource(DockerHelper):
"""Helper class for working with Kubernetes objects. Represents any resource
within Kubernetes - pods, services etc.
Resources are created from a Kubernetes resource definition, either
using createResource, or from a file using fromFile. fromFile will create
multiple resources from a single file.
"""
[docs] @classmethod
def fromFile(cls, parent, fileName, imageSubst=None, namespace=None):
""" Create (multiple) resources from a single file,
possibly substituting image names. Returns a list of
Resource objects.
:param parent: Reference to the parent PySys testcase
:param fileName: Path of the yaml file to read the resource definitions from,
which will be passed to "kubectrl create -f <filename>".
Must be in UTF-8 encoding.
:param imageSubst: a map of search:replacement for strings to replace in image: stanzas
:param namespace: The Namespace to define these objects in
:return: a list of Resource objects
"""
with io.open(fileName, encoding='utf-8') as f:
data = f.readlines() # returns unicode character strings
i = 0
start = 0
size = len(data)
resources = []
while i < size:
if '---' == data[i].strip():
resources.append(data[start:i])
start = i+1
i = i + 1
resources.append(data[start:i])
rv = {}
for x in resources:
res = cls.createResource(parent, x, imageSubst, namespace=namespace)
if res:
rv[res.getName()] = res
return rv
[docs] @classmethod
def createResource(cls, parent, data, imageSubst=None, namespace=None):
""" Create a resource from a YAML resource definition,
possibly substituting image names.
:param parent: Reference to the parent PySys testcase
:param data: The YAML content for a single resource, as a list of character
strings representing each line. The "kind" dictionary value will be used
to determine what class is instantiated.
:param imageSubst: a map of search:replacement for strings to replace in image: stanzas
:param namespace: The Namespace to define this objects in
:return a Resource object
"""
assert not isinstance(data, str), 'data must be a list of strings not a string'
kind = None
for l in data:
if 'kind:' in l:
kind = l.split(":")[1].strip()
if not kind: return None
if imageSubst:
newdata = []
for l in data:
if 'image:' in l:
for k in imageSubst:
assert imageSubst[k].__class__.__name__=='DockerImage', type(imageSubst[k])
l = l.replace(k, imageSubst[k].getName())
newdata.append(l)
data = newdata
if kind == "Pod":
return Pod(parent, data, namespace=namespace)
elif kind == "Job":
return Job(parent, data, namespace=namespace)
elif kind == "Service":
return Service(parent, data, namespace=namespace)
elif kind == "Deployment":
return Deployment(parent, data, namespace=namespace)
elif kind == "StatefulSet":
return StatefulSet(parent, data, namespace=namespace)
elif kind == "PersistentVolumeClaim":
return PersistentVolumeClaim(parent, data, namespace=namespace)
elif kind == "PersistentVolume":
return PersistentVolume(parent, data, namespace=namespace)
elif kind == "Namespace":
return NameSpace(parent, data)
elif kind == "Secret":
return Secret(parent, data, namespace=namespace)
else:
raise Exception("Unknown kind: %s" % kind)
[docs] @classmethod
def createAll(cls, resources, waitForRunning=True, timeout=DEFAULT_TIMEOUT):
""" Create all the specified resources, then wait for all of them to be running.
:param resources: A dictionary string:Resource
:param waitForRunning: Set to false to not wait for all the resources
:param timeout: Maximum number of seconds to wait for each resource
"""
for res in resources:
resources[res].create(waitForRunning=False)
if waitForRunning:
for res in resources:
resources[res].waitForRunning(timeout=timeout)
def __init__(self, parent, data, namespace=None):
""" Create a resource object from a Parent and YAML """
self.data = data
self.parent = parent
self.name = None
self.namespace = namespace
self.config = self.parent.project.KUBE_CONFIG
self.extra_path=[parent.project.CONTAINER_PATH]
self.extra_env={"HOME":parent.project.HOME}
self.localenv = parent.createEnvirons(addToExePath=self.extra_path,overrides=self.extra_env)
metadata = False
for l in data:
if 'metadata:' in l:
metadata = True
if 'spec:' in l:
metadata = False
if metadata and 'name:' in l:
self.name = l.split(":")[1].strip()
if not self.name:
raise Exception("Couldn't find name in %s" % data)
self.started = False
self.parent.addResource(self)
DockerHelper.__init__(self, parent=parent, displayName=self.name)
[docs] def getName(self):
""" Return the name of this resource """
return self.name
[docs] def create(self, waitForRunning=True, timeout=DEFAULT_TIMEOUT):
""" Create this resource within a Kubernetes node.
:param waitForRunning: If True waits for the resource to be ready before returning.
:param timeout: Maximum number of seconds to wait for the resource to be ready.
"""
assert(not self.started)
instance = self.parent.getInstanceCount("kube-create")
filename=self.parent.output+"/kube-create-%s-%s-data.yml" % (self.name, instance)
with io.open(filename, "w", encoding='utf-8') as f: # yaml is always unicode so ok to assume UTF-8
data = self.data
if data and isinstance(data[0], bytes): data = [x.decode('utf-8') for x in data] # python2 may have passed byte strings in, assume utf8
f.writelines(data)
args=[ "create", "-f", filename]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
try:
ClusterExecutable.start(
instance=self,
displayName='kubectl create %s'%self.name,
arguments=args,
environs=self.localenv,
stdout="kube-create-%s-%s.out"%(instance, self.name),
stderr="kube-create-%i-%s.err"%(instance, self.name),
ignoreExitStatus=False,
abortOnError=True)
except Exception:
self.parent.logFileContents("kube-create-%i-%s.err"%(instance, self.name))
raise
self.started = True
if waitForRunning:
self.waitForRunning(timeout)
return self
def getDescribe(self, type):
output = None
instance = self.parent.getInstanceCount(type+"-get")
args = [ "get", type, self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
stdout="kubectl_%s_get_%s_%i.out"%(type, self.name, instance)
stderr="kubectl_%s_get_%s_%i.err"%(type, self.name, instance)
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
stdout=stdout,
stderr=stderr,
displayName=type+"-get")
output = self.parent.output+"/"+stdout
stdout="kubectl_%s_describe_%s_%i.out"%(type, self.name, instance)
stderr="kubectl_%s_describe_%s_%i.err"%(type, self.name, instance)
args = [ "describe", type +"/"+ self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
stdout=stdout,
stderr=stderr,
displayName=type+"-describe")
return output
[docs] def waitForRunning(self, timeout=DEFAULT_TIMEOUT):
""" Overridden by subclasses. Waits until a resource is in the running state or timeout (seconds) occurs
:param timeout: Maximum number of seconds to wait for the resource to be available
"""
pass
[docs]class Job(Resource):
""" Represents a Kubernetes job. Provides controls to create and destroy the job, get the logs etc
Create using Resource.fromFile """
def __init__(self, parent, data, own = True, namespace = None):
""" Create a job from a Parent and YAML """
Resource.__init__(self, parent, data, namespace=namespace)
self.exposed = False
self.owning = own
if not own:
self.running = True
[docs] def waitForRunning(self, timeout=DEFAULT_TIMEOUT):
""" Wait until the pod is in the running state
:param timeout: Maximum number of seconds to wait for the resource to be available
"""
running = False
count = 0
while not running:
if count > timeout:
self.parent.abort(TIMEDOUT, "Timeout waiting for job %s to start after %d secs" % (self.name, timeout))
count = count + 1
outfile = self.getDescribe('job')
creating = False
with self._safelyOpenProcessOutputFile(outfile) as f:
for l in f:
if self.name in l:
columns = re.split('\s+', l)
jobstatus = re.split('/', columns[1])
if jobstatus[0] == jobstatus[1]:
running = True
if not running:
self.parent.wait(1)
[docs] def log(self, logfile):
""" Stream the Kubernetes logs (i.e. the stdout and stderr) for a resource that has been run.
Sends it live to two files, one for stdout, another for stderr.
:param logfile: A named file prefix in the output directory. Suffixed with '.out' and '.err'
"""
args = [ "logs", "-f", self.name]
stdout = os.path.join(self.parent.output, logfile+".out")
stderr = os.path.join(self.parent.output, logfile+".err")
if self.namespace:
args.append("--namespace="+self.namespace.getName())
tail = ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName="kubectl logs",
state=BACKGROUND,
stdout=stdout,
stderr=stderr)
return self
[docs] def cp(self, containerPath):
""" 'docker cp' a file out of the pod into your output directory
:param containerPath: Absolute path of the file in the container. The file in your output directory will share its basename.
"""
instance = self.parent.getInstanceCount("job-cp")
args = [ "cp", self.name+':'+containerPath, self.parent.output+'/'+os.path.basename(containerPath)]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
stdout="kubectl_job_cp_%i.out"%instance,
stderr="kubectl_job_cp_%i.err"%instance,
displayName="job-cp")
return self
def __del__(self):
""" Removes the job and waits for it to be removed """
if self.owning:
self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True):
""" Removes the job and waits for it to be removed.
:param timeout: Maximum number of seconds to wait for the pod to be removed
"""
if self.started:
args=[ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "job", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName='kubectl delete job')
i = 0
args = [ "get", "job", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv, arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get job').exitStatus:
self.parent.wait(1)
i = i + 1
self.started=False
[docs]class Pod(Resource):
""" Represents a Kubernetes pod. Provides controls to create and destroy the pod, get the logs etc
Create using Resource.fromFile """
def __init__(self, parent, data, own = True, namespace = None):
""" Create a pod from a Parent and YAML """
Resource.__init__(self, parent, data, namespace=namespace)
self.exposed = False
self.owning = own
if not own:
self.running = True
self.started = True
[docs] def waitForRunning(self, timeout=DEFAULT_TIMEOUT):
""" Wait until the pod is in the running state
:param timeout: Maximum number of seconds to wait for the resource to be available
"""
running = False
count = 0
while not running:
if count > timeout:
self.parent.addOutcome(TIMEDOUT, 'waiting for pod %s to start timed out after %d seconds'%(self.name, timeout), printReason=True, abortOnError=True)
count = count + 1
outfile = self.getDescribe('pod')
creating = False
with self._safelyOpenProcessOutputFile(outfile) as f:
for l in f:
if 'Running' in l:
running = True
elif 'ContainerCreating' in l:
creating = True
elif 'Pending' in l:
creating = True
elif 'PodInitializing' in l:
creating = True
elif 'Init:' in l:
creating = True
if not running and not creating:
self.log(self.name+'-start-error', once=True)
self.parent.abort(BLOCKED, "Error creating pod "+self.name)
if not running:
self.parent.wait(1)
[docs] def log(self, logfile, once=False):
""" Stream the Kubernetes logs (i.e. the stdout and stderr) for a resource that has been run.
Sends it live to two files, one for stdout, another for stderr.
:param logfile: A named file prefix in the output directory. Suffixed with '.out' and '.err'
"""
args = [ "logs" ]
if not once: args.append("-f")
args.append(self.name)
stdout = os.path.join(self.parent.output, logfile+".out")
stderr = os.path.join(self.parent.output, logfile+".err")
if self.namespace:
args.append("--namespace="+self.namespace.getName())
tail = ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName="kubectl logs",
state=BACKGROUND,
stdout=stdout,
stderr=stderr)
return self
[docs] def cp(self, containerPath):
""" 'docker cp' a file out of the pod into your output directory
:param containerPath: Absolute path of the file in the container. The file in your output directory will share its basename.
"""
instance = self.parent.getInstanceCount("pod-cp")
args = [ "cp", self.name+':'+containerPath, self.parent.output+'/'+os.path.basename(containerPath)]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
stdout="kubectl_pod_cp_%i.out"%instance,
stderr="kubectl_pod_cp_%i.err"%instance,
displayName="pod-cp")
return self
[docs] def expose(self):
""" Expose the port(s) of the pod on the host """
if self.exposed: return
instance = self.parent.getInstanceCount("expose-pod")
Service(self.parent, ['metadata:',' name: %s' % self.name])
args = [ "expose", "pod", self.name, "--type=NodePort"]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
stdout="kubectl_pod_expose_%i.out"%instance,
stderr="kubectl_pod_expose_%i.err"%instance,
displayName="expose-pod")
self.exposed = True
return self
[docs] def getExternalPort(self, port):
""" Return the port on the Kube node which corresponds to the given internal port.
Exposes the ports if not already exposed. """
self.expose()
instance = self.parent.getInstanceCount("get-service")
args = [ "get", "service", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
stdout="kubectl_get_service_%i.out"%instance,
stderr="kubectl_get_service_%i.err"%instance,
displayName="get-service")
with self._safelyOpenProcessOutputFile(os.path.join(self.parent.output, "kubectl_get_service_%i.out"%instance)) as f:
for l in f:
if l.startswith('NAME'): continue
data = re.split('\s+', l.strip())
for d in data[4].split(','):
ports = re.split('[^0-9]', d)
if int(ports[0]) == port: return int(ports[1])
self.parent.abort(BLOCKED, 'Could not find a port for service %s port %s' % (self.name, port))
def __del__(self):
""" Removes the pod and waits for it to be removed """
if self.owning:
self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True):
""" Removes the pod and waits for it to be removed.
:param timeout: Maximum number of seconds to wait for the pod to be removed
"""
if self.started:
args=[ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "pod", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName='kubectl delete pod')
i = 0
args = [ "get", "pod", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv,arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get pod').exitStatus:
self.parent.wait(1)
i = i + 1
self.started=False
[docs]class Service(Resource):
""" Represents a Kubernetes service. Provides controls to create and destroy the pod, get the logs etc.
Create using Resource.fromFile """
def __init__(self, parent, data, namespace=None):
""" Create a service from a Parent and YAML """
Resource.__init__(self, parent, data, namespace=namespace)
def __del__(self):
""" Removes the service and waits for it to be removed """
if self.started:
self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True):
""" Removes the service and waits for it to be removed.
:param timeout: Maximum number of seconds to wait for the service to be removed
"""
if self.started:
args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "service", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName='kubectl delete service')
i = 0
args = [ "get", "service", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv,arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get service').exitStatus:
self.parent.wait(1)
i = i + 1
self.started=False
[docs] def log(self, logfile):
""" Stream the Kubernetes logs (i.e. the stdout and stderr) for a resource that has been run.
Sends it live to two files, one for stdout, another for stderr.
:param logfile: A named file prefix in the output directory. Suffixed with '.out' and '.err'
"""
args = [ "logs", "-f", self.name]
stdout = os.path.join(self.parent.output, logfile+".out")
stderr = os.path.join(self.parent.output, logfile+".err")
if self.namespace:
args.append("--namespace="+self.namespace.getName())
tail = ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName="kubectl logs",
state=BACKGROUND,
stdout=stdout,
stderr=stderr)
return self
class NameSpace(Resource):
""" Represents a Kubernetes NameSpace.
Create using Resource.fromFile or NameSpace.fromName or NameSpace.fromRandom """
@classmethod
def fromName(cls, parent, name):
""" Create a named namespace """
data = [
"kind: Namespace\n",
"apiVersion: v1\n",
"metadata:\n",
" name: %s\n"%name
]
return NameSpace(parent, data)
@classmethod
def fromUnique(cls, parent):
""" Create a namespace with a unique name """
return cls.fromName(parent, DockerImage.generateUniqueName())
def __init__(self, parent, data, namespace=None):
""" Create a namespace from a Parent and YAML """
Resource.__init__(self, parent, data, namespace=namespace)
def __del__(self):
""" Removes the namespace and waits for it to be removed """
if self.started:
self.delete()
def delete(self, timeout=600, graceperiod=0, force=True):
""" Removes the namespace and waits for it to be removed.
:param timeout: Maximum number of seconds to wait for the volume to be removed
"""
args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "namespace", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName='kubectl delete namespace')
i = 0
args = [ "get", "namespace", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
while i < timeout and 0 == ClusterExecutable.start(instance=self, environs=self.localenv, arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get namespace').exitStatus:
self.parent.wait(1)
i = i + 1
[docs]class Secret(Resource):
""" Represents a Kubernetes Secret.
Create using Resource.fromFils"""
def __init__(self, parent, data, namespace=None):
""" Create a secret from a Parent and YAML """
Resource.__init__(self, parent, data, namespace=namespace)
def __del__(self):
""" Removes the secret and waits for it to be removed """
if self.started:
self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True):
""" Removes the secret and waits for it to be removed.
:param timeout: Maximum number of seconds to wait for the secret to be removed
"""
args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "secret", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName='kubectl delete secret',
stdout=f"kubectl-delete-{self.name}.out",
stderr=f"kubectl-delete-{self.name}.err")
i = 0
args = [ "get", "secret", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
while i < timeout and 0 == ClusterExecutable.start(instance=self, environs=self.localenv, arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get secret').exitStatus:
self.parent.wait(1)
i = i + 1
[docs]class PersistentVolume(Resource):
""" Represents a Kubernetes PersistentVolume.
Create using Resource.fromFile """
def __init__(self, parent, data, namespace=None):
""" Create a volume from a Parent and YAML """
Resource.__init__(self, parent, data, namespace=namespace)
def __del__(self):
""" Removes the volume and waits for it to be removed """
if self.started:
self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True):
""" Removes the volume and waits for it to be removed.
:param timeout: Maximum number of seconds to wait for the volume to be removed
"""
args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "pv", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName='kubectl delete pv')
i = 0
args = [ "get", "pv", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv,arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get pv').exitStatus:
self.parent.wait(1)
i = i + 1
[docs]class PersistentVolumeClaim(Resource):
""" Represents a Kubernetes PersistentVolumeClaim.
Create using Resource.fromFile """
def __init__(self, parent, data, namespace=None):
""" Create a claim from a Parent and YAML """
Resource.__init__(self, parent, data, namespace=namespace)
def __del__(self):
""" Removes the claim and waits for it to be removed """
if self.started:
self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True):
""" Removes the claim and waits for it to be removed.
:param timeout: Maximum number of seconds to wait for the claim to be removed
"""
args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "pvc", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName='kubectl delete pvc')
i = 0
args = [ "get", "pvc", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv,arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get pvc').exitStatus:
self.parent.wait(1)
i = i + 1
[docs]class Deployment(Resource):
""" Represents a Kubernetes Deployment.
Create using Resource.fromFile """
def __init__(self, parent, data, namespace=None):
""" Create a deployment from a Parent and YAML """
Resource.__init__(self, parent, data, namespace=namespace)
[docs] def waitForRunning(self, timeout=DEFAULT_TIMEOUT):
""" Wait until the deployment and all its pods are in the running state
:param timeout: Maximum number of seconds to wait for each pod to be available.
"""
running = False
count = 0
while not running:
if count > timeout:
self.parent.abort(TIMEDOUT, "Timeout waiting for deployment %s to start" % self.name)
count = count + 1
outfile = self.getDescribe('deployment')
with self._safelyOpenProcessOutputFile(outfile) as f:
for l in f:
if self.name in l:
columns = re.split('\s+', l)
jobstatus = re.split('/', columns[1])
self.parent.log.info("waitForRunning 1==%s 2==%s", jobstatus[0],jobstatus[1])
if jobstatus[0] == jobstatus[1]:
running = True
if not running:
self.parent.wait(1)
for pod in self.getPods():
pod.waitForRunning(timeout)
[docs] def getPods(self):
""" Get all the pods associated with this deployment """
instance = self.parent.getInstanceCount("pods-get")
args = [ "get", "pods"]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
stdout="kubectl_pods_get_%i.out"%(instance),
stderr="kubectl_pods_get_%i.err"%(instance),
displayName="pods-get")
pods = []
with self._safelyOpenProcessOutputFile(self.parent.output+"/kubectl_pods_get_%i.out"%(instance)) as f:
for l in f:
m = re.search(r'^(%s-[0-9a-f]+-[a-zA-Z0-9]*)\s+.*' % self.name, l)
if m:
pods.append(Pod(self.parent, ['metadata:',' name: %s' % m.group(1)], False, namespace=self.namespace))
return pods
def __del__(self):
""" Removes the deployment and waits for it to be removed """
if self.started:
self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True):
""" Removes the deployment and waits for it to be removed.
:param timeout: Maximum number of seconds to wait for the deployment to be removed
"""
if self.started:
args=[ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "deployment", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName='kubectl delete deployment')
i = 0
args = [ "get", "deployment", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
while i < timeout and 0 == ClusterExecutable.start(instance=self, environs=self.localenv, arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get deployment').exitStatus:
self.parent.wait(1)
i = i + 1
self.started=False
[docs]class StatefulSet(Resource):
""" Represents a Kubernetes StatefulSet.
Create using Resource.fromFile """
def __init__(self, parent, data, namespace=None):
""" Create a statefulset from a Parent and YAML """
Resource.__init__(self, parent, data, namespace=namespace)
[docs] def waitForRunning(self, timeout=DEFAULT_TIMEOUT):
""" Wait until the statefulset and all its pods are in the running state
:param timeout: Maximum number of seconds to wait for each pod to be available.
"""
running = False
count = 0
while not running:
if count > timeout:
self.parent.abort(TIMEDOUT, "Timeout waiting for statefulset %s to start" % self.name)
count = count + 1
outfile = self.getDescribe('statefulset')
with self._safelyOpenProcessOutputFile(outfile) as f:
for l in f:
if self.name in l:
columns = re.split('\s+', l)
jobstatus = re.split('/', columns[1])
self.parent.log.info("waitForRunning: %s / %s", jobstatus[0],jobstatus[1])
if jobstatus[0] == jobstatus[1]:
running = True
if not running:
self.parent.wait(1)
for pod in self.getPods():
pod.waitForRunning(timeout)
[docs] def getPods(self):
""" Get all the pods associated with this statefulset """
instance = self.parent.getInstanceCount("pods-get")
args = [ "get", "pods"]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
stdout="kubectl_pods_get_%i.out"%(instance),
stderr="kubectl_pods_get_%i.err"%(instance),
displayName="pods-get")
pods = []
with self._safelyOpenProcessOutputFile(self.parent.output+"/kubectl_pods_get_%i.out"%(instance)) as f:
for l in f:
m = re.search(r'^(%s-[0-9]+)\s+.*' % self.name, l)
if m:
pods.append(Pod(self.parent, ['metadata:',' name: %s' % m.group(1)], False, namespace=self.namespace))
return pods
def __del__(self):
""" Removes the statefulset and waits for it to be removed """
if self.started:
self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True):
""" Removes the statefulset and waits for it to be removed.
:param timeout: Maximum number of seconds to wait for the statefulset to be removed
"""
if self.started:
args=[ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "statefulset", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName='kubectl delete statefulset')
i = 0
args = [ "get", "statefulset", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
while i < timeout and 0 == ClusterExecutable.start(instance=self, environs=self.localenv,arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get statefulset').exitStatus:
self.parent.wait(1)
i = i + 1
self.started=False
[docs]class Cluster(Resource):
""" Represents a Rancher Cluster - top level organisation for objects.
Also provides a hook for determining cluster health and availability.
Create using Cluster.fromName """
[docs] @classmethod
def fromName(cls, parent , name):
""" Bending this to fit class hierarchy """
data = [
"kind: Cluster\n",
"apiVersion: v1\n",
"metadata:\n",
" name: %s\n"%name
]
return Cluster(parent,data,name)
def __init__(self,parent, data,name):
""" Record Cluster using existing mechanism """
self.clusterName = name
Resource.__init__(self, parent, data, None)
def __del__(self):
""" Cluster is Read-Only """
pass
[docs] def delete(self, timeout=600, graceperiod=0, force=True):
""" Cluster is Read-Only """
pass
[docs]class Project(Resource):
""" Represents a Rancher Project - A grouping mechanism in Rancher.
Create using Project.fromContext """
[docs] @classmethod
def fromName(cls, parent, cluster, name):
""" Bending this to fit class hierarchy """
data = [
"kind: Project\n",
"apiVersion: v1\n",
"metadata:\n",
" name: %s\n"%name
]
return Project(parent, cluster, data)
[docs] @classmethod
def fromUnique(cls, parent, cluster, rootname):
""" Create a namespace with a unique name """
return cls.fromName(parent, cluster, "{}_{}".format(rootname,DockerImage.generateUniqueName()))
[docs] @classmethod
def fromContext(cls, parent, cluster):
""" Create a project with a unique name """
return cls.fromName(parent, cluster, "{}_{}_tests".format(getpass.getuser(),cluster.clusterName))
def __init__(self, parent,cluster, data):
""" Record Cluster using existing mechanism """
self.cluster = cluster
Resource.__init__(self, parent, data)
def __del__(self):
""" delete Project """
if self.started:
self.delete()
[docs] def switch(self):
""" Check Cluster and create project if needed - if not rancher then noop """
if hasattr(self.parent.project, 'CONTAINER_CMD') and self.parent.project.CONTAINER_CMD == 'rancher':
args=["project", "ls"]
tail = ClusterExecutable.start(
instance=self,
rancherDebug=False,
prefixDefaultArg=False,
environs=self.localenv,
arguments=args,
stdout="rancher_projects_get.out",
stderr="rancher_projects_get.err",
displayName='rancher project ls')
self.defaultProject = ''
found = False
with self._safelyOpenProcessOutputFile(self.parent.output+"/rancher_projects_get.out") as f:
for l in f:
columns = re.split('\s+', l)
if l.find(self.name) > -1:
found = True
break
if not found:
args=["project", "create", self.name, "--cluster",self.cluster.clusterName]
ClusterExecutable.start(
instance=self,
prefixDefaultArg=False,
environs=self.localenv,
arguments=args,
stdout="rancher_project_create.out",
stderr="rancher_project_create.err",
displayName='rancher project create')
self.parent.log.info("Created project"+self.name)
args=["context", "switch", self.name]
ClusterExecutable.start(
instance=self,
prefixDefaultArg=False,
environs=self.localenv,
arguments=args,
stdout="rancher_context_switch.out",
stderr="rancher_context_switch.err",
displayName='rancher context switch')
def delete(self, timeout=600, graceperiod=0, force=True):
#we don't delete the project now, it is the base level for all namespaces
#we will have switched into this name space however
pass
[docs]class NameSpace(Resource):
""" Represents a Kubernetes NameSpace.
Create using Resource.fromFile or NameSpace.fromName or NameSpace.fromRandom """
[docs] @classmethod
def fromName(cls, parent, name):
""" Create a named namespace """
data = [
"kind: Namespace\n",
"apiVersion: v1\n",
"metadata:\n",
" name: %s\n"%name
]
return NameSpace(parent, data)
[docs] @classmethod
def fromUnique(cls, parent):
""" Create a namespace with a unique name """
return cls.fromName(parent, '{}-{}'.format(parent.descriptor.id.replace('_','').lower(), DockerImage.generateUniqueName() ))
def __init__(self, parent, data, namespace=None):
""" Create a namespace from a Parent and YAML """
self.project = None
Resource.__init__(self, parent, data, namespace=namespace)
[docs] def createInProject(self,project):
""" Create the namespace under the suppied project - noop if not rancher """
self.project = project
args=["namespace", "create",self.name]
if hasattr(self.parent.project, 'CONTAINER_CMD') and self.parent.project.CONTAINER_CMD == 'rancher':
ClusterExecutable.start(
instance=self,
arguments=args,
prefixDefaultArg=False,
stdout="rancher_namespace_create.out",
stderr="rancher_namespace_create.err",
environs=self.localenv,
displayName='rancher namespace create')
else:
self.create()
def __del__(self):
""" Removes the namespace and waits for it to be removed """
self.delete()
[docs] def delete(self, timeout=600, graceperiod=0, force=True):
""" Removes the namespace and waits for it to be removed.
:param timeout: Maximum number of seconds to wait for the volume to be removed
"""
args = [ "delete", "--force=%s"%force, "--grace-period", str(graceperiod), "namespace", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
ClusterExecutable.start(
instance=self,
environs=self.localenv,
arguments=args,
displayName='kubectl delete namespace')
i = 0
args = [ "get", "namespace", self.name]
if self.namespace:
args.append("--namespace="+self.namespace.getName())
while i < timeout and 0 == ClusterExecutable.start(instance=self,environs=self.localenv, arguments=args, abortOnError=False, ignoreExitStatus=True, displayName='kubectl get namespace').exitStatus:
self.parent.wait(1)
i = i + 1