import requests
from docarray import dataclass
from docarray.typing import Text

from import SearchRequestModel
from import helper

[docs]class Client: """ This is the jina NOW client which can be used to run requests against apps which are deployed using jcloud. """ def __init__(self, jcloud_id, app, api_key): self.jcloud_id = jcloud_id = app self.api_key = api_key
[docs] def send_request_bff(self, endpoint: str, **kwargs): request_body = { "host": f'grpcs://nowapi-{self.jcloud_id}', "api_key": self.api_key, **kwargs, } response = f'{endpoint}', json=request_body, ) return response
[docs] async def send_request(self, endpoint: str, **kwargs): """ Client to run requests against a deployed flow """ if endpoint != 'search': raise NotImplementedError('Only search endpoint is supported for now') @dataclass class DataClass: text_0: Text if 'text' in kwargs: query_doc = helper.field_dict_to_mm_doc( {'text': kwargs.pop('text')}, data_class=DataClass, modalities_dict={'text': Text}, field_names_to_dataclass_fields={'text': 'text_0'}, ) else: raise Exception('query doc is empty') app_request = SearchRequestModel( host=f'grpcs://nowapi-{self.jcloud_id}', api_key=self.api_key, **kwargs, ) response = await helper.jina_client_post( app_request, endpoint, docs=query_doc, parameters={ 'limit': app_request.limit, 'filter': app_request.filters, 'score_calculation': app_request.score_calculation, }, ) return response