import logging
from typing import Dict, Generator, List, Optional
from elasticsearch import Elasticsearch
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("elastic_transport").setLevel(logging.WARNING)
[docs]class ElasticsearchConnector:
def __init__(
self,
connection_str: str = 'http://localhost:9200',
connection_args: Optional[Dict] = None,
):
"""
Provides an interface to an Elasticsearch database.
:param connection_str: A connection string for the ES instance. Usually, it
includes url, port, username, password, etc. Typically, it has the form:
'https://{user_name}:{password}@{host}:{port}'
:param connection_args: Dictionary with additional connection arguments,
e.g., information about certificates
"""
self._connection_str = connection_str
self._connection_args = (
connection_args if connection_args else {'verify_certs': False}
)
self.es = Elasticsearch(self._connection_str, **self._connection_args)
def __enter__(self) -> 'ElasticsearchConnector':
return self
def __exit__(self, type, value, traceback) -> None:
self.close()
[docs] def get_documents_by_query(
self, query: Dict, index_name: str, page_size: Optional[int] = 10
) -> Generator[List[Dict], None, None]:
"""
Executes an Elasticsearch query on a given index and returns a generator which
yields pages of documents from the query results.
:param query: Elasticsearch query
:param index_name: Name of an Elasticsearch index
:param page_size: Number of documents per page
:return: Generator which yields one page of documents on each call.
"""
resp = self.es.search(
**query,
index=index_name,
scroll='2m',
size=page_size,
source=False,
)
documents = [
{**doc['_source'], **{'id': doc['_id']}} for doc in resp['hits']['hits']
]
scroll_id = resp['_scroll_id']
scroll_size = len(documents)
while scroll_size > 0:
yield documents
resp = self.es.scroll(scroll_id=scroll_id, scroll='2m')
scroll_id = resp['_scroll_id']
documents = [
{**doc['_source'], **{'id': doc['_id']}} for doc in resp['hits']['hits']
]
scroll_size = len(documents)
[docs] def close(self) -> None:
"""
Closes Elasticsearch connection.
"""
self.es.close()