summaryrefslogtreecommitdiffstats
path: root/roles/openshift_health_checker/test/docker_storage_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'roles/openshift_health_checker/test/docker_storage_test.py')
-rw-r--r--roles/openshift_health_checker/test/docker_storage_test.py36
1 files changed, 15 insertions, 21 deletions
diff --git a/roles/openshift_health_checker/test/docker_storage_test.py b/roles/openshift_health_checker/test/docker_storage_test.py
index 99c529054..e0dccc062 100644
--- a/roles/openshift_health_checker/test/docker_storage_test.py
+++ b/roles/openshift_health_checker/test/docker_storage_test.py
@@ -4,12 +4,6 @@ from openshift_checks import OpenShiftCheckException
from openshift_checks.docker_storage import DockerStorage
-def dummy_check(execute_module=None):
- def dummy_exec(self, status, task_vars):
- raise Exception("dummy executor called")
- return DockerStorage(execute_module=execute_module or dummy_exec)
-
-
@pytest.mark.parametrize('is_containerized, group_names, is_active', [
(False, ["masters", "etcd"], False),
(False, ["masters", "nodes"], True),
@@ -20,7 +14,7 @@ def test_is_active(is_containerized, group_names, is_active):
openshift=dict(common=dict(is_containerized=is_containerized)),
group_names=group_names,
)
- assert DockerStorage.is_active(task_vars=task_vars) == is_active
+ assert DockerStorage(None, task_vars).is_active() == is_active
def non_atomic_task_vars():
@@ -99,17 +93,17 @@ def non_atomic_task_vars():
),
])
def test_check_storage_driver(docker_info, failed, expect_msg):
- def execute_module(module_name, module_args, tmp=None, task_vars=None):
+ def execute_module(module_name, *_):
if module_name == "yum":
return {}
if module_name != "docker_info":
raise ValueError("not expecting module " + module_name)
return docker_info
- check = dummy_check(execute_module=execute_module)
- check.check_dm_usage = lambda status, task_vars: dict() # stub out for this test
- check.check_overlay_usage = lambda info, task_vars: dict() # stub out for this test
- result = check.run(tmp=None, task_vars=non_atomic_task_vars())
+ check = DockerStorage(execute_module, non_atomic_task_vars())
+ check.check_dm_usage = lambda status: dict() # stub out for this test
+ check.check_overlay_usage = lambda info: dict() # stub out for this test
+ result = check.run()
if failed:
assert result["failed"]
@@ -168,9 +162,9 @@ not_enough_space = {
),
])
def test_dm_usage(task_vars, driver_status, vg_free, success, expect_msg):
- check = dummy_check()
- check.get_vg_free = lambda pool, task_vars: vg_free
- result = check.check_dm_usage(driver_status, task_vars)
+ check = DockerStorage(None, task_vars)
+ check.get_vg_free = lambda pool: vg_free
+ result = check.check_dm_usage(driver_status)
result_success = not result.get("failed")
assert result_success is success
@@ -210,18 +204,18 @@ def test_dm_usage(task_vars, driver_status, vg_free, success, expect_msg):
)
])
def test_vg_free(pool, command_returns, raises, returns):
- def execute_module(module_name, module_args, tmp=None, task_vars=None):
+ def execute_module(module_name, *_):
if module_name != "command":
raise ValueError("not expecting module " + module_name)
return command_returns
- check = dummy_check(execute_module=execute_module)
+ check = DockerStorage(execute_module)
if raises:
with pytest.raises(OpenShiftCheckException) as err:
- check.get_vg_free(pool, {})
+ check.get_vg_free(pool)
assert raises in str(err.value)
else:
- ret = check.get_vg_free(pool, {})
+ ret = check.get_vg_free(pool)
assert ret == returns
@@ -298,13 +292,13 @@ ansible_mounts_zero_size = [{
),
])
def test_overlay_usage(ansible_mounts, threshold, expect_fail, expect_msg):
- check = dummy_check()
task_vars = non_atomic_task_vars()
task_vars["ansible_mounts"] = ansible_mounts
if threshold is not None:
task_vars["max_overlay_usage_percent"] = threshold
+ check = DockerStorage(None, task_vars)
docker_info = dict(DockerRootDir="/var/lib/docker", Driver="overlay")
- result = check.check_overlay_usage(docker_info, task_vars)
+ result = check.check_overlay_usage(docker_info)
assert expect_fail == bool(result.get("failed"))
for msg in expect_msg: