summaryrefslogtreecommitdiffstats
path: root/roles/openshift_health_checker/openshift_checks/logging/elasticsearch.py
blob: 986a01f386e497ec0365d5b8bfd5bc91fa8dcbb9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
"""Check for an aggregated logging Elasticsearch deployment"""

import json
import re

from openshift_checks import OpenShiftCheckException, OpenShiftCheckExceptionList
from openshift_checks.logging.logging import LoggingCheck


class Elasticsearch(LoggingCheck):
    """Check for an aggregated logging Elasticsearch deployment"""

    name = "elasticsearch"
    tags = ["health", "logging"]

    def run(self):
        """Check various things and gather errors. Returns: result as hash"""

        es_pods = self.get_pods_for_component("es")
        self.check_elasticsearch(es_pods)
        # TODO(lmeyer): run it all again for the ops cluster

        return {}

    def check_elasticsearch(self, es_pods):
        """Perform checks for Elasticsearch. Raises OpenShiftCheckExceptionList on any errors."""
        running_pods, errors = self.running_elasticsearch_pods(es_pods)
        pods_by_name = {
            pod['metadata']['name']: pod for pod in running_pods
            # Filter out pods that are not members of a DC
            if pod['metadata'].get('labels', {}).get('deploymentconfig')
        }
        if not pods_by_name:
            # nothing running, cannot run the rest of the check
            errors.append(OpenShiftCheckException(
                'NoRunningPods',
                'No logging Elasticsearch pods were found running, so no logs are being aggregated.'
            ))
            raise OpenShiftCheckExceptionList(errors)

        errors += self.check_elasticsearch_masters(pods_by_name)
        errors += self.check_elasticsearch_node_list(pods_by_name)
        errors += self.check_es_cluster_health(pods_by_name)
        errors += self.check_elasticsearch_diskspace(pods_by_name)
        if errors:
            raise OpenShiftCheckExceptionList(errors)

    def running_elasticsearch_pods(self, es_pods):
        """Returns: list of running pods, list of errors about non-running pods"""
        not_running = self.not_running_pods(es_pods)
        running_pods = [pod for pod in es_pods if pod not in not_running]
        if not_running:
            return running_pods, [OpenShiftCheckException(
                'PodNotRunning',
                'The following Elasticsearch pods are defined but not running:\n'
                '{pods}'.format(pods=''.join(
                    "  {} ({})\n".format(pod['metadata']['name'], pod['spec'].get('host', 'None'))
                    for pod in not_running
                ))
            )]
        return running_pods, []

    @staticmethod
    def _build_es_curl_cmd(pod_name, url):
        base = "exec {name} -- curl -s --cert {base}cert --key {base}key --cacert {base}ca -XGET '{url}'"
        return base.format(base="/etc/elasticsearch/secret/admin-", name=pod_name, url=url)

    def check_elasticsearch_masters(self, pods_by_name):
        """Check that Elasticsearch masters are sane. Returns: list of errors"""
        es_master_names = set()
        errors = []
        for pod_name in pods_by_name.keys():
            # Compare what each ES node reports as master and compare for split brain
            get_master_cmd = self._build_es_curl_cmd(pod_name, "https://localhost:9200/_cat/master")
            master_name_str = self.exec_oc(get_master_cmd, [], save_as_name="get_master_names.json")
            master_names = (master_name_str or '').split(' ')
            if len(master_names) > 1:
                es_master_names.add(master_names[1])
            else:
                errors.append(OpenShiftCheckException(
                    'NoMasterName',
                    'Elasticsearch {pod} gave unexpected response when asked master name:\n'
                    '  {response}'.format(pod=pod_name, response=master_name_str)
                ))

        if not es_master_names:
            errors.append(OpenShiftCheckException(
                'NoMasterFound',
                'No logging Elasticsearch masters were found.'
            ))
            return errors

        if len(es_master_names) > 1:
            errors.append(OpenShiftCheckException(
                'SplitBrainMasters',
                'Found multiple Elasticsearch masters according to the pods:\n'
                '{master_list}\n'
                'This implies that the masters have "split brain" and are not correctly\n'
                'replicating data for the logging cluster. Log loss is likely to occur.'
                .format(master_list='\n'.join('  ' + master for master in es_master_names))
            ))

        return errors

    def check_elasticsearch_node_list(self, pods_by_name):
        """Check that reported ES masters are accounted for by pods. Returns: list of errors"""

        if not pods_by_name:
            return [OpenShiftCheckException(
                'MissingComponentPods',
                'No logging Elasticsearch pods were found.'
            )]

        # get ES cluster nodes
        node_cmd = self._build_es_curl_cmd(list(pods_by_name.keys())[0], 'https://localhost:9200/_nodes')
        cluster_node_data = self.exec_oc(node_cmd, [], save_as_name="get_es_nodes.json")
        try:
            cluster_nodes = json.loads(cluster_node_data)['nodes']
        except (ValueError, KeyError):
            return [OpenShiftCheckException(
                'MissingNodeList',
                'Failed to query Elasticsearch for the list of ES nodes. The output was:\n' +
                cluster_node_data
            )]

        # Try to match all ES-reported node hosts to known pods.
        errors = []
        for node in cluster_nodes.values():
            # Note that with 1.4/3.4 the pod IP may be used as the master name
            if not any(node['host'] in (pod_name, pod['status'].get('podIP'))
                       for pod_name, pod in pods_by_name.items()):
                errors.append(OpenShiftCheckException(
                    'EsPodNodeMismatch',
                    'The Elasticsearch cluster reports a member node "{node}"\n'
                    'that does not correspond to any known ES pod.'.format(node=node['host'])
                ))

        return errors

    def check_es_cluster_health(self, pods_by_name):
        """Exec into the elasticsearch pods and check the cluster health. Returns: list of errors"""
        errors = []
        for pod_name in pods_by_name.keys():
            cluster_health_cmd = self._build_es_curl_cmd(pod_name, 'https://localhost:9200/_cluster/health?pretty=true')
            cluster_health_data = self.exec_oc(cluster_health_cmd, [], save_as_name='get_es_health.json')
            try:
                health_res = json.loads(cluster_health_data)
                if not health_res or not health_res.get('status'):
                    raise ValueError()
            except ValueError:
                errors.append(OpenShiftCheckException(
                    'BadEsResponse',
                    'Could not retrieve cluster health status from logging ES pod "{pod}".\n'
                    'Response was:\n{output}'.format(pod=pod_name, output=cluster_health_data)
                ))
                continue

            if health_res['status'] not in ['green', 'yellow']:
                errors.append(OpenShiftCheckException(
                    'EsClusterHealthRed',
                    'Elasticsearch cluster health status is RED according to pod "{}"'.format(pod_name)
                ))

        return errors

    def check_elasticsearch_diskspace(self, pods_by_name):
        """
        Exec into an ES pod and query the diskspace on the persistent volume.
        Returns: list of errors
        """
        errors = []
        for pod_name in pods_by_name.keys():
            df_cmd = 'exec {} -- df --output=ipcent,pcent /elasticsearch/persistent'.format(pod_name)
            disk_output = self.exec_oc(df_cmd, [], save_as_name='get_pv_diskspace.json')
            lines = disk_output.splitlines()
            # expecting one header looking like 'IUse% Use%' and one body line
            body_re = r'\s*(\d+)%?\s+(\d+)%?\s*$'
            if len(lines) != 2 or len(lines[0].split()) != 2 or not re.match(body_re, lines[1]):
                errors.append(OpenShiftCheckException(
                    'BadDfResponse',
                    'Could not retrieve storage usage from logging ES pod "{pod}".\n'
                    'Response to `df` command was:\n{output}'.format(pod=pod_name, output=disk_output)
                ))
                continue
            inode_pct, disk_pct = re.match(body_re, lines[1]).groups()

            inode_pct_thresh = self.get_var('openshift_check_efk_es_inode_pct', default='90')
            if int(inode_pct) >= int(inode_pct_thresh):
                errors.append(OpenShiftCheckException(
                    'InodeUsageTooHigh',
                    'Inode percent usage on the storage volume for logging ES pod "{pod}"\n'
                    '  is {pct}, greater than threshold {limit}.\n'
                    '  Note: threshold can be specified in inventory with {param}'.format(
                        pod=pod_name,
                        pct=str(inode_pct),
                        limit=str(inode_pct_thresh),
                        param='openshift_check_efk_es_inode_pct',
                    )))
            disk_pct_thresh = self.get_var('openshift_check_efk_es_storage_pct', default='80')
            if int(disk_pct) >= int(disk_pct_thresh):
                errors.append(OpenShiftCheckException(
                    'DiskUsageTooHigh',
                    'Disk percent usage on the storage volume for logging ES pod "{pod}"\n'
                    '  is {pct}, greater than threshold {limit}.\n'
                    '  Note: threshold can be specified in inventory with {param}'.format(
                        pod=pod_name,
                        pct=str(disk_pct),
                        limit=str(disk_pct_thresh),
                        param='openshift_check_efk_es_storage_pct',
                    )))

        return errors