Source code for now.common.options

"""
This module implements a command-line dialog with the user.
Its goal is to configure a UserInput object with users specifications.
Optionally, values can be passed from the command-line when jina-now is launched. In that case,
the dialog won't ask for the value.
"""
from __future__ import annotations, print_function, unicode_literals

import importlib
import os
import uuid

from hubble import AuthenticationRequiredError

from now.common.detect_schema import (
    set_field_names_elasticsearch,
    set_field_names_from_docarray,
    set_field_names_from_local_folder,
    set_field_names_from_s3_bucket,
)
from now.constants import MODALITY_TO_MODELS, DatasetTypes
from now.deployment.deployment import cmd
from now.log import yaspin_extended
from now.now_dataclasses import DialogOptions, UserInput
from now.utils.authentication.helpers import (
    get_aws_profile,
    get_info_hubble,
    jina_auth_login,
)
from now.utils.common.helpers import hide_string_chars, sigmap, to_camel_case
from now.utils.errors.helpers import DemoAvailableException, RetryException

AVAILABLE_SOON = 'will be available in upcoming versions'
aws_profile = get_aws_profile()

# Make sure you add this dialog option to your app in order of dependency, i.e., if some dialog option depends on other
# than the parent should be called first before the dependant can called.


def _check_index_field(user_input: UserInput, **kwargs):
    if not user_input.index_fields:
        raise RetryException('Please select at least one index field')

    if '__all__' in user_input.index_fields:
        user_input.index_fields = list(
            user_input.index_field_candidates_to_modalities.keys()
        )
    elif any(
        idx_field
        for idx_field in user_input.index_fields
        if idx_field not in list(user_input.index_field_candidates_to_modalities.keys())
    ):
        raise ValueError(
            f'Index field specified {user_input.index_fields} is not among the index candidate fields. Please '
            f'choose one of the following: {list(user_input.index_field_candidates_to_modalities.keys())}'
        )


def _fill_filter_field_if_selected_all(user_input: UserInput, **kwargs):
    if '__all__' in user_input.filter_fields:
        user_input.filter_fields = list(
            user_input.filter_field_candidates_to_modalities.keys()
        )


def _append_all_option_to_filter(user_input: UserInput):
    choices = [
        {'name': field, 'value': field}
        for field in user_input.filter_field_candidates_to_modalities.keys()
        if field not in user_input.index_fields
    ]
    if len(choices) > 1:
        choices.append({'name': 'All of the above', 'value': '__all__'})
    return choices


APP_NAME = DialogOptions(
    name='flow_name',
    prompt_message='Choose a name for your application:',
    prompt_type='input',
    is_terminal_command=True,
    post_func=lambda user_input, **kwargs: clean_flow_name(user_input),
)


[docs]def clean_flow_name(user_input: UserInput): """ Clean the flow name to make it valid, removing special characters and spaces. """ user_input.flow_name = ''.join( [c for c in user_input.flow_name or '' if c.isalnum() or c == '-'] ).lower()
DATASET_TYPE = DialogOptions( name='dataset_type', prompt_message='How do you want to provide input? (format: https://docarray.jina.ai/)', choices=[ {'name': 'Demo dataset', 'value': DatasetTypes.DEMO}, { 'name': 'DocumentArray name (recommended)', 'value': DatasetTypes.DOCARRAY, }, { 'name': 'Local folder', 'value': DatasetTypes.PATH, }, { 'name': 'S3 bucket', 'value': DatasetTypes.S3_BUCKET, }, { 'name': 'Elasticsearch', 'value': DatasetTypes.ELASTICSEARCH, }, ], prompt_type='list', is_terminal_command=True, post_func=lambda user_input, **kwargs: check_login_dataset(user_input), )
[docs]def check_login_dataset(user_input: UserInput): if user_input.dataset_type == DatasetTypes.DOCARRAY and user_input.jwt is None: _jina_auth_login(user_input)
def _get_demo_data_choices(user_input: UserInput, **kwargs): all_demo_datasets = [] for demo_datasets in user_input.app_instance.demo_datasets.values(): all_demo_datasets.extend(demo_datasets) return [ {'name': demo_data.display_name, 'value': demo_data.name} for demo_data in all_demo_datasets ] def _check_if_demo_available(user_input: UserInput, **kwargs): # If the demo is available then do not continue with the dialog and break if user_input.app_instance.is_demo_available(user_input): raise DemoAvailableException( 'Demo is available. Suspending the remaining dialog' ) # We need login only if the deployment is needed else # for hosted example user should have access without login if user_input.jwt is None: _jina_auth_login(user_input) set_field_names_from_docarray(user_input) DEMO_DATA = DialogOptions( name='dataset_name', prompt_message='What demo dataset do you want to use?', choices=lambda user_input, **kwargs: _get_demo_data_choices(user_input), prompt_type='list', depends_on=DATASET_TYPE, is_terminal_command=True, description='Select one of the available demo datasets', conditional_check=lambda user_input, **kwargs: user_input.dataset_type == DatasetTypes.DEMO, post_func=lambda user_input, **kwargs: _check_if_demo_available(user_input), ) DOCARRAY_NAME = DialogOptions( name='dataset_name', prompt_message='Please enter your DocArray name:', prompt_type='input', depends_on=DATASET_TYPE, conditional_check=lambda user_input: user_input.dataset_type == DatasetTypes.DOCARRAY, post_func=lambda user_input, **kwargs: set_field_names_from_docarray(user_input), ) DATASET_PATH = DialogOptions( name='dataset_path', prompt_message='Please enter the path to the local folder:', prompt_type='input', depends_on=DATASET_TYPE, is_terminal_command=True, conditional_check=lambda user_input: user_input.dataset_type == DatasetTypes.PATH, post_func=lambda user_input, **kwargs: set_field_names_from_local_folder( user_input ), ) def _fix_s3_uri(user_input: UserInput): if not user_input.dataset_path.endswith('/'): user_input.dataset_path += '/' DATASET_PATH_S3 = DialogOptions( name='dataset_path', prompt_message='Please enter the S3 URI to the folder:', prompt_type='input', depends_on=DATASET_TYPE, conditional_check=lambda user_input: user_input.dataset_type == DatasetTypes.S3_BUCKET, post_func=lambda user_input, **kwargs: _fix_s3_uri(user_input), ) AWS_ACCESS_KEY_ID = DialogOptions( name='aws_access_key_id', prompt_message=f'Please enter the AWS access key ID: [{hide_string_chars(aws_profile.aws_access_key_id)}]', default=f'{aws_profile.aws_access_key_id}', prompt_type='input', depends_on=DATASET_TYPE, conditional_check=lambda user_input: user_input.dataset_type == DatasetTypes.S3_BUCKET, is_terminal_command=True, ) AWS_SECRET_ACCESS_KEY = DialogOptions( name='aws_secret_access_key', prompt_message=f'Please enter the AWS secret access key: [{hide_string_chars(aws_profile.aws_secret_access_key)}]', default=f'{aws_profile.aws_secret_access_key}', prompt_type='input', depends_on=DATASET_TYPE, conditional_check=lambda user_input: user_input.dataset_type == DatasetTypes.S3_BUCKET, is_terminal_command=True, ) AWS_REGION_NAME = DialogOptions( name='aws_region_name', prompt_message=f'Please enter the AWS region: [{aws_profile.region}]', default=f'{aws_profile.region}', prompt_type='input', depends_on=DATASET_TYPE, conditional_check=lambda user_input: user_input.dataset_type == DatasetTypes.S3_BUCKET, post_func=lambda user_input, **kwargs: set_field_names_from_s3_bucket(user_input), is_terminal_command=True, ) # --------------------------------------------- # INDEX_FIELDS = DialogOptions( name='index_fields', choices=lambda user_input, **kwargs: [ {'name': field, 'value': field} for field in user_input.index_field_candidates_to_modalities.keys() ] + ( [{'name': 'All of the above', 'value': '__all__'}] if len(user_input.index_field_candidates_to_modalities) > 1 else [] ), prompt_message='Please select the index fields:', prompt_type='checkbox', is_terminal_command=True, post_func=_check_index_field, argparse_kwargs={ 'type': lambda fields: fields.split(',') if fields else UserInput().index_fields }, ) FILTER_FIELDS = DialogOptions( name='filter_fields', choices=lambda user_input, **kwargs: _append_all_option_to_filter(user_input), prompt_message='Please select the filter fields', prompt_type='checkbox', depends_on=DATASET_TYPE, conditional_check=lambda user_input: user_input.filter_field_candidates_to_modalities is not None and len( set(user_input.filter_field_candidates_to_modalities.keys()) - set(user_input.index_fields) ) > 0, post_func=_fill_filter_field_if_selected_all, )
[docs]def update_model_choice(user_input: UserInput, option_name, **kwargs): if not kwargs.get(option_name): raise RetryException('Please select at least one model') user_input.model_choices[option_name] = kwargs.get(option_name)
[docs]def get_models_dialog(user_input: UserInput): models_dialog = [] for index_field in user_input.index_fields: option_name = f'{index_field}_model' models_dialog.append( DialogOptions( name=option_name, prompt_message=f'Please select the model for {index_field}:', prompt_type='checkbox', choices=MODALITY_TO_MODELS[ user_input.index_field_candidates_to_modalities[index_field] ], post_func=lambda user_inp, opt_name=option_name, **kwargs: update_model_choice( user_inp, opt_name, **kwargs ), ) ) return models_dialog
MODEL_SELECTION = DialogOptions( name='model_selection', prompt_message='This is a dynamic dialog which will spawn multiple dialog options', prompt_type='list', choices=[], depends_on=INDEX_FIELDS, conditional_check=lambda user_input: user_input.index_fields, is_terminal_command=True, dynamic_func=get_models_dialog, ) ES_INDEX_NAME = DialogOptions( name='es_index_name', prompt_message='Please enter the name of your Elasticsearch index:', prompt_type='input', depends_on=DATASET_TYPE, conditional_check=lambda user_input: user_input.dataset_type == DatasetTypes.ELASTICSEARCH, ) ES_HOST_NAME = DialogOptions( name='es_host_name', prompt_message='Please enter the address of your Elasticsearch node:', prompt_type='input', depends_on=DATASET_TYPE, conditional_check=lambda user_input: user_input.dataset_type == DatasetTypes.ELASTICSEARCH, ) ES_ADDITIONAL_ARGS = DialogOptions( name='es_additional_args', prompt_message='Please enter additional arguments for your Elasticsearch node if there are any:', prompt_type='input', depends_on=DATASET_TYPE, conditional_check=lambda user_input: user_input.dataset_type == DatasetTypes.ELASTICSEARCH, post_func=lambda user_input, **kwargs: set_field_names_elasticsearch(user_input), ) # --------------------------------------------- # SECURED = DialogOptions( name='secured', prompt_message='Do you want to secure the Flow?', prompt_type='list', choices=[ {'name': '⛔ no', 'value': False}, {'name': '✅ yes', 'value': True}, ], is_terminal_command=True, post_func=lambda user_input, **kwargs: check_login_deployment(user_input), )
[docs]def check_login_deployment(user_input: UserInput): if user_input.jwt is None: _jina_auth_login(user_input)
API_KEY = DialogOptions( name='api_key', prompt_message='Do you want to generate an API key to access this deployment?', prompt_type='list', choices=[ {'name': '✅ yes', 'value': uuid.uuid4().hex}, {'name': '⛔ no', 'value': False}, ], depends_on=SECURED, is_terminal_command=True, description='Pass an API key to access the Flow once the deployment is complete. ', conditional_check=lambda user_inp: str(user_inp.secured).lower() == 'true', post_func=lambda user_input, **kwargs: _set_value_to_none(user_input), ) ADDITIONAL_USERS = DialogOptions( name='additional_user', prompt_message='Do you want to provide additional users access to this flow?', prompt_type='list', choices=[ {'name': '✅ yes', 'value': True}, {'name': '⛔ no', 'value': False}, ], depends_on=SECURED, conditional_check=lambda user_inp: str(user_inp.secured).lower() == 'true', ) USER_EMAILS = DialogOptions( name='user_emails', prompt_message='Please enter email addresses (separated by commas) ' 'to grant access to this Flow.\nAdditionally, you can specify comma-separated domain names' ' such that all users from that domain can access this Flow, e.g. `jina.ai`\n', prompt_type='input', depends_on=ADDITIONAL_USERS, conditional_check=lambda user_inp: user_inp.additional_user, post_func=lambda user_input, **kwargs: _add_additional_users(user_input, **kwargs), ) def _set_value_to_none(user_input: UserInput): if not user_input.api_key: user_input.api_key = None def _add_additional_users(user_input: UserInput, **kwargs): if not kwargs.get('user_emails', None): raise RetryException('Please provide at least one email address') user_input.user_emails = ( [email.strip() for email in kwargs['user_emails'].split(',')] if kwargs['user_emails'] else [] )
[docs]def construct_app(app_name: str): return getattr( importlib.import_module(f'now.app.{app_name}.app'), f'{to_camel_case(app_name)}', )()
def _jina_auth_login(user_input: UserInput, **kwargs): try: jina_auth_login() except AuthenticationRequiredError: with yaspin_extended( sigmap=sigmap, text='Log in to Jina AI Cloud', color='green' ) as spinner: cmd('jina auth login') spinner.ok('🛠️') get_info_hubble(user_input) os.environ['JCLOUD_NO_SURVEY'] = '1' app_config = [APP_NAME] data_type = [DATASET_TYPE] data_fields = [INDEX_FIELDS, FILTER_FIELDS] get_models = [MODEL_SELECTION] data_demo = [DEMO_DATA] data_da = [DOCARRAY_NAME, DATASET_PATH] data_s3 = [DATASET_PATH_S3, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION_NAME] data_es = [ ES_HOST_NAME, ES_INDEX_NAME, ES_ADDITIONAL_ARGS, ] remote_cluster = [SECURED, API_KEY, ADDITIONAL_USERS, USER_EMAILS] base_options = ( data_type + data_demo + data_da + data_s3 + data_es + data_fields + get_models + app_config + remote_cluster )