Source code for now.run_backend

import random
import sys
import uuid
from copy import deepcopy
from typing import Dict, Optional

import requests
from docarray import DocumentArray
from jina.clients import Client

from now.admin.update_api_keys import update_api_keys
from now.app.base.app import JinaNOWApp
from now.constants import ACCESS_PATHS
from now.data_loading.data_loading import load_data
from now.deployment.flow import deploy_flow
from now.log import time_profiler
from now.now_dataclasses import UserInput
from now.utils.jcloud.helpers import get_flow_id


[docs]@time_profiler def run( app_instance: JinaNOWApp, user_input: UserInput, **kwargs, ): """ This function will run the backend of the app. Specifically, it will: - Load the data - Set up the flow dynamically and get the environment variables - Deploy the flow - Index the data :param app_instance: The app instance :param user_input: The user input :param kwargs: Additional arguments :return: """ print_callback = kwargs.get('print_callback', print) dataset = load_data(user_input, print_callback) print_callback('Data loaded. Deploying the flow...') # Set up the app specific flow app_instance.setup(user_input=user_input) client, gateway_host_http = deploy_flow(flow_yaml=app_instance.flow_yaml) # TODO at the moment the scheduler is not working. So we index the data right away # if ( # user_input.deployment_type == 'remote' # and user_input.dataset_type == DatasetTypes.S3_BUCKET # and 'NOW_CI_RUN' not in os.environ # ): # # schedule the trigger which will sync the bucket with the indexer once a day # trigger_scheduler(user_input, gateway_host_internal) # else: # index the data right away print_callback('Flow deployed. Indexing the data...') index_docs(user_input, dataset, client, print_callback, **kwargs) return gateway_host_http
[docs]def trigger_scheduler(user_input, host): """ This function will trigger the scheduler which will sync the bucket with the indexer once a day """ print('Triggering scheduler to index data from S3 bucket') # check if the api_key exists. If not then create a new one if user_input.secured and not user_input.api_key: user_input.api_key = uuid.uuid4().hex # Also call the bff to update the api key for i in range( 100 ): # increase the probability that all replicas get the new key update_api_keys(user_input.api_key, host) scheduler_params = { 'flow_id': get_flow_id(host), 'api_key': user_input.api_key, } cookies = {'st': user_input.jwt['token']} try: response = requests.post( 'https://storefrontapi.nowrun.jina.ai/api/v1/schedule_sync', json=scheduler_params, cookies=cookies, ) response.raise_for_status() print( 'Scheduler triggered successfully. Scheduler will sync data from S3 bucket once a day.' ) except Exception as e: print(f'Error while scheduling indexing: {e}') print(f'Indexing will not be scheduled. Please contact Jina AI support.')
[docs]def index_docs(user_input, dataset, client, print_callback, **kwargs): """ Index the data right away """ print_callback(f"▶ indexing {len(dataset)} documents") params = {'access_paths': ACCESS_PATHS} if user_input.secured: params['jwt'] = user_input.jwt call_flow( client=client, dataset=dataset, max_request_size=user_input.app_instance.max_request_size, parameters=deepcopy(params), return_results=False, **kwargs, ) print_callback('⭐ Success - your data is indexed')
[docs]@time_profiler def call_flow( client: Client, dataset: DocumentArray, max_request_size: int, endpoint: str = '/index', parameters: Optional[Dict] = None, return_results: Optional[bool] = False, **kwargs, ): request_size = estimate_request_size(dataset, max_request_size) response = client.post( on=endpoint, request_size=request_size, inputs=dataset, show_progress=True, parameters=parameters, continue_on_error=True, prefetch=100, on_done=kwargs.get('on_done', None), on_error=kwargs.get('on_error', None), on_always=kwargs.get('on_always', None), ) if return_results: return response
[docs]def estimate_request_size(index, max_request_size): if len(index) > 30: sample = random.sample(index, 30) else: sample = index size = sum([sys.getsizeof(x.content) for x in sample]) / 30 max_size = 50_000 request_size = max(min(max_request_size, int(max_size / size)), 1) return request_size