summaryrefslogtreecommitdiffstats
path: root/roles/openshift_certificate_expiry/library/openshift_cert_expiry.py
diff options
context:
space:
mode:
Diffstat (limited to 'roles/openshift_certificate_expiry/library/openshift_cert_expiry.py')
-rw-r--r--roles/openshift_certificate_expiry/library/openshift_cert_expiry.py116
1 files changed, 42 insertions, 74 deletions
diff --git a/roles/openshift_certificate_expiry/library/openshift_cert_expiry.py b/roles/openshift_certificate_expiry/library/openshift_cert_expiry.py
index b093d84fe..33a4faf3e 100644
--- a/roles/openshift_certificate_expiry/library/openshift_cert_expiry.py
+++ b/roles/openshift_certificate_expiry/library/openshift_cert_expiry.py
@@ -8,15 +8,12 @@ import datetime
import io
import os
import subprocess
-import sys
-import tempfile
+import yaml
-# File pointers from io.open require unicode inputs when using their
-# `write` method
-import six
from six.moves import configparser
-import yaml
+from ansible.module_utils.basic import AnsibleModule
+
try:
# You can comment this import out and include a 'pass' in this
# block if you're manually testing this module on a NON-ATOMIC
@@ -24,13 +21,14 @@ try:
# available). That will force the `load_and_handle_cert` function
# to use the Fake OpenSSL classes.
import OpenSSL.crypto
+ HAS_OPENSSL = True
except ImportError:
# Some platforms (such as RHEL Atomic) may not have the Python
# OpenSSL library installed. In this case we will use a manual
# work-around to parse each certificate.
#
# Check for 'OpenSSL.crypto' in `sys.modules` later.
- pass
+ HAS_OPENSSL = False
DOCUMENTATION = '''
---
@@ -158,6 +156,10 @@ might return: [('O=system', 'nodes'), ('CN=system', 'node:m01.example.com')]
'subjectAltName'"""
return self.extensions[i]
+ def get_extension_count(self):
+ """ get_extension_count """
+ return len(self.extensions)
+
def get_notAfter(self):
"""Returns a date stamp as a string in the form
'20180922170439Z'. strptime the result with format param:
@@ -268,30 +270,23 @@ A tuple of the form:
# around a missing library on the target host.
#
# pylint: disable=redefined-variable-type
- if 'OpenSSL.crypto' in sys.modules:
+ if HAS_OPENSSL:
# No work-around required
cert_loaded = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, _cert_string)
else:
- # Missing library, work-around required. We need to write the
- # cert out to disk temporarily so we can run the 'openssl'
+ # Missing library, work-around required. Run the 'openssl'
# command on it to decode it
- _, path = tempfile.mkstemp()
- with io.open(path, 'w') as fp:
- fp.write(six.u(_cert_string))
- fp.flush()
-
- cmd = 'openssl x509 -in {} -text'.format(path)
+ cmd = 'openssl x509 -text'
try:
- openssl_decoded = subprocess.Popen(cmd.split(),
- stdout=subprocess.PIPE)
+ openssl_proc = subprocess.Popen(cmd.split(),
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE)
except OSError:
ans_module.fail_json(msg="Error: The 'OpenSSL' python library and CLI command were not found on the target host. Unable to parse any certificates. This host will not be included in generated reports.")
else:
- openssl_decoded = openssl_decoded.communicate()[0]
+ openssl_decoded = openssl_proc.communicate(_cert_string.encode('utf-8'))[0].decode('utf-8')
cert_loaded = FakeOpenSSLCertificate(openssl_decoded)
- finally:
- os.remove(path)
######################################################################
# Read all possible names from the cert
@@ -301,34 +296,12 @@ A tuple of the form:
# To read SANs from a cert we must read the subjectAltName
# extension from the X509 Object. What makes this more difficult
- # is that pyOpenSSL does not give extensions as a list, nor does
- # it provide a count of all loaded extensions.
- #
- # Rather, extensions are REQUESTED by index. We must iterate over
- # all extensions until we find the one called 'subjectAltName'. If
- # we don't find that extension we'll eventually request an
- # extension at an index where no extension exists (IndexError is
- # raised). When that happens we know that the cert has no SANs so
- # we break out of the loop.
- i = 0
- checked_all_extensions = False
- while not checked_all_extensions:
- try:
- # Read the extension at index 'i'
- ext = cert_loaded.get_extension(i)
- except IndexError:
- # We tried to read an extension but it isn't there, that
- # means we ran out of extensions to check. Abort
- san = None
- checked_all_extensions = True
- else:
- # We were able to load the extension at index 'i'
- if ext.get_short_name() == 'subjectAltName':
- san = ext
- checked_all_extensions = True
- else:
- # Try reading the next extension
- i += 1
+ # is that pyOpenSSL does not give extensions as an iterable
+ san = None
+ for i in range(cert_loaded.get_extension_count()):
+ ext = cert_loaded.get_extension(i)
+ if ext.get_short_name() == 'subjectAltName':
+ san = ext
if san is not None:
# The X509Extension object for subjectAltName prints as a
@@ -341,9 +314,13 @@ A tuple of the form:
######################################################################
# Grab the expiration date
+ not_after = cert_loaded.get_notAfter()
+ # example get_notAfter() => 20180922170439Z
+ if isinstance(not_after, bytes):
+ not_after = not_after.decode('utf-8')
+
cert_expiry_date = datetime.datetime.strptime(
- cert_loaded.get_notAfter(),
- # example get_notAfter() => 20180922170439Z
+ not_after,
'%Y%m%d%H%M%SZ')
time_remaining = cert_expiry_date - now
@@ -455,13 +432,11 @@ an OpenShift Container Platform cluster
)
# Basic scaffolding for OpenShift specific certs
- openshift_base_config_path = module.params['config_base']
- openshift_master_config_path = os.path.normpath(
- os.path.join(openshift_base_config_path, "master/master-config.yaml")
- )
- openshift_node_config_path = os.path.normpath(
- os.path.join(openshift_base_config_path, "node/node-config.yaml")
- )
+ openshift_base_config_path = os.path.realpath(module.params['config_base'])
+ openshift_master_config_path = os.path.join(openshift_base_config_path,
+ "master", "master-config.yaml")
+ openshift_node_config_path = os.path.join(openshift_base_config_path,
+ "node", "node-config.yaml")
openshift_cert_check_paths = [
openshift_master_config_path,
openshift_node_config_path,
@@ -476,9 +451,7 @@ an OpenShift Container Platform cluster
kubeconfig_paths = []
for m_kube_config in master_kube_configs:
kubeconfig_paths.append(
- os.path.normpath(
- os.path.join(openshift_base_config_path, "master/%s.kubeconfig" % m_kube_config)
- )
+ os.path.join(openshift_base_config_path, "master", m_kube_config + ".kubeconfig")
)
# Validate some paths we have the ability to do ahead of time
@@ -527,7 +500,7 @@ an OpenShift Container Platform cluster
######################################################################
for os_cert in filter_paths(openshift_cert_check_paths):
# Open up that config file and locate the cert and CA
- with open(os_cert, 'r') as fp:
+ with io.open(os_cert, 'r', encoding='utf-8') as fp:
cert_meta = {}
cfg = yaml.load(fp)
# cert files are specified in parsed `fp` as relative to the path
@@ -542,7 +515,7 @@ an OpenShift Container Platform cluster
# Load the certificate and the CA, parse their expiration dates into
# datetime objects so we can manipulate them later
for _, v in cert_meta.items():
- with open(v, 'r') as fp:
+ with io.open(v, 'r', encoding='utf-8') as fp:
cert = fp.read()
(cert_subject,
cert_expiry_date,
@@ -575,7 +548,7 @@ an OpenShift Container Platform cluster
try:
# Try to read the standard 'node-config.yaml' file to check if
# this host is a node.
- with open(openshift_node_config_path, 'r') as fp:
+ with io.open(openshift_node_config_path, 'r', encoding='utf-8') as fp:
cfg = yaml.load(fp)
# OK, the config file exists, therefore this is a
@@ -588,7 +561,7 @@ an OpenShift Container Platform cluster
cfg_path = os.path.dirname(fp.name)
node_kubeconfig = os.path.join(cfg_path, node_masterKubeConfig)
- with open(node_kubeconfig, 'r') as fp:
+ with io.open(node_kubeconfig, 'r', encoding='utf8') as fp:
# Read in the nodes kubeconfig file and grab the good stuff
cfg = yaml.load(fp)
@@ -613,7 +586,7 @@ an OpenShift Container Platform cluster
pass
for kube in filter_paths(kubeconfig_paths):
- with open(kube, 'r') as fp:
+ with io.open(kube, 'r', encoding='utf-8') as fp:
# TODO: Maybe consider catching exceptions here?
cfg = yaml.load(fp)
@@ -656,7 +629,7 @@ an OpenShift Container Platform cluster
etcd_certs = []
etcd_cert_params.append('dne')
try:
- with open('/etc/etcd/etcd.conf', 'r') as fp:
+ with io.open('/etc/etcd/etcd.conf', 'r', encoding='utf-8') as fp:
etcd_config = configparser.ConfigParser()
# Reason: This check is disabled because the issue was introduced
# during a period where the pylint checks weren't enabled for this file
@@ -675,7 +648,7 @@ an OpenShift Container Platform cluster
pass
for etcd_cert in filter_paths(etcd_certs_to_check):
- with open(etcd_cert, 'r') as fp:
+ with io.open(etcd_cert, 'r', encoding='utf-8') as fp:
c = fp.read()
(cert_subject,
cert_expiry_date,
@@ -697,7 +670,7 @@ an OpenShift Container Platform cluster
# Now the embedded etcd
######################################################################
try:
- with open('/etc/origin/master/master-config.yaml', 'r') as fp:
+ with io.open('/etc/origin/master/master-config.yaml', 'r', encoding='utf-8') as fp:
cfg = yaml.load(fp)
except IOError:
# Not present
@@ -864,10 +837,5 @@ an OpenShift Container Platform cluster
)
-######################################################################
-# It's just the way we do things in Ansible. So disable this warning
-#
-# pylint: disable=wrong-import-position,import-error
-from ansible.module_utils.basic import AnsibleModule # noqa: E402
if __name__ == '__main__':
main()