Source code for now.data_loading.elasticsearch.data_extractor
import logging
from typing import Dict, Optional, Type, Union
from docarray import Document, DocumentArray
from now.data_loading.elasticsearch.connector import ElasticsearchConnector
from now.now_dataclasses import UserInput
logging.getLogger("PIL.Image").setLevel(logging.CRITICAL + 1)
ID_TAG = 'id'
FIELD_TAG = 'field_name'
EXTRACTION_TYPE_TAG = 'extraction_type'
[docs]class ElasticsearchExtractor:
def __init__(
self,
query: Dict,
index: str,
user_input: UserInput,
connection_str: str,
data_class: Type = None,
connection_args: Optional[Dict] = None,
):
"""
For extracting documents from Elasticsearch into a `docarray.DocumentArray`
dataset, this class implements an iterator which yields `docarray.Document`
objects. To specify the data for extraction, one needs to provide an
es query together with the index name and parameters to connect to
the Elasticsearch instance.
:param query: Elasticsearch query in the form of a JSON string
:param index: Name of the ES index containing the documents to be extracted
:param connection_str: A connection string for the ES instance. Usually, it
includes url, port, username, password, etc. Typically, it has the form:
'https://{user_name}:{password}@{host}:{port}'
:param connection_args: Dictionary with additional connection arguments,
e.g., information about certificates
"""
self._es_connector = ElasticsearchConnector(
connection_str=connection_str,
connection_args=(connection_args if connection_args else {}),
)
self._query = query
self._index = index
self._user_input = user_input
self._data_class = data_class
self._document_cache = []
self._query_result = self._es_connector.get_documents_by_query(
self._query, self._index
)
[docs] def extract(self) -> DocumentArray:
return DocumentArray([doc for doc in self._extract_documents()])
def _extract_documents(self):
try:
next_doc = self._get_next_document()
while next_doc:
yield next_doc
next_doc = self._get_next_document()
except StopIteration:
self._es_connector.close()
return
def _get_next_document(self) -> Union[Document, None]:
"""
Returns the next document from the Elasticsearch database.
In order to retrieve further documents, Elasticsearch documents are retrieved
in pages of multiple documents. After retrieving a page, its contained
documents are stored in a document cache. If documents are left in the cache
this function returns one of those documents. Otherwise, the next page is
queried. If there is no page left, None will be returned.
:return: extracted document
"""
if len(self._document_cache) == 0:
self._document_cache = next(self._query_result)
if len(self._document_cache) == 0:
return None
return self._construct_document(self._document_cache.pop())
def _construct_document(self, es_document: Dict) -> Document:
"""
Constructs a `docarray.Document` object from an Elasticsearch document.
:param es_document: Elasticsearch document
:return: `docarray.Document` object
Creates a document using the dataclass specified in the user input.
"""
kwargs, tags = {}, {}
for field_name, field_value in es_document.items():
if field_name in self._user_input.index_fields:
kwargs[
self._user_input.field_names_to_dataclass_fields[field_name]
] = field_value
else:
tags[field_name] = field_value
doc = Document(self._data_class(**kwargs))
doc.tags = tags
return doc