summaryrefslogtreecommitdiffstats
path: root/roles/openshift_health_checker/test/docker_storage_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'roles/openshift_health_checker/test/docker_storage_test.py')
-rw-r--r--roles/openshift_health_checker/test/docker_storage_test.py125
1 files changed, 103 insertions, 22 deletions
diff --git a/roles/openshift_health_checker/test/docker_storage_test.py b/roles/openshift_health_checker/test/docker_storage_test.py
index bb25e3f66..e0dccc062 100644
--- a/roles/openshift_health_checker/test/docker_storage_test.py
+++ b/roles/openshift_health_checker/test/docker_storage_test.py
@@ -4,12 +4,6 @@ from openshift_checks import OpenShiftCheckException
from openshift_checks.docker_storage import DockerStorage
-def dummy_check(execute_module=None):
- def dummy_exec(self, status, task_vars):
- raise Exception("dummy executor called")
- return DockerStorage(execute_module=execute_module or dummy_exec)
-
-
@pytest.mark.parametrize('is_containerized, group_names, is_active', [
(False, ["masters", "etcd"], False),
(False, ["masters", "nodes"], True),
@@ -20,10 +14,11 @@ def test_is_active(is_containerized, group_names, is_active):
openshift=dict(common=dict(is_containerized=is_containerized)),
group_names=group_names,
)
- assert DockerStorage.is_active(task_vars=task_vars) == is_active
+ assert DockerStorage(None, task_vars).is_active() == is_active
-non_atomic_task_vars = {"openshift": {"common": {"is_atomic": False}}}
+def non_atomic_task_vars():
+ return {"openshift": {"common": {"is_atomic": False}}}
@pytest.mark.parametrize('docker_info, failed, expect_msg', [
@@ -56,7 +51,7 @@ non_atomic_task_vars = {"openshift": {"common": {"is_atomic": False}}}
(
dict(info={
"Driver": "overlay2",
- "DriverStatus": []
+ "DriverStatus": [("Backing Filesystem", "xfs")],
}),
False,
[],
@@ -64,6 +59,27 @@ non_atomic_task_vars = {"openshift": {"common": {"is_atomic": False}}}
(
dict(info={
"Driver": "overlay",
+ "DriverStatus": [("Backing Filesystem", "btrfs")],
+ }),
+ True,
+ ["storage is type 'btrfs'", "only supported with\n'xfs'"],
+ ),
+ (
+ dict(info={
+ "Driver": "overlay2",
+ "DriverStatus": [("Backing Filesystem", "xfs")],
+ "OperatingSystem": "Red Hat Enterprise Linux Server release 7.2 (Maipo)",
+ "KernelVersion": "3.10.0-327.22.2.el7.x86_64",
+ }),
+ True,
+ ["Docker reports kernel version 3.10.0-327"],
+ ),
+ (
+ dict(info={
+ "Driver": "overlay",
+ "DriverStatus": [("Backing Filesystem", "xfs")],
+ "OperatingSystem": "CentOS",
+ "KernelVersion": "3.10.0-514",
}),
False,
[],
@@ -77,16 +93,17 @@ non_atomic_task_vars = {"openshift": {"common": {"is_atomic": False}}}
),
])
def test_check_storage_driver(docker_info, failed, expect_msg):
- def execute_module(module_name, module_args, tmp=None, task_vars=None):
+ def execute_module(module_name, *_):
if module_name == "yum":
return {}
if module_name != "docker_info":
raise ValueError("not expecting module " + module_name)
return docker_info
- check = dummy_check(execute_module=execute_module)
- check._check_dm_usage = lambda status, task_vars: dict() # stub out for this test
- result = check.run(tmp=None, task_vars=non_atomic_task_vars)
+ check = DockerStorage(execute_module, non_atomic_task_vars())
+ check.check_dm_usage = lambda status: dict() # stub out for this test
+ check.check_overlay_usage = lambda info: dict() # stub out for this test
+ result = check.run()
if failed:
assert result["failed"]
@@ -145,9 +162,9 @@ not_enough_space = {
),
])
def test_dm_usage(task_vars, driver_status, vg_free, success, expect_msg):
- check = dummy_check()
- check._get_vg_free = lambda pool, task_vars: vg_free
- result = check._check_dm_usage(driver_status, task_vars)
+ check = DockerStorage(None, task_vars)
+ check.get_vg_free = lambda pool: vg_free
+ result = check.check_dm_usage(driver_status)
result_success = not result.get("failed")
assert result_success is success
@@ -187,18 +204,18 @@ def test_dm_usage(task_vars, driver_status, vg_free, success, expect_msg):
)
])
def test_vg_free(pool, command_returns, raises, returns):
- def execute_module(module_name, module_args, tmp=None, task_vars=None):
+ def execute_module(module_name, *_):
if module_name != "command":
raise ValueError("not expecting module " + module_name)
return command_returns
- check = dummy_check(execute_module=execute_module)
+ check = DockerStorage(execute_module)
if raises:
with pytest.raises(OpenShiftCheckException) as err:
- check._get_vg_free(pool, {})
+ check.get_vg_free(pool)
assert raises in str(err.value)
else:
- ret = check._get_vg_free(pool, {})
+ ret = check.get_vg_free(pool)
assert ret == returns
@@ -209,7 +226,7 @@ def test_vg_free(pool, command_returns, raises, returns):
("12g", 12.0 * 1024**3),
])
def test_convert_to_bytes(string, expect_bytes):
- got = DockerStorage._convert_to_bytes(string)
+ got = DockerStorage.convert_to_bytes(string)
assert got == expect_bytes
@@ -219,6 +236,70 @@ def test_convert_to_bytes(string, expect_bytes):
])
def test_convert_to_bytes_error(string):
with pytest.raises(ValueError) as err:
- DockerStorage._convert_to_bytes(string)
+ DockerStorage.convert_to_bytes(string)
assert "Cannot convert" in str(err.value)
assert string in str(err.value)
+
+
+ansible_mounts_enough = [{
+ 'mount': '/var/lib/docker',
+ 'size_available': 50 * 10**9,
+ 'size_total': 50 * 10**9,
+}]
+ansible_mounts_not_enough = [{
+ 'mount': '/var/lib/docker',
+ 'size_available': 0,
+ 'size_total': 50 * 10**9,
+}]
+ansible_mounts_missing_fields = [dict(mount='/var/lib/docker')]
+ansible_mounts_zero_size = [{
+ 'mount': '/var/lib/docker',
+ 'size_available': 0,
+ 'size_total': 0,
+}]
+
+
+@pytest.mark.parametrize('ansible_mounts, threshold, expect_fail, expect_msg', [
+ (
+ ansible_mounts_enough,
+ None,
+ False,
+ [],
+ ),
+ (
+ ansible_mounts_not_enough,
+ None,
+ True,
+ ["usage percentage", "higher than threshold"],
+ ),
+ (
+ ansible_mounts_not_enough,
+ "bogus percent",
+ True,
+ ["is not a percentage"],
+ ),
+ (
+ ansible_mounts_missing_fields,
+ None,
+ True,
+ ["Ansible bug"],
+ ),
+ (
+ ansible_mounts_zero_size,
+ None,
+ True,
+ ["Ansible bug"],
+ ),
+])
+def test_overlay_usage(ansible_mounts, threshold, expect_fail, expect_msg):
+ task_vars = non_atomic_task_vars()
+ task_vars["ansible_mounts"] = ansible_mounts
+ if threshold is not None:
+ task_vars["max_overlay_usage_percent"] = threshold
+ check = DockerStorage(None, task_vars)
+ docker_info = dict(DockerRootDir="/var/lib/docker", Driver="overlay")
+ result = check.check_overlay_usage(docker_info)
+
+ assert expect_fail == bool(result.get("failed"))
+ for msg in expect_msg:
+ assert msg in result["msg"]