Change add_relation function in libjuju.py to accept saas
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 import os
18 import uuid
19 import yaml
20
21 from juju.controller import Controller
22 from juju.model import Model
23 from n2vc.exceptions import K8sException
24 from n2vc.k8s_conn import K8sConnector
25 from n2vc.kubectl import Kubectl
26 from .exceptions import MethodNotImplemented, N2VCNotFound
27
28
29 # from juju.bundle import BundleHandler
30 # import re
31 # import ssl
32 # from .vnf import N2VC
33 class K8sJujuConnector(K8sConnector):
34 def __init__(
35 self,
36 fs: object,
37 db: object,
38 kubectl_command: str = "/usr/bin/kubectl",
39 juju_command: str = "/usr/bin/juju",
40 log: object = None,
41 on_update_db=None,
42 ):
43 """
44
45 :param kubectl_command: path to kubectl executable
46 :param helm_command: path to helm executable
47 :param fs: file system for kubernetes and helm configuration
48 :param log: logger
49 """
50
51 # parent class
52 K8sConnector.__init__(
53 self, db, log=log, on_update_db=on_update_db,
54 )
55
56 self.fs = fs
57 self.log.debug("Initializing K8S Juju connector")
58
59 self.juju_command = juju_command
60 self.juju_public_key = None
61
62 self.log.debug("K8S Juju connector initialized")
63 # TODO: Remove these commented lines:
64 # self.authenticated = False
65 # self.models = {}
66 # self.juju_secret = ""
67
68 """Initialization"""
69
70 async def init_env(
71 self,
72 k8s_creds: str,
73 namespace: str = "kube-system",
74 reuse_cluster_uuid: str = None,
75 ) -> (str, bool):
76 """
77 It prepares a given K8s cluster environment to run Juju bundles.
78
79 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
80 '.kube/config'
81 :param namespace: optional namespace to be used for juju. By default,
82 'kube-system' will be used
83 :param reuse_cluster_uuid: existing cluster uuid for reuse
84 :return: uuid of the K8s cluster and True if connector has installed some
85 software in the cluster
86 (on error, an exception will be raised)
87 """
88
89 """Bootstrapping
90
91 Bootstrapping cannot be done, by design, through the API. We need to
92 use the CLI tools.
93 """
94
95 """
96 WIP: Workflow
97
98 1. Has the environment already been bootstrapped?
99 - Check the database to see if we have a record for this env
100
101 2. If this is a new env, create it
102 - Add the k8s cloud to Juju
103 - Bootstrap
104 - Record it in the database
105
106 3. Connect to the Juju controller for this cloud
107
108 """
109 # cluster_uuid = reuse_cluster_uuid
110 # if not cluster_uuid:
111 # cluster_uuid = str(uuid4())
112
113 ##################################################
114 # TODO: Pull info from db based on the namespace #
115 ##################################################
116
117 ###################################################
118 # TODO: Make it idempotent, calling add-k8s and #
119 # bootstrap whenever reuse_cluster_uuid is passed #
120 # as parameter #
121 # `init_env` is called to initialize the K8s #
122 # cluster for juju. If this initialization fails, #
123 # it can be called again by LCM with the param #
124 # reuse_cluster_uuid, e.g. to try to fix it. #
125 ###################################################
126
127 # This is a new cluster, so bootstrap it
128
129 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
130
131 # Is a local k8s cluster?
132 localk8s = self.is_local_k8s(k8s_creds)
133
134 # If the k8s is external, the juju controller needs a loadbalancer
135 loadbalancer = False if localk8s else True
136
137 # Name the new k8s cloud
138 k8s_cloud = "k8s-{}".format(cluster_uuid)
139
140 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
141 await self.add_k8s(k8s_cloud, k8s_creds)
142
143 # Bootstrap Juju controller
144 self.log.debug("Bootstrapping...")
145 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
146 self.log.debug("Bootstrap done.")
147
148 # Get the controller information
149
150 # Parse ~/.local/share/juju/controllers.yaml
151 # controllers.testing.api-endpoints|ca-cert|uuid
152 self.log.debug("Getting controller endpoints")
153 with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
154 controllers = yaml.load(f, Loader=yaml.Loader)
155 controller = controllers["controllers"][cluster_uuid]
156 endpoints = controller["api-endpoints"]
157 juju_endpoint = endpoints[0]
158 juju_ca_cert = controller["ca-cert"]
159
160 # Parse ~/.local/share/juju/accounts
161 # controllers.testing.user|password
162 self.log.debug("Getting accounts")
163 with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
164 controllers = yaml.load(f, Loader=yaml.Loader)
165 controller = controllers["controllers"][cluster_uuid]
166
167 juju_user = controller["user"]
168 juju_secret = controller["password"]
169
170 config = {
171 "endpoint": juju_endpoint,
172 "username": juju_user,
173 "secret": juju_secret,
174 "cacert": juju_ca_cert,
175 "loadbalancer": loadbalancer,
176 }
177
178 # Store the cluster configuration so it
179 # can be used for subsequent calls
180 self.log.debug("Setting config")
181 await self.set_config(cluster_uuid, config)
182
183 # Test connection
184 controller = await self.get_controller(cluster_uuid)
185 await controller.disconnect()
186
187 # TODO: Remove these commented lines
188 # raise Exception("EOL")
189 # self.juju_public_key = None
190 # Login to the k8s cluster
191 # if not self.authenticated:
192 # await self.login(cluster_uuid)
193
194 # We're creating a new cluster
195 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
196 # cluster_uuid=cluster_uuid))
197 # model = await self.get_model(
198 # self.get_namespace(cluster_uuid),
199 # cluster_uuid=cluster_uuid
200 # )
201
202 # Disconnect from the model
203 # if model and model.is_connected():
204 # await model.disconnect()
205
206 return cluster_uuid, True
207
208 """Repo Management"""
209
210 async def repo_add(
211 self, name: str, url: str, _type: str = "charm",
212 ):
213 raise MethodNotImplemented()
214
215 async def repo_list(self):
216 raise MethodNotImplemented()
217
218 async def repo_remove(
219 self, name: str,
220 ):
221 raise MethodNotImplemented()
222
223 async def synchronize_repos(self, cluster_uuid: str, name: str):
224 """
225 Returns None as currently add_repo is not implemented
226 """
227 return None
228
229 """Reset"""
230
231 async def reset(
232 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
233 ) -> bool:
234 """Reset a cluster
235
236 Resets the Kubernetes cluster by removing the model that represents it.
237
238 :param cluster_uuid str: The UUID of the cluster to reset
239 :return: Returns True if successful or raises an exception.
240 """
241
242 try:
243
244 # Remove k8scluster from database
245 self.log.debug("[reset] Removing k8scluster from juju database")
246 juju_db = self.db.get_one("admin", {"_id": "juju"})
247
248 for k in juju_db["k8sclusters"]:
249 if k["_id"] == cluster_uuid:
250 juju_db["k8sclusters"].remove(k)
251 self.db.set_one(
252 table="admin",
253 q_filter={"_id": "juju"},
254 update_dict={"k8sclusters": juju_db["k8sclusters"]},
255 )
256 break
257
258 # Destroy the controller (via CLI)
259 self.log.debug("[reset] Destroying controller")
260 await self.destroy_controller(cluster_uuid)
261
262 self.log.debug("[reset] Removing k8s cloud")
263 k8s_cloud = "k8s-{}".format(cluster_uuid)
264 await self.remove_cloud(k8s_cloud)
265
266 except Exception as ex:
267 self.log.debug("Caught exception during reset: {}".format(ex))
268 return True
269 # TODO: Remove these commented lines
270 # if not self.authenticated:
271 # await self.login(cluster_uuid)
272
273 # if self.controller.is_connected():
274 # # Destroy the model
275 # namespace = self.get_namespace(cluster_uuid)
276 # if await self.has_model(namespace):
277 # self.log.debug("[reset] Destroying model")
278 # await self.controller.destroy_model(namespace, destroy_storage=True)
279
280 # # Disconnect from the controller
281 # self.log.debug("[reset] Disconnecting controller")
282 # await self.logout()
283
284 """Deployment"""
285
286 async def install(
287 self,
288 cluster_uuid: str,
289 kdu_model: str,
290 atomic: bool = True,
291 timeout: float = 300,
292 params: dict = None,
293 db_dict: dict = None,
294 kdu_name: str = None,
295 namespace: str = None,
296 ) -> bool:
297 """Install a bundle
298
299 :param cluster_uuid str: The UUID of the cluster to install to
300 :param kdu_model str: The name or path of a bundle to install
301 :param atomic bool: If set, waits until the model is active and resets
302 the cluster on failure.
303 :param timeout int: The time, in seconds, to wait for the install
304 to finish
305 :param params dict: Key-value pairs of instantiation parameters
306 :param kdu_name: Name of the KDU instance to be installed
307 :param namespace: K8s namespace to use for the KDU instance
308
309 :return: If successful, returns ?
310 """
311
312 controller = await self.get_controller(cluster_uuid)
313
314 ##
315 # Get or create the model, based on the NS
316 # uuid.
317 if kdu_name:
318 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
319 else:
320 kdu_instance = db_dict["filter"]["_id"]
321
322 self.log.debug("Checking for model named {}".format(kdu_instance))
323
324 # Create the new model
325 self.log.debug("Adding model: {}".format(kdu_instance))
326 model = await self.add_model(
327 kdu_instance, cluster_uuid=cluster_uuid, controller=controller
328 )
329
330 if model:
331 # TODO: Instantiation parameters
332
333 """
334 "Juju bundle that models the KDU, in any of the following ways:
335 - <juju-repo>/<juju-bundle>
336 - <juju-bundle folder under k8s_models folder in the package>
337 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
338 in the package>
339 - <URL_where_to_fetch_juju_bundle>
340 """
341 try:
342 previous_workdir = os.getcwd()
343 except FileNotFoundError:
344 previous_workdir = "/app/storage"
345
346 bundle = kdu_model
347 if kdu_model.startswith("cs:"):
348 bundle = kdu_model
349 elif kdu_model.startswith("http"):
350 # Download the file
351 pass
352 else:
353 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
354
355 os.chdir(new_workdir)
356
357 bundle = "local:{}".format(kdu_model)
358
359 if not bundle:
360 # Raise named exception that the bundle could not be found
361 raise Exception()
362
363 self.log.debug("[install] deploying {}".format(bundle))
364 await model.deploy(bundle)
365
366 # Get the application
367 if atomic:
368 # applications = model.applications
369 self.log.debug("[install] Applications: {}".format(model.applications))
370 for name in model.applications:
371 self.log.debug("[install] Waiting for {} to settle".format(name))
372 application = model.applications[name]
373 try:
374 # It's not enough to wait for all units to be active;
375 # the application status needs to be active as well.
376 self.log.debug("Waiting for all units to be active...")
377 await model.block_until(
378 lambda: all(
379 unit.agent_status == "idle"
380 and application.status in ["active", "unknown"]
381 and unit.workload_status in ["active", "unknown"]
382 for unit in application.units
383 ),
384 timeout=timeout,
385 )
386 self.log.debug("All units active.")
387
388 # TODO use asyncio.TimeoutError
389 except concurrent.futures._base.TimeoutError:
390 os.chdir(previous_workdir)
391 self.log.debug("[install] Timeout exceeded; resetting cluster")
392 await self.reset(cluster_uuid)
393 return False
394
395 # Wait for the application to be active
396 if model.is_connected():
397 self.log.debug("[install] Disconnecting model")
398 await model.disconnect()
399 await controller.disconnect()
400 os.chdir(previous_workdir)
401
402 return kdu_instance
403 raise Exception("Unable to install")
404
405 async def instances_list(self, cluster_uuid: str) -> list:
406 """
407 returns a list of deployed releases in a cluster
408
409 :param cluster_uuid: the cluster
410 :return:
411 """
412 return []
413
414 async def upgrade(
415 self,
416 cluster_uuid: str,
417 kdu_instance: str,
418 kdu_model: str = None,
419 params: dict = None,
420 ) -> str:
421 """Upgrade a model
422
423 :param cluster_uuid str: The UUID of the cluster to upgrade
424 :param kdu_instance str: The unique name of the KDU instance
425 :param kdu_model str: The name or path of the bundle to upgrade to
426 :param params dict: Key-value pairs of instantiation parameters
427
428 :return: If successful, reference to the new revision number of the
429 KDU instance.
430 """
431
432 # TODO: Loop through the bundle and upgrade each charm individually
433
434 """
435 The API doesn't have a concept of bundle upgrades, because there are
436 many possible changes: charm revision, disk, number of units, etc.
437
438 As such, we are only supporting a limited subset of upgrades. We'll
439 upgrade the charm revision but leave storage and scale untouched.
440
441 Scale changes should happen through OSM constructs, and changes to
442 storage would require a redeployment of the service, at least in this
443 initial release.
444 """
445 raise MethodNotImplemented()
446 # TODO: Remove these commented lines
447
448 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
449
450 # model = None
451 # namespace = self.get_namespace(cluster_uuid)
452 # controller = await self.get_controller(cluster_uuid)
453
454 # try:
455 # if namespace not in await controller.list_models():
456 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
457
458 # model = await controller.get_model(namespace)
459 # with open(kdu_model, "r") as f:
460 # bundle = yaml.safe_load(f)
461
462 # """
463 # {
464 # 'description': 'Test bundle',
465 # 'bundle': 'kubernetes',
466 # 'applications': {
467 # 'mariadb-k8s': {
468 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
469 # 'scale': 1,
470 # 'options': {
471 # 'password': 'manopw',
472 # 'root_password': 'osm4u',
473 # 'user': 'mano'
474 # },
475 # 'series': 'kubernetes'
476 # }
477 # }
478 # }
479 # """
480 # # TODO: This should be returned in an agreed-upon format
481 # for name in bundle["applications"]:
482 # self.log.debug(model.applications)
483 # application = model.applications[name]
484 # self.log.debug(application)
485
486 # path = bundle["applications"][name]["charm"]
487
488 # try:
489 # await application.upgrade_charm(switch=path)
490 # except juju.errors.JujuError as ex:
491 # if "already running charm" in str(ex):
492 # # We're already running this version
493 # pass
494 # finally:
495 # if model:
496 # await model.disconnect()
497 # await controller.disconnect()
498 # return True
499
500 """Rollback"""
501
502 async def rollback(
503 self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
504 ) -> str:
505 """Rollback a model
506
507 :param cluster_uuid str: The UUID of the cluster to rollback
508 :param kdu_instance str: The unique name of the KDU instance
509 :param revision int: The revision to revert to. If omitted, rolls back
510 the previous upgrade.
511
512 :return: If successful, returns the revision of active KDU instance,
513 or raises an exception
514 """
515 raise MethodNotImplemented()
516
517 """Deletion"""
518
519 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
520 """Uninstall a KDU instance
521
522 :param cluster_uuid str: The UUID of the cluster
523 :param kdu_instance str: The unique name of the KDU instance
524
525 :return: Returns True if successful, or raises an exception
526 """
527
528 controller = await self.get_controller(cluster_uuid)
529
530 self.log.debug("[uninstall] Destroying model")
531
532 await controller.destroy_models(kdu_instance)
533
534 self.log.debug("[uninstall] Model destroyed and disconnecting")
535 await controller.disconnect()
536
537 return True
538 # TODO: Remove these commented lines
539 # if not self.authenticated:
540 # self.log.debug("[uninstall] Connecting to controller")
541 # await self.login(cluster_uuid)
542
543 async def exec_primitive(
544 self,
545 cluster_uuid: str = None,
546 kdu_instance: str = None,
547 primitive_name: str = None,
548 timeout: float = 300,
549 params: dict = None,
550 db_dict: dict = None,
551 ) -> str:
552 """Exec primitive (Juju action)
553
554 :param cluster_uuid str: The UUID of the cluster
555 :param kdu_instance str: The unique name of the KDU instance
556 :param primitive_name: Name of action that will be executed
557 :param timeout: Timeout for action execution
558 :param params: Dictionary of all the parameters needed for the action
559 :db_dict: Dictionary for any additional data
560
561 :return: Returns the output of the action
562 """
563
564 controller = await self.get_controller(cluster_uuid)
565
566 if not params or "application-name" not in params:
567 raise K8sException(
568 "Missing application-name argument, \
569 argument needed for K8s actions"
570 )
571 try:
572 self.log.debug(
573 "[exec_primitive] Getting model "
574 "kdu_instance: {}".format(kdu_instance)
575 )
576
577 model = await self.get_model(kdu_instance, controller=controller)
578
579 application_name = params["application-name"]
580 application = model.applications[application_name]
581
582 actions = await application.get_actions()
583 if primitive_name not in actions:
584 raise K8sException("Primitive {} not found".format(primitive_name))
585
586 unit = None
587 for u in application.units:
588 if await u.is_leader_from_status():
589 unit = u
590 break
591
592 if unit is None:
593 raise K8sException("No leader unit found to execute action")
594
595 self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
596 action = await unit.run_action(primitive_name, **params)
597
598 output = await model.get_action_output(action_uuid=action.entity_id)
599 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
600
601 status = (
602 status[action.entity_id] if action.entity_id in status else "failed"
603 )
604
605 if status != "completed":
606 raise K8sException(
607 "status is not completed: {} output: {}".format(status, output)
608 )
609
610 return output
611
612 except Exception as e:
613 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
614 self.log.error(error_msg)
615 raise K8sException(message=error_msg)
616 finally:
617 await controller.disconnect()
618 # TODO: Remove these commented lines:
619 # if not self.authenticated:
620 # self.log.debug("[exec_primitive] Connecting to controller")
621 # await self.login(cluster_uuid)
622
623 """Introspection"""
624
625 async def inspect_kdu(self, kdu_model: str,) -> dict:
626 """Inspect a KDU
627
628 Inspects a bundle and returns a dictionary of config parameters and
629 their default values.
630
631 :param kdu_model str: The name or path of the bundle to inspect.
632
633 :return: If successful, returns a dictionary of available parameters
634 and their default values.
635 """
636
637 kdu = {}
638 with open(kdu_model, "r") as f:
639 bundle = yaml.safe_load(f)
640
641 """
642 {
643 'description': 'Test bundle',
644 'bundle': 'kubernetes',
645 'applications': {
646 'mariadb-k8s': {
647 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
648 'scale': 1,
649 'options': {
650 'password': 'manopw',
651 'root_password': 'osm4u',
652 'user': 'mano'
653 },
654 'series': 'kubernetes'
655 }
656 }
657 }
658 """
659 # TODO: This should be returned in an agreed-upon format
660 kdu = bundle["applications"]
661
662 return kdu
663
664 async def help_kdu(self, kdu_model: str,) -> str:
665 """View the README
666
667 If available, returns the README of the bundle.
668
669 :param kdu_model str: The name or path of a bundle
670
671 :return: If found, returns the contents of the README.
672 """
673 readme = None
674
675 files = ["README", "README.txt", "README.md"]
676 path = os.path.dirname(kdu_model)
677 for file in os.listdir(path):
678 if file in files:
679 with open(file, "r") as f:
680 readme = f.read()
681 break
682
683 return readme
684
685 async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
686 """Get the status of the KDU
687
688 Get the current status of the KDU instance.
689
690 :param cluster_uuid str: The UUID of the cluster
691 :param kdu_instance str: The unique id of the KDU instance
692
693 :return: Returns a dictionary containing namespace, state, resources,
694 and deployment_time.
695 """
696 status = {}
697 controller = await self.get_controller(cluster_uuid)
698 model = await self.get_model(kdu_instance, controller=controller)
699
700 model_status = await model.get_status()
701 status = model_status.applications
702
703 for name in model_status.applications:
704 application = model_status.applications[name]
705 status[name] = {"status": application["status"]["status"]}
706
707 await model.disconnect()
708 await controller.disconnect()
709
710 return status
711
712 async def get_services(
713 self, cluster_uuid: str, kdu_instance: str, namespace: str
714 ) -> list:
715 """Return a list of services of a kdu_instance"""
716
717 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
718
719 config_path = "/tmp/{}".format(cluster_uuid)
720 config_file = "{}/config".format(config_path)
721
722 if not os.path.exists(config_path):
723 os.makedirs(config_path)
724 with open(config_file, "w") as f:
725 f.write(credentials)
726
727 kubectl = Kubectl(config_file=config_file)
728 return kubectl.get_services(
729 field_selector="metadata.namespace={}".format(kdu_instance)
730 )
731
732 async def get_service(
733 self, cluster_uuid: str, service_name: str, namespace: str
734 ) -> object:
735 """Return data for a specific service inside a namespace"""
736
737 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
738
739 config_path = "/tmp/{}".format(cluster_uuid)
740 config_file = "{}/config".format(config_path)
741
742 if not os.path.exists(config_path):
743 os.makedirs(config_path)
744 with open(config_file, "w") as f:
745 f.write(credentials)
746
747 kubectl = Kubectl(config_file=config_file)
748
749 return kubectl.get_services(
750 field_selector="metadata.name={},metadata.namespace={}".format(
751 service_name, namespace
752 )
753 )[0]
754
755 # Private methods
756 async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
757 """Add a k8s cloud to Juju
758
759 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
760 Juju Controller.
761
762 :param cloud_name str: The name of the cloud to add.
763 :param credentials dict: A dictionary representing the output of
764 `kubectl config view --raw`.
765
766 :returns: True if successful, otherwise raises an exception.
767 """
768
769 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
770 self.log.debug(cmd)
771
772 process = await asyncio.create_subprocess_exec(
773 *cmd,
774 stdout=asyncio.subprocess.PIPE,
775 stderr=asyncio.subprocess.PIPE,
776 stdin=asyncio.subprocess.PIPE,
777 )
778
779 # Feed the process the credentials
780 process.stdin.write(credentials.encode("utf-8"))
781 await process.stdin.drain()
782 process.stdin.close()
783
784 _stdout, stderr = await process.communicate()
785
786 return_code = process.returncode
787
788 self.log.debug("add-k8s return code: {}".format(return_code))
789
790 if return_code > 0:
791 raise Exception(stderr)
792
793 return True
794
795 async def add_model(
796 self, model_name: str, cluster_uuid: str, controller: Controller
797 ) -> Model:
798 """Adds a model to the controller
799
800 Adds a new model to the Juju controller
801
802 :param model_name str: The name of the model to add.
803 :param cluster_uuid str: ID of the cluster.
804 :param controller: Controller object in which the model will be added
805 :returns: The juju.model.Model object of the new model upon success or
806 raises an exception.
807 """
808
809 self.log.debug(
810 "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
811 )
812 model = None
813 try:
814 if self.juju_public_key is not None:
815 model = await controller.add_model(
816 model_name, config={"authorized-keys": self.juju_public_key}
817 )
818 else:
819 model = await controller.add_model(model_name)
820 except Exception as ex:
821 self.log.debug(ex)
822 self.log.debug("Caught exception: {}".format(ex))
823 pass
824
825 return model
826
827 async def bootstrap(
828 self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
829 ) -> bool:
830 """Bootstrap a Kubernetes controller
831
832 Bootstrap a Juju controller inside the Kubernetes cluster
833
834 :param cloud_name str: The name of the cloud.
835 :param cluster_uuid str: The UUID of the cluster to bootstrap.
836 :param loadbalancer bool: If the controller should use loadbalancer or not.
837 :returns: True upon success or raises an exception.
838 """
839
840 if not loadbalancer:
841 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
842 else:
843 """
844 For public clusters, specify that the controller service is using a
845 LoadBalancer.
846 """
847 cmd = [
848 self.juju_command,
849 "bootstrap",
850 cloud_name,
851 cluster_uuid,
852 "--config",
853 "controller-service-type=loadbalancer",
854 ]
855
856 self.log.debug(
857 "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
858 )
859
860 process = await asyncio.create_subprocess_exec(
861 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
862 )
863
864 _stdout, stderr = await process.communicate()
865
866 return_code = process.returncode
867
868 if return_code > 0:
869 #
870 if b"already exists" not in stderr:
871 raise Exception(stderr)
872
873 return True
874
875 async def destroy_controller(self, cluster_uuid: str) -> bool:
876 """Destroy a Kubernetes controller
877
878 Destroy an existing Kubernetes controller.
879
880 :param cluster_uuid str: The UUID of the cluster to bootstrap.
881 :returns: True upon success or raises an exception.
882 """
883 cmd = [
884 self.juju_command,
885 "destroy-controller",
886 "--destroy-all-models",
887 "--destroy-storage",
888 "-y",
889 cluster_uuid,
890 ]
891
892 process = await asyncio.create_subprocess_exec(
893 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
894 )
895
896 _stdout, stderr = await process.communicate()
897
898 return_code = process.returncode
899
900 if return_code > 0:
901 #
902 if "already exists" not in stderr:
903 raise Exception(stderr)
904
905 def get_credentials(self, cluster_uuid: str) -> str:
906 """
907 Get Cluster Kubeconfig
908 """
909 k8scluster = self.db.get_one(
910 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
911 )
912
913 self.db.encrypt_decrypt_fields(
914 k8scluster.get("credentials"),
915 "decrypt",
916 ["password", "secret"],
917 schema_version=k8scluster["schema_version"],
918 salt=k8scluster["_id"],
919 )
920
921 return yaml.safe_dump(k8scluster.get("credentials"))
922
923 def get_config(self, cluster_uuid: str,) -> dict:
924 """Get the cluster configuration
925
926 Gets the configuration of the cluster
927
928 :param cluster_uuid str: The UUID of the cluster.
929 :return: A dict upon success, or raises an exception.
930 """
931
932 juju_db = self.db.get_one("admin", {"_id": "juju"})
933 config = None
934 for k in juju_db["k8sclusters"]:
935 if k["_id"] == cluster_uuid:
936 config = k["config"]
937 self.db.encrypt_decrypt_fields(
938 config,
939 "decrypt",
940 ["secret", "cacert"],
941 schema_version="1.1",
942 salt=k["_id"],
943 )
944 break
945 if not config:
946 raise Exception(
947 "Unable to locate configuration for cluster {}".format(cluster_uuid)
948 )
949 return config
950
951 async def get_model(self, model_name: str, controller: Controller) -> Model:
952 """Get a model from the Juju Controller.
953
954 Note: Model objects returned must call disconnected() before it goes
955 out of scope.
956
957 :param model_name str: The name of the model to get
958 :param controller Controller: Controller object
959 :return The juju.model.Model object if found, or None.
960 """
961
962 models = await controller.list_models()
963 if model_name not in models:
964 raise N2VCNotFound("Model {} not found".format(model_name))
965 self.log.debug("Found model: {}".format(model_name))
966 return await controller.get_model(model_name)
967
968 def get_namespace(self, cluster_uuid: str,) -> str:
969 """Get the namespace UUID
970 Gets the namespace's unique name
971
972 :param cluster_uuid str: The UUID of the cluster
973 :returns: The namespace UUID, or raises an exception
974 """
975 config = self.get_config(cluster_uuid)
976
977 # Make sure the name is in the config
978 if "namespace" not in config:
979 raise Exception("Namespace not found.")
980
981 # TODO: We want to make sure this is unique to the cluster, in case
982 # the cluster is being reused.
983 # Consider pre/appending the cluster id to the namespace string
984 return config["namespace"]
985
986 # TODO: Remove these lines of code
987 # async def has_model(self, model_name: str) -> bool:
988 # """Check if a model exists in the controller
989
990 # Checks to see if a model exists in the connected Juju controller.
991
992 # :param model_name str: The name of the model
993 # :return: A boolean indicating if the model exists
994 # """
995 # models = await self.controller.list_models()
996
997 # if model_name in models:
998 # return True
999 # return False
1000
1001 def is_local_k8s(self, credentials: str,) -> bool:
1002 """Check if a cluster is local
1003
1004 Checks if a cluster is running in the local host
1005
1006 :param credentials dict: A dictionary containing the k8s credentials
1007 :returns: A boolean if the cluster is running locally
1008 """
1009
1010 creds = yaml.safe_load(credentials)
1011
1012 if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1013 for cluster in creds["clusters"]:
1014 if "server" in cluster["cluster"]:
1015 if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1016 return True
1017
1018 return False
1019
1020 async def get_controller(self, cluster_uuid):
1021 """Login to the Juju controller."""
1022
1023 config = self.get_config(cluster_uuid)
1024
1025 juju_endpoint = config["endpoint"]
1026 juju_user = config["username"]
1027 juju_secret = config["secret"]
1028 juju_ca_cert = config["cacert"]
1029
1030 controller = Controller()
1031
1032 if juju_secret:
1033 self.log.debug(
1034 "Connecting to controller... ws://{} as {}".format(
1035 juju_endpoint, juju_user,
1036 )
1037 )
1038 try:
1039 await controller.connect(
1040 endpoint=juju_endpoint,
1041 username=juju_user,
1042 password=juju_secret,
1043 cacert=juju_ca_cert,
1044 )
1045 self.log.debug("JujuApi: Logged into controller")
1046 return controller
1047 except Exception as ex:
1048 self.log.debug(ex)
1049 self.log.debug("Caught exception: {}".format(ex))
1050 else:
1051 self.log.fatal("VCA credentials not configured.")
1052
1053 # TODO: Remove these commented lines
1054 # self.authenticated = False
1055 # if self.authenticated:
1056 # return
1057
1058 # self.connecting = True
1059 # juju_public_key = None
1060 # self.authenticated = True
1061 # Test: Make sure we have the credentials loaded
1062 # async def logout(self):
1063 # """Logout of the Juju controller."""
1064 # self.log.debug("[logout]")
1065 # if not self.authenticated:
1066 # return False
1067
1068 # for model in self.models:
1069 # self.log.debug("Logging out of model {}".format(model))
1070 # await self.models[model].disconnect()
1071
1072 # if self.controller:
1073 # self.log.debug("Disconnecting controller {}".format(self.controller))
1074 # await self.controller.disconnect()
1075 # self.controller = None
1076
1077 # self.authenticated = False
1078
1079 async def remove_cloud(self, cloud_name: str,) -> bool:
1080 """Remove a k8s cloud from Juju
1081
1082 Removes a Kubernetes cloud from Juju.
1083
1084 :param cloud_name str: The name of the cloud to add.
1085
1086 :returns: True if successful, otherwise raises an exception.
1087 """
1088
1089 # Remove the bootstrapped controller
1090 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1091 process = await asyncio.create_subprocess_exec(
1092 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1093 )
1094
1095 _stdout, stderr = await process.communicate()
1096
1097 return_code = process.returncode
1098
1099 if return_code > 0:
1100 raise Exception(stderr)
1101
1102 # Remove the cloud from the local config
1103 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1104 process = await asyncio.create_subprocess_exec(
1105 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1106 )
1107
1108 _stdout, stderr = await process.communicate()
1109
1110 return_code = process.returncode
1111
1112 if return_code > 0:
1113 raise Exception(stderr)
1114
1115 return True
1116
1117 async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1118 """Save the cluster configuration
1119
1120 Saves the cluster information to the Mongo database
1121
1122 :param cluster_uuid str: The UUID of the cluster
1123 :param config dict: A dictionary containing the cluster configuration
1124 """
1125
1126 juju_db = self.db.get_one("admin", {"_id": "juju"})
1127
1128 k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1129 self.db.encrypt_decrypt_fields(
1130 config,
1131 "encrypt",
1132 ["secret", "cacert"],
1133 schema_version="1.1",
1134 salt=cluster_uuid,
1135 )
1136 k8sclusters.append({"_id": cluster_uuid, "config": config})
1137 self.db.set_one(
1138 table="admin",
1139 q_filter={"_id": "juju"},
1140 update_dict={"k8sclusters": k8sclusters},
1141 )