Source code for now.client

import requests
from docarray import dataclass
from docarray.typing import Text

from now.executor.gateway.bff.app.v1.models.search import SearchRequestModel
from now.executor.gateway.bff.app.v1.routers import helper


[docs]class Client: """ This is the jina NOW client which can be used to run requests against apps which are deployed using jcloud. """ def __init__(self, jcloud_id, app, api_key): self.jcloud_id = jcloud_id self.app = app self.api_key = api_key
[docs] def send_request_bff(self, endpoint: str, **kwargs): request_body = { "host": f'grpcs://nowapi-{self.jcloud_id}.wolf.jina.ai', "api_key": self.api_key, **kwargs, } response = requests.post( f'https://nowrun.jina.ai/api/v1/{endpoint}', json=request_body, ) return response
[docs] async def send_request(self, endpoint: str, **kwargs): """ Client to run requests against a deployed flow """ if endpoint != 'search': raise NotImplementedError('Only search endpoint is supported for now') @dataclass class DataClass: text_0: Text if 'text' in kwargs: query_doc = helper.field_dict_to_mm_doc( {'text': kwargs.pop('text')}, data_class=DataClass, modalities_dict={'text': Text}, field_names_to_dataclass_fields={'text': 'text_0'}, ) else: raise Exception('query doc is empty') app_request = SearchRequestModel( host=f'grpcs://nowapi-{self.jcloud_id}.wolf.jina.ai', api_key=self.api_key, **kwargs, ) response = await helper.jina_client_post( app_request, endpoint, docs=query_doc, parameters={ 'limit': app_request.limit, 'filter': app_request.filters, 'score_calculation': app_request.score_calculation, }, ) return response