Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 import os
18 import uuid
19 import yaml
20
21 from juju.controller import Controller
22 from juju.model import Model
23 from n2vc.exceptions import K8sException, JujuError
24 from n2vc.k8s_conn import K8sConnector
25 from n2vc.kubectl import Kubectl
26 from .exceptions import MethodNotImplemented, N2VCNotFound
27 from n2vc.utils import obj_to_dict, obj_to_yaml
28
29
30 # from juju.bundle import BundleHandler
31 # import re
32 # import ssl
33 # from .vnf import N2VC
34 class K8sJujuConnector(K8sConnector):
35 def __init__(
36 self,
37 fs: object,
38 db: object,
39 kubectl_command: str = "/usr/bin/kubectl",
40 juju_command: str = "/usr/bin/juju",
41 log: object = None,
42 on_update_db=None,
43 ):
44 """
45
46 :param kubectl_command: path to kubectl executable
47 :param helm_command: path to helm executable
48 :param fs: file system for kubernetes and helm configuration
49 :param log: logger
50 """
51
52 # parent class
53 K8sConnector.__init__(
54 self, db, log=log, on_update_db=on_update_db,
55 )
56
57 self.fs = fs
58 self.log.debug("Initializing K8S Juju connector")
59
60 self.juju_command = juju_command
61 self.juju_public_key = None
62
63 self.log.debug("K8S Juju connector initialized")
64 # TODO: Remove these commented lines:
65 # self.authenticated = False
66 # self.models = {}
67 # self.juju_secret = ""
68
69 """Initialization"""
70
71 async def init_env(
72 self,
73 k8s_creds: str,
74 namespace: str = "kube-system",
75 reuse_cluster_uuid: str = None,
76 ) -> (str, bool):
77 """
78 It prepares a given K8s cluster environment to run Juju bundles.
79
80 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
81 '.kube/config'
82 :param namespace: optional namespace to be used for juju. By default,
83 'kube-system' will be used
84 :param reuse_cluster_uuid: existing cluster uuid for reuse
85 :return: uuid of the K8s cluster and True if connector has installed some
86 software in the cluster
87 (on error, an exception will be raised)
88 """
89
90 """Bootstrapping
91
92 Bootstrapping cannot be done, by design, through the API. We need to
93 use the CLI tools.
94 """
95
96 """
97 WIP: Workflow
98
99 1. Has the environment already been bootstrapped?
100 - Check the database to see if we have a record for this env
101
102 2. If this is a new env, create it
103 - Add the k8s cloud to Juju
104 - Bootstrap
105 - Record it in the database
106
107 3. Connect to the Juju controller for this cloud
108
109 """
110 # cluster_uuid = reuse_cluster_uuid
111 # if not cluster_uuid:
112 # cluster_uuid = str(uuid4())
113
114 ##################################################
115 # TODO: Pull info from db based on the namespace #
116 ##################################################
117
118 ###################################################
119 # TODO: Make it idempotent, calling add-k8s and #
120 # bootstrap whenever reuse_cluster_uuid is passed #
121 # as parameter #
122 # `init_env` is called to initialize the K8s #
123 # cluster for juju. If this initialization fails, #
124 # it can be called again by LCM with the param #
125 # reuse_cluster_uuid, e.g. to try to fix it. #
126 ###################################################
127
128 # This is a new cluster, so bootstrap it
129
130 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
131
132 # Is a local k8s cluster?
133 localk8s = self.is_local_k8s(k8s_creds)
134
135 # If the k8s is external, the juju controller needs a loadbalancer
136 loadbalancer = False if localk8s else True
137
138 # Name the new k8s cloud
139 k8s_cloud = "k8s-{}".format(cluster_uuid)
140
141 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
142 await self.add_k8s(k8s_cloud, k8s_creds)
143
144 # Bootstrap Juju controller
145 self.log.debug("Bootstrapping...")
146 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
147 self.log.debug("Bootstrap done.")
148
149 # Get the controller information
150
151 # Parse ~/.local/share/juju/controllers.yaml
152 # controllers.testing.api-endpoints|ca-cert|uuid
153 self.log.debug("Getting controller endpoints")
154 with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
155 controllers = yaml.load(f, Loader=yaml.Loader)
156 controller = controllers["controllers"][cluster_uuid]
157 endpoints = controller["api-endpoints"]
158 juju_endpoint = endpoints[0]
159 juju_ca_cert = controller["ca-cert"]
160
161 # Parse ~/.local/share/juju/accounts
162 # controllers.testing.user|password
163 self.log.debug("Getting accounts")
164 with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
165 controllers = yaml.load(f, Loader=yaml.Loader)
166 controller = controllers["controllers"][cluster_uuid]
167
168 juju_user = controller["user"]
169 juju_secret = controller["password"]
170
171 config = {
172 "endpoint": juju_endpoint,
173 "username": juju_user,
174 "secret": juju_secret,
175 "cacert": juju_ca_cert,
176 "loadbalancer": loadbalancer,
177 }
178
179 # Store the cluster configuration so it
180 # can be used for subsequent calls
181 self.log.debug("Setting config")
182 await self.set_config(cluster_uuid, config)
183
184 # Test connection
185 controller = await self.get_controller(cluster_uuid)
186 await controller.disconnect()
187
188 # TODO: Remove these commented lines
189 # raise Exception("EOL")
190 # self.juju_public_key = None
191 # Login to the k8s cluster
192 # if not self.authenticated:
193 # await self.login(cluster_uuid)
194
195 # We're creating a new cluster
196 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
197 # cluster_uuid=cluster_uuid))
198 # model = await self.get_model(
199 # self.get_namespace(cluster_uuid),
200 # cluster_uuid=cluster_uuid
201 # )
202
203 # Disconnect from the model
204 # if model and model.is_connected():
205 # await model.disconnect()
206
207 return cluster_uuid, True
208
209 """Repo Management"""
210
211 async def repo_add(
212 self, name: str, url: str, _type: str = "charm",
213 ):
214 raise MethodNotImplemented()
215
216 async def repo_list(self):
217 raise MethodNotImplemented()
218
219 async def repo_remove(
220 self, name: str,
221 ):
222 raise MethodNotImplemented()
223
224 async def synchronize_repos(self, cluster_uuid: str, name: str):
225 """
226 Returns None as currently add_repo is not implemented
227 """
228 return None
229
230 """Reset"""
231
232 async def reset(
233 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
234 ) -> bool:
235 """Reset a cluster
236
237 Resets the Kubernetes cluster by removing the model that represents it.
238
239 :param cluster_uuid str: The UUID of the cluster to reset
240 :return: Returns True if successful or raises an exception.
241 """
242
243 try:
244
245 # Remove k8scluster from database
246 self.log.debug("[reset] Removing k8scluster from juju database")
247 juju_db = self.db.get_one("admin", {"_id": "juju"})
248
249 for k in juju_db["k8sclusters"]:
250 if k["_id"] == cluster_uuid:
251 juju_db["k8sclusters"].remove(k)
252 self.db.set_one(
253 table="admin",
254 q_filter={"_id": "juju"},
255 update_dict={"k8sclusters": juju_db["k8sclusters"]},
256 )
257 break
258
259 # Destroy the controller (via CLI)
260 self.log.debug("[reset] Destroying controller")
261 await self.destroy_controller(cluster_uuid)
262
263 self.log.debug("[reset] Removing k8s cloud")
264 k8s_cloud = "k8s-{}".format(cluster_uuid)
265 await self.remove_cloud(k8s_cloud)
266
267 except Exception as ex:
268 self.log.debug("Caught exception during reset: {}".format(ex))
269 return True
270 # TODO: Remove these commented lines
271 # if not self.authenticated:
272 # await self.login(cluster_uuid)
273
274 # if self.controller.is_connected():
275 # # Destroy the model
276 # namespace = self.get_namespace(cluster_uuid)
277 # if await self.has_model(namespace):
278 # self.log.debug("[reset] Destroying model")
279 # await self.controller.destroy_model(namespace, destroy_storage=True)
280
281 # # Disconnect from the controller
282 # self.log.debug("[reset] Disconnecting controller")
283 # await self.logout()
284
285 """Deployment"""
286
287 async def install(
288 self,
289 cluster_uuid: str,
290 kdu_model: str,
291 atomic: bool = True,
292 timeout: float = 300,
293 params: dict = None,
294 db_dict: dict = None,
295 kdu_name: str = None,
296 namespace: str = None,
297 ) -> bool:
298 """Install a bundle
299
300 :param cluster_uuid str: The UUID of the cluster to install to
301 :param kdu_model str: The name or path of a bundle to install
302 :param atomic bool: If set, waits until the model is active and resets
303 the cluster on failure.
304 :param timeout int: The time, in seconds, to wait for the install
305 to finish
306 :param params dict: Key-value pairs of instantiation parameters
307 :param kdu_name: Name of the KDU instance to be installed
308 :param namespace: K8s namespace to use for the KDU instance
309
310 :return: If successful, returns ?
311 """
312
313 controller = await self.get_controller(cluster_uuid)
314
315 ##
316 # Get or create the model, based on the NS
317 # uuid.
318 if kdu_name:
319 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
320 else:
321 kdu_instance = db_dict["filter"]["_id"]
322
323 self.log.debug("Checking for model named {}".format(kdu_instance))
324 try:
325 if self.on_update_db:
326 await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
327 except Exception as e:
328 self.log.debug("Error in updating vca status {}".format(str(e)))
329
330 # Create the new model
331 self.log.debug("Adding model: {}".format(kdu_instance))
332 model = await self.add_model(
333 kdu_instance, cluster_uuid=cluster_uuid, controller=controller
334 )
335
336 if model:
337 # TODO: Instantiation parameters
338
339 """
340 "Juju bundle that models the KDU, in any of the following ways:
341 - <juju-repo>/<juju-bundle>
342 - <juju-bundle folder under k8s_models folder in the package>
343 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
344 in the package>
345 - <URL_where_to_fetch_juju_bundle>
346 """
347 try:
348 previous_workdir = os.getcwd()
349 except FileNotFoundError:
350 previous_workdir = "/app/storage"
351
352 bundle = kdu_model
353 if kdu_model.startswith("cs:"):
354 bundle = kdu_model
355 elif kdu_model.startswith("http"):
356 # Download the file
357 pass
358 else:
359 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
360
361 os.chdir(new_workdir)
362
363 bundle = "local:{}".format(kdu_model)
364
365 if not bundle:
366 # Raise named exception that the bundle could not be found
367 raise Exception()
368
369 self.log.debug("[install] deploying {}".format(bundle))
370 await model.deploy(bundle)
371
372 # Get the application
373 if atomic:
374 # applications = model.applications
375 self.log.debug("[install] Applications: {}".format(model.applications))
376 for name in model.applications:
377 self.log.debug("[install] Waiting for {} to settle".format(name))
378 application = model.applications[name]
379 try:
380 # It's not enough to wait for all units to be active;
381 # the application status needs to be active as well.
382 self.log.debug("Waiting for all units to be active...")
383 await model.block_until(
384 lambda: all(
385 unit.agent_status == "idle"
386 and application.status in ["active", "unknown"]
387 and unit.workload_status in ["active", "unknown"]
388 for unit in application.units
389 ),
390 timeout=timeout,
391 )
392 self.log.debug("All units active.")
393
394 # TODO use asyncio.TimeoutError
395 except concurrent.futures._base.TimeoutError:
396 os.chdir(previous_workdir)
397 self.log.debug("[install] Timeout exceeded; resetting cluster")
398 await self.reset(cluster_uuid)
399 return False
400
401 # Wait for the application to be active
402 if model.is_connected():
403 self.log.debug("[install] Disconnecting model")
404 await model.disconnect()
405 await controller.disconnect()
406 os.chdir(previous_workdir)
407 if self.on_update_db:
408 await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
409 return kdu_instance
410 raise Exception("Unable to install")
411
412 async def instances_list(self, cluster_uuid: str) -> list:
413 """
414 returns a list of deployed releases in a cluster
415
416 :param cluster_uuid: the cluster
417 :return:
418 """
419 return []
420
421 async def upgrade(
422 self,
423 cluster_uuid: str,
424 kdu_instance: str,
425 kdu_model: str = None,
426 params: dict = None,
427 ) -> str:
428 """Upgrade a model
429
430 :param cluster_uuid str: The UUID of the cluster to upgrade
431 :param kdu_instance str: The unique name of the KDU instance
432 :param kdu_model str: The name or path of the bundle to upgrade to
433 :param params dict: Key-value pairs of instantiation parameters
434
435 :return: If successful, reference to the new revision number of the
436 KDU instance.
437 """
438
439 # TODO: Loop through the bundle and upgrade each charm individually
440
441 """
442 The API doesn't have a concept of bundle upgrades, because there are
443 many possible changes: charm revision, disk, number of units, etc.
444
445 As such, we are only supporting a limited subset of upgrades. We'll
446 upgrade the charm revision but leave storage and scale untouched.
447
448 Scale changes should happen through OSM constructs, and changes to
449 storage would require a redeployment of the service, at least in this
450 initial release.
451 """
452 raise MethodNotImplemented()
453 # TODO: Remove these commented lines
454
455 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
456
457 # model = None
458 # namespace = self.get_namespace(cluster_uuid)
459 # controller = await self.get_controller(cluster_uuid)
460
461 # try:
462 # if namespace not in await controller.list_models():
463 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
464
465 # model = await controller.get_model(namespace)
466 # with open(kdu_model, "r") as f:
467 # bundle = yaml.safe_load(f)
468
469 # """
470 # {
471 # 'description': 'Test bundle',
472 # 'bundle': 'kubernetes',
473 # 'applications': {
474 # 'mariadb-k8s': {
475 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
476 # 'scale': 1,
477 # 'options': {
478 # 'password': 'manopw',
479 # 'root_password': 'osm4u',
480 # 'user': 'mano'
481 # },
482 # 'series': 'kubernetes'
483 # }
484 # }
485 # }
486 # """
487 # # TODO: This should be returned in an agreed-upon format
488 # for name in bundle["applications"]:
489 # self.log.debug(model.applications)
490 # application = model.applications[name]
491 # self.log.debug(application)
492
493 # path = bundle["applications"][name]["charm"]
494
495 # try:
496 # await application.upgrade_charm(switch=path)
497 # except juju.errors.JujuError as ex:
498 # if "already running charm" in str(ex):
499 # # We're already running this version
500 # pass
501 # finally:
502 # if model:
503 # await model.disconnect()
504 # await controller.disconnect()
505 # return True
506
507 """Rollback"""
508
509 async def rollback(
510 self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
511 ) -> str:
512 """Rollback a model
513
514 :param cluster_uuid str: The UUID of the cluster to rollback
515 :param kdu_instance str: The unique name of the KDU instance
516 :param revision int: The revision to revert to. If omitted, rolls back
517 the previous upgrade.
518
519 :return: If successful, returns the revision of active KDU instance,
520 or raises an exception
521 """
522 raise MethodNotImplemented()
523
524 """Deletion"""
525
526 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
527 """Uninstall a KDU instance
528
529 :param cluster_uuid str: The UUID of the cluster
530 :param kdu_instance str: The unique name of the KDU instance
531
532 :return: Returns True if successful, or raises an exception
533 """
534
535 controller = await self.get_controller(cluster_uuid)
536
537 self.log.debug("[uninstall] Destroying model")
538
539 await controller.destroy_models(kdu_instance)
540
541 self.log.debug("[uninstall] Model destroyed and disconnecting")
542 await controller.disconnect()
543
544 return True
545 # TODO: Remove these commented lines
546 # if not self.authenticated:
547 # self.log.debug("[uninstall] Connecting to controller")
548 # await self.login(cluster_uuid)
549
550 async def exec_primitive(
551 self,
552 cluster_uuid: str = None,
553 kdu_instance: str = None,
554 primitive_name: str = None,
555 timeout: float = 300,
556 params: dict = None,
557 db_dict: dict = None,
558 ) -> str:
559 """Exec primitive (Juju action)
560
561 :param cluster_uuid str: The UUID of the cluster
562 :param kdu_instance str: The unique name of the KDU instance
563 :param primitive_name: Name of action that will be executed
564 :param timeout: Timeout for action execution
565 :param params: Dictionary of all the parameters needed for the action
566 :db_dict: Dictionary for any additional data
567
568 :return: Returns the output of the action
569 """
570
571 controller = await self.get_controller(cluster_uuid)
572
573 if not params or "application-name" not in params:
574 raise K8sException(
575 "Missing application-name argument, \
576 argument needed for K8s actions"
577 )
578 try:
579 self.log.debug(
580 "[exec_primitive] Getting model "
581 "kdu_instance: {}".format(kdu_instance)
582 )
583
584 model = await self.get_model(kdu_instance, controller=controller)
585
586 application_name = params["application-name"]
587 application = model.applications[application_name]
588
589 actions = await application.get_actions()
590 if primitive_name not in actions:
591 raise K8sException("Primitive {} not found".format(primitive_name))
592
593 unit = None
594 for u in application.units:
595 if await u.is_leader_from_status():
596 unit = u
597 break
598
599 if unit is None:
600 raise K8sException("No leader unit found to execute action")
601
602 self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
603 action = await unit.run_action(primitive_name, **params)
604
605 output = await model.get_action_output(action_uuid=action.entity_id)
606 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
607
608 status = (
609 status[action.entity_id] if action.entity_id in status else "failed"
610 )
611
612 if status != "completed":
613 raise K8sException(
614 "status: {}, output: {}".format(status, output)
615 )
616 if self.on_update_db:
617 await self.on_update_db(cluster_uuid, kdu_instance, filter=db_dict["filter"])
618
619 return output
620
621 except Exception as e:
622 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
623 self.log.error(error_msg)
624 raise K8sException(message=error_msg)
625 finally:
626 await controller.disconnect()
627 # TODO: Remove these commented lines:
628 # if not self.authenticated:
629 # self.log.debug("[exec_primitive] Connecting to controller")
630 # await self.login(cluster_uuid)
631
632 """Introspection"""
633
634 async def inspect_kdu(self, kdu_model: str,) -> dict:
635 """Inspect a KDU
636
637 Inspects a bundle and returns a dictionary of config parameters and
638 their default values.
639
640 :param kdu_model str: The name or path of the bundle to inspect.
641
642 :return: If successful, returns a dictionary of available parameters
643 and their default values.
644 """
645
646 kdu = {}
647 with open(kdu_model, "r") as f:
648 bundle = yaml.safe_load(f)
649
650 """
651 {
652 'description': 'Test bundle',
653 'bundle': 'kubernetes',
654 'applications': {
655 'mariadb-k8s': {
656 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
657 'scale': 1,
658 'options': {
659 'password': 'manopw',
660 'root_password': 'osm4u',
661 'user': 'mano'
662 },
663 'series': 'kubernetes'
664 }
665 }
666 }
667 """
668 # TODO: This should be returned in an agreed-upon format
669 kdu = bundle["applications"]
670
671 return kdu
672
673 async def help_kdu(self, kdu_model: str,) -> str:
674 """View the README
675
676 If available, returns the README of the bundle.
677
678 :param kdu_model str: The name or path of a bundle
679
680 :return: If found, returns the contents of the README.
681 """
682 readme = None
683
684 files = ["README", "README.txt", "README.md"]
685 path = os.path.dirname(kdu_model)
686 for file in os.listdir(path):
687 if file in files:
688 with open(file, "r") as f:
689 readme = f.read()
690 break
691
692 return readme
693
694 async def status_kdu(
695 self,
696 cluster_uuid: str,
697 kdu_instance: str,
698 complete_status: bool = False,
699 yaml_format: bool = False
700 ) -> dict:
701 """Get the status of the KDU
702
703 Get the current status of the KDU instance.
704
705 :param cluster_uuid str: The UUID of the cluster
706 :param kdu_instance str: The unique id of the KDU instance
707 :param complete_status: To get the complete_status of the KDU
708 :param yaml_format: To get the status in proper format for NSR record
709
710 :return: Returns a dictionary containing namespace, state, resources,
711 and deployment_time and returns complete_status if complete_status is True
712 """
713 status = {}
714 controller = await self.get_controller(cluster_uuid)
715 try:
716 model = await self.get_model(kdu_instance, controller=controller)
717 model_status = await model.get_status()
718 status = model_status.applications
719
720 if not complete_status:
721 for name in model_status.applications:
722 application = model_status.applications[name]
723 status[name] = {"status": application["status"]["status"]}
724 else:
725 if yaml_format:
726 return obj_to_yaml(model_status)
727 else:
728 return obj_to_dict(model_status)
729 except Exception as e:
730 self.log.debug("Error in getting model_status for kdu_instance: {}. Error: {}"
731 .format(kdu_instance, str(e)))
732 finally:
733 if model:
734 await model.disconnect()
735 await controller.disconnect()
736 return status
737
738 async def get_application_actions(
739 self,
740 application_name: str,
741 model_name: str,
742 cluster_uuid: str,
743 kdu_instance: str
744 ) -> dict:
745 """
746 Get available actions for an application
747
748 :param application_name str: Application name
749 :model_name str: Model name
750 :param cluster_uuid str: The UUID of the cluster
751 :param kdu_instance str: The unique id of the KDU instance
752
753 :return: Returns a dictionary which has action list of the Application
754 """
755 model = None
756 application_actions = {}
757 controller = await self.get_controller(cluster_uuid)
758 try:
759 model = await self.get_model(kdu_instance, controller=controller)
760 application = model.applications[application_name]
761 application_actions = await application.get_actions()
762 except Exception as e:
763 raise JujuError("Error in getting actions for application: {} in model: {}. Error: {}"
764 .format(application_name, model_name, str(e)))
765 finally:
766 if model:
767 await model.disconnect()
768 await controller.disconnect()
769
770 return application_actions
771
772 async def get_application_configs(
773 self,
774 application_name: str,
775 model_name: str,
776 cluster_uuid: str,
777 kdu_instance: str
778 ) -> dict:
779 """
780 Get available configs for an application.
781
782 :param application_name str: Application name
783 :model_name str: Model name
784 :param cluster_uuid str: The UUID of the cluster
785 :param kdu_instance str: The unique id of the KDU instance
786
787 :return: Returns a dictionary which has config list of the Application
788 """
789 model = None
790 application_configs = {}
791 controller = await self.get_controller(cluster_uuid)
792 try:
793 model = await self.get_model(kdu_instance, controller=controller)
794 application = model.applications[application_name]
795 application_configs = await application.get_config()
796 except Exception as e:
797 raise JujuError("Error in getting configs for application: {} in model: {}. Error: {}"
798 .format(application_name, model_name, str(e)))
799 finally:
800 if model:
801 await model.disconnect()
802 await controller.disconnect()
803 return application_configs
804
805 async def get_executed_actions(
806 self,
807 model_name: str,
808 cluster_uuid: str,
809 kdu_instance: str
810 ) -> list:
811 """
812 Get executed/history of actions for a model.
813 :model_name str: Model name
814 :param cluster_uuid str: The UUID of the cluster
815 :param kdu_instance str: The unique id of the KDU instance
816
817 :return: List of executed actions for a model.
818 """
819 model = None
820 executed_actions = []
821 controller = await self.get_controller(cluster_uuid)
822 try:
823 model = await self.get_model(kdu_instance, controller=controller)
824 # Get all unique action names
825 actions = {}
826 for application in model.applications:
827 application_actions = await self.get_application_actions(application, model,
828 cluster_uuid, kdu_instance)
829 actions.update(application_actions)
830 # Get status of all actions
831 for application_action in actions:
832 application_action_status_list = \
833 await model.get_action_status(name=application_action)
834 for action_id, action_status in application_action_status_list.items():
835 executed_action = {"id": action_id,
836 "action": application_action,
837 "status": action_status}
838 # Get action output by id
839 action_status = await model.get_action_output(executed_action["id"])
840 for k, v in action_status.items():
841 executed_action[k] = v
842 executed_actions.append(executed_action)
843 except Exception as e:
844 raise JujuError("Error in getting executed actions for model: {}. Error: {}"
845 .format(model_name, str(e)))
846 finally:
847 if model:
848 await model.disconnect()
849 await controller.disconnect()
850 return executed_actions
851
852 async def update_vca_status(self, vcastatus: dict, cluster_uuid: str, kdu_instance: str):
853 """
854 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
855
856 :param vcastatus dict: dict containing vcastatus
857 :param cluster_uuid str: The UUID of the cluster
858 :param kdu_instance str: The unique id of the KDU instance
859 :return: None
860 """
861 try:
862 for model_name in vcastatus:
863 # Adding executed actions
864 vcastatus[model_name]["executedActions"] = \
865 await self.get_executed_actions(model_name, cluster_uuid, kdu_instance)
866
867 for application in vcastatus[model_name]["applications"]:
868 # Adding application actions
869 vcastatus[model_name]["applications"][application]["actions"] = \
870 await self.get_application_actions(application, model_name,
871 cluster_uuid, kdu_instance)
872 # Adding application configs
873 vcastatus[model_name]["applications"][application]["configs"] = \
874 await self.get_application_configs(application, model_name,
875 cluster_uuid, kdu_instance)
876 except Exception as e:
877 self.log.debug("Error in updating vca status: {}".format(str(e)))
878
879 async def get_services(
880 self, cluster_uuid: str, kdu_instance: str, namespace: str
881 ) -> list:
882 """Return a list of services of a kdu_instance"""
883
884 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
885
886 config_path = "/tmp/{}".format(cluster_uuid)
887 config_file = "{}/config".format(config_path)
888
889 if not os.path.exists(config_path):
890 os.makedirs(config_path)
891 with open(config_file, "w") as f:
892 f.write(credentials)
893
894 kubectl = Kubectl(config_file=config_file)
895 return kubectl.get_services(
896 field_selector="metadata.namespace={}".format(kdu_instance)
897 )
898
899 async def get_service(
900 self, cluster_uuid: str, service_name: str, namespace: str
901 ) -> object:
902 """Return data for a specific service inside a namespace"""
903
904 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
905
906 config_path = "/tmp/{}".format(cluster_uuid)
907 config_file = "{}/config".format(config_path)
908
909 if not os.path.exists(config_path):
910 os.makedirs(config_path)
911 with open(config_file, "w") as f:
912 f.write(credentials)
913
914 kubectl = Kubectl(config_file=config_file)
915
916 return kubectl.get_services(
917 field_selector="metadata.name={},metadata.namespace={}".format(
918 service_name, namespace
919 )
920 )[0]
921
922 # Private methods
923 async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
924 """Add a k8s cloud to Juju
925
926 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
927 Juju Controller.
928
929 :param cloud_name str: The name of the cloud to add.
930 :param credentials dict: A dictionary representing the output of
931 `kubectl config view --raw`.
932
933 :returns: True if successful, otherwise raises an exception.
934 """
935
936 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
937 self.log.debug(cmd)
938
939 process = await asyncio.create_subprocess_exec(
940 *cmd,
941 stdout=asyncio.subprocess.PIPE,
942 stderr=asyncio.subprocess.PIPE,
943 stdin=asyncio.subprocess.PIPE,
944 )
945
946 # Feed the process the credentials
947 process.stdin.write(credentials.encode("utf-8"))
948 await process.stdin.drain()
949 process.stdin.close()
950
951 _stdout, stderr = await process.communicate()
952
953 return_code = process.returncode
954
955 self.log.debug("add-k8s return code: {}".format(return_code))
956
957 if return_code > 0:
958 raise Exception(stderr)
959
960 return True
961
962 async def add_model(
963 self, model_name: str, cluster_uuid: str, controller: Controller
964 ) -> Model:
965 """Adds a model to the controller
966
967 Adds a new model to the Juju controller
968
969 :param model_name str: The name of the model to add.
970 :param cluster_uuid str: ID of the cluster.
971 :param controller: Controller object in which the model will be added
972 :returns: The juju.model.Model object of the new model upon success or
973 raises an exception.
974 """
975
976 self.log.debug(
977 "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
978 )
979 model = None
980 try:
981 if self.juju_public_key is not None:
982 model = await controller.add_model(
983 model_name, config={"authorized-keys": self.juju_public_key}
984 )
985 else:
986 model = await controller.add_model(model_name)
987 except Exception as ex:
988 self.log.debug(ex)
989 self.log.debug("Caught exception: {}".format(ex))
990 pass
991
992 return model
993
994 async def bootstrap(
995 self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
996 ) -> bool:
997 """Bootstrap a Kubernetes controller
998
999 Bootstrap a Juju controller inside the Kubernetes cluster
1000
1001 :param cloud_name str: The name of the cloud.
1002 :param cluster_uuid str: The UUID of the cluster to bootstrap.
1003 :param loadbalancer bool: If the controller should use loadbalancer or not.
1004 :returns: True upon success or raises an exception.
1005 """
1006
1007 if not loadbalancer:
1008 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
1009 else:
1010 """
1011 For public clusters, specify that the controller service is using a
1012 LoadBalancer.
1013 """
1014 cmd = [
1015 self.juju_command,
1016 "bootstrap",
1017 cloud_name,
1018 cluster_uuid,
1019 "--config",
1020 "controller-service-type=loadbalancer",
1021 ]
1022
1023 self.log.debug(
1024 "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
1025 )
1026
1027 process = await asyncio.create_subprocess_exec(
1028 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1029 )
1030
1031 _stdout, stderr = await process.communicate()
1032
1033 return_code = process.returncode
1034
1035 if return_code > 0:
1036 #
1037 if b"already exists" not in stderr:
1038 raise Exception(stderr)
1039
1040 return True
1041
1042 async def destroy_controller(self, cluster_uuid: str) -> bool:
1043 """Destroy a Kubernetes controller
1044
1045 Destroy an existing Kubernetes controller.
1046
1047 :param cluster_uuid str: The UUID of the cluster to bootstrap.
1048 :returns: True upon success or raises an exception.
1049 """
1050 cmd = [
1051 self.juju_command,
1052 "destroy-controller",
1053 "--destroy-all-models",
1054 "--destroy-storage",
1055 "-y",
1056 cluster_uuid,
1057 ]
1058
1059 process = await asyncio.create_subprocess_exec(
1060 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1061 )
1062
1063 _stdout, stderr = await process.communicate()
1064
1065 return_code = process.returncode
1066
1067 if return_code > 0:
1068 #
1069 if "already exists" not in stderr:
1070 raise Exception(stderr)
1071
1072 def get_credentials(self, cluster_uuid: str) -> str:
1073 """
1074 Get Cluster Kubeconfig
1075 """
1076 k8scluster = self.db.get_one(
1077 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
1078 )
1079
1080 self.db.encrypt_decrypt_fields(
1081 k8scluster.get("credentials"),
1082 "decrypt",
1083 ["password", "secret"],
1084 schema_version=k8scluster["schema_version"],
1085 salt=k8scluster["_id"],
1086 )
1087
1088 return yaml.safe_dump(k8scluster.get("credentials"))
1089
1090 def get_config(self, cluster_uuid: str,) -> dict:
1091 """Get the cluster configuration
1092
1093 Gets the configuration of the cluster
1094
1095 :param cluster_uuid str: The UUID of the cluster.
1096 :return: A dict upon success, or raises an exception.
1097 """
1098
1099 juju_db = self.db.get_one("admin", {"_id": "juju"})
1100 config = None
1101 for k in juju_db["k8sclusters"]:
1102 if k["_id"] == cluster_uuid:
1103 config = k["config"]
1104 self.db.encrypt_decrypt_fields(
1105 config,
1106 "decrypt",
1107 ["secret", "cacert"],
1108 schema_version="1.1",
1109 salt=k["_id"],
1110 )
1111 break
1112 if not config:
1113 raise Exception(
1114 "Unable to locate configuration for cluster {}".format(cluster_uuid)
1115 )
1116 return config
1117
1118 async def get_model(self, model_name: str, controller: Controller) -> Model:
1119 """Get a model from the Juju Controller.
1120
1121 Note: Model objects returned must call disconnected() before it goes
1122 out of scope.
1123
1124 :param model_name str: The name of the model to get
1125 :param controller Controller: Controller object
1126 :return The juju.model.Model object if found, or None.
1127 """
1128
1129 models = await controller.list_models()
1130 if model_name not in models:
1131 raise N2VCNotFound("Model {} not found".format(model_name))
1132 self.log.debug("Found model: {}".format(model_name))
1133 return await controller.get_model(model_name)
1134
1135 def get_namespace(self, cluster_uuid: str,) -> str:
1136 """Get the namespace UUID
1137 Gets the namespace's unique name
1138
1139 :param cluster_uuid str: The UUID of the cluster
1140 :returns: The namespace UUID, or raises an exception
1141 """
1142 config = self.get_config(cluster_uuid)
1143
1144 # Make sure the name is in the config
1145 if "namespace" not in config:
1146 raise Exception("Namespace not found.")
1147
1148 # TODO: We want to make sure this is unique to the cluster, in case
1149 # the cluster is being reused.
1150 # Consider pre/appending the cluster id to the namespace string
1151 return config["namespace"]
1152
1153 # TODO: Remove these lines of code
1154 # async def has_model(self, model_name: str) -> bool:
1155 # """Check if a model exists in the controller
1156
1157 # Checks to see if a model exists in the connected Juju controller.
1158
1159 # :param model_name str: The name of the model
1160 # :return: A boolean indicating if the model exists
1161 # """
1162 # models = await self.controller.list_models()
1163
1164 # if model_name in models:
1165 # return True
1166 # return False
1167
1168 def is_local_k8s(self, credentials: str,) -> bool:
1169 """Check if a cluster is local
1170
1171 Checks if a cluster is running in the local host
1172
1173 :param credentials dict: A dictionary containing the k8s credentials
1174 :returns: A boolean if the cluster is running locally
1175 """
1176
1177 creds = yaml.safe_load(credentials)
1178
1179 if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1180 for cluster in creds["clusters"]:
1181 if "server" in cluster["cluster"]:
1182 if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1183 return True
1184
1185 return False
1186
1187 async def get_controller(self, cluster_uuid):
1188 """Login to the Juju controller."""
1189
1190 config = self.get_config(cluster_uuid)
1191
1192 juju_endpoint = config["endpoint"]
1193 juju_user = config["username"]
1194 juju_secret = config["secret"]
1195 juju_ca_cert = config["cacert"]
1196
1197 controller = Controller()
1198
1199 if juju_secret:
1200 self.log.debug(
1201 "Connecting to controller... ws://{} as {}".format(
1202 juju_endpoint, juju_user,
1203 )
1204 )
1205 try:
1206 await controller.connect(
1207 endpoint=juju_endpoint,
1208 username=juju_user,
1209 password=juju_secret,
1210 cacert=juju_ca_cert,
1211 )
1212 self.log.debug("JujuApi: Logged into controller")
1213 return controller
1214 except Exception as ex:
1215 self.log.debug(ex)
1216 self.log.debug("Caught exception: {}".format(ex))
1217 else:
1218 self.log.fatal("VCA credentials not configured.")
1219
1220 # TODO: Remove these commented lines
1221 # self.authenticated = False
1222 # if self.authenticated:
1223 # return
1224
1225 # self.connecting = True
1226 # juju_public_key = None
1227 # self.authenticated = True
1228 # Test: Make sure we have the credentials loaded
1229 # async def logout(self):
1230 # """Logout of the Juju controller."""
1231 # self.log.debug("[logout]")
1232 # if not self.authenticated:
1233 # return False
1234
1235 # for model in self.models:
1236 # self.log.debug("Logging out of model {}".format(model))
1237 # await self.models[model].disconnect()
1238
1239 # if self.controller:
1240 # self.log.debug("Disconnecting controller {}".format(self.controller))
1241 # await self.controller.disconnect()
1242 # self.controller = None
1243
1244 # self.authenticated = False
1245
1246 async def remove_cloud(self, cloud_name: str,) -> bool:
1247 """Remove a k8s cloud from Juju
1248
1249 Removes a Kubernetes cloud from Juju.
1250
1251 :param cloud_name str: The name of the cloud to add.
1252
1253 :returns: True if successful, otherwise raises an exception.
1254 """
1255
1256 # Remove the bootstrapped controller
1257 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1258 process = await asyncio.create_subprocess_exec(
1259 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1260 )
1261
1262 _stdout, stderr = await process.communicate()
1263
1264 return_code = process.returncode
1265
1266 if return_code > 0:
1267 raise Exception(stderr)
1268
1269 # Remove the cloud from the local config
1270 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1271 process = await asyncio.create_subprocess_exec(
1272 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1273 )
1274
1275 _stdout, stderr = await process.communicate()
1276
1277 return_code = process.returncode
1278
1279 if return_code > 0:
1280 raise Exception(stderr)
1281
1282 return True
1283
1284 async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1285 """Save the cluster configuration
1286
1287 Saves the cluster information to the Mongo database
1288
1289 :param cluster_uuid str: The UUID of the cluster
1290 :param config dict: A dictionary containing the cluster configuration
1291 """
1292
1293 juju_db = self.db.get_one("admin", {"_id": "juju"})
1294
1295 k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1296 self.db.encrypt_decrypt_fields(
1297 config,
1298 "encrypt",
1299 ["secret", "cacert"],
1300 schema_version="1.1",
1301 salt=cluster_uuid,
1302 )
1303 k8sclusters.append({"_id": cluster_uuid, "config": config})
1304 self.db.set_one(
1305 table="admin",
1306 q_filter={"_id": "juju"},
1307 update_dict={"k8sclusters": k8sclusters},
1308 )