# pylint: skip-file ''' OpenShiftCLI class that wraps the oc commands in a subprocess ''' import atexit import json import os import shutil import subprocess import re import yaml # This is here because of a bug that causes yaml # to incorrectly handle timezone info on timestamps def timestamp_constructor(_, node): '''return timestamps as strings''' return str(node.value) yaml.add_constructor(u'tag:yaml.org,2002:timestamp', timestamp_constructor) # pylint: disable=too-few-public-methods class OpenShiftCLI(object): ''' Class to wrap the oc command line tools ''' def __init__(self, namespace, kubeconfig='/etc/origin/master/admin.kubeconfig', verbose=False): ''' Constructor for OpenshiftOC ''' self.namespace = namespace self.verbose = verbose self.kubeconfig = kubeconfig # Pylint allows only 5 arguments to be passed. # pylint: disable=too-many-arguments def _replace_content(self, resource, rname, content, force=False): ''' replace the current object with the content ''' res = self._get(resource, rname) if not res['results']: return res fname = '/tmp/%s' % rname yed = Yedit(fname, res['results'][0]) changes = [] for key, value in content.items(): changes.append(yed.put(key, value)) if any([not change[0] for change in changes]): return {'returncode': 0, 'updated': False} yed.write() atexit.register(Utils.cleanup, [fname]) return self._replace(fname, force) def _replace(self, fname, force=False): '''return all pods ''' cmd = ['-n', self.namespace, 'replace', '-f', fname] if force: cmd.append('--force') return self.oc_cmd(cmd) def _create(self, fname): '''return all pods ''' return self.oc_cmd(['create', '-f', fname, '-n', self.namespace]) def _delete(self, resource, rname): '''return all pods ''' return self.oc_cmd(['delete', resource, rname, '-n', self.namespace]) def _get(self, resource, rname=None): '''return a secret by name ''' cmd = ['get', resource, '-o', 'json', '-n', self.namespace] if rname: cmd.append(rname) rval = self.oc_cmd(cmd, output=True) # Ensure results are retuned in an array if rval.has_key('items'): rval['results'] = rval['items'] elif not isinstance(rval['results'], list): rval['results'] = [rval['results']] return rval def oc_cmd(self, cmd, output=False): '''Base command for oc ''' #cmds = ['/usr/bin/oc', '--config', self.kubeconfig] cmds = ['/usr/bin/oc'] cmds.extend(cmd) rval = {} results = '' err = None if self.verbose: print ' '.join(cmds) proc = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env={'KUBECONFIG': self.kubeconfig}) proc.wait() stdout = proc.stdout.read() stderr = proc.stderr.read() rval = {"returncode": proc.returncode, "results": results, } if proc.returncode == 0: if output: try: rval['results'] = json.loads(stdout) except ValueError as err: if "No JSON object could be decoded" in err.message: err = err.message if self.verbose: print stdout print stderr print if err: rval.update({"err": err, "stderr": stderr, "stdout": stdout, "cmd": cmds }) else: rval.update({"stderr": stderr, "stdout": stdout, "results": {}, }) return rval class Utils(object): ''' utilities for openshiftcli modules ''' @staticmethod def create_file(rname, data, ftype=None): ''' create a file in tmp with name and contents''' path = os.path.join('/tmp', rname) with open(path, 'w') as fds: if ftype == 'yaml': fds.write(yaml.safe_dump(data, default_flow_style=False)) elif ftype == 'json': fds.write(json.dumps(data)) else: fds.write(data) # Register cleanup when module is done atexit.register(Utils.cleanup, [path]) return path @staticmethod def create_files_from_contents(data): '''Turn an array of dict: filename, content into a files array''' files = [] for sfile in data: path = Utils.create_file(sfile['path'], sfile['content']) files.append(path) return files @staticmethod def cleanup(files): '''Clean up on exit ''' for sfile in files: if os.path.exists(sfile): if os.path.isdir(sfile): shutil.rmtree(sfile) elif os.path.isfile(sfile): os.remove(sfile) @staticmethod def exists(results, _name): ''' Check to see if the results include the name ''' if not results: return False if Utils.find_result(results, _name): return True return False @staticmethod def find_result(results, _name): ''' Find the specified result by name''' rval = None for result in results: if result.has_key('metadata') and result['metadata']['name'] == _name: rval = result break return rval @staticmethod def get_resource_file(sfile, sfile_type='yaml'): ''' return the service file ''' contents = None with open(sfile) as sfd: contents = sfd.read() if sfile_type == 'yaml': contents = yaml.safe_load(contents) elif sfile_type == 'json': contents = json.loads(contents) return contents # Disabling too-many-branches. This is a yaml dictionary comparison function # pylint: disable=too-many-branches,too-many-return-statements @staticmethod def check_def_equal(user_def, result_def, debug=False): ''' Given a user defined definition, compare it with the results given back by our query. ''' # Currently these values are autogenerated and we do not need to check them skip = ['metadata', 'status'] for key, value in result_def.items(): if key in skip: continue # Both are lists if isinstance(value, list): if not isinstance(user_def[key], list): return False # lists should be identical if value != user_def[key]: return False # recurse on a dictionary elif isinstance(value, dict): if not isinstance(user_def[key], dict): if debug: print "dict returned false not instance of dict" return False # before passing ensure keys match api_values = set(value.keys()) - set(skip) user_values = set(user_def[key].keys()) - set(skip) if api_values != user_values: if debug: print api_values print user_values print "keys are not equal in dict" return False result = Utils.check_def_equal(user_def[key], value, debug=debug) if not result: if debug: print "dict returned false" return False # Verify each key, value pair is the same else: if not user_def.has_key(key) or value != user_def[key]: if debug: print "value not equal; user_def does not have key" print value print user_def[key] return False return True