Source code for now.deployment.deployment
import asyncio
import subprocess
from jcloud.flow import CloudFlow
from now.log.log import logger
[docs]def deploy_wolf(path: str):
# print file content from the path
with open(path) as f:
logger.debug(f'deploy yaml on wolf:\n{f.read()}')
return CloudFlow(path=path).__enter__()
[docs]def status_wolf(flow_id):
loop = get_or_create_eventloop()
return loop.run_until_complete(CloudFlow(flow_id=flow_id).status)
[docs]def get_or_create_eventloop():
try:
return asyncio.get_event_loop()
except RuntimeError as ex:
if "There is no current event loop in thread" in str(ex):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return asyncio.get_event_loop()
[docs]def list_all_wolf(status='Serving', namespace='nowapi'):
flows = []
loop = get_or_create_eventloop()
jflows = loop.run_until_complete(CloudFlow().list_all(phase=status))['flows']
# Transform the JCloud flow response to a much simpler list of dicts
for flow in jflows:
executor_name = list(flow['status']['endpoints'].keys())[0]
flows.append(
{'id': flow['id'], 'name': flow['status']['endpoints'][executor_name]}
)
# filter by namespace - if the namespace is contained in the flow name
if namespace:
return [f for f in flows if namespace in f['id']]
return flows
[docs]def cmd(command, std_output=False, wait=True):
if isinstance(command, str):
command = command.split()
if not std_output:
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
else:
process = subprocess.Popen(command)
if wait:
output, error = process.communicate()
return output, error