fix issue storing status of k8s_helm_conn
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 import os
18 import uuid
19 import yaml
20
21 import juju
22 from juju.controller import Controller
23 from juju.model import Model
24 from n2vc.exceptions import K8sException
25 from n2vc.k8s_conn import K8sConnector
26
27 from .exceptions import MethodNotImplemented
28
29
30 # from juju.bundle import BundleHandler
31 # import re
32 # import ssl
33 # from .vnf import N2VC
34 class K8sJujuConnector(K8sConnector):
35 def __init__(
36 self,
37 fs: object,
38 db: object,
39 kubectl_command: str = "/usr/bin/kubectl",
40 juju_command: str = "/usr/bin/juju",
41 log: object = None,
42 on_update_db=None,
43 ):
44 """
45
46 :param kubectl_command: path to kubectl executable
47 :param helm_command: path to helm executable
48 :param fs: file system for kubernetes and helm configuration
49 :param log: logger
50 """
51
52 # parent class
53 K8sConnector.__init__(
54 self, db, log=log, on_update_db=on_update_db,
55 )
56
57 self.fs = fs
58 self.log.debug("Initializing K8S Juju connector")
59
60 self.authenticated = False
61 self.models = {}
62
63 self.juju_command = juju_command
64 self.juju_secret = ""
65
66 self.log.debug("K8S Juju connector initialized")
67
68 """Initialization"""
69
70 async def init_env(
71 self,
72 k8s_creds: str,
73 namespace: str = "kube-system",
74 reuse_cluster_uuid: str = None,
75 ) -> (str, bool):
76 """
77 It prepares a given K8s cluster environment to run Juju bundles.
78
79 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
80 '.kube/config'
81 :param namespace: optional namespace to be used for juju. By default,
82 'kube-system' will be used
83 :param reuse_cluster_uuid: existing cluster uuid for reuse
84 :return: uuid of the K8s cluster and True if connector has installed some
85 software in the cluster
86 (on error, an exception will be raised)
87 """
88
89 """Bootstrapping
90
91 Bootstrapping cannot be done, by design, through the API. We need to
92 use the CLI tools.
93 """
94
95 """
96 WIP: Workflow
97
98 1. Has the environment already been bootstrapped?
99 - Check the database to see if we have a record for this env
100
101 2. If this is a new env, create it
102 - Add the k8s cloud to Juju
103 - Bootstrap
104 - Record it in the database
105
106 3. Connect to the Juju controller for this cloud
107
108 """
109 # cluster_uuid = reuse_cluster_uuid
110 # if not cluster_uuid:
111 # cluster_uuid = str(uuid4())
112
113 ##################################################
114 # TODO: Pull info from db based on the namespace #
115 ##################################################
116
117 ###################################################
118 # TODO: Make it idempotent, calling add-k8s and #
119 # bootstrap whenever reuse_cluster_uuid is passed #
120 # as parameter #
121 # `init_env` is called to initialize the K8s #
122 # cluster for juju. If this initialization fails, #
123 # it can be called again by LCM with the param #
124 # reuse_cluster_uuid, e.g. to try to fix it. #
125 ###################################################
126
127 if not reuse_cluster_uuid:
128 # This is a new cluster, so bootstrap it
129
130 cluster_uuid = str(uuid.uuid4())
131
132 # Is a local k8s cluster?
133 localk8s = self.is_local_k8s(k8s_creds)
134
135 # If the k8s is external, the juju controller needs a loadbalancer
136 loadbalancer = False if localk8s else True
137
138 # Name the new k8s cloud
139 k8s_cloud = "k8s-{}".format(cluster_uuid)
140
141 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
142 await self.add_k8s(k8s_cloud, k8s_creds)
143
144 # Bootstrap Juju controller
145 self.log.debug("Bootstrapping...")
146 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
147 self.log.debug("Bootstrap done.")
148
149 # Get the controller information
150
151 # Parse ~/.local/share/juju/controllers.yaml
152 # controllers.testing.api-endpoints|ca-cert|uuid
153 self.log.debug("Getting controller endpoints")
154 with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
155 controllers = yaml.load(f, Loader=yaml.Loader)
156 controller = controllers["controllers"][cluster_uuid]
157 endpoints = controller["api-endpoints"]
158 self.juju_endpoint = endpoints[0]
159 self.juju_ca_cert = controller["ca-cert"]
160
161 # Parse ~/.local/share/juju/accounts
162 # controllers.testing.user|password
163 self.log.debug("Getting accounts")
164 with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
165 controllers = yaml.load(f, Loader=yaml.Loader)
166 controller = controllers["controllers"][cluster_uuid]
167
168 self.juju_user = controller["user"]
169 self.juju_secret = controller["password"]
170
171 # raise Exception("EOL")
172
173 self.juju_public_key = None
174
175 config = {
176 "endpoint": self.juju_endpoint,
177 "username": self.juju_user,
178 "secret": self.juju_secret,
179 "cacert": self.juju_ca_cert,
180 "namespace": namespace,
181 "loadbalancer": loadbalancer,
182 }
183
184 # Store the cluster configuration so it
185 # can be used for subsequent calls
186 self.log.debug("Setting config")
187 await self.set_config(cluster_uuid, config)
188
189 else:
190 # This is an existing cluster, so get its config
191 cluster_uuid = reuse_cluster_uuid
192
193 config = self.get_config(cluster_uuid)
194
195 self.juju_endpoint = config["endpoint"]
196 self.juju_user = config["username"]
197 self.juju_secret = config["secret"]
198 self.juju_ca_cert = config["cacert"]
199 self.juju_public_key = None
200
201 # Login to the k8s cluster
202 if not self.authenticated:
203 await self.login(cluster_uuid)
204
205 # We're creating a new cluster
206 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
207 # cluster_uuid=cluster_uuid))
208 # model = await self.get_model(
209 # self.get_namespace(cluster_uuid),
210 # cluster_uuid=cluster_uuid
211 # )
212
213 # Disconnect from the model
214 # if model and model.is_connected():
215 # await model.disconnect()
216
217 return cluster_uuid, True
218
219 """Repo Management"""
220
221 async def repo_add(
222 self, name: str, url: str, _type: str = "charm",
223 ):
224 raise MethodNotImplemented()
225
226 async def repo_list(self):
227 raise MethodNotImplemented()
228
229 async def repo_remove(
230 self, name: str,
231 ):
232 raise MethodNotImplemented()
233
234 async def synchronize_repos(self, cluster_uuid: str, name: str):
235 """
236 Returns None as currently add_repo is not implemented
237 """
238 return None
239
240 """Reset"""
241
242 async def reset(
243 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
244 ) -> bool:
245 """Reset a cluster
246
247 Resets the Kubernetes cluster by removing the model that represents it.
248
249 :param cluster_uuid str: The UUID of the cluster to reset
250 :return: Returns True if successful or raises an exception.
251 """
252
253 try:
254 if not self.authenticated:
255 await self.login(cluster_uuid)
256
257 if self.controller.is_connected():
258 # Destroy the model
259 namespace = self.get_namespace(cluster_uuid)
260 if await self.has_model(namespace):
261 self.log.debug("[reset] Destroying model")
262 await self.controller.destroy_model(namespace, destroy_storage=True)
263
264 # Disconnect from the controller
265 self.log.debug("[reset] Disconnecting controller")
266 await self.logout()
267
268 # Destroy the controller (via CLI)
269 self.log.debug("[reset] Destroying controller")
270 await self.destroy_controller(cluster_uuid)
271
272 self.log.debug("[reset] Removing k8s cloud")
273 k8s_cloud = "k8s-{}".format(cluster_uuid)
274 await self.remove_cloud(k8s_cloud)
275
276 except Exception as ex:
277 self.log.debug("Caught exception during reset: {}".format(ex))
278
279 return True
280
281 """Deployment"""
282
283 async def install(
284 self,
285 cluster_uuid: str,
286 kdu_model: str,
287 atomic: bool = True,
288 timeout: float = 300,
289 params: dict = None,
290 db_dict: dict = None,
291 kdu_name: str = None,
292 namespace: str = None,
293 ) -> bool:
294 """Install a bundle
295
296 :param cluster_uuid str: The UUID of the cluster to install to
297 :param kdu_model str: The name or path of a bundle to install
298 :param atomic bool: If set, waits until the model is active and resets
299 the cluster on failure.
300 :param timeout int: The time, in seconds, to wait for the install
301 to finish
302 :param params dict: Key-value pairs of instantiation parameters
303 :param kdu_name: Name of the KDU instance to be installed
304 :param namespace: K8s namespace to use for the KDU instance
305
306 :return: If successful, returns ?
307 """
308
309 if not self.authenticated:
310 self.log.debug("[install] Logging in to the controller")
311 await self.login(cluster_uuid)
312
313 ##
314 # Get or create the model, based on the NS
315 # uuid.
316 if kdu_name:
317 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
318 else:
319 kdu_instance = db_dict["filter"]["_id"]
320
321 self.log.debug("Checking for model named {}".format(kdu_instance))
322
323 # Create the new model
324 self.log.debug("Adding model: {}".format(kdu_instance))
325 model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
326
327 if model:
328 # TODO: Instantiation parameters
329
330 """
331 "Juju bundle that models the KDU, in any of the following ways:
332 - <juju-repo>/<juju-bundle>
333 - <juju-bundle folder under k8s_models folder in the package>
334 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
335 in the package>
336 - <URL_where_to_fetch_juju_bundle>
337 """
338
339 previous_workdir = os.getcwd()
340
341 bundle = kdu_model
342 if kdu_model.startswith("cs:"):
343 bundle = kdu_model
344 elif kdu_model.startswith("http"):
345 # Download the file
346 pass
347 else:
348 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
349
350 os.chdir(new_workdir)
351
352 bundle = "local:{}".format(kdu_model)
353
354 if not bundle:
355 # Raise named exception that the bundle could not be found
356 raise Exception()
357
358 self.log.debug("[install] deploying {}".format(bundle))
359 await model.deploy(bundle)
360
361 # Get the application
362 if atomic:
363 # applications = model.applications
364 self.log.debug("[install] Applications: {}".format(model.applications))
365 for name in model.applications:
366 self.log.debug("[install] Waiting for {} to settle".format(name))
367 application = model.applications[name]
368 try:
369 # It's not enough to wait for all units to be active;
370 # the application status needs to be active as well.
371 self.log.debug("Waiting for all units to be active...")
372 await model.block_until(
373 lambda: all(
374 unit.agent_status == "idle"
375 and application.status in ["active", "unknown"]
376 and unit.workload_status in ["active", "unknown"]
377 for unit in application.units
378 ),
379 timeout=timeout,
380 )
381 self.log.debug("All units active.")
382
383 # TODO use asyncio.TimeoutError
384 except concurrent.futures._base.TimeoutError:
385 os.chdir(previous_workdir)
386 self.log.debug("[install] Timeout exceeded; resetting cluster")
387 await self.reset(cluster_uuid)
388 return False
389
390 # Wait for the application to be active
391 if model.is_connected():
392 self.log.debug("[install] Disconnecting model")
393 await model.disconnect()
394
395 os.chdir(previous_workdir)
396
397 return kdu_instance
398 raise Exception("Unable to install")
399
400 async def instances_list(self, cluster_uuid: str) -> list:
401 """
402 returns a list of deployed releases in a cluster
403
404 :param cluster_uuid: the cluster
405 :return:
406 """
407 return []
408
409 async def upgrade(
410 self,
411 cluster_uuid: str,
412 kdu_instance: str,
413 kdu_model: str = None,
414 params: dict = None,
415 ) -> str:
416 """Upgrade a model
417
418 :param cluster_uuid str: The UUID of the cluster to upgrade
419 :param kdu_instance str: The unique name of the KDU instance
420 :param kdu_model str: The name or path of the bundle to upgrade to
421 :param params dict: Key-value pairs of instantiation parameters
422
423 :return: If successful, reference to the new revision number of the
424 KDU instance.
425 """
426
427 # TODO: Loop through the bundle and upgrade each charm individually
428
429 """
430 The API doesn't have a concept of bundle upgrades, because there are
431 many possible changes: charm revision, disk, number of units, etc.
432
433 As such, we are only supporting a limited subset of upgrades. We'll
434 upgrade the charm revision but leave storage and scale untouched.
435
436 Scale changes should happen through OSM constructs, and changes to
437 storage would require a redeployment of the service, at least in this
438 initial release.
439 """
440 namespace = self.get_namespace(cluster_uuid)
441 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
442
443 with open(kdu_model, "r") as f:
444 bundle = yaml.safe_load(f)
445
446 """
447 {
448 'description': 'Test bundle',
449 'bundle': 'kubernetes',
450 'applications': {
451 'mariadb-k8s': {
452 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
453 'scale': 1,
454 'options': {
455 'password': 'manopw',
456 'root_password': 'osm4u',
457 'user': 'mano'
458 },
459 'series': 'kubernetes'
460 }
461 }
462 }
463 """
464 # TODO: This should be returned in an agreed-upon format
465 for name in bundle["applications"]:
466 self.log.debug(model.applications)
467 application = model.applications[name]
468 self.log.debug(application)
469
470 path = bundle["applications"][name]["charm"]
471
472 try:
473 await application.upgrade_charm(switch=path)
474 except juju.errors.JujuError as ex:
475 if "already running charm" in str(ex):
476 # We're already running this version
477 pass
478
479 await model.disconnect()
480
481 return True
482 raise MethodNotImplemented()
483
484 """Rollback"""
485
486 async def rollback(
487 self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
488 ) -> str:
489 """Rollback a model
490
491 :param cluster_uuid str: The UUID of the cluster to rollback
492 :param kdu_instance str: The unique name of the KDU instance
493 :param revision int: The revision to revert to. If omitted, rolls back
494 the previous upgrade.
495
496 :return: If successful, returns the revision of active KDU instance,
497 or raises an exception
498 """
499 raise MethodNotImplemented()
500
501 """Deletion"""
502
503 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
504 """Uninstall a KDU instance
505
506 :param cluster_uuid str: The UUID of the cluster
507 :param kdu_instance str: The unique name of the KDU instance
508
509 :return: Returns True if successful, or raises an exception
510 """
511 if not self.authenticated:
512 self.log.debug("[uninstall] Connecting to controller")
513 await self.login(cluster_uuid)
514
515 self.log.debug("[uninstall] Destroying model")
516
517 await self.controller.destroy_models(kdu_instance)
518
519 self.log.debug("[uninstall] Model destroyed and disconnecting")
520 await self.logout()
521
522 return True
523
524 async def exec_primitive(
525 self,
526 cluster_uuid: str = None,
527 kdu_instance: str = None,
528 primitive_name: str = None,
529 timeout: float = 300,
530 params: dict = None,
531 db_dict: dict = None,
532 ) -> str:
533 """Exec primitive (Juju action)
534
535 :param cluster_uuid str: The UUID of the cluster
536 :param kdu_instance str: The unique name of the KDU instance
537 :param primitive_name: Name of action that will be executed
538 :param timeout: Timeout for action execution
539 :param params: Dictionary of all the parameters needed for the action
540 :db_dict: Dictionary for any additional data
541
542 :return: Returns the output of the action
543 """
544 if not self.authenticated:
545 self.log.debug("[exec_primitive] Connecting to controller")
546 await self.login(cluster_uuid)
547
548 if not params or "application-name" not in params:
549 raise K8sException(
550 "Missing application-name argument, \
551 argument needed for K8s actions"
552 )
553 try:
554 self.log.debug(
555 "[exec_primitive] Getting model "
556 "kdu_instance: {}".format(kdu_instance)
557 )
558
559 model = await self.get_model(kdu_instance, cluster_uuid)
560
561 application_name = params["application-name"]
562 application = model.applications[application_name]
563
564 actions = await application.get_actions()
565 if primitive_name not in actions:
566 raise K8sException("Primitive {} not found".format(primitive_name))
567
568 unit = None
569 for u in application.units:
570 if await u.is_leader_from_status():
571 unit = u
572 break
573
574 if unit is None:
575 raise K8sException("No leader unit found to execute action")
576
577 self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
578 action = await unit.run_action(primitive_name, **params)
579
580 output = await model.get_action_output(action_uuid=action.entity_id)
581 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
582
583 status = (
584 status[action.entity_id] if action.entity_id in status else "failed"
585 )
586
587 if status != "completed":
588 raise K8sException(
589 "status is not completed: {} output: {}".format(status, output)
590 )
591
592 return output
593
594 except Exception as e:
595 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
596 self.log.error(error_msg)
597 raise K8sException(message=error_msg)
598
599 """Introspection"""
600
601 async def inspect_kdu(self, kdu_model: str,) -> dict:
602 """Inspect a KDU
603
604 Inspects a bundle and returns a dictionary of config parameters and
605 their default values.
606
607 :param kdu_model str: The name or path of the bundle to inspect.
608
609 :return: If successful, returns a dictionary of available parameters
610 and their default values.
611 """
612
613 kdu = {}
614 with open(kdu_model, "r") as f:
615 bundle = yaml.safe_load(f)
616
617 """
618 {
619 'description': 'Test bundle',
620 'bundle': 'kubernetes',
621 'applications': {
622 'mariadb-k8s': {
623 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
624 'scale': 1,
625 'options': {
626 'password': 'manopw',
627 'root_password': 'osm4u',
628 'user': 'mano'
629 },
630 'series': 'kubernetes'
631 }
632 }
633 }
634 """
635 # TODO: This should be returned in an agreed-upon format
636 kdu = bundle["applications"]
637
638 return kdu
639
640 async def help_kdu(self, kdu_model: str,) -> str:
641 """View the README
642
643 If available, returns the README of the bundle.
644
645 :param kdu_model str: The name or path of a bundle
646
647 :return: If found, returns the contents of the README.
648 """
649 readme = None
650
651 files = ["README", "README.txt", "README.md"]
652 path = os.path.dirname(kdu_model)
653 for file in os.listdir(path):
654 if file in files:
655 with open(file, "r") as f:
656 readme = f.read()
657 break
658
659 return readme
660
661 async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
662 """Get the status of the KDU
663
664 Get the current status of the KDU instance.
665
666 :param cluster_uuid str: The UUID of the cluster
667 :param kdu_instance str: The unique id of the KDU instance
668
669 :return: Returns a dictionary containing namespace, state, resources,
670 and deployment_time.
671 """
672 status = {}
673
674 model = await self.get_model(
675 self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid
676 )
677
678 # model = await self.get_model_by_uuid(cluster_uuid)
679 if model:
680 model_status = await model.get_status()
681 status = model_status.applications
682
683 for name in model_status.applications:
684 application = model_status.applications[name]
685 status[name] = {"status": application["status"]["status"]}
686
687 if model.is_connected():
688 await model.disconnect()
689
690 return status
691
692 # Private methods
693 async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
694 """Add a k8s cloud to Juju
695
696 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
697 Juju Controller.
698
699 :param cloud_name str: The name of the cloud to add.
700 :param credentials dict: A dictionary representing the output of
701 `kubectl config view --raw`.
702
703 :returns: True if successful, otherwise raises an exception.
704 """
705
706 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
707 self.log.debug(cmd)
708
709 process = await asyncio.create_subprocess_exec(
710 *cmd,
711 stdout=asyncio.subprocess.PIPE,
712 stderr=asyncio.subprocess.PIPE,
713 stdin=asyncio.subprocess.PIPE,
714 )
715
716 # Feed the process the credentials
717 process.stdin.write(credentials.encode("utf-8"))
718 await process.stdin.drain()
719 process.stdin.close()
720
721 _stdout, stderr = await process.communicate()
722
723 return_code = process.returncode
724
725 self.log.debug("add-k8s return code: {}".format(return_code))
726
727 if return_code > 0:
728 raise Exception(stderr)
729
730 return True
731
732 async def add_model(self, model_name: str, cluster_uuid: str,) -> Model:
733 """Adds a model to the controller
734
735 Adds a new model to the Juju controller
736
737 :param model_name str: The name of the model to add.
738 :returns: The juju.model.Model object of the new model upon success or
739 raises an exception.
740 """
741 if not self.authenticated:
742 await self.login(cluster_uuid)
743
744 self.log.debug(
745 "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
746 )
747 try:
748 if self.juju_public_key is not None:
749 model = await self.controller.add_model(
750 model_name, config={"authorized-keys": self.juju_public_key}
751 )
752 else:
753 model = await self.controller.add_model(model_name)
754 except Exception as ex:
755 self.log.debug(ex)
756 self.log.debug("Caught exception: {}".format(ex))
757 pass
758
759 return model
760
761 async def bootstrap(
762 self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
763 ) -> bool:
764 """Bootstrap a Kubernetes controller
765
766 Bootstrap a Juju controller inside the Kubernetes cluster
767
768 :param cloud_name str: The name of the cloud.
769 :param cluster_uuid str: The UUID of the cluster to bootstrap.
770 :param loadbalancer bool: If the controller should use loadbalancer or not.
771 :returns: True upon success or raises an exception.
772 """
773
774 if not loadbalancer:
775 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
776 else:
777 """
778 For public clusters, specify that the controller service is using a
779 LoadBalancer.
780 """
781 cmd = [
782 self.juju_command,
783 "bootstrap",
784 cloud_name,
785 cluster_uuid,
786 "--config",
787 "controller-service-type=loadbalancer",
788 ]
789
790 self.log.debug(
791 "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
792 )
793
794 process = await asyncio.create_subprocess_exec(
795 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
796 )
797
798 _stdout, stderr = await process.communicate()
799
800 return_code = process.returncode
801
802 if return_code > 0:
803 #
804 if b"already exists" not in stderr:
805 raise Exception(stderr)
806
807 return True
808
809 async def destroy_controller(self, cluster_uuid: str) -> bool:
810 """Destroy a Kubernetes controller
811
812 Destroy an existing Kubernetes controller.
813
814 :param cluster_uuid str: The UUID of the cluster to bootstrap.
815 :returns: True upon success or raises an exception.
816 """
817 cmd = [
818 self.juju_command,
819 "destroy-controller",
820 "--destroy-all-models",
821 "--destroy-storage",
822 "-y",
823 cluster_uuid,
824 ]
825
826 process = await asyncio.create_subprocess_exec(
827 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
828 )
829
830 _stdout, stderr = await process.communicate()
831
832 return_code = process.returncode
833
834 if return_code > 0:
835 #
836 if "already exists" not in stderr:
837 raise Exception(stderr)
838
839 def get_config(self, cluster_uuid: str,) -> dict:
840 """Get the cluster configuration
841
842 Gets the configuration of the cluster
843
844 :param cluster_uuid str: The UUID of the cluster.
845 :return: A dict upon success, or raises an exception.
846 """
847 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
848 if os.path.exists(cluster_config):
849 with open(cluster_config, "r") as f:
850 config = yaml.safe_load(f.read())
851 return config
852 else:
853 raise Exception(
854 "Unable to locate configuration for cluster {}".format(cluster_uuid)
855 )
856
857 async def get_model(self, model_name: str, cluster_uuid: str,) -> Model:
858 """Get a model from the Juju Controller.
859
860 Note: Model objects returned must call disconnected() before it goes
861 out of scope.
862
863 :param model_name str: The name of the model to get
864 :return The juju.model.Model object if found, or None.
865 """
866 if not self.authenticated:
867 await self.login(cluster_uuid)
868
869 model = None
870 models = await self.controller.list_models()
871 if model_name in models:
872 self.log.debug("Found model: {}".format(model_name))
873 model = await self.controller.get_model(model_name)
874 return model
875
876 def get_namespace(self, cluster_uuid: str,) -> str:
877 """Get the namespace UUID
878 Gets the namespace's unique name
879
880 :param cluster_uuid str: The UUID of the cluster
881 :returns: The namespace UUID, or raises an exception
882 """
883 config = self.get_config(cluster_uuid)
884
885 # Make sure the name is in the config
886 if "namespace" not in config:
887 raise Exception("Namespace not found.")
888
889 # TODO: We want to make sure this is unique to the cluster, in case
890 # the cluster is being reused.
891 # Consider pre/appending the cluster id to the namespace string
892 return config["namespace"]
893
894 async def has_model(self, model_name: str) -> bool:
895 """Check if a model exists in the controller
896
897 Checks to see if a model exists in the connected Juju controller.
898
899 :param model_name str: The name of the model
900 :return: A boolean indicating if the model exists
901 """
902 models = await self.controller.list_models()
903
904 if model_name in models:
905 return True
906 return False
907
908 def is_local_k8s(self, credentials: str,) -> bool:
909 """Check if a cluster is local
910
911 Checks if a cluster is running in the local host
912
913 :param credentials dict: A dictionary containing the k8s credentials
914 :returns: A boolean if the cluster is running locally
915 """
916 creds = yaml.safe_load(credentials)
917 if os.getenv("OSMLCM_VCA_APIPROXY"):
918 host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
919
920 if creds and host_ip:
921 for cluster in creds["clusters"]:
922 if "server" in cluster["cluster"]:
923 if host_ip in cluster["cluster"]["server"]:
924 return True
925
926 return False
927
928 async def login(self, cluster_uuid):
929 """Login to the Juju controller."""
930
931 if self.authenticated:
932 return
933
934 self.connecting = True
935
936 # Test: Make sure we have the credentials loaded
937 config = self.get_config(cluster_uuid)
938
939 self.juju_endpoint = config["endpoint"]
940 self.juju_user = config["username"]
941 self.juju_secret = config["secret"]
942 self.juju_ca_cert = config["cacert"]
943 self.juju_public_key = None
944
945 self.controller = Controller()
946
947 if self.juju_secret:
948 self.log.debug(
949 "Connecting to controller... ws://{} as {}/{}".format(
950 self.juju_endpoint, self.juju_user, self.juju_secret,
951 )
952 )
953 try:
954 await self.controller.connect(
955 endpoint=self.juju_endpoint,
956 username=self.juju_user,
957 password=self.juju_secret,
958 cacert=self.juju_ca_cert,
959 )
960 self.authenticated = True
961 self.log.debug("JujuApi: Logged into controller")
962 except Exception as ex:
963 self.log.debug(ex)
964 self.log.debug("Caught exception: {}".format(ex))
965 pass
966 else:
967 self.log.fatal("VCA credentials not configured.")
968 self.authenticated = False
969
970 async def logout(self):
971 """Logout of the Juju controller."""
972 self.log.debug("[logout]")
973 if not self.authenticated:
974 return False
975
976 for model in self.models:
977 self.log.debug("Logging out of model {}".format(model))
978 await self.models[model].disconnect()
979
980 if self.controller:
981 self.log.debug("Disconnecting controller {}".format(self.controller))
982 await self.controller.disconnect()
983 self.controller = None
984
985 self.authenticated = False
986
987 async def remove_cloud(self, cloud_name: str,) -> bool:
988 """Remove a k8s cloud from Juju
989
990 Removes a Kubernetes cloud from Juju.
991
992 :param cloud_name str: The name of the cloud to add.
993
994 :returns: True if successful, otherwise raises an exception.
995 """
996
997 # Remove the bootstrapped controller
998 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
999 process = await asyncio.create_subprocess_exec(
1000 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1001 )
1002
1003 _stdout, stderr = await process.communicate()
1004
1005 return_code = process.returncode
1006
1007 if return_code > 0:
1008 raise Exception(stderr)
1009
1010 # Remove the cloud from the local config
1011 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1012 process = await asyncio.create_subprocess_exec(
1013 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1014 )
1015
1016 _stdout, stderr = await process.communicate()
1017
1018 return_code = process.returncode
1019
1020 if return_code > 0:
1021 raise Exception(stderr)
1022
1023 return True
1024
1025 async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1026 """Save the cluster configuration
1027
1028 Saves the cluster information to the file store
1029
1030 :param cluster_uuid str: The UUID of the cluster
1031 :param config dict: A dictionary containing the cluster configuration
1032 :returns: Boolean upon success or raises an exception.
1033 """
1034
1035 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1036 if not os.path.exists(cluster_config):
1037 self.log.debug("Writing config to {}".format(cluster_config))
1038 with open(cluster_config, "w") as f:
1039 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1040
1041 return True