From 293f18593307f5a90890848d77c439b3d3f1dd0d Mon Sep 17 00:00:00 2001 From: Jason DeTiberus Date: Tue, 21 Feb 2017 20:23:34 -0500 Subject: test fixes for openshift_certificates_expiry - create pytest fixtures for building certs at runtime - update tests to use the fixtures - add tests for load_and_handle_cert - fix py2/py3 encode/decode issues raised by tests - add get_extension_count method to fakeOpenSSLCertificate - avoid using a temp file for passing ssl certificate to openssl subprocess - other test tweaks: - exclude conftest.py and tests from coverage report - reduce the fail_under to 26%, since the tests being included were inflating our coverage --- .../test/test_fakeopensslclasses.py | 121 +++++++++++---------- 1 file changed, 64 insertions(+), 57 deletions(-) (limited to 'roles/openshift_certificate_expiry/test/test_fakeopensslclasses.py') diff --git a/roles/openshift_certificate_expiry/test/test_fakeopensslclasses.py b/roles/openshift_certificate_expiry/test/test_fakeopensslclasses.py index 2e245191f..ccdd48fa8 100644 --- a/roles/openshift_certificate_expiry/test/test_fakeopensslclasses.py +++ b/roles/openshift_certificate_expiry/test/test_fakeopensslclasses.py @@ -1,82 +1,89 @@ -#!/usr/bin/env python ''' Unit tests for the FakeOpenSSL classes ''' - import os +import subprocess import sys -import unittest + import pytest -# Disable import-error b/c our libraries aren't loaded in jenkins -# pylint: disable=import-error,wrong-import-position -# place class in our python path -module_path = os.path.join('/'.join(os.path.realpath(__file__).split(os.path.sep)[:-1]), 'library') -sys.path.insert(0, module_path) -openshift_cert_expiry = pytest.importorskip("openshift_cert_expiry") +MODULE_PATH = os.path.realpath(os.path.join(__file__, os.pardir, os.pardir, 'library')) +sys.path.insert(1, MODULE_PATH) + +# pylint: disable=import-error,wrong-import-position,missing-docstring +# pylint: disable=invalid-name,redefined-outer-name +from openshift_cert_expiry import FakeOpenSSLCertificate # noqa: E402 + + +@pytest.fixture(scope='module') +def fake_valid_cert(valid_cert): + cmd = ['openssl', 'x509', '-in', str(valid_cert['cert_file']), '-text'] + cert = subprocess.check_output(cmd) + return FakeOpenSSLCertificate(cert.decode('utf8')) + +def test_not_after(valid_cert, fake_valid_cert): + ''' Validate value returned back from get_notAfter() ''' + real_cert = valid_cert['cert'] -@pytest.mark.skip('Skipping all tests because of unresolved import errors') -class TestFakeOpenSSLClasses(unittest.TestCase): - ''' - Test class for FakeOpenSSL classes - ''' + # Internal representation of pyOpenSSL is bytes, while FakeOpenSSLCertificate + # is text, so decode the result from pyOpenSSL prior to comparing + assert real_cert.get_notAfter().decode('utf8') == fake_valid_cert.get_notAfter() - def setUp(self): - ''' setup method for other tests ''' - with open('test/system-node-m01.example.com.crt.txt', 'r') as fp: - self.cert_string = fp.read() - self.fake_cert = openshift_cert_expiry.FakeOpenSSLCertificate(self.cert_string) +def test_serial(valid_cert, fake_valid_cert): + ''' Validate value returned back form get_serialnumber() ''' + real_cert = valid_cert['cert'] + assert real_cert.get_serial_number() == fake_valid_cert.get_serial_number() - with open('test/master.server.crt.txt', 'r') as fp: - self.cert_san_string = fp.read() - self.fake_san_cert = openshift_cert_expiry.FakeOpenSSLCertificate(self.cert_san_string) +def test_get_subject(valid_cert, fake_valid_cert): + ''' Validate the certificate subject ''' - def test_FakeOpenSSLCertificate_get_serial_number(self): - """We can read the serial number from the cert""" - self.assertEqual(11, self.fake_cert.get_serial_number()) + # Gather the subject components and create a list of colon separated strings. + # Since the internal representation of pyOpenSSL uses bytes, we need to decode + # the results before comparing. + c_subjects = valid_cert['cert'].get_subject().get_components() + c_subj = ', '.join(['{}:{}'.format(x.decode('utf8'), y.decode('utf8')) for x, y in c_subjects]) + f_subjects = fake_valid_cert.get_subject().get_components() + f_subj = ', '.join(['{}:{}'.format(x, y) for x, y in f_subjects]) + assert c_subj == f_subj - def test_FakeOpenSSLCertificate_get_notAfter(self): - """We can read the cert expiry date""" - expiry = self.fake_cert.get_notAfter() - self.assertEqual('20190207181935Z', expiry) - def test_FakeOpenSSLCertificate_get_sans(self): - """We can read Subject Alt Names from a cert""" - ext = self.fake_san_cert.get_extension(0) +def get_san_extension(cert): + # Internal representation of pyOpenSSL is bytes, while FakeOpenSSLCertificate + # is text, so we need to set the value to search for accordingly. + if isinstance(cert, FakeOpenSSLCertificate): + san_short_name = 'subjectAltName' + else: + san_short_name = b'subjectAltName' - if ext.get_short_name() == 'subjectAltName': - sans = str(ext) + for i in range(cert.get_extension_count()): + ext = cert.get_extension(i) + if ext.get_short_name() == san_short_name: + # return the string representation to compare the actual SAN + # values instead of the data types + return str(ext) - self.assertEqual('DNS:kubernetes, DNS:kubernetes.default, DNS:kubernetes.default.svc, DNS:kubernetes.default.svc.cluster.local, DNS:m01.example.com, DNS:openshift, DNS:openshift.default, DNS:openshift.default.svc, DNS:openshift.default.svc.cluster.local, DNS:172.30.0.1, DNS:192.168.122.241, IP Address:172.30.0.1, IP Address:192.168.122.241', sans) + return None - def test_FakeOpenSSLCertificate_get_sans_no_sans(self): - """We can tell when there are no Subject Alt Names in a cert""" - with self.assertRaises(IndexError): - self.fake_cert.get_extension(0) - def test_FakeOpenSSLCertificate_get_subject(self): - """We can read the Subject from a cert""" - # Subject: O=system:nodes, CN=system:node:m01.example.com - subject = self.fake_cert.get_subject() - subjects = [] - for name, value in subject.get_components(): - subjects.append('{}={}'.format(name, value)) +def test_subject_alt_names(valid_cert, fake_valid_cert): + real_cert = valid_cert['cert'] - self.assertEqual('O=system:nodes, CN=system:node:m01.example.com', ', '.join(subjects)) + san = get_san_extension(real_cert) + f_san = get_san_extension(fake_valid_cert) - def test_FakeOpenSSLCertificate_get_subject_san_cert(self): - """We can read the Subject from a cert with sans""" - # Subject: O=system:nodes, CN=system:node:m01.example.com - subject = self.fake_san_cert.get_subject() - subjects = [] - for name, value in subject.get_components(): - subjects.append('{}={}'.format(name, value)) + assert san == f_san - self.assertEqual('CN=172.30.0.1', ', '.join(subjects)) + # If there are either dns or ip sans defined, verify common_name present + if valid_cert['ip'] or valid_cert['dns']: + assert 'DNS:' + valid_cert['common_name'] in f_san + # Verify all ip sans are present + for ip in valid_cert['ip']: + assert 'IP Address:' + ip in f_san -if __name__ == "__main__": - unittest.main() + # Verify all dns sans are present + for name in valid_cert['dns']: + assert 'DNS:' + name in f_san -- cgit v1.2.1