"""The authentication for the specified user failed."""
-class InvalidCACertificate(Exception):
- """The CA Certificate is not valid."""
-
-
class NotImplemented(Exception):
"""The method is not implemented."""
def __str__(self):
return '<{}> Invalid certificate: {}'.format(type(self), super().__str__())
-
self.actions = dict()
def register_machine(self, machine: Machine, db_dict: dict):
- entity_id = machine.entity_id
+ try:
+ entity_id = machine.entity_id
+ except:
+ # no entity_id aatribute, try machine attribute
+ entity_id = machine.machine
+ self.n2vc.debug(msg='Registering machine for changes notifications: {}'.format(entity_id))
entity = _Entity(entity_id=entity_id, entity_type='machine', obj=machine, db_dict=db_dict)
self.machines[entity_id] = entity
def register_application(self, application: Application, db_dict: dict):
entity_id = application.entity_id
+ self.n2vc.debug(msg='Registering application for changes notifications: {}'.format(entity_id))
entity = _Entity(entity_id=entity_id, entity_type='application', obj=application, db_dict=db_dict)
self.applications[entity_id] = entity
def register_action(self, action: Action, db_dict: dict):
entity_id = action.entity_id
+ self.n2vc.debug(msg='Registering action for changes notifications: {}'.format(entity_id))
entity = _Entity(entity_id=entity_id, entity_type='action', obj=action, db_dict=db_dict)
self.actions[entity_id] = entity
self,
k8s_creds: str,
namespace: str = 'kube-system',
- reuse_cluster_uuid = None
+ reuse_cluster_uuid=None
) -> (str, bool):
"""
It prepares a given K8s cluster environment to run Charts or juju Bundles on both sides:
@abc.abstractmethod
async def inspect_kdu(
self,
- kdu_model: str
+ kdu_model: str,
+ repo_url: str = None
) -> str:
"""
- These calls will retrieve from the Charm/Bundle:
+ These calls will retrieve from the Chart/Bundle:
- The list of configurable values and their defaults (e.g. in Charts, it would retrieve
the contents of `values.yaml`).
- If available, any embedded help file (e.g. `readme.md`) embedded in the Chart/Bundle.
- :param cluster_uuid: the cluster to get the information
:param kdu_model: chart/bundle reference
- :return: If successful, it will return a dictionary containing the list of available parameters
- and their default values
+ :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, even stable URL)
+ :return:
+
+ If successful, it will return the available parameters and their default values as provided by the backend.
"""
@abc.abstractmethod
async def help_kdu(
self,
- kdu_model: str
+ kdu_model: str,
+ repo_url: str = None
) -> str:
"""
- :param cluster_uuid: the cluster to get the information
:param kdu_model: chart/bundle reference
+ :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases, even stable URL)
:return: If successful, it will return the contents of the 'readme.md'
"""
self._helm_command = helm_command
self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
+ # initialize helm client-only
+ self.debug('Initializing helm client-only...')
+ command = '{} init --client-only'.format(self._helm_command)
+ try:
+ asyncio.ensure_future(self._local_async_exec(command=command, raise_exception_on_error=False))
+ # loop = asyncio.get_event_loop()
+ # loop.run_until_complete(self._local_async_exec(command=command, raise_exception_on_error=False))
+ except Exception as e:
+ self.warning(msg='helm init failed (it was already initialized): {}'.format(e))
+
self.info('K8S Helm connector initialized')
async def init_env(
kube_dir, helm_dir, config_filename, cluster_dir = \
self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
- command = '{} --kubeconfig={} --home={} repo list --output yaml'.format(self._helm_command, config_filename, helm_dir)
+ command = '{} --kubeconfig={} --home={} repo list --output yaml'\
+ .format(self._helm_command, config_filename, helm_dir)
output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
if output and len(output) > 0:
self.debug('installing {} in cluster {}'.format(kdu_model, cluster_uuid))
- start = time.time()
- end = start + timeout
-
# config filename
kube_dir, helm_dir, config_filename, cluster_dir = \
self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
if result is not None:
# instance already exists: generate a new one
kdu_instance = None
- except:
+ except Exception as e:
kdu_instance = None
# helm repo install
self.debug('upgrading {} in cluster {}'.format(kdu_model, cluster_uuid))
- start = time.time()
- end = start + timeout
-
# config filename
kube_dir, helm_dir, config_filename, cluster_dir = \
self._get_paths(cluster_name=cluster_uuid, create_if_not_exist=True)
)
# wait for execution task
- await asyncio.wait([ exec_task ])
+ await asyncio.wait([exec_task])
# cancel status task
status_task.cancel()
async def inspect_kdu(
self,
- kdu_model: str
+ kdu_model: str,
+ repo_url: str = None
) -> str:
- self.debug('inspect kdu_model {}'.format(kdu_model))
+ self.debug('inspect kdu_model {} from (optional) repo: {}'.format(kdu_model, repo_url))
+
+ return await self._exec_inspect_comand(inspect_command='', kdu_model=kdu_model, repo_url=repo_url)
- command = '{} inspect values {}'\
- .format(self._helm_command, kdu_model)
+ async def values_kdu(
+ self,
+ kdu_model: str,
+ repo_url: str = None
+ ) -> str:
- output, rc = await self._local_async_exec(command=command)
+ self.debug('inspect kdu_model values {} from (optional) repo: {}'.format(kdu_model, repo_url))
- return output
+ return await self._exec_inspect_comand(inspect_command='values', kdu_model=kdu_model, repo_url=repo_url)
async def help_kdu(
self,
- kdu_model: str
- ):
+ kdu_model: str,
+ repo_url: str = None
+ ) -> str:
- self.debug('help kdu_model {}'.format(kdu_model))
+ self.debug('inspect kdu_model {} readme.md from repo: {}'.format(kdu_model, repo_url))
- command = '{} inspect readme {}'\
- .format(self._helm_command, kdu_model)
-
- output, rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
-
- return output
+ return await self._exec_inspect_comand(inspect_command='readme', kdu_model=kdu_model, repo_url=repo_url)
async def status_kdu(
self,
cluster_uuid: str,
kdu_instance: str
- ):
-
- return await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, show_error_log=True)
+ ) -> str:
+ # call internal function
+ return await self._status_kdu(
+ cluster_uuid=cluster_uuid,
+ kdu_instance=kdu_instance,
+ show_error_log=True,
+ return_text=True
+ )
"""
##################################################################################################
##################################################################################################
"""
+ async def _exec_inspect_comand(
+ self,
+ inspect_command: str,
+ kdu_model: str,
+ repo_url: str = None
+ ):
+
+ repo_str = ''
+ if repo_url:
+ repo_str = ' --repo {}'.format(repo_url)
+ idx = kdu_model.find('/')
+ if idx >= 0:
+ idx += 1
+ kdu_model = kdu_model[idx:]
+
+ inspect_command = '{} inspect {} {}{}'.format(self._helm_command, inspect_command, kdu_model, repo_str)
+ output, rc = await self._local_async_exec(command=inspect_command, encode_utf8=True)
+
+ return output
+
async def _status_kdu(
self,
cluster_uuid: str,
kdu_instance: str,
- show_error_log: bool = False
+ show_error_log: bool = False,
+ return_text: bool = False
):
self.debug('status of kdu_instance {}'.format(kdu_instance))
show_error_log=show_error_log
)
+ if return_text:
+ return str(output)
+
if rc != 0:
return None
kdu_instance: str
) -> bool:
- status = await self.status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance)
+ status = await self._status_kdu(cluster_uuid=cluster_uuid, kdu_instance=kdu_instance, return_text=False)
# extract info.status.resources-> str
# format:
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
def _params_to_file_option(self, cluster_uuid: str, params: dict) -> (str, str):
- params_str = ''
if params and len(params) > 0:
kube_dir, helm_dir, config_filename, cluster_dir = \
for key in params:
value = params.get(key)
if '!!yaml' in str(value):
- value = yaml.safe_load(value[7:])
+ value = yaml.load(value[7:])
params2[key] = value
values_file = get_random_number() + '.yaml'
self,
command: str,
raise_exception_on_error: bool = False,
- show_error_log: bool = True
+ show_error_log: bool = True,
+ encode_utf8: bool = False
) -> (str, int):
command = K8sHelmConnector._remove_multiple_spaces(command)
output = ''
if stdout:
output = stdout.decode('utf-8').strip()
+ # output = stdout.decode()
if stderr:
output = stderr.decode('utf-8').strip()
+ # output = stderr.decode()
if return_code != 0 and show_error_log:
self.debug('Return code (FAIL): {}\nOutput:\n{}'.format(return_code, output))
if raise_exception_on_error and return_code != 0:
raise Exception(output)
+ if encode_utf8:
+ output = output.encode('utf-8').strip()
+ output = str(output).replace('\\n', '\n')
+
return output, return_code
except Exception as e:
if exception_if_not_exists:
self.error(msg)
raise Exception(msg)
-
class K8sJujuConnector(K8sConnector):
+
def __init__(
self,
- fs,
- kubectl_command='/usr/bin/kubectl',
- log=None
+ fs: object,
+ db: object,
+ kubectl_command: str = '/usr/bin/kubectl',
+ juju_command: str = '/usr/bin/juju',
+ log=None,
+ on_update_db=None,
):
"""
# parent class
K8sConnector.__init__(
self,
- kubectl_command=kubectl_command,
- fs=fs,
+ db,
log=log,
+ on_update_db=on_update_db,
)
+ self.fs = fs
self.info('Initializing K8S Juju connector')
self.authenticated = False
self.models = {}
self.log = logging.getLogger(__name__)
+
+ self.juju_command = juju_command
+ self.juju_secret = ""
+
self.info('K8S Juju connector initialized')
"""Initialization"""
async def init_env(
self,
- k8s_creds: dict,
+ k8s_creds: str,
namespace: str = 'kube-system',
reuse_cluster_uuid: str = None,
- ) -> str:
+ ) -> (str, bool):
"""Initialize a Kubernetes environment
:param k8s_creds dict: A dictionary containing the Kubernetes cluster
Bootstrapping cannot be done, by design, through the API. We need to
use the CLI tools.
"""
- # TODO: The path may change
- jujudir = "/snap/bin"
-
- self.k8scli = "{}/juju".format(jujudir)
"""
WIP: Workflow
# Does the kubeconfig contain microk8s?
microk8s = self.is_microk8s_by_credentials(k8s_creds)
- if not microk8s:
- # Name the new k8s cloud
- k8s_cloud = "{}-k8s".format(namespace)
+ # Name the new k8s cloud
+ k8s_cloud = "{}-k8s".format(namespace)
- await self.add_k8s(k8s_cloud, k8s_creds)
+ print("Adding k8s cloud {}".format(k8s_cloud))
+ await self.add_k8s(k8s_cloud, k8s_creds)
- # Bootstrap Juju controller
- self.bootstrap(k8s_cloud, cluster_uuid)
- else:
- # k8s_cloud = 'microk8s-test'
- k8s_cloud = "{}-k8s".format(namespace)
-
- await self.add_k8s(k8s_cloud, k8s_creds)
-
- await self.bootstrap(k8s_cloud, cluster_uuid)
+ # Bootstrap Juju controller
+ print("Bootstrapping...")
+ await self.bootstrap(k8s_cloud, cluster_uuid, microk8s)
+ print("Bootstrap done.")
# Get the controller information
# Parse ~/.local/share/juju/controllers.yaml
# controllers.testing.api-endpoints|ca-cert|uuid
+ print("Getting controller endpoints")
with open(os.path.expanduser(
"~/.local/share/juju/controllers.yaml"
)) as f:
# Parse ~/.local/share/juju/accounts
# controllers.testing.user|password
+ print("Getting accounts")
with open(os.path.expanduser(
"~/.local/share/juju/accounts.yaml"
)) as f:
# Store the cluster configuration so it
# can be used for subsequent calls
+ print("Setting config")
await self.set_config(cluster_uuid, config)
else:
# Login to the k8s cluster
if not self.authenticated:
- await self.login()
+ await self.login(cluster_uuid)
# We're creating a new cluster
- print("Getting model {}".format(self.get_namespace(cluster_uuid)))
- model = await self.get_model(self.get_namespace(cluster_uuid))
+ print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
+ model = await self.get_model(
+ self.get_namespace(cluster_uuid),
+ cluster_uuid=cluster_uuid
+ )
# Disconnect from the model
if model and model.is_connected():
await model.disconnect()
- return cluster_uuid
+ return cluster_uuid, True
"""Repo Management"""
async def repo_add(
"""Reset"""
async def reset(
- self,
- cluster_uuid: str,
+ self,
+ cluster_uuid: str,
+ force: bool = False,
+ uninstall_sw: bool = False
) -> bool:
"""Reset a cluster
try:
if not self.authenticated:
- await self.login()
+ await self.login(cluster_uuid)
if self.controller.is_connected():
# Destroy the model
print("Caught exception during reset: {}".format(ex))
"""Deployment"""
+
async def install(
self,
cluster_uuid: str,
kdu_model: str,
atomic: bool = True,
- timeout: int = None,
+ timeout: float = 300,
params: dict = None,
- ) -> str:
+ db_dict: dict = None
+ ) -> bool:
"""Install a bundle
:param cluster_uuid str: The UUID of the cluster to install to
if not self.authenticated:
print("[install] Logging in to the controller")
- await self.login()
+ await self.login(cluster_uuid)
##
# Get or create the model, based on the namespace the cluster was
# instantiated with.
namespace = self.get_namespace(cluster_uuid)
- model = await self.get_model(namespace)
+
+ self.log.debug("Checking for model named {}".format(namespace))
+ model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
if not model:
# Create the new model
- model = await self.add_model(namespace)
+ self.log.debug("Adding model: {}".format(namespace))
+ model = await self.add_model(namespace, cluster_uuid=cluster_uuid)
if model:
# TODO: Instantiation parameters
- print("[install] deploying {}".format(kdu_model))
- await model.deploy(kdu_model)
+ """
+ "Juju bundle that models the KDU, in any of the following ways:
+ - <juju-repo>/<juju-bundle>
+ - <juju-bundle folder under k8s_models folder in the package>
+ - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
+ - <URL_where_to_fetch_juju_bundle>
+ """
+
+ bundle = kdu_model
+ if kdu_model.startswith("cs:"):
+ bundle = kdu_model
+ elif kdu_model.startswith("http"):
+ # Download the file
+ pass
+ else:
+ # Local file
+
+ # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
+ # Uncompress temporarily
+ # bundle = <uncompressed file>
+ pass
+
+ if not bundle:
+ # Raise named exception that the bundle could not be found
+ raise Exception()
+
+ print("[install] deploying {}".format(bundle))
+ await model.deploy(bundle)
# Get the application
if atomic:
initial release.
"""
namespace = self.get_namespace(cluster_uuid)
- model = await self.get_model(namespace)
+ model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
with open(kdu_model, 'r') as f:
- bundle = yaml.load(f, Loader=yaml.FullLoader)
+ bundle = yaml.safe_load(f)
"""
{
removed = False
# Remove an application from the model
- model = await self.get_model(self.get_namespace(cluster_uuid))
+ model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
if model:
# Get the application
kdu = {}
with open(kdu_model, 'r') as f:
- bundle = yaml.load(f, Loader=yaml.FullLoader)
+ bundle = yaml.safe_load(f)
"""
{
"""
status = {}
- model = await self.get_model(self.get_namespace(cluster_uuid))
+ model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
# model = await self.get_model_by_uuid(cluster_uuid)
if model:
async def add_k8s(
self,
cloud_name: str,
- credentials: dict,
+ credentials: str,
) -> bool:
"""Add a k8s cloud to Juju
:returns: True if successful, otherwise raises an exception.
"""
- cmd = [self.k8scli, "add-k8s", "--local", cloud_name]
+ cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
+ print(cmd)
p = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- input=yaml.dump(credentials, Dumper=yaml.Dumper),
- encoding='ascii'
+ # input=yaml.dump(credentials, Dumper=yaml.Dumper).encode("utf-8"),
+ input=credentials.encode("utf-8"),
+ # encoding='ascii'
)
retcode = p.returncode
+ print("add-k8s return code: {}".format(retcode))
if retcode > 0:
raise Exception(p.stderr)
async def add_model(
self,
- model_name: str
+ model_name: str,
+ cluster_uuid: str,
) -> juju.model.Model:
"""Adds a model to the controller
raises an exception.
"""
if not self.authenticated:
- await self.login()
+ await self.login(cluster_uuid)
+ self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
model = await self.controller.add_model(
model_name,
config={'authorized-keys': self.juju_public_key}
async def bootstrap(
self,
cloud_name: str,
- cluster_uuid: str
+ cluster_uuid: str,
+ microk8s: bool
) -> bool:
"""Bootstrap a Kubernetes controller
:param cloud_name str: The name of the cloud.
:param cluster_uuid str: The UUID of the cluster to bootstrap.
+ :param microk8s bool: If this is a microk8s cluster.
:returns: True upon success or raises an exception.
"""
- cmd = [self.k8scli, "bootstrap", cloud_name, cluster_uuid]
+
+ if microk8s:
+ cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
+ else:
+ """
+ For non-microk8s clusters, specify that the controller service is using a LoadBalancer.
+ """
+ cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
+
print("Bootstrapping controller {} in cloud {}".format(
cluster_uuid, cloud_name
))
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- encoding='ascii'
+ # encoding='ascii'
)
retcode = p.returncode
if retcode > 0:
#
- if 'already exists' not in p.stderr:
+ if b'already exists' not in p.stderr:
raise Exception(p.stderr)
return True
:returns: True upon success or raises an exception.
"""
cmd = [
- self.k8scli,
+ self.juju_command,
"destroy-controller",
"--destroy-all-models",
"--destroy-storage",
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- encoding='ascii'
+ # encoding='ascii'
)
retcode = p.returncode
cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
if os.path.exists(cluster_config):
with open(cluster_config, 'r') as f:
- config = yaml.load(f.read(), Loader=yaml.FullLoader)
+ config = yaml.safe_load(f.read())
return config
else:
raise Exception(
async def get_model(
self,
model_name: str,
+ cluster_uuid: str,
) -> juju.model.Model:
"""Get a model from the Juju Controller.
:return The juju.model.Model object if found, or None.
"""
if not self.authenticated:
- await self.login()
+ await self.login(cluster_uuid)
model = None
models = await self.controller.list_models()
-
+ self.log.debug(models)
if model_name in models:
+ self.log.debug("Found model: {}".format(model_name))
model = await self.controller.get_model(
model_name
)
def is_microk8s_by_credentials(
self,
- credentials: dict,
+ credentials: str,
) -> bool:
"""Check if a cluster is micro8s
:param credentials dict: A dictionary containing the k8s credentials
:returns: A boolean if the cluster is running microk8s
"""
- for context in credentials['contexts']:
- if 'microk8s' in context['name']:
- return True
+ creds = yaml.safe_load(credentials)
+ if creds:
+ for context in creds['contexts']:
+ if 'microk8s' in context['name']:
+ return True
return False
- async def login(self):
+ async def login(self, cluster_uuid):
"""Login to the Juju controller."""
if self.authenticated:
self.connecting = True
+ # Test: Make sure we have the credentials loaded
+ config = self.get_config(cluster_uuid)
+
+ self.juju_endpoint = config['endpoint']
+ self.juju_user = config['username']
+ self.juju_secret = config['secret']
+ self.juju_ca_cert = config['cacert']
+ self.juju_public_key = None
+
self.controller = Controller()
if self.juju_secret:
"""
# Remove the bootstrapped controller
- cmd = [self.k8scli, "remove-k8s", "--client", cloud_name]
+ cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
p = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- encoding='ascii'
+ # encoding='ascii'
)
retcode = p.returncode
raise Exception(p.stderr)
# Remove the cloud from the local config
- cmd = [self.k8scli, "remove-cloud", "--client", cloud_name]
+ cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
p = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- encoding='ascii'
+ # encoding='ascii'
)
retcode = p.returncode
# datetime
dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
dt = dt + time_str
- dt = time_str # logger already shows datetime
+ # dt = time_str # logger already shows datetime
# current thread
if include_thread:
self.on_update_db = on_update_db
# generate private/public key-pair
+ self.private_key_path = None
+ self.public_key_path = None
self.get_public_key()
@abc.abstractmethod
"""
# TODO: review which public key
- async def get_public_key(self) -> str:
+ def get_public_key(self) -> str:
"""Get the VCA ssh-public-key
Returns the SSH public key from local mahine, to be injected into virtual machines to
from juju.application import Application
from juju.action import Action
from juju.machine import Machine
+from juju.client import client
+
+from n2vc.provisioner import SSHProvisioner
class N2VCJujuConnector(N2VCConnector):
url: str = '127.0.0.1:17070',
username: str = 'admin',
vca_config: dict = None,
- on_update_db=None,
- api_proxy=None
+ on_update_db=None
):
"""Initialize juju N2VC connector
"""
if self.ca_cert:
self.ca_cert = base64_to_cacert(vca_config['ca_cert'])
- if api_proxy:
- self.api_proxy = api_proxy
+ if 'api_proxy' in vca_config:
+ self.api_proxy = vca_config['api_proxy']
+ self.debug('api_proxy for native charms configured: {}'.format(self.api_proxy))
else:
self.warning('api_proxy is not configured. Support for native charms is disabled')
# register machine on juju
try:
- machine = await self._juju_provision_machine(
+ machine_id = await self._juju_provision_machine(
model_name=model_name,
hostname=hostname,
username=username,
except Exception as e:
self.error('Error registering machine: {}'.format(e))
raise N2VCException(message='Error registering machine on juju: {}'.format(e))
- self.info('Machine registered')
+
+ self.info('Machine registered: {}'.format(machine_id))
# id for the execution environment
ee_id = N2VCJujuConnector._build_ee_id(
model_name=model_name,
application_name=application_name,
- machine_id=str(machine.entity_id)
+ machine_id=str(machine_id)
)
self.info('Execution environment registered. ee_id: {}'.format(ee_id))
db_dict: dict = None,
progress_timeout: float = None,
total_timeout: float = None
- ) -> Machine:
+ ) -> str:
+
+ if not self.api_proxy:
+ msg = 'Cannot provision machine: api_proxy is not defined'
+ self.error(msg=msg)
+ raise N2VCException(message=msg)
- self.debug('provisioning machine. model: {}, hostname: {}'.format(model_name, hostname))
+ self.debug('provisioning machine. model: {}, hostname: {}, username: {}'.format(model_name, hostname, username))
if not self._authenticated:
await self._juju_login()
model = await self._juju_get_model(model_name=model_name)
observer = self.juju_observers[model_name]
- spec = 'ssh:{}@{}:{}'.format(username, hostname, private_key_path)
- self.debug('provisioning machine {}'.format(spec))
+ # TODO check if machine is already provisioned
+ machine_list = await model.get_machines()
+
+ provisioner = SSHProvisioner(
+ host=hostname,
+ user=username,
+ private_key_path=private_key_path,
+ log=self.log
+ )
+
+ params = None
try:
- machine = await model.add_machine(spec=spec)
- except Exception as e:
- import sys
- import traceback
- traceback.print_exc(file=sys.stdout)
- print('-' * 60)
- raise e
+ params = provisioner.provision_machine()
+ except Exception as ex:
+ msg = "Exception provisioning machine: {}".format(ex)
+ self.log.error(msg)
+ raise N2VCException(message=msg)
+
+ params.jobs = ['JobHostUnits']
+
+ connection = model.connection()
+
+ # Submit the request.
+ self.debug("Adding machine to model")
+ client_facade = client.ClientFacade.from_connection(connection)
+ results = await client_facade.AddMachines(params=[params])
+ error = results.machines[0].error
+ if error:
+ msg = "Error adding machine: {}}".format(error.message)
+ self.error(msg=msg)
+ raise ValueError(msg)
+
+ machine_id = results.machines[0].machine
+
+ # Need to run this after AddMachines has been called,
+ # as we need the machine_id
+ self.debug("Installing Juju agent into machine {}".format(machine_id))
+ asyncio.ensure_future(provisioner.install_agent(
+ connection=connection,
+ nonce=params.nonce,
+ machine_id=machine_id,
+ api=self.api_proxy,
+ ))
+
+ # wait for machine in model (now, machine is not yet in model, so we must wait for it)
+ machine = None
+ for i in range(10):
+ machine_list = await model.get_machines()
+ if machine_id in machine_list:
+ self.debug('Machine {} found in model!'.format(machine_id))
+ machine = model.machines.get(machine_id)
+ break
+ await asyncio.sleep(2)
+
+ if machine is None:
+ msg = 'Machine {} not found in model'.format(machine_id)
+ self.error(msg=msg)
+ raise Exception(msg)
# register machine with observer
observer.register_machine(machine=machine, db_dict=db_dict)
# wait for machine creation
- self.debug('waiting for provision completed... {}'.format(machine.entity_id))
+ self.debug('waiting for provision finishes... {}'.format(machine_id))
await observer.wait_for_machine(
- machine=machine,
+ machine_id=machine_id,
progress_timeout=progress_timeout,
total_timeout=total_timeout
)
- self.debug("Machine provisioned {}".format(machine.entity_id))
- return machine
+ self.debug("Machine provisioned {}".format(machine_id))
+
+ return machine_id
async def _juju_deploy_charm(
self,
self.debug('deploying application {} to machine {}, model {}'
.format(application_name, machine_id, model_name))
self.debug('charm: {}'.format(charm_path))
+ series = 'xenial'
+ # series = None
application = await model.deploy(
entity_url=charm_path,
application_name=application_name,
channel='stable',
num_units=1,
- series='xenial',
+ series=series,
to=machine_id
)
fs.fs_connect(storage)
client = n2vc.k8s_juju_conn.K8sJujuConnector(
- kubectl_command = '/bin/true',
- fs = fs,
+ kubectl_command='/snap/bin/kubectl',
+ juju_command='/snap/bin/juju',
+ fs=fs,
+ db=None,
)
# kubectl config view --raw
# microk8s.config
# if microk8s then
- kubecfg = subprocess.getoutput('microk8s.config')
+ # kubecfg = subprocess.getoutput('microk8s.config')
# else
- # kubecfg.subprocess.getoutput('kubectl config view --raw')
-
- k8screds = yaml.load(kubecfg, Loader=yaml.FullLoader)
+ kubecfg = subprocess.getoutput('kubectl config view --raw')
+ # print(kubecfg)
+
+ # k8screds = yaml.load(kubecfg, Loader=yaml.FullLoader)
namespace = 'testing'
kdu_model = "./tests/bundles/k8s-zookeeper.yaml"
"""init_env"""
- cluster_uuid = await client.init_env(k8screds, namespace, reuse_cluster_uuid=reuse_cluster_uuid)
+ cluster_uuid, _ = await client.init_env(kubecfg, namespace, reuse_cluster_uuid=reuse_cluster_uuid)
print(cluster_uuid)
if not reuse_cluster_uuid: