Source code for now.app.search_app.app

import os
from typing import Dict, List, Tuple, TypeVar

from jina import Client

from now.app.base.app import JinaNOWApp
from now.constants import (
    ACCESS_PATHS,
    DEMO_NS,
    EXTERNAL_CLIP_HOST,
    EXTERNAL_SBERT_HOST,
    NOW_AUTOCOMPLETE_VERSION,
    NOW_ELASTIC_INDEXER_VERSION,
    NOW_PREPROCESSOR_VERSION,
    Apps,
    DatasetTypes,
    Models,
)
from now.demo_data import AVAILABLE_DATASETS, DemoDataset, DemoDatasetNames
from now.executor.name_to_id_map import name_to_id_map
from now.now_dataclasses import UserInput

JINA_LOG_LEVEL = os.environ.get("JINA_LOG_LEVEL", "DEBUG")
AUTOCOMPLETE_LOG_LEVEL = os.environ.get("AUTOCOMPLETE_LOG_LEVEL", JINA_LOG_LEVEL)
PREPROCESSOR_LOG_LEVEL = os.environ.get("PREPROCESSOR_LOG_LEVEL", JINA_LOG_LEVEL)
INDEXER_LOG_LEVEL = os.environ.get("INDEXER_LOG_LEVEL", JINA_LOG_LEVEL)


[docs]class SearchApp(JinaNOWApp): def __init__(self): super().__init__() @property def app_name(self) -> str: return Apps.SEARCH_APP @property def is_enabled(self) -> bool: return True @property def description(self) -> str: return 'Search app' @property def required_docker_memory_in_gb(self) -> int: return 8 @property def demo_datasets(self) -> Dict[TypeVar, List[DemoDataset]]: return AVAILABLE_DATASETS @property def finetune_datasets(self) -> [Tuple]: return DemoDatasetNames.DEEP_FASHION, DemoDatasetNames.BIRD_SPECIES
[docs] def is_demo_available(self, user_input) -> bool: if ( user_input.dataset_type == DatasetTypes.DEMO and 'NOW_EXAMPLES' not in os.environ and 'NOW_CI_RUN' not in os.environ ): client = Client( host=f'https://{DEMO_NS.format(user_input.dataset_name.split("/")[-1])}.dev.jina.ai' ) try: client.post('/dry_run') except Exception as e: # noqa E722 pass return True return False
[docs] @staticmethod def autocomplete_stub(testing=False) -> Dict: return { 'name': 'autocomplete_executor', 'uses': f'jinahub+docker://{name_to_id_map.get("NOWAutoCompleteExecutor2")}/{NOW_AUTOCOMPLETE_VERSION}' if not testing else 'NOWAutoCompleteExecutor2', 'needs': 'gateway', 'jcloud': { 'autoscale': { 'min': 0, 'max': 1, 'metric': 'concurrency', 'target': 1, }, 'resources': { 'instance': 'C1', 'capacity': 'spot', 'storage': {'kind': 'efs', 'size': '1M'}, }, }, 'env': {'JINA_LOG_LEVEL': AUTOCOMPLETE_LOG_LEVEL}, }
[docs] @staticmethod def preprocessor_stub(testing=False) -> Dict: return { 'name': 'preprocessor', 'needs': 'autocomplete_executor', 'uses': f'jinahub+docker://{name_to_id_map.get("NOWPreprocessor")}/{NOW_PREPROCESSOR_VERSION}' if not testing else 'NOWPreprocessor', 'jcloud': { 'autoscale': { 'min': 0, 'max': 100, 'metric': 'concurrency', 'target': 1, }, 'resources': {'instance': 'C4', 'capacity': 'spot'}, }, 'env': {'JINA_LOG_LEVEL': PREPROCESSOR_LOG_LEVEL}, }
[docs] @staticmethod def clip_encoder_stub() -> Tuple[Dict, int]: return { 'name': Models.CLIP_MODEL, 'uses': f'jinahub+docker://CLIPOnnxEncoder/0.8.1-gpu', 'host': EXTERNAL_CLIP_HOST, 'port': 443, 'tls': True, 'external': True, 'uses_with': {'access_paths': ACCESS_PATHS, 'name': 'ViT-B-32::openai'}, 'env': {'JINA_LOG_LEVEL': 'DEBUG'}, 'needs': 'preprocessor', }, 512
[docs] @staticmethod def sbert_encoder_stub() -> Tuple[Dict, int]: return { 'name': Models.SBERT_MODEL, 'uses': f'jinahub+docker://TransformerSentenceEncoder', 'uses_with': { 'access_paths': ACCESS_PATHS, 'model_name': 'msmarco-distilbert-base-v3', }, 'host': EXTERNAL_SBERT_HOST, 'port': 443, 'tls': True, 'external': True, 'env': {'JINA_LOG_LEVEL': 'DEBUG'}, 'needs': 'preprocessor', }, 768
[docs] @staticmethod def indexer_stub( user_input: UserInput, encoder2dim: Dict[str, int], testing=False, ) -> Dict: """Creates indexer stub. :param user_input: user input :param encoder2dim: maps encoder name to its output dimension :param testing: use local executors if True """ document_mappings_list = [] for encoder, dim in encoder2dim.items(): document_mappings_list.append( [ encoder, dim, [ user_input.field_names_to_dataclass_fields[ index_field.replace('_model', '') ] for index_field, encoders in user_input.model_choices.items() if encoder in encoders ], ] ) provision_index = 'yes' if not testing else 'no' provision_shards = os.getenv('PROVISION_SHARDS', '1') provision_replicas = os.getenv('PROVISION_REPLICAS', '0') return { 'name': 'indexer', 'needs': list(encoder2dim.keys()), 'uses': f'jinahub+docker://{name_to_id_map.get("NOWElasticIndexer")}/{NOW_ELASTIC_INDEXER_VERSION}' if not testing else 'NOWElasticIndexer', 'uses_with': { 'document_mappings': document_mappings_list, }, 'no_reduce': True, 'jcloud': { 'labels': { 'app': 'indexer', 'provision-index': provision_index, 'provision-shards': provision_shards, 'provision-replicas': provision_replicas, }, 'resources': {'instance': 'C2', 'capacity': 'spot'}, }, 'env': {'JINA_LOG_LEVEL': INDEXER_LOG_LEVEL}, }
[docs] def get_executor_stubs( self, user_input: UserInput, testing=False, **kwargs ) -> List[Dict]: """Returns a dictionary of executors to be added in the flow :param user_input: user input :param testing: use local executors if True :return: executors stubs with filled-in env vars """ flow_yaml_executors = [ self.autocomplete_stub(testing), self.preprocessor_stub(testing), ] encoder2dim = {} def add_encoders_to_flow(models): for model, encoder_stub in models: if any( model in user_input.model_choices[f"{field}_model"] for field in user_input.index_fields ): encoder, dim = encoder_stub() encoder2dim[encoder['name']] = dim flow_yaml_executors.append(encoder) add_encoders_to_flow( [ (Models.SBERT_MODEL, self.sbert_encoder_stub), (Models.CLIP_MODEL, self.clip_encoder_stub), ] ) flow_yaml_executors.append( self.indexer_stub( user_input, encoder2dim=encoder2dim, testing=testing, ) ) return flow_yaml_executors
@property def max_request_size(self) -> int: """Max number of documents in one request""" return 10