# pylint: skip-file # flake8: noqa # pylint: disable=too-many-lines # noqa: E301,E302,E303,T001 class OpenShiftCLIError(Exception): '''Exception class for openshiftcli''' pass ADDITIONAL_PATH_LOOKUPS = ['/usr/local/bin', os.path.expanduser('~/bin')] def locate_oc_binary(): ''' Find and return oc binary file ''' # https://github.com/openshift/openshift-ansible/issues/3410 # oc can be in /usr/local/bin in some cases, but that may not # be in $PATH due to ansible/sudo paths = os.environ.get("PATH", os.defpath).split(os.pathsep) + ADDITIONAL_PATH_LOOKUPS oc_binary = 'oc' # Use shutil.which if it is available, otherwise fallback to a naive path search try: which_result = shutil.which(oc_binary, path=os.pathsep.join(paths)) if which_result is not None: oc_binary = which_result except AttributeError: for path in paths: if os.path.exists(os.path.join(path, oc_binary)): oc_binary = os.path.join(path, oc_binary) break return oc_binary # pylint: disable=too-few-public-methods class OpenShiftCLI(object): ''' Class to wrap the command line tools ''' def __init__(self, namespace, kubeconfig='/etc/origin/master/admin.kubeconfig', verbose=False, all_namespaces=False): ''' Constructor for OpenshiftCLI ''' self.namespace = namespace self.verbose = verbose self.kubeconfig = Utils.create_tmpfile_copy(kubeconfig) self.all_namespaces = all_namespaces self.oc_binary = locate_oc_binary() # Pylint allows only 5 arguments to be passed. # pylint: disable=too-many-arguments def _replace_content(self, resource, rname, content, force=False, sep='.'): ''' replace the current object with the content ''' res = self._get(resource, rname) if not res['results']: return res fname = Utils.create_tmpfile(rname + '-') yed = Yedit(fname, res['results'][0], separator=sep) changes = [] for key, value in content.items(): changes.append(yed.put(key, value)) if any([change[0] for change in changes]): yed.write() atexit.register(Utils.cleanup, [fname]) return self._replace(fname, force) return {'returncode': 0, 'updated': False} def _replace(self, fname, force=False): '''replace the current object with oc replace''' cmd = ['replace', '-f', fname] if force: cmd.append('--force') return self.openshift_cmd(cmd) def _create_from_content(self, rname, content): '''create a temporary file and then call oc create on it''' fname = Utils.create_tmpfile(rname + '-') yed = Yedit(fname, content=content) yed.write() atexit.register(Utils.cleanup, [fname]) return self._create(fname) def _create(self, fname): '''call oc create on a filename''' return self.openshift_cmd(['create', '-f', fname]) def _delete(self, resource, rname, selector=None): '''call oc delete on a resource''' cmd = ['delete', resource, rname] if selector: cmd.append('--selector=%s' % selector) return self.openshift_cmd(cmd) def _process(self, template_name, create=False, params=None, template_data=None): # noqa: E501 '''process a template template_name: the name of the template to process create: whether to send to oc create after processing params: the parameters for the template template_data: the incoming template's data; instead of a file ''' cmd = ['process'] if template_data: cmd.extend(['-f', '-']) else: cmd.append(template_name) if params: param_str = ["%s=%s" % (key, value) for key, value in params.items()] cmd.append('-v') cmd.extend(param_str) results = self.openshift_cmd(cmd, output=True, input_data=template_data) if results['returncode'] != 0 or not create: return results fname = Utils.create_tmpfile(template_name + '-') yed = Yedit(fname, results['results']) yed.write() atexit.register(Utils.cleanup, [fname]) return self.openshift_cmd(['create', '-f', fname]) def _get(self, resource, rname=None, selector=None): '''return a resource by name ''' cmd = ['get', resource] if selector: cmd.append('--selector=%s' % selector) elif rname: cmd.append(rname) cmd.extend(['-o', 'json']) rval = self.openshift_cmd(cmd, output=True) # Ensure results are retuned in an array if 'items' in rval: rval['results'] = rval['items'] elif not isinstance(rval['results'], list): rval['results'] = [rval['results']] return rval def _schedulable(self, node=None, selector=None, schedulable=True): ''' perform oadm manage-node scheduable ''' cmd = ['manage-node'] if node: cmd.extend(node) else: cmd.append('--selector=%s' % selector) cmd.append('--schedulable=%s' % schedulable) return self.openshift_cmd(cmd, oadm=True, output=True, output_type='raw') # noqa: E501 def _list_pods(self, node=None, selector=None, pod_selector=None): ''' perform oadm list pods node: the node in which to list pods selector: the label selector filter if provided pod_selector: the pod selector filter if provided ''' cmd = ['manage-node'] if node: cmd.extend(node) else: cmd.append('--selector=%s' % selector) if pod_selector: cmd.append('--pod-selector=%s' % pod_selector) cmd.extend(['--list-pods', '-o', 'json']) return self.openshift_cmd(cmd, oadm=True, output=True, output_type='raw') # pylint: disable=too-many-arguments def _evacuate(self, node=None, selector=None, pod_selector=None, dry_run=False, grace_period=None, force=False): ''' perform oadm manage-node evacuate ''' cmd = ['manage-node'] if node: cmd.extend(node) else: cmd.append('--selector=%s' % selector) if dry_run: cmd.append('--dry-run') if pod_selector: cmd.append('--pod-selector=%s' % pod_selector) if grace_period: cmd.append('--grace-period=%s' % int(grace_period)) if force: cmd.append('--force') cmd.append('--evacuate') return self.openshift_cmd(cmd, oadm=True, output=True, output_type='raw') def _version(self): ''' return the openshift version''' return self.openshift_cmd(['version'], output=True, output_type='raw') def _import_image(self, url=None, name=None, tag=None): ''' perform image import ''' cmd = ['import-image'] image = '{0}'.format(name) if tag: image += ':{0}'.format(tag) cmd.append(image) if url: cmd.append('--from={0}/{1}'.format(url, image)) cmd.append('-n{0}'.format(self.namespace)) cmd.append('--confirm') return self.openshift_cmd(cmd) def _run(self, cmds, input_data): ''' Actually executes the command. This makes mocking easier. ''' curr_env = os.environ.copy() curr_env.update({'KUBECONFIG': self.kubeconfig}) proc = subprocess.Popen(cmds, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=curr_env) stdout, stderr = proc.communicate(input_data) return proc.returncode, stdout.decode(), stderr.decode() # pylint: disable=too-many-arguments,too-many-branches def openshift_cmd(self, cmd, oadm=False, output=False, output_type='json', input_data=None): '''Base command for oc ''' cmds = [self.oc_binary] if oadm: cmds.append('adm') cmds.extend(cmd) if self.all_namespaces: cmds.extend(['--all-namespaces']) elif self.namespace is not None and self.namespace.lower() not in ['none', 'emtpy']: # E501 cmds.extend(['-n', self.namespace]) rval = {} results = '' err = None if self.verbose: print(' '.join(cmds)) try: returncode, stdout, stderr = self._run(cmds, input_data) except OSError as ex: returncode, stdout, stderr = 1, '', 'Failed to execute {}: {}'.format(subprocess.list2cmdline(cmds), ex) rval = {"returncode": returncode, "results": results, "cmd": ' '.join(cmds)} if returncode == 0: if output: if output_type == 'json': try: rval['results'] = json.loads(stdout) except ValueError as err: if "No JSON object could be decoded" in err.args: err = err.args elif output_type == 'raw': rval['results'] = stdout if self.verbose: print("STDOUT: {0}".format(stdout)) print("STDERR: {0}".format(stderr)) if err: rval.update({"err": err, "stderr": stderr, "stdout": stdout, "cmd": cmds}) else: rval.update({"stderr": stderr, "stdout": stdout, "results": {}}) return rval class Utils(object): ''' utilities for openshiftcli modules ''' @staticmethod def _write(filename, contents): ''' Actually write the file contents to disk. This helps with mocking. ''' with open(filename, 'w') as sfd: sfd.write(contents) @staticmethod def create_tmp_file_from_contents(rname, data, ftype='yaml'): ''' create a file in tmp with name and contents''' tmp = Utils.create_tmpfile(prefix=rname) if ftype == 'yaml': # AUDIT:no-member makes sense here due to ruamel.YAML/PyYAML usage # pylint: disable=no-member if hasattr(yaml, 'RoundTripDumper'): Utils._write(tmp, yaml.dump(data, Dumper=yaml.RoundTripDumper)) else: Utils._write(tmp, yaml.safe_dump(data, default_flow_style=False)) elif ftype == 'json': Utils._write(tmp, json.dumps(data)) else: Utils._write(tmp, data) # Register cleanup when module is done atexit.register(Utils.cleanup, [tmp]) return tmp @staticmethod def create_tmpfile_copy(inc_file): '''create a temporary copy of a file''' tmpfile = Utils.create_tmpfile('lib_openshift-') Utils._write(tmpfile, open(inc_file).read()) # Cleanup the tmpfile atexit.register(Utils.cleanup, [tmpfile]) return tmpfile @staticmethod def create_tmpfile(prefix='tmp'): ''' Generates and returns a temporary file name ''' with tempfile.NamedTemporaryFile(prefix=prefix, delete=False) as tmp: return tmp.name @staticmethod def create_tmp_files_from_contents(content, content_type=None): '''Turn an array of dict: filename, content into a files array''' if not isinstance(content, list): content = [content] files = [] for item in content: path = Utils.create_tmp_file_from_contents(item['path'] + '-', item['data'], ftype=content_type) files.append({'name': os.path.basename(item['path']), 'path': path}) return files @staticmethod def cleanup(files): '''Clean up on exit ''' for sfile in files: if os.path.exists(sfile): if os.path.isdir(sfile): shutil.rmtree(sfile) elif os.path.isfile(sfile): os.remove(sfile) @staticmethod def exists(results, _name): ''' Check to see if the results include the name ''' if not results: return False if Utils.find_result(results, _name): return True return False @staticmethod def find_result(results, _name): ''' Find the specified result by name''' rval = None for result in results: if 'metadata' in result and result['metadata']['name'] == _name: rval = result break return rval @staticmethod def get_resource_file(sfile, sfile_type='yaml'): ''' return the service file ''' contents = None with open(sfile) as sfd: contents = sfd.read() if sfile_type == 'yaml': # AUDIT:no-member makes sense here due to ruamel.YAML/PyYAML usage # pylint: disable=no-member if hasattr(yaml, 'RoundTripLoader'): contents = yaml.load(contents, yaml.RoundTripLoader) else: contents = yaml.safe_load(contents) elif sfile_type == 'json': contents = json.loads(contents) return contents @staticmethod def filter_versions(stdout): ''' filter the oc version output ''' version_dict = {} version_search = ['oc', 'openshift', 'kubernetes'] for line in stdout.strip().split('\n'): for term in version_search: if not line: continue if line.startswith(term): version_dict[term] = line.split()[-1] # horrible hack to get openshift version in Openshift 3.2 # By default "oc version in 3.2 does not return an "openshift" version if "openshift" not in version_dict: version_dict["openshift"] = version_dict["oc"] return version_dict @staticmethod def add_custom_versions(versions): ''' create custom versions strings ''' versions_dict = {} for tech, version in versions.items(): # clean up "-" from version if "-" in version: version = version.split("-")[0] if version.startswith('v'): versions_dict[tech + '_numeric'] = version[1:].split('+')[0] # "v3.3.0.33" is what we have, we want "3.3" versions_dict[tech + '_short'] = version[1:4] return versions_dict @staticmethod def openshift_installed(): ''' check if openshift is installed ''' import yum yum_base = yum.YumBase() if yum_base.rpmdb.searchNevra(name='atomic-openshift'): return True return False # Disabling too-many-branches. This is a yaml dictionary comparison function # pylint: disable=too-many-branches,too-many-return-statements,too-many-statements @staticmethod def check_def_equal(user_def, result_def, skip_keys=None, debug=False): ''' Given a user defined definition, compare it with the results given back by our query. ''' # Currently these values are autogenerated and we do not need to check them skip = ['metadata', 'status'] if skip_keys: skip.extend(skip_keys) for key, value in result_def.items(): if key in skip: continue # Both are lists if isinstance(value, list): if key not in user_def: if debug: print('User data does not have key [%s]' % key) print('User data: %s' % user_def) return False if not isinstance(user_def[key], list): if debug: print('user_def[key] is not a list key=[%s] user_def[key]=%s' % (key, user_def[key])) return False if len(user_def[key]) != len(value): if debug: print("List lengths are not equal.") print("key=[%s]: user_def[%s] != value[%s]" % (key, len(user_def[key]), len(value))) print("user_def: %s" % user_def[key]) print("value: %s" % value) return False for values in zip(user_def[key], value): if isinstance(values[0], dict) and isinstance(values[1], dict): if debug: print('sending list - list') print(type(values[0])) print(type(values[1])) result = Utils.check_def_equal(values[0], values[1], skip_keys=skip_keys, debug=debug) if not result: print('list compare returned false') return False elif value != user_def[key]: if debug: print('value should be identical') print(user_def[key]) print(value) return False # recurse on a dictionary elif isinstance(value, dict): if key not in user_def: if debug: print("user_def does not have key [%s]" % key) return False if not isinstance(user_def[key], dict): if debug: print("dict returned false: not instance of dict") return False # before passing ensure keys match api_values = set(value.keys()) - set(skip) user_values = set(user_def[key].keys()) - set(skip) if api_values != user_values: if debug: print("keys are not equal in dict") print(user_values) print(api_values) return False result = Utils.check_def_equal(user_def[key], value, skip_keys=skip_keys, debug=debug) if not result: if debug: print("dict returned false") print(result) return False # Verify each key, value pair is the same else: if key not in user_def or value != user_def[key]: if debug: print("value not equal; user_def does not have key") print(key) print(value) if key in user_def: print(user_def[key]) return False if debug: print('returning true') return True class OpenShiftCLIConfig(object): '''Generic Config''' def __init__(self, rname, namespace, kubeconfig, options): self.kubeconfig = kubeconfig self.name = rname self.namespace = namespace self._options = options @property def config_options(self): ''' return config options ''' return self._options def to_option_list(self): '''return all options as a string''' return self.stringify() def stringify(self): ''' return the options hash as cli params in a string ''' rval = [] for key in sorted(self.config_options.keys()): data = self.config_options[key] if data['include'] \ and (data['value'] or isinstance(data['value'], int)): rval.append('--{}={}'.format(key.replace('_', '-'), data['value'])) return rval