Source code for now.executor.preprocessor.executor

import tempfile

from jina import Document, DocumentArray

from now.app.base.app import JinaNOWApp
from now.constants import DatasetTypes
from now.executor.abstract.auth import (
    SecurityLevel,
    get_auth_executor_class,
    secure_request,
)
from now.executor.preprocessor.s3_download import maybe_download_from_s3

Executor = get_auth_executor_class()


[docs]def move_uri(d: Document) -> Document: cloud_uri = d.tags.get('uri') if isinstance(cloud_uri, str) and cloud_uri.startswith('s3://'): d.uri = cloud_uri d.chunks[:, 'uri'] = cloud_uri return d
[docs]class NOWPreprocessor(Executor): """Applies preprocessing to documents for encoding, indexing and searching as defined by app. If necessary, downloads files for that from cloud bucket. """ def __init__(self, max_workers: int = 15, *args, **kwargs): super().__init__(*args, **kwargs) self.app: JinaNOWApp = JinaNOWApp() self.max_workers = max_workers @secure_request(on=None, level=SecurityLevel.USER) def preprocess(self, docs: DocumentArray, *args, **kwargs) -> DocumentArray: """If necessary downloads data from cloud bucket. Applies preprocessing to document as defined by apps. :param docs: loaded data but not preprocessed :return: preprocessed documents which are ready to be encoded and indexed """ if len(docs) > 0 and not docs[0].chunks: raise ValueError( 'Documents are not in multi modal format. Please check documentation' 'https://docarray.jina.ai/datatypes/multimodal/' ) with tempfile.TemporaryDirectory() as tmpdir: index_fields = [] if self.user_input: for index_field in self.user_input.index_fields: index_fields.append( self.user_input.field_names_to_dataclass_fields[index_field] if index_field in self.user_input.field_names_to_dataclass_fields else index_field ) if ( self.user_input and self.user_input.dataset_type == DatasetTypes.S3_BUCKET ): maybe_download_from_s3( docs=docs, tmpdir=tmpdir, user_input=self.user_input, max_workers=self.max_workers, ) docs = self.app.preprocess(docs) # As _maybe_download_from_s3 moves S3 URI to tags['uri'], need to move it back for post-processor & accurate # results. if ( self.user_input and self.user_input.dataset_type == DatasetTypes.S3_BUCKET ): for d in docs: for c in d.chunks: # TODO please fix this hack - uri should not be in tags move_uri(c) return docs