-
Notifications
You must be signed in to change notification settings - Fork 960
/
es_proxy_v2_1.py
403 lines (353 loc) · 14.6 KB
/
es_proxy_v2_1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import json
import logging
from typing import (
Any, Dict, List,
)
from amundsen_common.models.search import (
Filter, HighlightOptions, SearchResponse,
)
from elasticsearch import Elasticsearch
from elasticsearch_dsl import (
MultiSearch, Q, Search,
)
from elasticsearch_dsl.query import Match, RankFeature
from elasticsearch_dsl.response import Response
from flask import current_app
from search_service import config
from search_service.proxy.es_proxy_utils import Resource, create_search_response
from search_service.proxy.es_proxy_v2 import BOOL_QUERY, ElasticsearchProxyV2
LOGGER = logging.getLogger(__name__)
# ES query constants
DEFAULT_FUZZINESS = "AUTO"
# using fvh as the default highlighter because it supports very large documents
DEFAULT_HIGHLIGHTER = 'fvh'
class ElasticsearchProxyV2_1(ElasticsearchProxyV2):
# map the field name in FE to the field used to filter in ES
# note: ES needs keyword field types to filter
GENERAL_MAPPING = {
'key': 'key',
'description': 'description',
'resource_type': 'resource_type',
}
TABLE_MAPPING = {
**GENERAL_MAPPING,
'badges': 'badges.keyword',
'tag': 'tags.keyword',
'schema': 'schema.keyword',
'table': 'name.keyword',
'column': 'columns.keyword',
'database': 'database.keyword',
'cluster': 'cluster.keyword',
}
DASHBOARD_MAPPING = {
**GENERAL_MAPPING,
'url': 'url',
'uri': 'uri',
'last_successful_run_timestamp': 'last_successful_run_timestamp',
'group_name': 'group_name.keyword',
'chart_names': 'chart_names.keyword',
'query_names': 'query_names.keyword',
'name': 'name.keyword',
'tag': 'tags.keyword',
}
FEATURE_MAPPING = {
**GENERAL_MAPPING,
'version': 'version',
'availability': 'availability',
'feature_group': 'feature_group.keyword',
'feature_name': 'name.keyword',
'entity': 'entity.keyword',
'status': 'status.keyword',
'tags': 'tags.keyword',
'badges': 'badges.keyword',
}
USER_MAPPING = {
'full_name': 'name.keyword',
'email': 'key',
'first_name': 'first_name',
'last_name': 'last_name',
'resource_type': 'resource_type',
}
RESOURCE_TO_MAPPING = {
Resource.TABLE: TABLE_MAPPING,
Resource.DASHBOARD: DASHBOARD_MAPPING,
Resource.FEATURE: FEATURE_MAPPING,
Resource.USER: USER_MAPPING,
}
# The overriding of __new__ here is a temporary solution to provide backwards compatiblity
# until most of the community has moved to using the new Elasticsearch mappings and it will
# be removed once ElasticsearchProxyV2 id deprecated
def __new__(cls: Any,
host: str,
user: str,
password: str,
client: Elasticsearch,
page_size: int, *args: str, **kwargs: int) -> Any:
elasticsearch_client = None
if client:
elasticsearch_client = client
else:
http_auth = (user, password) if user else None
elasticsearch_client = Elasticsearch(host, http_auth=http_auth)
# check if any index uses the most up to date mappings (version == 2)
indices = elasticsearch_client.indices.get_alias(index='*')
mappings_up_to_date = False
for index in indices:
index_mapping = elasticsearch_client.indices.get_mapping(index=index).get(index)
mapping_meta_field = index_mapping.get('mappings').get('_meta')
if mapping_meta_field is not None and mapping_meta_field.get('version') == 2:
mappings_up_to_date = True
break
if mappings_up_to_date:
# Use ElasticsearchProxyV2_1 if indexes are up to date with mappings
obj = super(ElasticsearchProxyV2_1, cls).__new__(cls)
return obj
# If old mappings are used proxy client should be ElasticsearchProxyV2
obj = super(ElasticsearchProxyV2_1, cls).__new__(ElasticsearchProxyV2)
obj.__init__(host=host,
user=user,
password=password,
client=elasticsearch_client,
page_size=page_size)
return obj
def get_index_alias_for_resource(self, resource_type: Resource) -> str:
resource_str = resource_type.name.lower()
alias_config = current_app.config.get(
config.ES_INDEX_ALIAS_TEMPLATE
)
if alias_config is None:
return f'{resource_str}_search_index_v2_1'
alias = str(alias_config).format(resource=resource_str)
return alias
def _build_must_query(self, resource: Resource, query_term: str) -> List[Q]:
"""
Builds the query object for the inputed search term
"""
if not query_term:
# We don't want to create match query for ""
# because it will result in no matches even with filters
return []
# query for fields general to all resources
should_clauses: List[Q] = [
Match(name={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 5
}),
Match(description={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 1.5
}),
Match(badges={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
}),
Match(tags={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
}),
]
if resource == Resource.TABLE:
columns_subfield = 'columns.general'
should_clauses.extend([
Match(schema={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 3
}),
Match(**{columns_subfield: {
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 2
}}),
Match(column_descriptions={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS
}),
])
elif resource == Resource.DASHBOARD:
should_clauses.extend([
Match(group_name={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 3
}),
Match(query_names={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 2
}),
Match(chart_names={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 2
}),
Match(uri={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 4
}),
])
elif resource == Resource.FEATURE:
should_clauses.extend([
Match(feature_group={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 3
}),
Match(version={
"query": query_term
}),
Match(entity={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 2
}),
Match(status={
"query": query_term
}),
])
elif resource == Resource.USER:
# replaces rather than extending
should_clauses = [
Match(name={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 5
}),
Match(first_name={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 3
}),
Match(last_name={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 3
}),
Match(team_name={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS
}),
Match(key={
"query": query_term,
"fuzziness": DEFAULT_FUZZINESS,
"boost": 4
}),
]
must_clauses: List[Q] = [Q(BOOL_QUERY, should=should_clauses)]
return must_clauses
def _build_should_query(self, resource: Resource, query_term: str) -> List[Q]:
# no scoring happens if there is no search term
if query_term == '':
return []
# general usage metric for searcheable resources
usage_metric_field_boosts = {
'total_usage': 10.0,
}
if resource == Resource.TABLE:
usage_metric_field_boosts = {
**usage_metric_field_boosts,
'unique_usage': 10.0,
}
if resource == Resource.USER:
usage_metric_field_boosts = {
'total_read': 10.0,
'total_own': 10.0,
'total_follow': 10.0,
}
rank_feature_queries = []
for metric in usage_metric_field_boosts.keys():
field_name = f'usage.{metric}'
boost = usage_metric_field_boosts[metric]
rank_feature_query = RankFeature(field=field_name,
boost=boost)
rank_feature_queries.append(rank_feature_query)
return rank_feature_queries
def _search_highlight(self,
resource: Resource,
search: Search,
highlight_options: Dict[Resource, HighlightOptions]) -> Search:
# get highlighting options for resource
resource_options = highlight_options.get(resource)
highlighting_enabled = resource_options.enable_highlight if resource_options else None
if highlighting_enabled:
# default highlighted fields
search = search.highlight('name',
type=DEFAULT_HIGHLIGHTER,
number_of_fragments=0)
search = search.highlight('description',
type=DEFAULT_HIGHLIGHTER,
number_of_fragments=0)
if resource == Resource.TABLE:
search = search.highlight('columns.general',
type=DEFAULT_HIGHLIGHTER,
number_of_fragments=10,
order='score')
search = search.highlight('column_descriptions',
type=DEFAULT_HIGHLIGHTER,
number_of_fragments=5,
order='score')
if resource == Resource.DASHBOARD:
search = search.highlight('chart_names',
type=DEFAULT_HIGHLIGHTER,
number_of_fragments=10,
order='score')
search = search.highlight('query_names',
type=DEFAULT_HIGHLIGHTER,
number_of_fragments=10,
order='score')
return search
def execute_multisearch_query(self, multisearch: MultiSearch) -> List[Response]:
try:
response = multisearch.execute()
return response
except Exception as e:
LOGGER.error(f'Failed to execute ES search queries. {e}')
return []
def search(self, *,
query_term: str,
page_index: int,
results_per_page: int,
resource_types: List[Resource],
filters: List[Filter],
highlight_options: Dict[Resource, HighlightOptions]) -> SearchResponse:
if not resource_types:
# if resource types are not defined then search all resources
resource_types = self.PRIMARY_ENTITIES
multisearch = MultiSearch(using=self.elasticsearch)
for resource in resource_types:
# guard clause to prevent search in missing indices or aliases
aliases_in_es = {alias['alias'] for alias in self.elasticsearch.cat.aliases(format="json")}
resource_alias = self.get_index_alias_for_resource(resource_type=resource)
if resource_alias not in aliases_in_es:
LOGGER.info(f"There are no indices in elasticsearch against resource_type: {resource}")
continue
# build a query for each resource to search
query_for_resource = self._build_elasticsearch_query(resource=resource,
query_term=query_term,
filters=filters)
# wrap the query in a search object
search = Search(index=resource_alias).query(query_for_resource)
# highlighting
if highlight_options:
search = self._search_highlight(resource=resource,
search=search,
highlight_options=highlight_options)
# pagination
start_from = page_index * results_per_page
end = results_per_page * (page_index + 1)
search = search[start_from:end]
# add search object to multisearch
LOGGER.info(json.dumps(search.to_dict()))
multisearch = multisearch.add(search)
responses = self.execute_multisearch_query(multisearch=multisearch)
formatted_response = create_search_response(page_index=page_index,
results_per_page=results_per_page,
responses=responses,
resource_types=resource_types,
resource_to_field_mapping=self.RESOURCE_TO_MAPPING)
return formatted_response