import json
import os
from collections import defaultdict
from typing import Dict, List, Type
from docarray import Document, DocumentArray
from docarray.dataclasses import is_multimodal
from now.common.detect_schema import (
get_first_file_in_folder_structure_s3,
get_s3_bucket_and_folder_prefix,
)
from now.constants import MAX_DOCS_FOR_TESTING, DatasetTypes
from now.data_loading.create_dataclass import create_dataclass
from now.data_loading.elasticsearch import ElasticsearchExtractor
from now.log import yaspin_extended
from now.now_dataclasses import UserInput
from now.utils.common.helpers import flatten_dict, sigmap
from now.utils.docarray.helpers import get_chunk_by_field_name
[docs]def load_data(user_input: UserInput, print_callback=print) -> DocumentArray:
"""Based on the user input, this function will pull the configured DocumentArray dataset ready for the preprocessing
executor.
:param user_input: The configured user object. Result from the Jina Now cli dialog.
:param print_callback: The callback function that should be used to print the status.
:return: The loaded DocumentArray.
"""
da = None
if user_input.dataset_type in [DatasetTypes.DEMO, DatasetTypes.DOCARRAY]:
user_input.field_names_to_dataclass_fields = {
field: field for field in user_input.index_fields
}
data_class = None
else:
data_class, user_input.field_names_to_dataclass_fields = create_dataclass(
user_input=user_input
)
if user_input.dataset_type in [DatasetTypes.DOCARRAY, DatasetTypes.DEMO]:
print_callback('⬇ Pull DocumentArray dataset')
da = _pull_docarray(user_input.dataset_name, user_input.admin_name)
da = _update_fields_and_metadata(da, user_input)
elif user_input.dataset_type == DatasetTypes.PATH:
print_callback('💿 Loading files from disk')
da = _load_from_disk(user_input=user_input, data_class=data_class)
elif user_input.dataset_type == DatasetTypes.S3_BUCKET:
print_callback('🗄 Loading files from S3')
da = _list_files_from_s3_bucket(user_input=user_input, data_class=data_class)
elif user_input.dataset_type == DatasetTypes.ELASTICSEARCH:
print_callback('🔍 Loading data from Elasticsearch')
da = _extract_es_data(user_input=user_input, data_class=data_class)
da = set_modality_da(da)
_add_metadata_to_chunks(da, user_input)
if da is None:
raise ValueError(
f'Could not load DocumentArray dataset. Please check your configuration: {user_input}.'
)
if 'NOW_CI_RUN' in os.environ:
da = da[:MAX_DOCS_FOR_TESTING]
return da
def _add_metadata_to_chunks(da, user_input):
dataclass_fields_to_field_names = {
v: k for k, v in user_input.field_names_to_dataclass_fields.items()
}
for doc in da:
for dataclass_field, meta_dict in doc._metadata['multi_modal_schema'].items():
field_name = dataclass_fields_to_field_names.get(dataclass_field, None)
if 'position' in meta_dict:
get_chunk_by_field_name(doc, dataclass_field)._metadata[
'field_name'
] = field_name
def _update_fields_and_metadata(
da: DocumentArray, user_input: UserInput
) -> DocumentArray:
"""Add selected index fields to da, add the tags, remove non-index chunks,
and update multi modal schema."""
if not da:
return da
all_fields = da[0]._metadata['multi_modal_schema'].keys()
for doc in da:
filtered_chunks = []
filtered_chunk_names = []
for field in all_fields:
field_doc = get_chunk_by_field_name(doc, field)
if field not in user_input.index_fields:
if field_doc.blob or field_doc.tensor is not None:
continue
doc.tags.update(
{
field: field_doc.content
if isinstance(field_doc.content, str)
else field_doc.uri
}
)
else:
filtered_chunks.append(field_doc)
filtered_chunk_names.append(field)
doc.chunks = filtered_chunks
# keep only the index fields in metadata
doc._metadata['multi_modal_schema'] = {
field: doc._metadata['multi_modal_schema'][field]
for field in filtered_chunk_names
}
# Update the positions accordingly to access the chunks
for position, field in enumerate(filtered_chunk_names):
doc._metadata['multi_modal_schema'][field]['position'] = int(position)
return da
def _pull_docarray(dataset_name: str, admin_name: str) -> DocumentArray:
dataset_name = (
admin_name + '/' + dataset_name if '/' not in dataset_name else dataset_name
)
try:
docs = DocumentArray.pull(name=dataset_name, show_progress=True)
if is_multimodal(docs[0]):
return docs
else:
raise ValueError(
f'The dataset {dataset_name} does not contain a multimodal DocumentArray. '
f'Please check documentation https://docarray.jina.ai/fundamentals/dataclass/construct/'
)
except Exception:
raise ValueError(
'DocumentArray does not exist or you do not have access to it. '
'Make sure to add user name as a prefix. Check documentation here. '
'https://docarray.jina.ai/fundamentals/cloud-support/data-management/'
)
def _extract_es_data(user_input: UserInput, data_class: Type) -> DocumentArray:
query = {
'query': {'match_all': {}},
'_source': True,
}
es_extractor = ElasticsearchExtractor(
query=query,
index=user_input.es_index_name,
user_input=user_input,
data_class=data_class,
connection_str=user_input.es_host_name,
)
extracted_docs = es_extractor.extract()
return extracted_docs
def _load_from_disk(user_input: UserInput, data_class: Type) -> DocumentArray:
"""
Loads the data from disk into multimodal documents.
:param user_input: The user input object.
:param data_class: The dataclass to use for the DocumentArray.
"""
dataset_path = user_input.dataset_path.strip()
dataset_path = os.path.expanduser(dataset_path)
if os.path.isfile(dataset_path):
try:
da = DocumentArray.load_binary(dataset_path)
if is_multimodal(da[0]):
da = _update_fields_and_metadata(da, user_input)
return da
else:
raise ValueError(
f'The file {dataset_path} does not contain a multimodal DocumentArray.'
f'Please check documentation https://docarray.jina.ai/fundamentals/dataclass/construct/'
)
except Exception:
print(f'Failed to load the binary file provided under path {dataset_path}')
exit(1)
elif os.path.isdir(dataset_path):
with yaspin_extended(
sigmap=sigmap, text="Loading data from folder", color="green"
) as spinner:
spinner.ok('🏭')
docs = from_files_local(
dataset_path,
user_input.index_fields,
user_input.field_names_to_dataclass_fields,
data_class,
)
return docs
else:
raise ValueError(
f'The provided dataset path {dataset_path} does not'
f' appear to be a valid file or folder on your system.'
)
[docs]def from_files_local(
path: str,
fields: List[str],
field_names_to_dataclass_fields: Dict,
data_class: Type,
) -> DocumentArray:
"""Creates a Multi Modal documentarray over a list of file path or the content of the files.
:param path: The path to the directory
:param fields: The fields to search for in the directory
:param field_names_to_dataclass_fields: The mapping of the field names to the dataclass fields
:param data_class: The dataclass to use for the document
:return: A DocumentArray with the documents
"""
file_paths = []
for root, dirs, files in os.walk(path):
file_paths.extend(
[os.path.join(root, file) for file in files if not file.startswith('.')]
)
folder_generator = os.walk(path, topdown=True)
current_level = folder_generator.__next__()
folder_structure = 'sub_folders' if len(current_level[1]) > 0 else 'single_folder'
if folder_structure == 'sub_folders':
docs = create_docs_from_subdirectories(
file_paths, fields, field_names_to_dataclass_fields, data_class
)
else:
docs = create_docs_from_files(
file_paths, fields, field_names_to_dataclass_fields, data_class
)
return DocumentArray(docs)
[docs]def create_docs_from_subdirectories(
file_paths: List,
fields: List[str],
field_names_to_dataclass_fields: Dict,
data_class: Type,
path: str = None,
is_s3_dataset: bool = False,
) -> List[Document]:
"""
Creates a Multi Modal documentarray over a list of subdirectories.
:param file_paths: The list of file paths
:param fields: The fields to search for in the directory
:param field_names_to_dataclass_fields: The mapping of the field names to the dataclass fields
:param data_class: The dataclass to use for the document
:param path: The path to the directory
:param is_s3_dataset: Whether the dataset is stored on s3
:return: The list of documents
"""
docs = []
folder_files = defaultdict(list)
for file in file_paths:
path_to_last_folder = (
'/'.join(file.split('/')[:-1])
if is_s3_dataset
else os.sep.join(file.split(os.sep)[:-1])
)
folder_files[path_to_last_folder].append(file)
for folder, files in folder_files.items():
kwargs = {}
tags_loaded_local = {}
_s3_uri_for_tags = ''
file_info = [
_extract_file_and_full_file_path(file, path, is_s3_dataset)
for file in files
]
# first store index fields given as files
for file, file_full_path in file_info:
if file in fields:
kwargs[field_names_to_dataclass_fields[file]] = file_full_path
# next check json files that can also contain index fields, and carry on data
for file, file_full_path in file_info:
if file.endswith('.json'):
if is_s3_dataset:
_s3_uri_for_tags = file_full_path
for field in data_class.__annotations__.keys():
if field not in kwargs.keys():
kwargs[field] = file_full_path
else:
with open(file_full_path) as f:
json_data = flatten_dict(json.load(f))
for field, value in json_data.items():
if field in fields:
kwargs[field_names_to_dataclass_fields[field]] = value
else:
tags_loaded_local[field] = value
doc = Document(data_class(**kwargs))
if _s3_uri_for_tags:
doc._metadata['_s3_uri_for_tags'] = _s3_uri_for_tags
elif tags_loaded_local:
doc.tags.update(tags_loaded_local)
docs.append(doc)
return docs
[docs]def create_docs_from_files(
file_paths: List,
fields: List[str],
field_names_to_dataclass_fields: Dict,
data_class: Type,
path: str = None,
is_s3_dataset: bool = False,
) -> List[Document]:
"""
Creates a Multi Modal documentarray over a list of files.
:param file_paths: List of file paths
:param fields: The fields to search for in the directory
:param field_names_to_dataclass_fields: The mapping of the files to the dataclass fields
:param data_class: The dataclass to use for the document
:param path: The path to the directory
:param is_s3_dataset: Whether the dataset is stored on s3
:return: A list of documents
"""
docs = []
for file in file_paths:
kwargs = {}
file, file_full_path = _extract_file_and_full_file_path(
file, path, is_s3_dataset
)
file_extension = file.split('.')[-1]
if (
file_extension == fields[0].split('.')[-1]
): # fields should have only one index field in case of files only
kwargs[field_names_to_dataclass_fields[fields[0]]] = file_full_path
docs.append(Document(data_class(**kwargs)))
return docs
def _list_s3_file_paths(bucket, folder_prefix):
"""
Lists the s3 file paths in an optimized way by finding the best level to use concurrent calls on
in the file structure, using a threadpool.
:param bucket: The s3 bucket used
:param folder_prefix: The root folder prefix
:return: A list of all s3 paths
"""
# TODO bucket is not thread safe and outputs duplicate files for different prefixes
# first_file = get_first_file_in_folder_structure_s3(bucket, folder_prefix)
# structure_identifier = first_file[len(folder_prefix) :].split('/')
# folder_structure = (
# 'sub_folders' if len(structure_identifier) > 1 else 'single_folder'
# )
#
# def get_level_order_prefixes(folder_prefix, level=1):
# """
# Gets the list of prefixes in a specific level. Levels are defined by the folder structure as follows:
# level_1/level_2/.../level_n/file.ext
#
# :param folder_prefix: The current level prefix
# :param level: The desired level we want to get to
#
# :return: A list of prefixes
# """
# level_prefixes = [
# obj['Prefix']
# for obj in bucket.meta.client.list_objects(
# Bucket=bucket.name, Prefix=folder_prefix, Delimiter='/'
# )['CommonPrefixes']
# ]
# if level == 1:
# return level_prefixes
# else:
# prefix_list = []
# for prefix in level_prefixes:
# prefix_list += get_level_order_prefixes(prefix, level - 1)
# return prefix_list
#
# def get_prefixes(max_levels=len(structure_identifier) - 2):
# """
# Finds the best level for the prefixes
#
# :param max_levels: The maximum number of level we can get to, this defaults to len(structure_identifier) - 2
# because the latest level (len(structure_identifier) - 1) will only have files, so it won't have any common
# prefixes inside.
#
# :return: A list of prefixes
# """
# level = 1
# list_prefixes = get_level_order_prefixes(folder_prefix, level)
# prefixes_states = [list_prefixes]
# while level < max_levels and len(list_prefixes) < NUM_FOLDERS_THRESHOLD:
# level += 1
# list_prefixes = get_level_order_prefixes(folder_prefix, level)
# prefixes_states.append(list_prefixes)
# if len(list_prefixes) > NUM_FOLDERS_THRESHOLD and len(prefixes_states) > 1:
# return prefixes_states[-2]
# return list_prefixes
#
# objects = []
# if folder_structure == 'sub_folders':
# prefixes = get_prefixes()
# # TODO: change cpu count to a fixed number
# with ThreadPoolExecutor(max_workers=20) as executor:
# futures = []
# for prefix in prefixes:
# pref = ''.join(prefix)
# f = executor.submit(
# lambda: list(bucket.objects.filter(Prefix=f'{pref}'))
# )
# futures.append(f)
# for f in futures:
# objects += f.result()
# else:
# objects = list(bucket.objects.filter(Prefix=folder_prefix))
objects = list(bucket.objects.filter(Prefix=folder_prefix))
return [
obj.key
for obj in objects
if not obj.key.endswith('/') and not obj.key.split('/')[-1].startswith('.')
]
def _list_files_from_s3_bucket(
user_input: UserInput, data_class: Type
) -> DocumentArray:
"""
Loads the data from s3 into multimodal documents.
:param user_input: The user input object.
:param data_class: The dataclass to use for the DocumentArray.
:return: The DocumentArray with the documents.
"""
bucket, folder_prefix = get_s3_bucket_and_folder_prefix(user_input)
first_file = get_first_file_in_folder_structure_s3(bucket, folder_prefix)
structure_identifier = first_file[len(folder_prefix) :].split('/')
folder_structure = (
'sub_folders' if len(structure_identifier) > 1 else 'single_folder'
)
with yaspin_extended(
sigmap=sigmap, text="Listing files from S3 bucket ...", color="green"
) as spinner:
file_paths = _list_s3_file_paths(bucket, folder_prefix)
spinner.ok('🏭')
with yaspin_extended(
sigmap=sigmap, text="Creating docarray from S3 bucket files ...", color="green"
) as spinner:
if folder_structure == 'sub_folders':
docs = create_docs_from_subdirectories(
file_paths,
user_input.index_fields,
user_input.field_names_to_dataclass_fields,
data_class,
user_input.dataset_path,
is_s3_dataset=True,
)
else:
docs = create_docs_from_files(
file_paths,
user_input.index_fields,
user_input.field_names_to_dataclass_fields,
data_class,
user_input.dataset_path,
is_s3_dataset=True,
)
spinner.ok('👝')
return DocumentArray(docs)
def _extract_file_and_full_file_path(file_path, path=None, is_s3_dataset=False):
"""
Extracts the file name and the full file path from s3 object.
:param file_path: The file path
:param path: The path to the directory
:param is_s3_dataset: Whether the dataset is stored on s3
:return: The file name and the full file path
"""
if is_s3_dataset:
file = file_path.split('/')[-1]
file_full_path = '/'.join(path.split('/')[:3]) + '/' + file_path
else:
file_full_path = file_path
file = file_path.split(os.sep)[-1]
return file, file_full_path
def _get_modality(document: Document):
"""
Detect document's modality based on its `modality` or `mime_type` attributes.
:param document: The document to detect the modality for.
"""
modalities = ['text', 'image', 'video']
if document.modality:
return document.modality
mime_type_class = document.mime_type.split('/')[0]
if document.mime_type == 'application/json':
return 'text'
if mime_type_class in modalities:
return mime_type_class
document.summary()
raise ValueError(f'Unknown modality')
[docs]def set_modality_da(documents: DocumentArray):
"""
Set document's modality based on its `modality` or `mime_type` attributes.
:param documents: The DocumentArray to set the modality for.
:return: The DocumentArray with the modality set.
"""
for doc in documents:
for chunk in doc.chunks:
chunk.modality = chunk.modality or _get_modality(chunk)
return documents