summaryrefslogtreecommitdiffstats
path: root/roles/lib_utils/test
diff options
context:
space:
mode:
authorMichael Gugino <mgugino@redhat.com>2018-01-04 23:55:34 -0500
committerMichael Gugino <mgugino@redhat.com>2018-01-10 11:34:36 -0500
commitd3fefc32a727fe3c13159c4e9fe4399f35b487a8 (patch)
tree3211ffc7fa4c8df9ff93928e705ef5314d339f3c /roles/lib_utils/test
parentee2d4b8e66a344e8f6ca12cbc9362a80a07555d0 (diff)
downloadopenshift-d3fefc32a727fe3c13159c4e9fe4399f35b487a8.tar.gz
openshift-d3fefc32a727fe3c13159c4e9fe4399f35b487a8.tar.bz2
openshift-d3fefc32a727fe3c13159c4e9fe4399f35b487a8.tar.xz
openshift-d3fefc32a727fe3c13159c4e9fe4399f35b487a8.zip
Move more plugins to lib_utils
This commit continues moving plugins into lib_utils. This commit does not move any plugins for add-on roles such as logging and metrics.
Diffstat (limited to 'roles/lib_utils/test')
-rw-r--r--roles/lib_utils/test/conftest.py172
-rw-r--r--roles/lib_utils/test/openshift_master_facts_bad_input_tests.py57
-rw-r--r--roles/lib_utils/test/openshift_master_facts_conftest.py54
-rw-r--r--roles/lib_utils/test/openshift_master_facts_default_predicates_tests.py193
-rw-r--r--roles/lib_utils/test/openshift_master_facts_default_priorities_tests.py167
-rw-r--r--roles/lib_utils/test/test_fakeopensslclasses.py90
-rw-r--r--roles/lib_utils/test/test_load_and_handle_cert.py67
7 files changed, 800 insertions, 0 deletions
diff --git a/roles/lib_utils/test/conftest.py b/roles/lib_utils/test/conftest.py
new file mode 100644
index 000000000..aabdd4fa1
--- /dev/null
+++ b/roles/lib_utils/test/conftest.py
@@ -0,0 +1,172 @@
+# pylint: disable=missing-docstring,invalid-name,redefined-outer-name
+import os
+import pytest
+import sys
+
+from OpenSSL import crypto
+
+sys.path.insert(1, os.path.join(os.path.dirname(__file__), os.pardir, "lookup_plugins"))
+
+from openshift_master_facts_default_predicates import LookupModule as PredicatesLookupModule # noqa: E402
+from openshift_master_facts_default_priorities import LookupModule as PrioritiesLookupModule # noqa: E402
+
+# Parameter list for valid_cert fixture
+VALID_CERTIFICATE_PARAMS = [
+ {
+ 'short_name': 'client',
+ 'cn': 'client.example.com',
+ 'serial': 4,
+ 'uses': b'clientAuth',
+ 'dns': [],
+ 'ip': [],
+ },
+ {
+ 'short_name': 'server',
+ 'cn': 'server.example.com',
+ 'serial': 5,
+ 'uses': b'serverAuth',
+ 'dns': ['kubernetes', 'openshift'],
+ 'ip': ['10.0.0.1', '192.168.0.1']
+ },
+ {
+ 'short_name': 'combined',
+ 'cn': 'combined.example.com',
+ # Verify that HUGE serials parse correctly.
+ # Frobs PARSING_HEX_SERIAL in _parse_cert
+ # See https://bugzilla.redhat.com/show_bug.cgi?id=1464240
+ 'serial': 14449739080294792594019643629255165375,
+ 'uses': b'clientAuth, serverAuth',
+ 'dns': ['etcd'],
+ 'ip': ['10.0.0.2', '192.168.0.2']
+ }
+]
+
+# Extract the short_name from VALID_CERTIFICATE_PARAMS to provide
+# friendly naming for the valid_cert fixture
+VALID_CERTIFICATE_IDS = [param['short_name'] for param in VALID_CERTIFICATE_PARAMS]
+
+
+@pytest.fixture(scope='session')
+def ca(tmpdir_factory):
+ ca_dir = tmpdir_factory.mktemp('ca')
+
+ key = crypto.PKey()
+ key.generate_key(crypto.TYPE_RSA, 2048)
+
+ cert = crypto.X509()
+ cert.set_version(3)
+ cert.set_serial_number(1)
+ cert.get_subject().commonName = 'test-signer'
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(24 * 60 * 60)
+ cert.set_issuer(cert.get_subject())
+ cert.set_pubkey(key)
+ cert.add_extensions([
+ crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE, pathlen:0'),
+ crypto.X509Extension(b'keyUsage', True,
+ b'digitalSignature, keyEncipherment, keyCertSign, cRLSign'),
+ crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', subject=cert)
+ ])
+ cert.add_extensions([
+ crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid:always', issuer=cert)
+ ])
+ cert.sign(key, 'sha256')
+
+ return {
+ 'dir': ca_dir,
+ 'key': key,
+ 'cert': cert,
+ }
+
+
+@pytest.fixture(scope='session',
+ ids=VALID_CERTIFICATE_IDS,
+ params=VALID_CERTIFICATE_PARAMS)
+def valid_cert(request, ca):
+ common_name = request.param['cn']
+
+ key = crypto.PKey()
+ key.generate_key(crypto.TYPE_RSA, 2048)
+
+ cert = crypto.X509()
+ cert.set_serial_number(request.param['serial'])
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(24 * 60 * 60)
+ cert.set_issuer(ca['cert'].get_subject())
+ cert.set_pubkey(key)
+ cert.set_version(3)
+ cert.get_subject().commonName = common_name
+ cert.add_extensions([
+ crypto.X509Extension(b'basicConstraints', True, b'CA:FALSE'),
+ crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment'),
+ crypto.X509Extension(b'extendedKeyUsage', False, request.param['uses']),
+ ])
+
+ if request.param['dns'] or request.param['ip']:
+ san_list = ['DNS:{}'.format(common_name)]
+ san_list.extend(['DNS:{}'.format(x) for x in request.param['dns']])
+ san_list.extend(['IP:{}'.format(x) for x in request.param['ip']])
+
+ cert.add_extensions([
+ crypto.X509Extension(b'subjectAltName', False, ', '.join(san_list).encode('utf8'))
+ ])
+ cert.sign(ca['key'], 'sha256')
+
+ cert_contents = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
+ cert_file = ca['dir'].join('{}.crt'.format(common_name))
+ cert_file.write_binary(cert_contents)
+
+ return {
+ 'common_name': common_name,
+ 'serial': request.param['serial'],
+ 'dns': request.param['dns'],
+ 'ip': request.param['ip'],
+ 'uses': request.param['uses'],
+ 'cert_file': cert_file,
+ 'cert': cert
+ }
+
+
+@pytest.fixture()
+def predicates_lookup():
+ return PredicatesLookupModule()
+
+
+@pytest.fixture()
+def priorities_lookup():
+ return PrioritiesLookupModule()
+
+
+@pytest.fixture()
+def facts():
+ return {
+ 'openshift': {
+ 'common': {}
+ }
+ }
+
+
+@pytest.fixture(params=[True, False])
+def regions_enabled(request):
+ return request.param
+
+
+@pytest.fixture(params=[True, False])
+def zones_enabled(request):
+ return request.param
+
+
+def v_prefix(release):
+ """Prefix a release number with 'v'."""
+ return "v" + release
+
+
+def minor(release):
+ """Add a suffix to release, making 'X.Y' become 'X.Y.Z'."""
+ return release + ".1"
+
+
+@pytest.fixture(params=[str, v_prefix, minor])
+def release_mod(request):
+ """Modifies a release string to alternative valid values."""
+ return request.param
diff --git a/roles/lib_utils/test/openshift_master_facts_bad_input_tests.py b/roles/lib_utils/test/openshift_master_facts_bad_input_tests.py
new file mode 100644
index 000000000..e8da1e04a
--- /dev/null
+++ b/roles/lib_utils/test/openshift_master_facts_bad_input_tests.py
@@ -0,0 +1,57 @@
+import copy
+import os
+import sys
+
+from ansible.errors import AnsibleError
+import pytest
+
+sys.path.insert(1, os.path.join(os.path.dirname(__file__), os.pardir, "lookup_plugins"))
+
+from openshift_master_facts_default_predicates import LookupModule # noqa: E402
+
+
+class TestOpenShiftMasterFactsBadInput(object):
+ lookup = LookupModule()
+ default_facts = {
+ 'openshift': {
+ 'common': {}
+ }
+ }
+
+ def test_missing_openshift_facts(self):
+ with pytest.raises(AnsibleError):
+ facts = {}
+ self.lookup.run(None, variables=facts)
+
+ def test_missing_deployment_type(self):
+ with pytest.raises(AnsibleError):
+ facts = copy.deepcopy(self.default_facts)
+ facts['openshift']['common']['short_version'] = '10.10'
+ self.lookup.run(None, variables=facts)
+
+ def test_missing_short_version_and_missing_openshift_release(self):
+ with pytest.raises(AnsibleError):
+ facts = copy.deepcopy(self.default_facts)
+ facts['openshift']['common']['deployment_type'] = 'origin'
+ self.lookup.run(None, variables=facts)
+
+ def test_unknown_deployment_types(self):
+ with pytest.raises(AnsibleError):
+ facts = copy.deepcopy(self.default_facts)
+ facts['openshift']['common']['short_version'] = '1.1'
+ facts['openshift']['common']['deployment_type'] = 'bogus'
+ self.lookup.run(None, variables=facts)
+
+ def test_unknown_origin_version(self):
+ with pytest.raises(AnsibleError):
+ facts = copy.deepcopy(self.default_facts)
+ facts['openshift']['common']['short_version'] = '0.1'
+ facts['openshift']['common']['deployment_type'] = 'origin'
+ self.lookup.run(None, variables=facts)
+
+ def test_unknown_ocp_version(self):
+ with pytest.raises(AnsibleError):
+ facts = copy.deepcopy(self.default_facts)
+ facts['openshift']['common']['short_version'] = '0.1'
+ facts['openshift']['common']['deployment_type'] = 'openshift-enterprise'
+ self.lookup.run(None, variables=facts)
diff --git a/roles/lib_utils/test/openshift_master_facts_conftest.py b/roles/lib_utils/test/openshift_master_facts_conftest.py
new file mode 100644
index 000000000..140cced73
--- /dev/null
+++ b/roles/lib_utils/test/openshift_master_facts_conftest.py
@@ -0,0 +1,54 @@
+import os
+import sys
+
+import pytest
+
+sys.path.insert(1, os.path.join(os.path.dirname(__file__), os.pardir, "lookup_plugins"))
+
+from openshift_master_facts_default_predicates import LookupModule as PredicatesLookupModule # noqa: E402
+from openshift_master_facts_default_priorities import LookupModule as PrioritiesLookupModule # noqa: E402
+
+
+@pytest.fixture()
+def predicates_lookup():
+ return PredicatesLookupModule()
+
+
+@pytest.fixture()
+def priorities_lookup():
+ return PrioritiesLookupModule()
+
+
+@pytest.fixture()
+def facts():
+ return {
+ 'openshift': {
+ 'common': {}
+ }
+ }
+
+
+@pytest.fixture(params=[True, False])
+def regions_enabled(request):
+ return request.param
+
+
+@pytest.fixture(params=[True, False])
+def zones_enabled(request):
+ return request.param
+
+
+def v_prefix(release):
+ """Prefix a release number with 'v'."""
+ return "v" + release
+
+
+def minor(release):
+ """Add a suffix to release, making 'X.Y' become 'X.Y.Z'."""
+ return release + ".1"
+
+
+@pytest.fixture(params=[str, v_prefix, minor])
+def release_mod(request):
+ """Modifies a release string to alternative valid values."""
+ return request.param
diff --git a/roles/lib_utils/test/openshift_master_facts_default_predicates_tests.py b/roles/lib_utils/test/openshift_master_facts_default_predicates_tests.py
new file mode 100644
index 000000000..11aad9f03
--- /dev/null
+++ b/roles/lib_utils/test/openshift_master_facts_default_predicates_tests.py
@@ -0,0 +1,193 @@
+import pytest
+
+
+# Predicates ordered according to OpenShift Origin source:
+# origin/vendor/k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go
+
+DEFAULT_PREDICATES_1_1 = [
+ {'name': 'PodFitsHostPorts'},
+ {'name': 'PodFitsResources'},
+ {'name': 'NoDiskConflict'},
+ {'name': 'MatchNodeSelector'},
+]
+
+DEFAULT_PREDICATES_1_2 = [
+ {'name': 'PodFitsHostPorts'},
+ {'name': 'PodFitsResources'},
+ {'name': 'NoDiskConflict'},
+ {'name': 'NoVolumeZoneConflict'},
+ {'name': 'MatchNodeSelector'},
+ {'name': 'MaxEBSVolumeCount'},
+ {'name': 'MaxGCEPDVolumeCount'}
+]
+
+DEFAULT_PREDICATES_1_3 = [
+ {'name': 'NoDiskConflict'},
+ {'name': 'NoVolumeZoneConflict'},
+ {'name': 'MaxEBSVolumeCount'},
+ {'name': 'MaxGCEPDVolumeCount'},
+ {'name': 'GeneralPredicates'},
+ {'name': 'PodToleratesNodeTaints'},
+ {'name': 'CheckNodeMemoryPressure'}
+]
+
+DEFAULT_PREDICATES_1_4 = [
+ {'name': 'NoDiskConflict'},
+ {'name': 'NoVolumeZoneConflict'},
+ {'name': 'MaxEBSVolumeCount'},
+ {'name': 'MaxGCEPDVolumeCount'},
+ {'name': 'GeneralPredicates'},
+ {'name': 'PodToleratesNodeTaints'},
+ {'name': 'CheckNodeMemoryPressure'},
+ {'name': 'CheckNodeDiskPressure'},
+ {'name': 'MatchInterPodAffinity'}
+]
+
+DEFAULT_PREDICATES_1_5 = [
+ {'name': 'NoVolumeZoneConflict'},
+ {'name': 'MaxEBSVolumeCount'},
+ {'name': 'MaxGCEPDVolumeCount'},
+ {'name': 'MatchInterPodAffinity'},
+ {'name': 'NoDiskConflict'},
+ {'name': 'GeneralPredicates'},
+ {'name': 'PodToleratesNodeTaints'},
+ {'name': 'CheckNodeMemoryPressure'},
+ {'name': 'CheckNodeDiskPressure'},
+]
+
+DEFAULT_PREDICATES_3_6 = DEFAULT_PREDICATES_1_5
+
+DEFAULT_PREDICATES_3_7 = [
+ {'name': 'NoVolumeZoneConflict'},
+ {'name': 'MaxEBSVolumeCount'},
+ {'name': 'MaxGCEPDVolumeCount'},
+ {'name': 'MaxAzureDiskVolumeCount'},
+ {'name': 'MatchInterPodAffinity'},
+ {'name': 'NoDiskConflict'},
+ {'name': 'GeneralPredicates'},
+ {'name': 'PodToleratesNodeTaints'},
+ {'name': 'CheckNodeMemoryPressure'},
+ {'name': 'CheckNodeDiskPressure'},
+ {'name': 'NoVolumeNodeConflict'},
+]
+
+DEFAULT_PREDICATES_3_9 = DEFAULT_PREDICATES_3_8 = DEFAULT_PREDICATES_3_7
+
+REGION_PREDICATE = {
+ 'name': 'Region',
+ 'argument': {
+ 'serviceAffinity': {
+ 'labels': ['region']
+ }
+ }
+}
+
+TEST_VARS = [
+ ('1.1', 'origin', DEFAULT_PREDICATES_1_1),
+ ('3.1', 'openshift-enterprise', DEFAULT_PREDICATES_1_1),
+ ('1.2', 'origin', DEFAULT_PREDICATES_1_2),
+ ('3.2', 'openshift-enterprise', DEFAULT_PREDICATES_1_2),
+ ('1.3', 'origin', DEFAULT_PREDICATES_1_3),
+ ('3.3', 'openshift-enterprise', DEFAULT_PREDICATES_1_3),
+ ('1.4', 'origin', DEFAULT_PREDICATES_1_4),
+ ('3.4', 'openshift-enterprise', DEFAULT_PREDICATES_1_4),
+ ('1.5', 'origin', DEFAULT_PREDICATES_1_5),
+ ('3.5', 'openshift-enterprise', DEFAULT_PREDICATES_1_5),
+ ('3.6', 'origin', DEFAULT_PREDICATES_3_6),
+ ('3.6', 'openshift-enterprise', DEFAULT_PREDICATES_3_6),
+ ('3.7', 'origin', DEFAULT_PREDICATES_3_7),
+ ('3.7', 'openshift-enterprise', DEFAULT_PREDICATES_3_7),
+ ('3.8', 'origin', DEFAULT_PREDICATES_3_8),
+ ('3.8', 'openshift-enterprise', DEFAULT_PREDICATES_3_8),
+ ('3.9', 'origin', DEFAULT_PREDICATES_3_9),
+ ('3.9', 'openshift-enterprise', DEFAULT_PREDICATES_3_9),
+]
+
+
+def assert_ok(predicates_lookup, default_predicates, regions_enabled, **kwargs):
+ results = predicates_lookup.run(None, regions_enabled=regions_enabled, **kwargs)
+ if regions_enabled:
+ assert results == default_predicates + [REGION_PREDICATE]
+ else:
+ assert results == default_predicates
+
+
+def test_openshift_version(predicates_lookup, openshift_version_fixture, regions_enabled):
+ facts, default_predicates = openshift_version_fixture
+ assert_ok(predicates_lookup, default_predicates, variables=facts, regions_enabled=regions_enabled)
+
+
+@pytest.fixture(params=TEST_VARS)
+def openshift_version_fixture(request, facts):
+ version, deployment_type, default_predicates = request.param
+ version += '.1'
+ facts['openshift_version'] = version
+ facts['openshift']['common']['deployment_type'] = deployment_type
+ return facts, default_predicates
+
+
+def test_openshift_release(predicates_lookup, openshift_release_fixture, regions_enabled):
+ facts, default_predicates = openshift_release_fixture
+ assert_ok(predicates_lookup, default_predicates, variables=facts, regions_enabled=regions_enabled)
+
+
+@pytest.fixture(params=TEST_VARS)
+def openshift_release_fixture(request, facts, release_mod):
+ release, deployment_type, default_predicates = request.param
+ facts['openshift_release'] = release_mod(release)
+ facts['openshift']['common']['deployment_type'] = deployment_type
+ return facts, default_predicates
+
+
+def test_short_version(predicates_lookup, short_version_fixture, regions_enabled):
+ facts, default_predicates = short_version_fixture
+ assert_ok(predicates_lookup, default_predicates, variables=facts, regions_enabled=regions_enabled)
+
+
+@pytest.fixture(params=TEST_VARS)
+def short_version_fixture(request, facts):
+ short_version, deployment_type, default_predicates = request.param
+ facts['openshift']['common']['short_version'] = short_version
+ facts['openshift']['common']['deployment_type'] = deployment_type
+ return facts, default_predicates
+
+
+def test_short_version_kwarg(predicates_lookup, short_version_kwarg_fixture, regions_enabled):
+ facts, short_version, default_predicates = short_version_kwarg_fixture
+ assert_ok(
+ predicates_lookup, default_predicates, variables=facts,
+ regions_enabled=regions_enabled, short_version=short_version)
+
+
+@pytest.fixture(params=TEST_VARS)
+def short_version_kwarg_fixture(request, facts):
+ short_version, deployment_type, default_predicates = request.param
+ facts['openshift']['common']['deployment_type'] = deployment_type
+ return facts, short_version, default_predicates
+
+
+def test_deployment_type_kwarg(predicates_lookup, deployment_type_kwarg_fixture, regions_enabled):
+ facts, deployment_type, default_predicates = deployment_type_kwarg_fixture
+ assert_ok(
+ predicates_lookup, default_predicates, variables=facts,
+ regions_enabled=regions_enabled, deployment_type=deployment_type)
+
+
+@pytest.fixture(params=TEST_VARS)
+def deployment_type_kwarg_fixture(request, facts):
+ short_version, deployment_type, default_predicates = request.param
+ facts['openshift']['common']['short_version'] = short_version
+ return facts, deployment_type, default_predicates
+
+
+def test_short_version_deployment_type_kwargs(
+ predicates_lookup, short_version_deployment_type_kwargs_fixture, regions_enabled):
+ short_version, deployment_type, default_predicates = short_version_deployment_type_kwargs_fixture
+ assert_ok(
+ predicates_lookup, default_predicates, regions_enabled=regions_enabled,
+ short_version=short_version, deployment_type=deployment_type)
+
+
+@pytest.fixture(params=TEST_VARS)
+def short_version_deployment_type_kwargs_fixture(request):
+ return request.param
diff --git a/roles/lib_utils/test/openshift_master_facts_default_priorities_tests.py b/roles/lib_utils/test/openshift_master_facts_default_priorities_tests.py
new file mode 100644
index 000000000..527fc9ff4
--- /dev/null
+++ b/roles/lib_utils/test/openshift_master_facts_default_priorities_tests.py
@@ -0,0 +1,167 @@
+import pytest
+
+
+DEFAULT_PRIORITIES_1_1 = [
+ {'name': 'LeastRequestedPriority', 'weight': 1},
+ {'name': 'BalancedResourceAllocation', 'weight': 1},
+ {'name': 'SelectorSpreadPriority', 'weight': 1}
+]
+
+DEFAULT_PRIORITIES_1_2 = [
+ {'name': 'LeastRequestedPriority', 'weight': 1},
+ {'name': 'BalancedResourceAllocation', 'weight': 1},
+ {'name': 'SelectorSpreadPriority', 'weight': 1},
+ {'name': 'NodeAffinityPriority', 'weight': 1}
+]
+
+DEFAULT_PRIORITIES_1_3 = [
+ {'name': 'LeastRequestedPriority', 'weight': 1},
+ {'name': 'BalancedResourceAllocation', 'weight': 1},
+ {'name': 'SelectorSpreadPriority', 'weight': 1},
+ {'name': 'NodeAffinityPriority', 'weight': 1},
+ {'name': 'TaintTolerationPriority', 'weight': 1}
+]
+
+DEFAULT_PRIORITIES_1_4 = [
+ {'name': 'LeastRequestedPriority', 'weight': 1},
+ {'name': 'BalancedResourceAllocation', 'weight': 1},
+ {'name': 'SelectorSpreadPriority', 'weight': 1},
+ {'name': 'NodePreferAvoidPodsPriority', 'weight': 10000},
+ {'name': 'NodeAffinityPriority', 'weight': 1},
+ {'name': 'TaintTolerationPriority', 'weight': 1},
+ {'name': 'InterPodAffinityPriority', 'weight': 1}
+]
+
+DEFAULT_PRIORITIES_1_5 = [
+ {'name': 'SelectorSpreadPriority', 'weight': 1},
+ {'name': 'InterPodAffinityPriority', 'weight': 1},
+ {'name': 'LeastRequestedPriority', 'weight': 1},
+ {'name': 'BalancedResourceAllocation', 'weight': 1},
+ {'name': 'NodePreferAvoidPodsPriority', 'weight': 10000},
+ {'name': 'NodeAffinityPriority', 'weight': 1},
+ {'name': 'TaintTolerationPriority', 'weight': 1}
+]
+
+DEFAULT_PRIORITIES_3_6 = DEFAULT_PRIORITIES_1_5
+
+DEFAULT_PRIORITIES_3_9 = DEFAULT_PRIORITIES_3_8 = DEFAULT_PRIORITIES_3_7 = DEFAULT_PRIORITIES_3_6
+
+ZONE_PRIORITY = {
+ 'name': 'Zone',
+ 'argument': {
+ 'serviceAntiAffinity': {
+ 'label': 'zone'
+ }
+ },
+ 'weight': 2
+}
+
+TEST_VARS = [
+ ('1.1', 'origin', DEFAULT_PRIORITIES_1_1),
+ ('3.1', 'openshift-enterprise', DEFAULT_PRIORITIES_1_1),
+ ('1.2', 'origin', DEFAULT_PRIORITIES_1_2),
+ ('3.2', 'openshift-enterprise', DEFAULT_PRIORITIES_1_2),
+ ('1.3', 'origin', DEFAULT_PRIORITIES_1_3),
+ ('3.3', 'openshift-enterprise', DEFAULT_PRIORITIES_1_3),
+ ('1.4', 'origin', DEFAULT_PRIORITIES_1_4),
+ ('3.4', 'openshift-enterprise', DEFAULT_PRIORITIES_1_4),
+ ('1.5', 'origin', DEFAULT_PRIORITIES_1_5),
+ ('3.5', 'openshift-enterprise', DEFAULT_PRIORITIES_1_5),
+ ('3.6', 'origin', DEFAULT_PRIORITIES_3_6),
+ ('3.6', 'openshift-enterprise', DEFAULT_PRIORITIES_3_6),
+ ('3.7', 'origin', DEFAULT_PRIORITIES_3_7),
+ ('3.7', 'openshift-enterprise', DEFAULT_PRIORITIES_3_7),
+ ('3.8', 'origin', DEFAULT_PRIORITIES_3_8),
+ ('3.8', 'openshift-enterprise', DEFAULT_PRIORITIES_3_8),
+ ('3.9', 'origin', DEFAULT_PRIORITIES_3_9),
+ ('3.9', 'openshift-enterprise', DEFAULT_PRIORITIES_3_9),
+]
+
+
+def assert_ok(priorities_lookup, default_priorities, zones_enabled, **kwargs):
+ results = priorities_lookup.run(None, zones_enabled=zones_enabled, **kwargs)
+ if zones_enabled:
+ assert results == default_priorities + [ZONE_PRIORITY]
+ else:
+ assert results == default_priorities
+
+
+def test_openshift_version(priorities_lookup, openshift_version_fixture, zones_enabled):
+ facts, default_priorities = openshift_version_fixture
+ assert_ok(priorities_lookup, default_priorities, variables=facts, zones_enabled=zones_enabled)
+
+
+@pytest.fixture(params=TEST_VARS)
+def openshift_version_fixture(request, facts):
+ version, deployment_type, default_priorities = request.param
+ version += '.1'
+ facts['openshift_version'] = version
+ facts['openshift']['common']['deployment_type'] = deployment_type
+ return facts, default_priorities
+
+
+def test_openshift_release(priorities_lookup, openshift_release_fixture, zones_enabled):
+ facts, default_priorities = openshift_release_fixture
+ assert_ok(priorities_lookup, default_priorities, variables=facts, zones_enabled=zones_enabled)
+
+
+@pytest.fixture(params=TEST_VARS)
+def openshift_release_fixture(request, facts, release_mod):
+ release, deployment_type, default_priorities = request.param
+ facts['openshift_release'] = release_mod(release)
+ facts['openshift']['common']['deployment_type'] = deployment_type
+ return facts, default_priorities
+
+
+def test_short_version(priorities_lookup, short_version_fixture, zones_enabled):
+ facts, default_priorities = short_version_fixture
+ assert_ok(priorities_lookup, default_priorities, variables=facts, zones_enabled=zones_enabled)
+
+
+@pytest.fixture(params=TEST_VARS)
+def short_version_fixture(request, facts):
+ short_version, deployment_type, default_priorities = request.param
+ facts['openshift']['common']['short_version'] = short_version
+ facts['openshift']['common']['deployment_type'] = deployment_type
+ return facts, default_priorities
+
+
+def test_short_version_kwarg(priorities_lookup, short_version_kwarg_fixture, zones_enabled):
+ facts, short_version, default_priorities = short_version_kwarg_fixture
+ assert_ok(
+ priorities_lookup, default_priorities, variables=facts,
+ zones_enabled=zones_enabled, short_version=short_version)
+
+
+@pytest.fixture(params=TEST_VARS)
+def short_version_kwarg_fixture(request, facts):
+ short_version, deployment_type, default_priorities = request.param
+ facts['openshift']['common']['deployment_type'] = deployment_type
+ return facts, short_version, default_priorities
+
+
+def test_deployment_type_kwarg(priorities_lookup, deployment_type_kwarg_fixture, zones_enabled):
+ facts, deployment_type, default_priorities = deployment_type_kwarg_fixture
+ assert_ok(
+ priorities_lookup, default_priorities, variables=facts,
+ zones_enabled=zones_enabled, deployment_type=deployment_type)
+
+
+@pytest.fixture(params=TEST_VARS)
+def deployment_type_kwarg_fixture(request, facts):
+ short_version, deployment_type, default_priorities = request.param
+ facts['openshift']['common']['short_version'] = short_version
+ return facts, deployment_type, default_priorities
+
+
+def test_short_version_deployment_type_kwargs(
+ priorities_lookup, short_version_deployment_type_kwargs_fixture, zones_enabled):
+ short_version, deployment_type, default_priorities = short_version_deployment_type_kwargs_fixture
+ assert_ok(
+ priorities_lookup, default_priorities, zones_enabled=zones_enabled,
+ short_version=short_version, deployment_type=deployment_type)
+
+
+@pytest.fixture(params=TEST_VARS)
+def short_version_deployment_type_kwargs_fixture(request):
+ return request.param
diff --git a/roles/lib_utils/test/test_fakeopensslclasses.py b/roles/lib_utils/test/test_fakeopensslclasses.py
new file mode 100644
index 000000000..8a521a765
--- /dev/null
+++ b/roles/lib_utils/test/test_fakeopensslclasses.py
@@ -0,0 +1,90 @@
+'''
+ Unit tests for the FakeOpenSSL classes
+'''
+import os
+import subprocess
+import sys
+
+import pytest
+
+MODULE_PATH = os.path.realpath(os.path.join(__file__, os.pardir, os.pardir, 'library'))
+sys.path.insert(1, MODULE_PATH)
+
+# pylint: disable=import-error,wrong-import-position,missing-docstring
+# pylint: disable=invalid-name,redefined-outer-name
+from openshift_cert_expiry import FakeOpenSSLCertificate # noqa: E402
+
+
+@pytest.fixture(scope='module')
+def fake_valid_cert(valid_cert):
+ cmd = ['openssl', 'x509', '-in', str(valid_cert['cert_file']), '-text',
+ '-nameopt', 'oneline']
+ cert = subprocess.check_output(cmd)
+ return FakeOpenSSLCertificate(cert.decode('utf8'))
+
+
+def test_not_after(valid_cert, fake_valid_cert):
+ ''' Validate value returned back from get_notAfter() '''
+ real_cert = valid_cert['cert']
+
+ # Internal representation of pyOpenSSL is bytes, while FakeOpenSSLCertificate
+ # is text, so decode the result from pyOpenSSL prior to comparing
+ assert real_cert.get_notAfter().decode('utf8') == fake_valid_cert.get_notAfter()
+
+
+def test_serial(valid_cert, fake_valid_cert):
+ ''' Validate value returned back form get_serialnumber() '''
+ real_cert = valid_cert['cert']
+ assert real_cert.get_serial_number() == fake_valid_cert.get_serial_number()
+
+
+def test_get_subject(valid_cert, fake_valid_cert):
+ ''' Validate the certificate subject '''
+
+ # Gather the subject components and create a list of colon separated strings.
+ # Since the internal representation of pyOpenSSL uses bytes, we need to decode
+ # the results before comparing.
+ c_subjects = valid_cert['cert'].get_subject().get_components()
+ c_subj = ', '.join(['{}:{}'.format(x.decode('utf8'), y.decode('utf8')) for x, y in c_subjects])
+ f_subjects = fake_valid_cert.get_subject().get_components()
+ f_subj = ', '.join(['{}:{}'.format(x, y) for x, y in f_subjects])
+ assert c_subj == f_subj
+
+
+def get_san_extension(cert):
+ # Internal representation of pyOpenSSL is bytes, while FakeOpenSSLCertificate
+ # is text, so we need to set the value to search for accordingly.
+ if isinstance(cert, FakeOpenSSLCertificate):
+ san_short_name = 'subjectAltName'
+ else:
+ san_short_name = b'subjectAltName'
+
+ for i in range(cert.get_extension_count()):
+ ext = cert.get_extension(i)
+ if ext.get_short_name() == san_short_name:
+ # return the string representation to compare the actual SAN
+ # values instead of the data types
+ return str(ext)
+
+ return None
+
+
+def test_subject_alt_names(valid_cert, fake_valid_cert):
+ real_cert = valid_cert['cert']
+
+ san = get_san_extension(real_cert)
+ f_san = get_san_extension(fake_valid_cert)
+
+ assert san == f_san
+
+ # If there are either dns or ip sans defined, verify common_name present
+ if valid_cert['ip'] or valid_cert['dns']:
+ assert 'DNS:' + valid_cert['common_name'] in f_san
+
+ # Verify all ip sans are present
+ for ip in valid_cert['ip']:
+ assert 'IP Address:' + ip in f_san
+
+ # Verify all dns sans are present
+ for name in valid_cert['dns']:
+ assert 'DNS:' + name in f_san
diff --git a/roles/lib_utils/test/test_load_and_handle_cert.py b/roles/lib_utils/test/test_load_and_handle_cert.py
new file mode 100644
index 000000000..98792e2ee
--- /dev/null
+++ b/roles/lib_utils/test/test_load_and_handle_cert.py
@@ -0,0 +1,67 @@
+'''
+ Unit tests for the load_and_handle_cert method
+'''
+import datetime
+import os
+import sys
+
+import pytest
+
+MODULE_PATH = os.path.realpath(os.path.join(__file__, os.pardir, os.pardir, 'library'))
+sys.path.insert(1, MODULE_PATH)
+
+# pylint: disable=import-error,wrong-import-position,missing-docstring
+# pylint: disable=invalid-name,redefined-outer-name
+import openshift_cert_expiry # noqa: E402
+
+# TODO: More testing on the results of the load_and_handle_cert function
+# could be implemented here as well, such as verifying subjects
+# match up.
+
+
+@pytest.fixture(params=['OpenSSLCertificate', 'FakeOpenSSLCertificate'])
+def loaded_cert(request, valid_cert):
+ """ parameterized fixture to provide load_and_handle_cert results
+ for both OpenSSL and FakeOpenSSL parsed certificates
+ """
+ now = datetime.datetime.now()
+
+ openshift_cert_expiry.HAS_OPENSSL = request.param == 'OpenSSLCertificate'
+
+ # valid_cert['cert_file'] is a `py.path.LocalPath` object and
+ # provides a read_text() method for reading the file contents.
+ cert_string = valid_cert['cert_file'].read_text('utf8')
+
+ (subject,
+ expiry_date,
+ time_remaining,
+ serial) = openshift_cert_expiry.load_and_handle_cert(cert_string, now)
+
+ return {
+ 'now': now,
+ 'subject': subject,
+ 'expiry_date': expiry_date,
+ 'time_remaining': time_remaining,
+ 'serial': serial,
+ }
+
+
+def test_serial(loaded_cert, valid_cert):
+ """Params:
+
+ * `loaded_cert` comes from the `loaded_cert` fixture in this file
+ * `valid_cert` comes from the 'valid_cert' fixture in conftest.py
+ """
+ valid_cert_serial = valid_cert['cert'].get_serial_number()
+ assert loaded_cert['serial'] == valid_cert_serial
+
+
+def test_expiry(loaded_cert):
+ """Params:
+
+ * `loaded_cert` comes from the `loaded_cert` fixture in this file
+ """
+ expiry_date = loaded_cert['expiry_date']
+ time_remaining = loaded_cert['time_remaining']
+ now = loaded_cert['now']
+ assert expiry_date == now + time_remaining