Add credential_name in Libjuju.add_model() function
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 import os
18 import uuid
19 import yaml
20
21 from juju.controller import Controller
22 from juju.model import Model
23 from n2vc.exceptions import K8sException
24 from n2vc.k8s_conn import K8sConnector
25 from n2vc.kubectl import Kubectl
26 from .exceptions import MethodNotImplemented, N2VCNotFound
27
28
29 # from juju.bundle import BundleHandler
30 # import re
31 # import ssl
32 # from .vnf import N2VC
33 class K8sJujuConnector(K8sConnector):
34 def __init__(
35 self,
36 fs: object,
37 db: object,
38 kubectl_command: str = "/usr/bin/kubectl",
39 juju_command: str = "/usr/bin/juju",
40 log: object = None,
41 loop: object = None,
42 on_update_db=None,
43 vca_config: dict = None,
44 ):
45 """
46
47 :param kubectl_command: path to kubectl executable
48 :param helm_command: path to helm executable
49 :param fs: file system for kubernetes and helm configuration
50 :param log: logger
51 """
52
53 # parent class
54 K8sConnector.__init__(
55 self, db, log=log, on_update_db=on_update_db,
56 )
57
58 self.fs = fs
59 self.log.debug("Initializing K8S Juju connector")
60
61 self.juju_command = juju_command
62 self.juju_public_key = None
63
64 self.log.debug("K8S Juju connector initialized")
65 # TODO: Remove these commented lines:
66 # self.authenticated = False
67 # self.models = {}
68 # self.juju_secret = ""
69
70 """Initialization"""
71
72 async def init_env(
73 self,
74 k8s_creds: str,
75 namespace: str = "kube-system",
76 reuse_cluster_uuid: str = None,
77 ) -> (str, bool):
78 """
79 It prepares a given K8s cluster environment to run Juju bundles.
80
81 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
82 '.kube/config'
83 :param namespace: optional namespace to be used for juju. By default,
84 'kube-system' will be used
85 :param reuse_cluster_uuid: existing cluster uuid for reuse
86 :return: uuid of the K8s cluster and True if connector has installed some
87 software in the cluster
88 (on error, an exception will be raised)
89 """
90
91 """Bootstrapping
92
93 Bootstrapping cannot be done, by design, through the API. We need to
94 use the CLI tools.
95 """
96
97 """
98 WIP: Workflow
99
100 1. Has the environment already been bootstrapped?
101 - Check the database to see if we have a record for this env
102
103 2. If this is a new env, create it
104 - Add the k8s cloud to Juju
105 - Bootstrap
106 - Record it in the database
107
108 3. Connect to the Juju controller for this cloud
109
110 """
111 # cluster_uuid = reuse_cluster_uuid
112 # if not cluster_uuid:
113 # cluster_uuid = str(uuid4())
114
115 ##################################################
116 # TODO: Pull info from db based on the namespace #
117 ##################################################
118
119 ###################################################
120 # TODO: Make it idempotent, calling add-k8s and #
121 # bootstrap whenever reuse_cluster_uuid is passed #
122 # as parameter #
123 # `init_env` is called to initialize the K8s #
124 # cluster for juju. If this initialization fails, #
125 # it can be called again by LCM with the param #
126 # reuse_cluster_uuid, e.g. to try to fix it. #
127 ###################################################
128
129 # This is a new cluster, so bootstrap it
130
131 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
132
133 # Is a local k8s cluster?
134 localk8s = self.is_local_k8s(k8s_creds)
135
136 # If the k8s is external, the juju controller needs a loadbalancer
137 loadbalancer = False if localk8s else True
138
139 # Name the new k8s cloud
140 k8s_cloud = "k8s-{}".format(cluster_uuid)
141
142 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
143 await self.add_k8s(k8s_cloud, k8s_creds)
144
145 # Bootstrap Juju controller
146 self.log.debug("Bootstrapping...")
147 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
148 self.log.debug("Bootstrap done.")
149
150 # Get the controller information
151
152 # Parse ~/.local/share/juju/controllers.yaml
153 # controllers.testing.api-endpoints|ca-cert|uuid
154 self.log.debug("Getting controller endpoints")
155 with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
156 controllers = yaml.load(f, Loader=yaml.Loader)
157 controller = controllers["controllers"][cluster_uuid]
158 endpoints = controller["api-endpoints"]
159 juju_endpoint = endpoints[0]
160 juju_ca_cert = controller["ca-cert"]
161
162 # Parse ~/.local/share/juju/accounts
163 # controllers.testing.user|password
164 self.log.debug("Getting accounts")
165 with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
166 controllers = yaml.load(f, Loader=yaml.Loader)
167 controller = controllers["controllers"][cluster_uuid]
168
169 juju_user = controller["user"]
170 juju_secret = controller["password"]
171
172 config = {
173 "endpoint": juju_endpoint,
174 "username": juju_user,
175 "secret": juju_secret,
176 "cacert": juju_ca_cert,
177 "loadbalancer": loadbalancer,
178 }
179
180 # Store the cluster configuration so it
181 # can be used for subsequent calls
182 self.log.debug("Setting config")
183 await self.set_config(cluster_uuid, config)
184
185 # Test connection
186 controller = await self.get_controller(cluster_uuid)
187 await controller.disconnect()
188
189 # TODO: Remove these commented lines
190 # raise Exception("EOL")
191 # self.juju_public_key = None
192 # Login to the k8s cluster
193 # if not self.authenticated:
194 # await self.login(cluster_uuid)
195
196 # We're creating a new cluster
197 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
198 # cluster_uuid=cluster_uuid))
199 # model = await self.get_model(
200 # self.get_namespace(cluster_uuid),
201 # cluster_uuid=cluster_uuid
202 # )
203
204 # Disconnect from the model
205 # if model and model.is_connected():
206 # await model.disconnect()
207
208 return cluster_uuid, True
209
210 """Repo Management"""
211
212 async def repo_add(
213 self, name: str, url: str, _type: str = "charm",
214 ):
215 raise MethodNotImplemented()
216
217 async def repo_list(self):
218 raise MethodNotImplemented()
219
220 async def repo_remove(
221 self, name: str,
222 ):
223 raise MethodNotImplemented()
224
225 async def synchronize_repos(self, cluster_uuid: str, name: str):
226 """
227 Returns None as currently add_repo is not implemented
228 """
229 return None
230
231 """Reset"""
232
233 async def reset(
234 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
235 ) -> bool:
236 """Reset a cluster
237
238 Resets the Kubernetes cluster by removing the model that represents it.
239
240 :param cluster_uuid str: The UUID of the cluster to reset
241 :return: Returns True if successful or raises an exception.
242 """
243
244 try:
245
246 # Remove k8scluster from database
247 self.log.debug("[reset] Removing k8scluster from juju database")
248 juju_db = self.db.get_one("admin", {"_id": "juju"})
249
250 for k in juju_db["k8sclusters"]:
251 if k["_id"] == cluster_uuid:
252 juju_db["k8sclusters"].remove(k)
253 self.db.set_one(
254 table="admin",
255 q_filter={"_id": "juju"},
256 update_dict={"k8sclusters": juju_db["k8sclusters"]},
257 )
258 break
259
260 # Destroy the controller (via CLI)
261 self.log.debug("[reset] Destroying controller")
262 await self.destroy_controller(cluster_uuid)
263
264 self.log.debug("[reset] Removing k8s cloud")
265 k8s_cloud = "k8s-{}".format(cluster_uuid)
266 await self.remove_cloud(k8s_cloud)
267
268 except Exception as ex:
269 self.log.debug("Caught exception during reset: {}".format(ex))
270 return True
271 # TODO: Remove these commented lines
272 # if not self.authenticated:
273 # await self.login(cluster_uuid)
274
275 # if self.controller.is_connected():
276 # # Destroy the model
277 # namespace = self.get_namespace(cluster_uuid)
278 # if await self.has_model(namespace):
279 # self.log.debug("[reset] Destroying model")
280 # await self.controller.destroy_model(namespace, destroy_storage=True)
281
282 # # Disconnect from the controller
283 # self.log.debug("[reset] Disconnecting controller")
284 # await self.logout()
285
286 """Deployment"""
287
288 async def install(
289 self,
290 cluster_uuid: str,
291 kdu_model: str,
292 atomic: bool = True,
293 timeout: float = 300,
294 params: dict = None,
295 db_dict: dict = None,
296 kdu_name: str = None,
297 namespace: str = None,
298 ) -> bool:
299 """Install a bundle
300
301 :param cluster_uuid str: The UUID of the cluster to install to
302 :param kdu_model str: The name or path of a bundle to install
303 :param atomic bool: If set, waits until the model is active and resets
304 the cluster on failure.
305 :param timeout int: The time, in seconds, to wait for the install
306 to finish
307 :param params dict: Key-value pairs of instantiation parameters
308 :param kdu_name: Name of the KDU instance to be installed
309 :param namespace: K8s namespace to use for the KDU instance
310
311 :return: If successful, returns ?
312 """
313
314 controller = await self.get_controller(cluster_uuid)
315
316 ##
317 # Get or create the model, based on the NS
318 # uuid.
319 if kdu_name:
320 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
321 else:
322 kdu_instance = db_dict["filter"]["_id"]
323
324 self.log.debug("Checking for model named {}".format(kdu_instance))
325
326 # Create the new model
327 self.log.debug("Adding model: {}".format(kdu_instance))
328 model = await self.add_model(
329 kdu_instance, cluster_uuid=cluster_uuid, controller=controller
330 )
331
332 if model:
333 # TODO: Instantiation parameters
334
335 """
336 "Juju bundle that models the KDU, in any of the following ways:
337 - <juju-repo>/<juju-bundle>
338 - <juju-bundle folder under k8s_models folder in the package>
339 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
340 in the package>
341 - <URL_where_to_fetch_juju_bundle>
342 """
343 try:
344 previous_workdir = os.getcwd()
345 except FileNotFoundError:
346 previous_workdir = "/app/storage"
347
348 bundle = kdu_model
349 if kdu_model.startswith("cs:"):
350 bundle = kdu_model
351 elif kdu_model.startswith("http"):
352 # Download the file
353 pass
354 else:
355 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
356
357 os.chdir(new_workdir)
358
359 bundle = "local:{}".format(kdu_model)
360
361 if not bundle:
362 # Raise named exception that the bundle could not be found
363 raise Exception()
364
365 self.log.debug("[install] deploying {}".format(bundle))
366 await model.deploy(bundle)
367
368 # Get the application
369 if atomic:
370 # applications = model.applications
371 self.log.debug("[install] Applications: {}".format(model.applications))
372 for name in model.applications:
373 self.log.debug("[install] Waiting for {} to settle".format(name))
374 application = model.applications[name]
375 try:
376 # It's not enough to wait for all units to be active;
377 # the application status needs to be active as well.
378 self.log.debug("Waiting for all units to be active...")
379 await model.block_until(
380 lambda: all(
381 unit.agent_status == "idle"
382 and application.status in ["active", "unknown"]
383 and unit.workload_status in ["active", "unknown"]
384 for unit in application.units
385 ),
386 timeout=timeout,
387 )
388 self.log.debug("All units active.")
389
390 # TODO use asyncio.TimeoutError
391 except concurrent.futures._base.TimeoutError:
392 os.chdir(previous_workdir)
393 self.log.debug("[install] Timeout exceeded; resetting cluster")
394 await self.reset(cluster_uuid)
395 return False
396
397 # Wait for the application to be active
398 if model.is_connected():
399 self.log.debug("[install] Disconnecting model")
400 await model.disconnect()
401 await controller.disconnect()
402 os.chdir(previous_workdir)
403
404 return kdu_instance
405 raise Exception("Unable to install")
406
407 async def instances_list(self, cluster_uuid: str) -> list:
408 """
409 returns a list of deployed releases in a cluster
410
411 :param cluster_uuid: the cluster
412 :return:
413 """
414 return []
415
416 async def upgrade(
417 self,
418 cluster_uuid: str,
419 kdu_instance: str,
420 kdu_model: str = None,
421 params: dict = None,
422 ) -> str:
423 """Upgrade a model
424
425 :param cluster_uuid str: The UUID of the cluster to upgrade
426 :param kdu_instance str: The unique name of the KDU instance
427 :param kdu_model str: The name or path of the bundle to upgrade to
428 :param params dict: Key-value pairs of instantiation parameters
429
430 :return: If successful, reference to the new revision number of the
431 KDU instance.
432 """
433
434 # TODO: Loop through the bundle and upgrade each charm individually
435
436 """
437 The API doesn't have a concept of bundle upgrades, because there are
438 many possible changes: charm revision, disk, number of units, etc.
439
440 As such, we are only supporting a limited subset of upgrades. We'll
441 upgrade the charm revision but leave storage and scale untouched.
442
443 Scale changes should happen through OSM constructs, and changes to
444 storage would require a redeployment of the service, at least in this
445 initial release.
446 """
447 raise MethodNotImplemented()
448 # TODO: Remove these commented lines
449
450 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
451
452 # model = None
453 # namespace = self.get_namespace(cluster_uuid)
454 # controller = await self.get_controller(cluster_uuid)
455
456 # try:
457 # if namespace not in await controller.list_models():
458 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
459
460 # model = await controller.get_model(namespace)
461 # with open(kdu_model, "r") as f:
462 # bundle = yaml.safe_load(f)
463
464 # """
465 # {
466 # 'description': 'Test bundle',
467 # 'bundle': 'kubernetes',
468 # 'applications': {
469 # 'mariadb-k8s': {
470 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
471 # 'scale': 1,
472 # 'options': {
473 # 'password': 'manopw',
474 # 'root_password': 'osm4u',
475 # 'user': 'mano'
476 # },
477 # 'series': 'kubernetes'
478 # }
479 # }
480 # }
481 # """
482 # # TODO: This should be returned in an agreed-upon format
483 # for name in bundle["applications"]:
484 # self.log.debug(model.applications)
485 # application = model.applications[name]
486 # self.log.debug(application)
487
488 # path = bundle["applications"][name]["charm"]
489
490 # try:
491 # await application.upgrade_charm(switch=path)
492 # except juju.errors.JujuError as ex:
493 # if "already running charm" in str(ex):
494 # # We're already running this version
495 # pass
496 # finally:
497 # if model:
498 # await model.disconnect()
499 # await controller.disconnect()
500 # return True
501
502 """Rollback"""
503
504 async def rollback(
505 self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
506 ) -> str:
507 """Rollback a model
508
509 :param cluster_uuid str: The UUID of the cluster to rollback
510 :param kdu_instance str: The unique name of the KDU instance
511 :param revision int: The revision to revert to. If omitted, rolls back
512 the previous upgrade.
513
514 :return: If successful, returns the revision of active KDU instance,
515 or raises an exception
516 """
517 raise MethodNotImplemented()
518
519 """Deletion"""
520
521 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
522 """Uninstall a KDU instance
523
524 :param cluster_uuid str: The UUID of the cluster
525 :param kdu_instance str: The unique name of the KDU instance
526
527 :return: Returns True if successful, or raises an exception
528 """
529
530 controller = await self.get_controller(cluster_uuid)
531
532 self.log.debug("[uninstall] Destroying model")
533
534 await controller.destroy_models(kdu_instance)
535
536 self.log.debug("[uninstall] Model destroyed and disconnecting")
537 await controller.disconnect()
538
539 return True
540 # TODO: Remove these commented lines
541 # if not self.authenticated:
542 # self.log.debug("[uninstall] Connecting to controller")
543 # await self.login(cluster_uuid)
544
545 async def exec_primitive(
546 self,
547 cluster_uuid: str = None,
548 kdu_instance: str = None,
549 primitive_name: str = None,
550 timeout: float = 300,
551 params: dict = None,
552 db_dict: dict = None,
553 ) -> str:
554 """Exec primitive (Juju action)
555
556 :param cluster_uuid str: The UUID of the cluster
557 :param kdu_instance str: The unique name of the KDU instance
558 :param primitive_name: Name of action that will be executed
559 :param timeout: Timeout for action execution
560 :param params: Dictionary of all the parameters needed for the action
561 :db_dict: Dictionary for any additional data
562
563 :return: Returns the output of the action
564 """
565
566 controller = await self.get_controller(cluster_uuid)
567
568 if not params or "application-name" not in params:
569 raise K8sException(
570 "Missing application-name argument, \
571 argument needed for K8s actions"
572 )
573 try:
574 self.log.debug(
575 "[exec_primitive] Getting model "
576 "kdu_instance: {}".format(kdu_instance)
577 )
578
579 model = await self.get_model(kdu_instance, controller=controller)
580
581 application_name = params["application-name"]
582 application = model.applications[application_name]
583
584 actions = await application.get_actions()
585 if primitive_name not in actions:
586 raise K8sException("Primitive {} not found".format(primitive_name))
587
588 unit = None
589 for u in application.units:
590 if await u.is_leader_from_status():
591 unit = u
592 break
593
594 if unit is None:
595 raise K8sException("No leader unit found to execute action")
596
597 self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
598 action = await unit.run_action(primitive_name, **params)
599
600 output = await model.get_action_output(action_uuid=action.entity_id)
601 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
602
603 status = (
604 status[action.entity_id] if action.entity_id in status else "failed"
605 )
606
607 if status != "completed":
608 raise K8sException(
609 "status is not completed: {} output: {}".format(status, output)
610 )
611
612 return output
613
614 except Exception as e:
615 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
616 self.log.error(error_msg)
617 raise K8sException(message=error_msg)
618 finally:
619 await controller.disconnect()
620 # TODO: Remove these commented lines:
621 # if not self.authenticated:
622 # self.log.debug("[exec_primitive] Connecting to controller")
623 # await self.login(cluster_uuid)
624
625 """Introspection"""
626
627 async def inspect_kdu(self, kdu_model: str,) -> dict:
628 """Inspect a KDU
629
630 Inspects a bundle and returns a dictionary of config parameters and
631 their default values.
632
633 :param kdu_model str: The name or path of the bundle to inspect.
634
635 :return: If successful, returns a dictionary of available parameters
636 and their default values.
637 """
638
639 kdu = {}
640 with open(kdu_model, "r") as f:
641 bundle = yaml.safe_load(f)
642
643 """
644 {
645 'description': 'Test bundle',
646 'bundle': 'kubernetes',
647 'applications': {
648 'mariadb-k8s': {
649 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
650 'scale': 1,
651 'options': {
652 'password': 'manopw',
653 'root_password': 'osm4u',
654 'user': 'mano'
655 },
656 'series': 'kubernetes'
657 }
658 }
659 }
660 """
661 # TODO: This should be returned in an agreed-upon format
662 kdu = bundle["applications"]
663
664 return kdu
665
666 async def help_kdu(self, kdu_model: str,) -> str:
667 """View the README
668
669 If available, returns the README of the bundle.
670
671 :param kdu_model str: The name or path of a bundle
672
673 :return: If found, returns the contents of the README.
674 """
675 readme = None
676
677 files = ["README", "README.txt", "README.md"]
678 path = os.path.dirname(kdu_model)
679 for file in os.listdir(path):
680 if file in files:
681 with open(file, "r") as f:
682 readme = f.read()
683 break
684
685 return readme
686
687 async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
688 """Get the status of the KDU
689
690 Get the current status of the KDU instance.
691
692 :param cluster_uuid str: The UUID of the cluster
693 :param kdu_instance str: The unique id of the KDU instance
694
695 :return: Returns a dictionary containing namespace, state, resources,
696 and deployment_time.
697 """
698 status = {}
699 controller = await self.get_controller(cluster_uuid)
700 model = await self.get_model(kdu_instance, controller=controller)
701
702 model_status = await model.get_status()
703 status = model_status.applications
704
705 for name in model_status.applications:
706 application = model_status.applications[name]
707 status[name] = {"status": application["status"]["status"]}
708
709 await model.disconnect()
710 await controller.disconnect()
711
712 return status
713
714 async def get_services(
715 self, cluster_uuid: str, kdu_instance: str, namespace: str
716 ) -> list:
717 """Return a list of services of a kdu_instance"""
718
719 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
720
721 config_path = "/tmp/{}".format(cluster_uuid)
722 config_file = "{}/config".format(config_path)
723
724 if not os.path.exists(config_path):
725 os.makedirs(config_path)
726 with open(config_file, "w") as f:
727 f.write(credentials)
728
729 kubectl = Kubectl(config_file=config_file)
730 return kubectl.get_services(
731 field_selector="metadata.namespace={}".format(kdu_instance)
732 )
733
734 async def get_service(
735 self, cluster_uuid: str, service_name: str, namespace: str
736 ) -> object:
737 """Return data for a specific service inside a namespace"""
738
739 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
740
741 config_path = "/tmp/{}".format(cluster_uuid)
742 config_file = "{}/config".format(config_path)
743
744 if not os.path.exists(config_path):
745 os.makedirs(config_path)
746 with open(config_file, "w") as f:
747 f.write(credentials)
748
749 kubectl = Kubectl(config_file=config_file)
750
751 return kubectl.get_services(
752 field_selector="metadata.name={},metadata.namespace={}".format(
753 service_name, namespace
754 )
755 )[0]
756
757 # Private methods
758 async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
759 """Add a k8s cloud to Juju
760
761 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
762 Juju Controller.
763
764 :param cloud_name str: The name of the cloud to add.
765 :param credentials dict: A dictionary representing the output of
766 `kubectl config view --raw`.
767
768 :returns: True if successful, otherwise raises an exception.
769 """
770
771 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
772 self.log.debug(cmd)
773
774 process = await asyncio.create_subprocess_exec(
775 *cmd,
776 stdout=asyncio.subprocess.PIPE,
777 stderr=asyncio.subprocess.PIPE,
778 stdin=asyncio.subprocess.PIPE,
779 )
780
781 # Feed the process the credentials
782 process.stdin.write(credentials.encode("utf-8"))
783 await process.stdin.drain()
784 process.stdin.close()
785
786 _stdout, stderr = await process.communicate()
787
788 return_code = process.returncode
789
790 self.log.debug("add-k8s return code: {}".format(return_code))
791
792 if return_code > 0:
793 raise Exception(stderr)
794
795 return True
796
797 async def add_model(
798 self, model_name: str, cluster_uuid: str, controller: Controller
799 ) -> Model:
800 """Adds a model to the controller
801
802 Adds a new model to the Juju controller
803
804 :param model_name str: The name of the model to add.
805 :param cluster_uuid str: ID of the cluster.
806 :param controller: Controller object in which the model will be added
807 :returns: The juju.model.Model object of the new model upon success or
808 raises an exception.
809 """
810
811 self.log.debug(
812 "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
813 )
814 model = None
815 try:
816 if self.juju_public_key is not None:
817 model = await controller.add_model(
818 model_name, config={"authorized-keys": self.juju_public_key}
819 )
820 else:
821 model = await controller.add_model(model_name)
822 except Exception as ex:
823 self.log.debug(ex)
824 self.log.debug("Caught exception: {}".format(ex))
825 pass
826
827 return model
828
829 async def bootstrap(
830 self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
831 ) -> bool:
832 """Bootstrap a Kubernetes controller
833
834 Bootstrap a Juju controller inside the Kubernetes cluster
835
836 :param cloud_name str: The name of the cloud.
837 :param cluster_uuid str: The UUID of the cluster to bootstrap.
838 :param loadbalancer bool: If the controller should use loadbalancer or not.
839 :returns: True upon success or raises an exception.
840 """
841
842 if not loadbalancer:
843 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
844 else:
845 """
846 For public clusters, specify that the controller service is using a
847 LoadBalancer.
848 """
849 cmd = [
850 self.juju_command,
851 "bootstrap",
852 cloud_name,
853 cluster_uuid,
854 "--config",
855 "controller-service-type=loadbalancer",
856 ]
857
858 self.log.debug(
859 "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
860 )
861
862 process = await asyncio.create_subprocess_exec(
863 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
864 )
865
866 _stdout, stderr = await process.communicate()
867
868 return_code = process.returncode
869
870 if return_code > 0:
871 #
872 if b"already exists" not in stderr:
873 raise Exception(stderr)
874
875 return True
876
877 async def destroy_controller(self, cluster_uuid: str) -> bool:
878 """Destroy a Kubernetes controller
879
880 Destroy an existing Kubernetes controller.
881
882 :param cluster_uuid str: The UUID of the cluster to bootstrap.
883 :returns: True upon success or raises an exception.
884 """
885 cmd = [
886 self.juju_command,
887 "destroy-controller",
888 "--destroy-all-models",
889 "--destroy-storage",
890 "-y",
891 cluster_uuid,
892 ]
893
894 process = await asyncio.create_subprocess_exec(
895 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
896 )
897
898 _stdout, stderr = await process.communicate()
899
900 return_code = process.returncode
901
902 if return_code > 0:
903 #
904 if "already exists" not in stderr:
905 raise Exception(stderr)
906
907 def get_credentials(self, cluster_uuid: str) -> str:
908 """
909 Get Cluster Kubeconfig
910 """
911 k8scluster = self.db.get_one(
912 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
913 )
914
915 self.db.encrypt_decrypt_fields(
916 k8scluster.get("credentials"),
917 "decrypt",
918 ["password", "secret"],
919 schema_version=k8scluster["schema_version"],
920 salt=k8scluster["_id"],
921 )
922
923 return yaml.safe_dump(k8scluster.get("credentials"))
924
925 def get_config(self, cluster_uuid: str,) -> dict:
926 """Get the cluster configuration
927
928 Gets the configuration of the cluster
929
930 :param cluster_uuid str: The UUID of the cluster.
931 :return: A dict upon success, or raises an exception.
932 """
933
934 juju_db = self.db.get_one("admin", {"_id": "juju"})
935 config = None
936 for k in juju_db["k8sclusters"]:
937 if k["_id"] == cluster_uuid:
938 config = k["config"]
939 self.db.encrypt_decrypt_fields(
940 config,
941 "decrypt",
942 ["secret", "cacert"],
943 schema_version="1.1",
944 salt=k["_id"],
945 )
946 break
947 if not config:
948 raise Exception(
949 "Unable to locate configuration for cluster {}".format(cluster_uuid)
950 )
951 return config
952
953 async def get_model(self, model_name: str, controller: Controller) -> Model:
954 """Get a model from the Juju Controller.
955
956 Note: Model objects returned must call disconnected() before it goes
957 out of scope.
958
959 :param model_name str: The name of the model to get
960 :param controller Controller: Controller object
961 :return The juju.model.Model object if found, or None.
962 """
963
964 models = await controller.list_models()
965 if model_name not in models:
966 raise N2VCNotFound("Model {} not found".format(model_name))
967 self.log.debug("Found model: {}".format(model_name))
968 return await controller.get_model(model_name)
969
970 def get_namespace(self, cluster_uuid: str,) -> str:
971 """Get the namespace UUID
972 Gets the namespace's unique name
973
974 :param cluster_uuid str: The UUID of the cluster
975 :returns: The namespace UUID, or raises an exception
976 """
977 config = self.get_config(cluster_uuid)
978
979 # Make sure the name is in the config
980 if "namespace" not in config:
981 raise Exception("Namespace not found.")
982
983 # TODO: We want to make sure this is unique to the cluster, in case
984 # the cluster is being reused.
985 # Consider pre/appending the cluster id to the namespace string
986 return config["namespace"]
987
988 # TODO: Remove these lines of code
989 # async def has_model(self, model_name: str) -> bool:
990 # """Check if a model exists in the controller
991
992 # Checks to see if a model exists in the connected Juju controller.
993
994 # :param model_name str: The name of the model
995 # :return: A boolean indicating if the model exists
996 # """
997 # models = await self.controller.list_models()
998
999 # if model_name in models:
1000 # return True
1001 # return False
1002
1003 def is_local_k8s(self, credentials: str,) -> bool:
1004 """Check if a cluster is local
1005
1006 Checks if a cluster is running in the local host
1007
1008 :param credentials dict: A dictionary containing the k8s credentials
1009 :returns: A boolean if the cluster is running locally
1010 """
1011
1012 creds = yaml.safe_load(credentials)
1013
1014 if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1015 for cluster in creds["clusters"]:
1016 if "server" in cluster["cluster"]:
1017 if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1018 return True
1019
1020 return False
1021
1022 async def get_controller(self, cluster_uuid):
1023 """Login to the Juju controller."""
1024
1025 config = self.get_config(cluster_uuid)
1026
1027 juju_endpoint = config["endpoint"]
1028 juju_user = config["username"]
1029 juju_secret = config["secret"]
1030 juju_ca_cert = config["cacert"]
1031
1032 controller = Controller()
1033
1034 if juju_secret:
1035 self.log.debug(
1036 "Connecting to controller... ws://{} as {}".format(
1037 juju_endpoint, juju_user,
1038 )
1039 )
1040 try:
1041 await controller.connect(
1042 endpoint=juju_endpoint,
1043 username=juju_user,
1044 password=juju_secret,
1045 cacert=juju_ca_cert,
1046 )
1047 self.log.debug("JujuApi: Logged into controller")
1048 return controller
1049 except Exception as ex:
1050 self.log.debug(ex)
1051 self.log.debug("Caught exception: {}".format(ex))
1052 else:
1053 self.log.fatal("VCA credentials not configured.")
1054
1055 # TODO: Remove these commented lines
1056 # self.authenticated = False
1057 # if self.authenticated:
1058 # return
1059
1060 # self.connecting = True
1061 # juju_public_key = None
1062 # self.authenticated = True
1063 # Test: Make sure we have the credentials loaded
1064 # async def logout(self):
1065 # """Logout of the Juju controller."""
1066 # self.log.debug("[logout]")
1067 # if not self.authenticated:
1068 # return False
1069
1070 # for model in self.models:
1071 # self.log.debug("Logging out of model {}".format(model))
1072 # await self.models[model].disconnect()
1073
1074 # if self.controller:
1075 # self.log.debug("Disconnecting controller {}".format(self.controller))
1076 # await self.controller.disconnect()
1077 # self.controller = None
1078
1079 # self.authenticated = False
1080
1081 async def remove_cloud(self, cloud_name: str,) -> bool:
1082 """Remove a k8s cloud from Juju
1083
1084 Removes a Kubernetes cloud from Juju.
1085
1086 :param cloud_name str: The name of the cloud to add.
1087
1088 :returns: True if successful, otherwise raises an exception.
1089 """
1090
1091 # Remove the bootstrapped controller
1092 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1093 process = await asyncio.create_subprocess_exec(
1094 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1095 )
1096
1097 _stdout, stderr = await process.communicate()
1098
1099 return_code = process.returncode
1100
1101 if return_code > 0:
1102 raise Exception(stderr)
1103
1104 # Remove the cloud from the local config
1105 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1106 process = await asyncio.create_subprocess_exec(
1107 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1108 )
1109
1110 _stdout, stderr = await process.communicate()
1111
1112 return_code = process.returncode
1113
1114 if return_code > 0:
1115 raise Exception(stderr)
1116
1117 return True
1118
1119 async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1120 """Save the cluster configuration
1121
1122 Saves the cluster information to the Mongo database
1123
1124 :param cluster_uuid str: The UUID of the cluster
1125 :param config dict: A dictionary containing the cluster configuration
1126 """
1127
1128 juju_db = self.db.get_one("admin", {"_id": "juju"})
1129
1130 k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1131 self.db.encrypt_decrypt_fields(
1132 config,
1133 "encrypt",
1134 ["secret", "cacert"],
1135 schema_version="1.1",
1136 salt=cluster_uuid,
1137 )
1138 k8sclusters.append({"_id": cluster_uuid, "config": config})
1139 self.db.set_one(
1140 table="admin",
1141 q_filter={"_id": "juju"},
1142 update_dict={"k8sclusters": k8sclusters},
1143 )