from typing import Dict, List, Union
from docarray import Document, DocumentArray
from docarray.score import NamedScore
from numpy import dot
from numpy.linalg import norm
from now.utils.docarray.helpers import get_chunk_by_field_name
[docs]def convert_es_to_da(
result: Union[Dict, List[Dict]], get_score_breakdown: bool
) -> DocumentArray:
"""
Transform Elasticsearch documents into DocumentArray. Assumes that all Elasticsearch
documents have a 'text' field. It returns embeddings as part of the tags for each field that is encoded.
:param result: results from an Elasticsearch query.
:param get_score_breakdown: whether to return the embeddings as tags for each document.
:return: a DocumentArray containing all results.
"""
if isinstance(result, Dict):
result = [result]
da = DocumentArray()
for es_doc in result:
doc = Document.from_base64(es_doc['_source']['serialized_doc'])
for k, v in es_doc['_source'].items():
if (
k.startswith('embedding') or k.endswith('embedding')
) and get_score_breakdown:
if 'embeddings' not in doc.tags:
doc.tags['embeddings'] = {}
doc.tags['embeddings'][k] = v
da.append(doc)
return da
[docs]def convert_doc_map_to_es(
docs_map: Dict[str, DocumentArray],
index_name: str,
encoder_to_fields: dict,
) -> List[Dict]:
"""
Transform a dictionary (mapping encoder to DocumentArray) into a list of Elasticsearch documents.
The `docs_map` dictionary is expected to have the following structure:
{
'encoder1': DocumentArray([...]),
'encoder2': DocumentArray([...]), # same number of documents as encoder1
...
}
:param docs_map: dictionary mapping encoder to DocumentArray.
:param index_name: name of the index to be used in Elasticsearch.
:param encoder_to_fields: dictionary mapping encoder to fields.
:return: a list of Elasticsearch documents as dictionaries ready to be indexed.
"""
es_docs = {}
for executor_name, documents in docs_map.items():
for doc in documents:
if doc.id not in es_docs:
es_docs[doc.id] = get_base_es_doc(doc, index_name)
_doc = DocumentArray(Document(doc, copy=True))
# remove embeddings from serialized doc
_doc[..., 'embedding'] = None
es_docs[doc.id]['serialized_doc'] = _doc[0].to_base64()
es_doc = es_docs[doc.id]
for encoded_field in encoder_to_fields[executor_name]:
field_doc = get_chunk_by_field_name(doc, encoded_field)
es_doc[
f'{encoded_field}-{executor_name}.embedding'
] = field_doc.embedding
if hasattr(field_doc, 'text') and field_doc.text:
es_doc[f'{encoded_field}'] = field_doc.text
if hasattr(field_doc, 'uri') and field_doc.uri:
es_doc['uri'] = field_doc.uri
return list(es_docs.values())
[docs]def get_base_es_doc(doc: Document, index_name: str) -> Dict:
es_doc = {k: v for k, v in doc.to_dict().items() if v}
es_doc.pop('chunks', None)
es_doc.pop('_metadata', None)
es_doc['_op_type'] = 'index'
es_doc['_index'] = index_name
es_doc['_id'] = doc.id
# TODO remove side effect - should not be part of this function
doc.tags['embeddings'] = {}
return es_doc
[docs]def convert_es_results_to_matches(
query_doc: Document,
es_results: List[Dict],
get_score_breakdown: bool,
metric: str,
score_calculation,
) -> DocumentArray:
"""
Transform a list of results from Elasticsearch into a matches in the form of a `DocumentArray`.
:param query_doc: the query document.
:param es_results: List of dictionaries containing results from Elasticsearch querying.
:param get_score_breakdown: whether to calculate the score breakdown for matches.
:param metric: the metric used to calculate the score.
:param score_calculation: the score calculation for each match.
:return: `DocumentArray` that holds all matches in the form of `Document`s.
"""
matches = DocumentArray()
for result in es_results:
d = convert_es_to_da(result, get_score_breakdown)[0]
d.scores[metric] = NamedScore(value=result['_score'])
if get_score_breakdown:
d = calculate_score_breakdown(query_doc, d, score_calculation, metric)
d.embedding = None
matches.append(d)
return matches
[docs]def calculate_score_breakdown(
query_doc: Document, retrieved_doc: Document, score_calculation, metric
) -> Document:
"""
Calculate the score breakdown for a given retrieved document. Each score calculation in the indexer's
`score_calculation` should have a corresponding value, returned inside a list of scores in the documents
tags under `score_breakdown`.
:param query_doc: The query document. Contains embeddings for the score calculation at tag level.
:param retrieved_doc: The Elasticsearch results, containing embeddings inside the `_source` field.
:param score_calculation: The score calculation used for the score breakdown.
:param metric: The metric to be used for the score breakdown.
:return: List of integers representing the score breakdown.
"""
retrieved_doc.scores['total'] = retrieved_doc.scores.pop(
metric
) # save the final script score as total
add_bm25 = False
for (
query_field,
document_field,
encoder,
linear_weight,
) in score_calculation:
if encoder == 'bm25':
add_bm25 = True
continue
q_emb = query_doc.tags['embeddings'][f'{query_field}-{encoder}']
d_emb = retrieved_doc.tags['embeddings'][
f'{document_field}-{encoder}.embedding'
]
if metric == 'cosine':
score = calculate_cosine(d_emb, q_emb) * linear_weight
elif metric == 'l2_norm':
score = calculate_l2_norm(d_emb, q_emb) * linear_weight
else:
raise ValueError(f'Invalid metric {metric}')
retrieved_doc.scores[
'-'.join(
[
query_field,
document_field,
encoder,
str(linear_weight),
]
)
] = NamedScore(value=round(score, 6))
if add_bm25:
# calculate bm25 score
vector_total = sum(
[v.value for k, v in retrieved_doc.scores.items() if k != 'total']
)
bm25_normalized = retrieved_doc.scores['total'].value - vector_total - 1
bm25_raw = bm25_normalized * 10
retrieved_doc.scores['bm25_normalized'] = NamedScore(
value=round(bm25_normalized, 6)
)
retrieved_doc.scores['bm25_raw'] = NamedScore(value=round(bm25_raw, 6))
# remove embeddings from document
retrieved_doc.tags.pop('embeddings', None)
return retrieved_doc
[docs]def calculate_l2_norm(d_emb, q_emb):
return norm(q_emb - d_emb)
[docs]def calculate_cosine(d_emb, q_emb):
return dot(q_emb, d_emb) / (norm(q_emb) * norm(d_emb))