-
Notifications
You must be signed in to change notification settings - Fork 0
/
get_data.py
150 lines (130 loc) · 6.68 KB
/
get_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
import argparse
from elastic_agg_to_df import build_generic_aggregations_dataframe
import pandas as pd
import json
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
def get_hit_dict(hit):
hit_dict = hit.to_dict()
hit_dict.update(hit.meta.to_dict())
return hit_dict
def get_display_name(value, options):
return options['buckets']['names'][value] if value in options['buckets']['names'] else value
def build_buckets(aggs, options):
size = options['maxRowsPerAggregation']
for bucket in options['buckets']['order']:
aggs = aggs.bucket(get_display_name(bucket, options), 'terms', field=bucket, size=size)
return aggs
def build_metrics(aggs, options):
count_fields = []
extended_fields = {}
iqr_fields = []
for metric in options['metrics']:
percentiles = []
for stat in options['metrics'][metric]:
if stat == 'iqr':
iqr_field = f"{get_display_name(metric, options)}_iqr"
iqr_fields.append(iqr_field)
aggs.metric(iqr_field, 'percentiles',
field=metric, percents=[25, 75], keyed=False)
elif stat in ["std", "count", "sum", "sum_of_squares", "variance"]:
if metric not in extended_fields:
extended_fields[metric] = []
aggs.metric(get_display_name(metric, options), 'extended_stats', field=metric)
extended_fields[metric].append(stat)
elif stat == 'doc_count':
count_fields.append(f"{get_display_name(metric, options)}_count")
elif stat == 'average':
aggs.metric(f"{get_display_name(metric, options)}_{stat}", 'avg', field=metric)
elif stat == 'median':
aggs.metric(f"{get_display_name(metric, options)}_{stat}", 'percentiles',
field=metric, percents=50, keyed=False)
elif stat.startswith('percentile_'):
percentiles.append(int(stat.split('_')[-1]))
else:
aggs.metric(f"{get_display_name(metric, options)}_{stat}", stat, field=metric)
percentiles_size = len(percentiles)
if percentiles_size == 1:
aggs.metric(f"{get_display_name(metric, options)}_percentile_{percentiles[0]}", 'percentiles',
field=metric, percents=percentiles, keyed=False)
if percentiles_size > 1:
aggs.metric(f"{get_display_name(metric, options)}_percentile", 'percentiles',
field=metric, percents=percentiles, keyed=False)
return count_fields, extended_fields, iqr_fields
def build_query(search, index, from_time, to_time, options):
time_key = options['time']['field']
time_interval = options['time']['interval']
time_query = {
time_key: {
"format": "strict_date_optional_time",
"gte": from_time,
"lt": to_time
}
}
search = search.index(index)
search = search.query('range', **time_query)
aggs = search.aggs.bucket(get_display_name(time_key, options), 'date_histogram',
field=time_key, interval=time_interval)
aggs = build_buckets(aggs, options)
count_fields, extended_fields, iqr_fields = build_metrics(aggs, options)
return search, count_fields, extended_fields, iqr_fields
def calculate_iqr_fields(df, iqr_fields):
remove_columns = []
for x in iqr_fields:
df[x] = df.apply(lambda row: row[x + "_75"] - row[x + "_25"], axis=1)
remove_columns.append(x + "_25")
remove_columns.append(x + "_75")
df = df.drop(remove_columns, axis=1)
return df
def get_data(index='metricbeat-*', from_time='now-5m', to_time='now', host='localhost',
port='9200', user='elastic', password='changeme', options='./options.json'):
"""
Get aggregated data from elasticsearch
:param index: Index to read from
:param from_time: Query from time, e.g. "now-5m", "2019-11-17T10:00:00.000Z", etc.
:param to_time: Query to time, e.g. "now", "2019-11-17T11:00:00.000Z", etc.
:param host: ElasticSearch host name
:param port: ElasticSearch port number
:param user: ElasticSearch user
:param password: ElasticSearch password
:param options: Options object, contains the aggregations and metrics (see options.json)
:return: DataFrame with aggregated data
"""
try:
options = json.load(open(options))
hosts = [{"host": host, "port": port}]
http_auth = (user, password)
elastic_client = Elasticsearch(hosts=hosts, http_auth=http_auth)
search = Search(using=elastic_client)
search, count_fields, extended_fields, iqr_fields = build_query(search, index, from_time, to_time, options)
response = search.execute()
if len(response.aggregations.timestamp.buckets) == 0:
return None
df = build_generic_aggregations_dataframe(response, count_fields, extended_fields)
if iqr_fields and len(iqr_fields) > 0:
df = calculate_iqr_fields(df, iqr_fields)
return df
except Exception as ex:
raise Exception('Invalid results from elastic, check query')
def download_json(path, index, last_minutes, host, port, user, password, options):
from_time = f"now-{last_minutes}m"
to_time = "now"
df = get_data(index, from_time, to_time, host, port, user, password, options)
print(df.head(10))
print(df.shape)
df.to_json(path)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Download elastic search last X minutes on an index')
parser.add_argument('--path', '-o', help='JSON output path', type=str, default="data.json", required=False)
parser.add_argument('--index', '-i', help="ElasticSearch index", type=str, default='metricbeat-*', required=False)
parser.add_argument('--lastMinutes', '-m', help="Last minutes to get data for", dest='last_minutes', type=int, default=120, required=False)
parser.add_argument('--host', '-a', help="ElasticSearch Host's ip/address", type=str, default='elastic.monitor.net', required=False)
parser.add_argument('--port', '-p', help="ElasticSearch Host's port", type=int, default=9200, required=False)
parser.add_argument('--user', '-u', help="ElasticSearch username", type=str, default='elastic', required=False)
parser.add_argument('--password', '-pw', help="ElasticSearch username's password", type=str, default='changeme', required=False)
parser.add_argument('--options', '-opt', help="Aggregations options", type=str, default="./options.json", required=False)
args = parser.parse_args()
print(args)
download_json(**vars(args))