From ede03746b10a5e775c2389cc97c4b5e49c75da07 Mon Sep 17 00:00:00 2001 From: juanvallejo Date: Tue, 20 Jun 2017 19:07:56 -0400 Subject: verify sane log times in logging stack This patch verifies that logs sent from logging pods can be queried on the Elasticsearch pod within a reasonable amount of time. --- .../openshift_checks/logging/logging.py | 11 +- .../openshift_checks/logging/logging_index_time.py | 132 +++++++++++++++ .../test/logging_check_test.py | 2 +- .../test/logging_index_time_test.py | 182 +++++++++++++++++++++ 4 files changed, 321 insertions(+), 6 deletions(-) create mode 100644 roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py create mode 100644 roles/openshift_health_checker/test/logging_index_time_test.py (limited to 'roles/openshift_health_checker') diff --git a/roles/openshift_health_checker/openshift_checks/logging/logging.py b/roles/openshift_health_checker/openshift_checks/logging/logging.py index 05b4d300c..1fddcd6f6 100644 --- a/roles/openshift_health_checker/openshift_checks/logging/logging.py +++ b/roles/openshift_health_checker/openshift_checks/logging/logging.py @@ -12,20 +12,21 @@ class LoggingCheck(OpenShiftCheck): """Base class for logging component checks""" name = "logging" + logging_namespace = "logging" @classmethod def is_active(cls, task_vars): - return super(LoggingCheck, cls).is_active(task_vars) and cls.is_first_master(task_vars) + logging_deployed = get_var(task_vars, "openshift_hosted_logging_deploy", default=False) + return super(LoggingCheck, cls).is_active(task_vars) and cls.is_first_master(task_vars) and logging_deployed @staticmethod def is_first_master(task_vars): - """Run only on first master and only when logging is configured. Returns: bool""" - logging_deployed = get_var(task_vars, "openshift_hosted_logging_deploy", default=True) + """Run only on first master. Returns: bool""" # Note: It would be nice to use membership in oo_first_master group, however for now it # seems best to avoid requiring that setup and just check this is the first master. hostname = get_var(task_vars, "ansible_ssh_host") or [None] masters = get_var(task_vars, "groups", "masters", default=None) or [None] - return logging_deployed and masters[0] == hostname + return masters and masters[0] == hostname def run(self, tmp, task_vars): pass @@ -45,7 +46,7 @@ class LoggingCheck(OpenShiftCheck): raise ValueError() except ValueError: # successful run but non-parsing data generally means there were no pods in the namespace - return None, 'There are no pods in the {} namespace. Is logging deployed?'.format(namespace) + return None, 'No pods were found for the "{}" logging component.'.format(logging_component) return pods['items'], None diff --git a/roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py b/roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py new file mode 100644 index 000000000..2ddd7549d --- /dev/null +++ b/roles/openshift_health_checker/openshift_checks/logging/logging_index_time.py @@ -0,0 +1,132 @@ +""" +Check for ensuring logs from pods can be queried in a reasonable amount of time. +""" + +import json +import time + +from uuid import uuid4 + +from openshift_checks import get_var, OpenShiftCheckException +from openshift_checks.logging.logging import LoggingCheck + + +ES_CMD_TIMEOUT_SECONDS = 30 + + +class LoggingIndexTime(LoggingCheck): + """Check that pod logs are aggregated and indexed in ElasticSearch within a reasonable amount of time.""" + name = "logging_index_time" + tags = ["health", "logging"] + + logging_namespace = "logging" + + def run(self, tmp, task_vars): + """Add log entry by making unique request to Kibana. Check for unique entry in the ElasticSearch pod logs.""" + try: + log_index_timeout = int( + get_var(task_vars, "openshift_check_logging_index_timeout_seconds", default=ES_CMD_TIMEOUT_SECONDS) + ) + except ValueError: + return { + "failed": True, + "msg": ('Invalid value provided for "openshift_check_logging_index_timeout_seconds". ' + 'Value must be an integer representing an amount in seconds.'), + } + + running_component_pods = dict() + + # get all component pods + self.logging_namespace = get_var(task_vars, "openshift_logging_namespace", default=self.logging_namespace) + for component, name in (['kibana', 'Kibana'], ['es', 'Elasticsearch']): + pods, error = self.get_pods_for_component( + self.execute_module, self.logging_namespace, component, task_vars, + ) + + if error: + msg = 'Unable to retrieve pods for the {} logging component: {}' + return {"failed": True, "changed": False, "msg": msg.format(name, error)} + + running_pods = self.running_pods(pods) + + if not running_pods: + msg = ('No {} pods in the "Running" state were found.' + 'At least one pod is required in order to perform this check.') + return {"failed": True, "changed": False, "msg": msg.format(name)} + + running_component_pods[component] = running_pods + + uuid = self.curl_kibana_with_uuid(running_component_pods["kibana"][0], task_vars) + self.wait_until_cmd_or_err(running_component_pods["es"][0], uuid, log_index_timeout, task_vars) + return {} + + def wait_until_cmd_or_err(self, es_pod, uuid, timeout_secs, task_vars): + """Retry an Elasticsearch query every second until query success, or a defined + length of time has passed.""" + deadline = time.time() + timeout_secs + interval = 1 + while not self.query_es_from_es(es_pod, uuid, task_vars): + if time.time() + interval > deadline: + msg = "expecting match in Elasticsearch for message with uuid {}, but no matches were found after {}s." + raise OpenShiftCheckException(msg.format(uuid, timeout_secs)) + time.sleep(interval) + + def curl_kibana_with_uuid(self, kibana_pod, task_vars): + """curl Kibana with a unique uuid.""" + uuid = self.generate_uuid() + pod_name = kibana_pod["metadata"]["name"] + exec_cmd = "exec {pod_name} -c kibana -- curl --max-time 30 -s http://localhost:5601/{uuid}" + exec_cmd = exec_cmd.format(pod_name=pod_name, uuid=uuid) + + error_str = self.exec_oc(self.execute_module, self.logging_namespace, exec_cmd, [], task_vars) + + try: + error_code = json.loads(error_str)["statusCode"] + except KeyError: + msg = ('invalid response returned from Kibana request (Missing "statusCode" key):\n' + 'Command: {}\nResponse: {}').format(exec_cmd, error_str) + raise OpenShiftCheckException(msg) + except ValueError: + msg = ('invalid response returned from Kibana request (Non-JSON output):\n' + 'Command: {}\nResponse: {}').format(exec_cmd, error_str) + raise OpenShiftCheckException(msg) + + if error_code != 404: + msg = 'invalid error code returned from Kibana request. Expecting error code "404", but got "{}" instead.' + raise OpenShiftCheckException(msg.format(error_code)) + + return uuid + + def query_es_from_es(self, es_pod, uuid, task_vars): + """curl the Elasticsearch pod and look for a unique uuid in its logs.""" + pod_name = es_pod["metadata"]["name"] + exec_cmd = ( + "exec {pod_name} -- curl --max-time 30 -s -f " + "--cacert /etc/elasticsearch/secret/admin-ca " + "--cert /etc/elasticsearch/secret/admin-cert " + "--key /etc/elasticsearch/secret/admin-key " + "https://logging-es:9200/project.{namespace}*/_count?q=message:{uuid}" + ) + exec_cmd = exec_cmd.format(pod_name=pod_name, namespace=self.logging_namespace, uuid=uuid) + result = self.exec_oc(self.execute_module, self.logging_namespace, exec_cmd, [], task_vars) + + try: + count = json.loads(result)["count"] + except KeyError: + msg = 'invalid response from Elasticsearch query:\n"{}"\nMissing "count" key:\n{}' + raise OpenShiftCheckException(msg.format(exec_cmd, result)) + except ValueError: + msg = 'invalid response from Elasticsearch query:\n"{}"\nNon-JSON output:\n{}' + raise OpenShiftCheckException(msg.format(exec_cmd, result)) + + return count + + @staticmethod + def running_pods(pods): + """Filter pods that are running.""" + return [pod for pod in pods if pod['status']['phase'] == 'Running'] + + @staticmethod + def generate_uuid(): + """Wrap uuid generator. Allows for testing with expected values.""" + return str(uuid4()) diff --git a/roles/openshift_health_checker/test/logging_check_test.py b/roles/openshift_health_checker/test/logging_check_test.py index b6db34fe3..a19881e5b 100644 --- a/roles/openshift_health_checker/test/logging_check_test.py +++ b/roles/openshift_health_checker/test/logging_check_test.py @@ -118,7 +118,7 @@ def test_is_active(groups, logging_deployed, is_active): ( 'No resources found.', None, - 'There are no pods in the logging namespace', + 'No pods were found for the "es"', ), ( json.dumps({'items': [plain_kibana_pod, plain_es_pod, plain_curator_pod, fluentd_pod_node1]}), diff --git a/roles/openshift_health_checker/test/logging_index_time_test.py b/roles/openshift_health_checker/test/logging_index_time_test.py new file mode 100644 index 000000000..79e7c7d4c --- /dev/null +++ b/roles/openshift_health_checker/test/logging_index_time_test.py @@ -0,0 +1,182 @@ +import json + +import pytest + +from openshift_checks.logging.logging_index_time import LoggingIndexTime, OpenShiftCheckException + + +SAMPLE_UUID = "unique-test-uuid" + + +def canned_loggingindextime(exec_oc=None): + """Create a check object with a canned exec_oc method""" + check = LoggingIndexTime("dummy") # fails if a module is actually invoked + if exec_oc: + check.exec_oc = exec_oc + return check + + +plain_running_elasticsearch_pod = { + "metadata": { + "labels": {"component": "es", "deploymentconfig": "logging-es-data-master"}, + "name": "logging-es-data-master-1", + }, + "status": { + "containerStatuses": [{"ready": True}, {"ready": True}], + "phase": "Running", + } +} +plain_running_kibana_pod = { + "metadata": { + "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"}, + "name": "logging-kibana-1", + }, + "status": { + "containerStatuses": [{"ready": True}, {"ready": True}], + "phase": "Running", + } +} +not_running_kibana_pod = { + "metadata": { + "labels": {"component": "kibana", "deploymentconfig": "logging-kibana"}, + "name": "logging-kibana-2", + }, + "status": { + "containerStatuses": [{"ready": True}, {"ready": False}], + "conditions": [{"status": "True", "type": "Ready"}], + "phase": "pending", + } +} + + +@pytest.mark.parametrize('pods, expect_pods', [ + ( + [not_running_kibana_pod], + [], + ), + ( + [plain_running_kibana_pod], + [plain_running_kibana_pod], + ), + ( + [], + [], + ) +]) +def test_check_running_pods(pods, expect_pods): + check = canned_loggingindextime(None) + pods = check.running_pods(pods) + assert pods == expect_pods + + +@pytest.mark.parametrize('name, json_response, uuid, timeout, extra_words', [ + ( + 'valid count in response', + { + "count": 1, + }, + SAMPLE_UUID, + 0.001, + [], + ), +], ids=lambda argval: argval[0]) +def test_wait_until_cmd_or_err_succeeds(name, json_response, uuid, timeout, extra_words): + def exec_oc(execute_module, ns, exec_cmd, args, task_vars): + return json.dumps(json_response) + + check = canned_loggingindextime(exec_oc) + check.wait_until_cmd_or_err(plain_running_elasticsearch_pod, uuid, timeout, None) + + +@pytest.mark.parametrize('name, json_response, uuid, timeout, extra_words', [ + ( + 'invalid json response', + { + "invalid_field": 1, + }, + SAMPLE_UUID, + 0.001, + ["invalid response", "Elasticsearch"], + ), + ( + 'empty response', + {}, + SAMPLE_UUID, + 0.001, + ["invalid response", "Elasticsearch"], + ), + ( + 'valid response but invalid match count', + { + "count": 0, + }, + SAMPLE_UUID, + 0.005, + ["expecting match", SAMPLE_UUID, "0.005s"], + ) +], ids=lambda argval: argval[0]) +def test_wait_until_cmd_or_err(name, json_response, uuid, timeout, extra_words): + def exec_oc(execute_module, ns, exec_cmd, args, task_vars): + return json.dumps(json_response) + + check = canned_loggingindextime(exec_oc) + with pytest.raises(OpenShiftCheckException) as error: + check.wait_until_cmd_or_err(plain_running_elasticsearch_pod, uuid, timeout, None) + + for word in extra_words: + assert word in str(error) + + +@pytest.mark.parametrize('name, json_response, uuid, extra_words', [ + ( + 'correct response code, found unique id is returned', + { + "statusCode": 404, + }, + "sample unique id", + ["sample unique id"], + ), +], ids=lambda argval: argval[0]) +def test_curl_kibana_with_uuid(name, json_response, uuid, extra_words): + def exec_oc(execute_module, ns, exec_cmd, args, task_vars): + return json.dumps(json_response) + + check = canned_loggingindextime(exec_oc) + check.generate_uuid = lambda: uuid + + result = check.curl_kibana_with_uuid(plain_running_kibana_pod, None) + + for word in extra_words: + assert word in result + + +@pytest.mark.parametrize('name, json_response, uuid, extra_words', [ + ( + 'invalid json response', + { + "invalid_field": "invalid", + }, + SAMPLE_UUID, + ["invalid response returned", 'Missing "statusCode" key'], + ), + ( + 'wrong error code in response', + { + "statusCode": 500, + }, + SAMPLE_UUID, + ["Expecting error code", "500"], + ), +], ids=lambda argval: argval[0]) +def test_failed_curl_kibana_with_uuid(name, json_response, uuid, extra_words): + def exec_oc(execute_module, ns, exec_cmd, args, task_vars): + return json.dumps(json_response) + + check = canned_loggingindextime(exec_oc) + check.generate_uuid = lambda: uuid + + with pytest.raises(OpenShiftCheckException) as error: + check.curl_kibana_with_uuid(plain_running_kibana_pod, None) + + for word in extra_words: + assert word in str(error) -- cgit v1.2.1