Implement get_service and get_services methods for K8sJujuConnector
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 import os
18 import uuid
19 import yaml
20
21 import juju
22 from juju.controller import Controller
23 from juju.model import Model
24 from n2vc.exceptions import K8sException
25 from n2vc.k8s_conn import K8sConnector
26 from n2vc.kubectl import Kubectl
27 from .exceptions import MethodNotImplemented
28
29
30 # from juju.bundle import BundleHandler
31 # import re
32 # import ssl
33 # from .vnf import N2VC
34 class K8sJujuConnector(K8sConnector):
35 def __init__(
36 self,
37 fs: object,
38 db: object,
39 kubectl_command: str = "/usr/bin/kubectl",
40 juju_command: str = "/usr/bin/juju",
41 log: object = None,
42 on_update_db=None,
43 ):
44 """
45
46 :param kubectl_command: path to kubectl executable
47 :param helm_command: path to helm executable
48 :param fs: file system for kubernetes and helm configuration
49 :param log: logger
50 """
51
52 # parent class
53 K8sConnector.__init__(
54 self, db, log=log, on_update_db=on_update_db,
55 )
56
57 self.fs = fs
58 self.log.debug("Initializing K8S Juju connector")
59
60 self.authenticated = False
61 self.models = {}
62
63 self.juju_command = juju_command
64 self.juju_secret = ""
65
66 self.log.debug("K8S Juju connector initialized")
67
68 """Initialization"""
69
70 async def init_env(
71 self,
72 k8s_creds: str,
73 namespace: str = "kube-system",
74 reuse_cluster_uuid: str = None,
75 ) -> (str, bool):
76 """
77 It prepares a given K8s cluster environment to run Juju bundles.
78
79 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
80 '.kube/config'
81 :param namespace: optional namespace to be used for juju. By default,
82 'kube-system' will be used
83 :param reuse_cluster_uuid: existing cluster uuid for reuse
84 :return: uuid of the K8s cluster and True if connector has installed some
85 software in the cluster
86 (on error, an exception will be raised)
87 """
88
89 """Bootstrapping
90
91 Bootstrapping cannot be done, by design, through the API. We need to
92 use the CLI tools.
93 """
94
95 """
96 WIP: Workflow
97
98 1. Has the environment already been bootstrapped?
99 - Check the database to see if we have a record for this env
100
101 2. If this is a new env, create it
102 - Add the k8s cloud to Juju
103 - Bootstrap
104 - Record it in the database
105
106 3. Connect to the Juju controller for this cloud
107
108 """
109 # cluster_uuid = reuse_cluster_uuid
110 # if not cluster_uuid:
111 # cluster_uuid = str(uuid4())
112
113 ##################################################
114 # TODO: Pull info from db based on the namespace #
115 ##################################################
116
117 ###################################################
118 # TODO: Make it idempotent, calling add-k8s and #
119 # bootstrap whenever reuse_cluster_uuid is passed #
120 # as parameter #
121 # `init_env` is called to initialize the K8s #
122 # cluster for juju. If this initialization fails, #
123 # it can be called again by LCM with the param #
124 # reuse_cluster_uuid, e.g. to try to fix it. #
125 ###################################################
126
127 if not reuse_cluster_uuid:
128 # This is a new cluster, so bootstrap it
129
130 cluster_uuid = str(uuid.uuid4())
131
132 # Is a local k8s cluster?
133 localk8s = self.is_local_k8s(k8s_creds)
134
135 # If the k8s is external, the juju controller needs a loadbalancer
136 loadbalancer = False if localk8s else True
137
138 # Name the new k8s cloud
139 k8s_cloud = "k8s-{}".format(cluster_uuid)
140
141 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
142 await self.add_k8s(k8s_cloud, k8s_creds)
143
144 # Bootstrap Juju controller
145 self.log.debug("Bootstrapping...")
146 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
147 self.log.debug("Bootstrap done.")
148
149 # Get the controller information
150
151 # Parse ~/.local/share/juju/controllers.yaml
152 # controllers.testing.api-endpoints|ca-cert|uuid
153 self.log.debug("Getting controller endpoints")
154 with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
155 controllers = yaml.load(f, Loader=yaml.Loader)
156 controller = controllers["controllers"][cluster_uuid]
157 endpoints = controller["api-endpoints"]
158 self.juju_endpoint = endpoints[0]
159 self.juju_ca_cert = controller["ca-cert"]
160
161 # Parse ~/.local/share/juju/accounts
162 # controllers.testing.user|password
163 self.log.debug("Getting accounts")
164 with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
165 controllers = yaml.load(f, Loader=yaml.Loader)
166 controller = controllers["controllers"][cluster_uuid]
167
168 self.juju_user = controller["user"]
169 self.juju_secret = controller["password"]
170
171 # raise Exception("EOL")
172
173 self.juju_public_key = None
174
175 config = {
176 "endpoint": self.juju_endpoint,
177 "username": self.juju_user,
178 "secret": self.juju_secret,
179 "cacert": self.juju_ca_cert,
180 "namespace": namespace,
181 "loadbalancer": loadbalancer,
182 }
183
184 # Store the cluster configuration so it
185 # can be used for subsequent calls
186 self.log.debug("Setting config")
187 await self.set_config(cluster_uuid, config)
188
189 else:
190 # This is an existing cluster, so get its config
191 cluster_uuid = reuse_cluster_uuid
192
193 config = self.get_config(cluster_uuid)
194
195 self.juju_endpoint = config["endpoint"]
196 self.juju_user = config["username"]
197 self.juju_secret = config["secret"]
198 self.juju_ca_cert = config["cacert"]
199 self.juju_public_key = None
200
201 # Login to the k8s cluster
202 if not self.authenticated:
203 await self.login(cluster_uuid)
204
205 # We're creating a new cluster
206 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
207 # cluster_uuid=cluster_uuid))
208 # model = await self.get_model(
209 # self.get_namespace(cluster_uuid),
210 # cluster_uuid=cluster_uuid
211 # )
212
213 # Disconnect from the model
214 # if model and model.is_connected():
215 # await model.disconnect()
216
217 return cluster_uuid, True
218
219 """Repo Management"""
220
221 async def repo_add(
222 self, name: str, url: str, _type: str = "charm",
223 ):
224 raise MethodNotImplemented()
225
226 async def repo_list(self):
227 raise MethodNotImplemented()
228
229 async def repo_remove(
230 self, name: str,
231 ):
232 raise MethodNotImplemented()
233
234 async def synchronize_repos(self, cluster_uuid: str, name: str):
235 """
236 Returns None as currently add_repo is not implemented
237 """
238 return None
239
240 """Reset"""
241
242 async def reset(
243 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
244 ) -> bool:
245 """Reset a cluster
246
247 Resets the Kubernetes cluster by removing the model that represents it.
248
249 :param cluster_uuid str: The UUID of the cluster to reset
250 :return: Returns True if successful or raises an exception.
251 """
252
253 try:
254 if not self.authenticated:
255 await self.login(cluster_uuid)
256
257 if self.controller.is_connected():
258 # Destroy the model
259 namespace = self.get_namespace(cluster_uuid)
260 if await self.has_model(namespace):
261 self.log.debug("[reset] Destroying model")
262 await self.controller.destroy_model(namespace, destroy_storage=True)
263
264 # Disconnect from the controller
265 self.log.debug("[reset] Disconnecting controller")
266 await self.logout()
267
268 # Destroy the controller (via CLI)
269 self.log.debug("[reset] Destroying controller")
270 await self.destroy_controller(cluster_uuid)
271
272 self.log.debug("[reset] Removing k8s cloud")
273 k8s_cloud = "k8s-{}".format(cluster_uuid)
274 await self.remove_cloud(k8s_cloud)
275
276 except Exception as ex:
277 self.log.debug("Caught exception during reset: {}".format(ex))
278
279 return True
280
281 """Deployment"""
282
283 async def install(
284 self,
285 cluster_uuid: str,
286 kdu_model: str,
287 atomic: bool = True,
288 timeout: float = 300,
289 params: dict = None,
290 db_dict: dict = None,
291 kdu_name: str = None,
292 namespace: str = None,
293 ) -> bool:
294 """Install a bundle
295
296 :param cluster_uuid str: The UUID of the cluster to install to
297 :param kdu_model str: The name or path of a bundle to install
298 :param atomic bool: If set, waits until the model is active and resets
299 the cluster on failure.
300 :param timeout int: The time, in seconds, to wait for the install
301 to finish
302 :param params dict: Key-value pairs of instantiation parameters
303 :param kdu_name: Name of the KDU instance to be installed
304 :param namespace: K8s namespace to use for the KDU instance
305
306 :return: If successful, returns ?
307 """
308
309 if not self.authenticated:
310 self.log.debug("[install] Logging in to the controller")
311 await self.login(cluster_uuid)
312
313 ##
314 # Get or create the model, based on the NS
315 # uuid.
316 if kdu_name:
317 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
318 else:
319 kdu_instance = db_dict["filter"]["_id"]
320
321 self.log.debug("Checking for model named {}".format(kdu_instance))
322
323 # Create the new model
324 self.log.debug("Adding model: {}".format(kdu_instance))
325 model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
326
327 if model:
328 # TODO: Instantiation parameters
329
330 """
331 "Juju bundle that models the KDU, in any of the following ways:
332 - <juju-repo>/<juju-bundle>
333 - <juju-bundle folder under k8s_models folder in the package>
334 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
335 in the package>
336 - <URL_where_to_fetch_juju_bundle>
337 """
338 try:
339 previous_workdir = os.getcwd()
340 except FileNotFoundError:
341 previous_workdir = "/app/storage"
342
343 bundle = kdu_model
344 if kdu_model.startswith("cs:"):
345 bundle = kdu_model
346 elif kdu_model.startswith("http"):
347 # Download the file
348 pass
349 else:
350 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
351
352 os.chdir(new_workdir)
353
354 bundle = "local:{}".format(kdu_model)
355
356 if not bundle:
357 # Raise named exception that the bundle could not be found
358 raise Exception()
359
360 self.log.debug("[install] deploying {}".format(bundle))
361 await model.deploy(bundle)
362
363 # Get the application
364 if atomic:
365 # applications = model.applications
366 self.log.debug("[install] Applications: {}".format(model.applications))
367 for name in model.applications:
368 self.log.debug("[install] Waiting for {} to settle".format(name))
369 application = model.applications[name]
370 try:
371 # It's not enough to wait for all units to be active;
372 # the application status needs to be active as well.
373 self.log.debug("Waiting for all units to be active...")
374 await model.block_until(
375 lambda: all(
376 unit.agent_status == "idle"
377 and application.status in ["active", "unknown"]
378 and unit.workload_status in ["active", "unknown"]
379 for unit in application.units
380 ),
381 timeout=timeout,
382 )
383 self.log.debug("All units active.")
384
385 # TODO use asyncio.TimeoutError
386 except concurrent.futures._base.TimeoutError:
387 os.chdir(previous_workdir)
388 self.log.debug("[install] Timeout exceeded; resetting cluster")
389 await self.reset(cluster_uuid)
390 return False
391
392 # Wait for the application to be active
393 if model.is_connected():
394 self.log.debug("[install] Disconnecting model")
395 await model.disconnect()
396
397 os.chdir(previous_workdir)
398
399 return kdu_instance
400 raise Exception("Unable to install")
401
402 async def instances_list(self, cluster_uuid: str) -> list:
403 """
404 returns a list of deployed releases in a cluster
405
406 :param cluster_uuid: the cluster
407 :return:
408 """
409 return []
410
411 async def upgrade(
412 self,
413 cluster_uuid: str,
414 kdu_instance: str,
415 kdu_model: str = None,
416 params: dict = None,
417 ) -> str:
418 """Upgrade a model
419
420 :param cluster_uuid str: The UUID of the cluster to upgrade
421 :param kdu_instance str: The unique name of the KDU instance
422 :param kdu_model str: The name or path of the bundle to upgrade to
423 :param params dict: Key-value pairs of instantiation parameters
424
425 :return: If successful, reference to the new revision number of the
426 KDU instance.
427 """
428
429 # TODO: Loop through the bundle and upgrade each charm individually
430
431 """
432 The API doesn't have a concept of bundle upgrades, because there are
433 many possible changes: charm revision, disk, number of units, etc.
434
435 As such, we are only supporting a limited subset of upgrades. We'll
436 upgrade the charm revision but leave storage and scale untouched.
437
438 Scale changes should happen through OSM constructs, and changes to
439 storage would require a redeployment of the service, at least in this
440 initial release.
441 """
442 namespace = self.get_namespace(cluster_uuid)
443 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
444
445 with open(kdu_model, "r") as f:
446 bundle = yaml.safe_load(f)
447
448 """
449 {
450 'description': 'Test bundle',
451 'bundle': 'kubernetes',
452 'applications': {
453 'mariadb-k8s': {
454 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
455 'scale': 1,
456 'options': {
457 'password': 'manopw',
458 'root_password': 'osm4u',
459 'user': 'mano'
460 },
461 'series': 'kubernetes'
462 }
463 }
464 }
465 """
466 # TODO: This should be returned in an agreed-upon format
467 for name in bundle["applications"]:
468 self.log.debug(model.applications)
469 application = model.applications[name]
470 self.log.debug(application)
471
472 path = bundle["applications"][name]["charm"]
473
474 try:
475 await application.upgrade_charm(switch=path)
476 except juju.errors.JujuError as ex:
477 if "already running charm" in str(ex):
478 # We're already running this version
479 pass
480
481 await model.disconnect()
482
483 return True
484 raise MethodNotImplemented()
485
486 """Rollback"""
487
488 async def rollback(
489 self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
490 ) -> str:
491 """Rollback a model
492
493 :param cluster_uuid str: The UUID of the cluster to rollback
494 :param kdu_instance str: The unique name of the KDU instance
495 :param revision int: The revision to revert to. If omitted, rolls back
496 the previous upgrade.
497
498 :return: If successful, returns the revision of active KDU instance,
499 or raises an exception
500 """
501 raise MethodNotImplemented()
502
503 """Deletion"""
504
505 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
506 """Uninstall a KDU instance
507
508 :param cluster_uuid str: The UUID of the cluster
509 :param kdu_instance str: The unique name of the KDU instance
510
511 :return: Returns True if successful, or raises an exception
512 """
513 if not self.authenticated:
514 self.log.debug("[uninstall] Connecting to controller")
515 await self.login(cluster_uuid)
516
517 self.log.debug("[uninstall] Destroying model")
518
519 await self.controller.destroy_models(kdu_instance)
520
521 self.log.debug("[uninstall] Model destroyed and disconnecting")
522 await self.logout()
523
524 return True
525
526 async def exec_primitive(
527 self,
528 cluster_uuid: str = None,
529 kdu_instance: str = None,
530 primitive_name: str = None,
531 timeout: float = 300,
532 params: dict = None,
533 db_dict: dict = None,
534 ) -> str:
535 """Exec primitive (Juju action)
536
537 :param cluster_uuid str: The UUID of the cluster
538 :param kdu_instance str: The unique name of the KDU instance
539 :param primitive_name: Name of action that will be executed
540 :param timeout: Timeout for action execution
541 :param params: Dictionary of all the parameters needed for the action
542 :db_dict: Dictionary for any additional data
543
544 :return: Returns the output of the action
545 """
546 if not self.authenticated:
547 self.log.debug("[exec_primitive] Connecting to controller")
548 await self.login(cluster_uuid)
549
550 if not params or "application-name" not in params:
551 raise K8sException(
552 "Missing application-name argument, \
553 argument needed for K8s actions"
554 )
555 try:
556 self.log.debug(
557 "[exec_primitive] Getting model "
558 "kdu_instance: {}".format(kdu_instance)
559 )
560
561 model = await self.get_model(kdu_instance, cluster_uuid)
562
563 application_name = params["application-name"]
564 application = model.applications[application_name]
565
566 actions = await application.get_actions()
567 if primitive_name not in actions:
568 raise K8sException("Primitive {} not found".format(primitive_name))
569
570 unit = None
571 for u in application.units:
572 if await u.is_leader_from_status():
573 unit = u
574 break
575
576 if unit is None:
577 raise K8sException("No leader unit found to execute action")
578
579 self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
580 action = await unit.run_action(primitive_name, **params)
581
582 output = await model.get_action_output(action_uuid=action.entity_id)
583 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
584
585 status = (
586 status[action.entity_id] if action.entity_id in status else "failed"
587 )
588
589 if status != "completed":
590 raise K8sException(
591 "status is not completed: {} output: {}".format(status, output)
592 )
593
594 return output
595
596 except Exception as e:
597 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
598 self.log.error(error_msg)
599 raise K8sException(message=error_msg)
600
601 """Introspection"""
602
603 async def inspect_kdu(self, kdu_model: str,) -> dict:
604 """Inspect a KDU
605
606 Inspects a bundle and returns a dictionary of config parameters and
607 their default values.
608
609 :param kdu_model str: The name or path of the bundle to inspect.
610
611 :return: If successful, returns a dictionary of available parameters
612 and their default values.
613 """
614
615 kdu = {}
616 with open(kdu_model, "r") as f:
617 bundle = yaml.safe_load(f)
618
619 """
620 {
621 'description': 'Test bundle',
622 'bundle': 'kubernetes',
623 'applications': {
624 'mariadb-k8s': {
625 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
626 'scale': 1,
627 'options': {
628 'password': 'manopw',
629 'root_password': 'osm4u',
630 'user': 'mano'
631 },
632 'series': 'kubernetes'
633 }
634 }
635 }
636 """
637 # TODO: This should be returned in an agreed-upon format
638 kdu = bundle["applications"]
639
640 return kdu
641
642 async def help_kdu(self, kdu_model: str,) -> str:
643 """View the README
644
645 If available, returns the README of the bundle.
646
647 :param kdu_model str: The name or path of a bundle
648
649 :return: If found, returns the contents of the README.
650 """
651 readme = None
652
653 files = ["README", "README.txt", "README.md"]
654 path = os.path.dirname(kdu_model)
655 for file in os.listdir(path):
656 if file in files:
657 with open(file, "r") as f:
658 readme = f.read()
659 break
660
661 return readme
662
663 async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
664 """Get the status of the KDU
665
666 Get the current status of the KDU instance.
667
668 :param cluster_uuid str: The UUID of the cluster
669 :param kdu_instance str: The unique id of the KDU instance
670
671 :return: Returns a dictionary containing namespace, state, resources,
672 and deployment_time.
673 """
674 status = {}
675
676 model = await self.get_model(
677 self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid
678 )
679
680 # model = await self.get_model_by_uuid(cluster_uuid)
681 if model:
682 model_status = await model.get_status()
683 status = model_status.applications
684
685 for name in model_status.applications:
686 application = model_status.applications[name]
687 status[name] = {"status": application["status"]["status"]}
688
689 if model.is_connected():
690 await model.disconnect()
691
692 return status
693
694 async def get_services(
695 self, cluster_uuid: str, kdu_instance: str, namespace: str
696 ) -> list:
697 """Return a list of services of a kdu_instance"""
698
699 config_file = self.get_config_file(cluster_uuid=cluster_uuid)
700 kubectl = Kubectl(config_file=config_file)
701 return kubectl.get_services(
702 field_selector="metadata.namespace={}".format(kdu_instance)
703 )
704
705 async def get_service(
706 self, cluster_uuid: str, service_name: str, namespace: str
707 ) -> object:
708 """Return data for a specific service inside a namespace"""
709
710 config_file = self.get_config_file(cluster_uuid=cluster_uuid)
711 kubectl = Kubectl(config_file=config_file)
712
713 return kubectl.get_services(
714 field_selector="metadata.name={},metadata.namespace={}".format(
715 service_name, namespace
716 )
717 )[0]
718
719 # Private methods
720 async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
721 """Add a k8s cloud to Juju
722
723 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
724 Juju Controller.
725
726 :param cloud_name str: The name of the cloud to add.
727 :param credentials dict: A dictionary representing the output of
728 `kubectl config view --raw`.
729
730 :returns: True if successful, otherwise raises an exception.
731 """
732
733 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
734 self.log.debug(cmd)
735
736 process = await asyncio.create_subprocess_exec(
737 *cmd,
738 stdout=asyncio.subprocess.PIPE,
739 stderr=asyncio.subprocess.PIPE,
740 stdin=asyncio.subprocess.PIPE,
741 )
742
743 # Feed the process the credentials
744 process.stdin.write(credentials.encode("utf-8"))
745 await process.stdin.drain()
746 process.stdin.close()
747
748 _stdout, stderr = await process.communicate()
749
750 return_code = process.returncode
751
752 self.log.debug("add-k8s return code: {}".format(return_code))
753
754 if return_code > 0:
755 raise Exception(stderr)
756
757 return True
758
759 async def add_model(self, model_name: str, cluster_uuid: str,) -> Model:
760 """Adds a model to the controller
761
762 Adds a new model to the Juju controller
763
764 :param model_name str: The name of the model to add.
765 :returns: The juju.model.Model object of the new model upon success or
766 raises an exception.
767 """
768 if not self.authenticated:
769 await self.login(cluster_uuid)
770
771 self.log.debug(
772 "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
773 )
774 try:
775 if self.juju_public_key is not None:
776 model = await self.controller.add_model(
777 model_name, config={"authorized-keys": self.juju_public_key}
778 )
779 else:
780 model = await self.controller.add_model(model_name)
781 except Exception as ex:
782 self.log.debug(ex)
783 self.log.debug("Caught exception: {}".format(ex))
784 pass
785
786 return model
787
788 async def bootstrap(
789 self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
790 ) -> bool:
791 """Bootstrap a Kubernetes controller
792
793 Bootstrap a Juju controller inside the Kubernetes cluster
794
795 :param cloud_name str: The name of the cloud.
796 :param cluster_uuid str: The UUID of the cluster to bootstrap.
797 :param loadbalancer bool: If the controller should use loadbalancer or not.
798 :returns: True upon success or raises an exception.
799 """
800
801 if not loadbalancer:
802 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
803 else:
804 """
805 For public clusters, specify that the controller service is using a
806 LoadBalancer.
807 """
808 cmd = [
809 self.juju_command,
810 "bootstrap",
811 cloud_name,
812 cluster_uuid,
813 "--config",
814 "controller-service-type=loadbalancer",
815 ]
816
817 self.log.debug(
818 "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
819 )
820
821 process = await asyncio.create_subprocess_exec(
822 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
823 )
824
825 _stdout, stderr = await process.communicate()
826
827 return_code = process.returncode
828
829 if return_code > 0:
830 #
831 if b"already exists" not in stderr:
832 raise Exception(stderr)
833
834 return True
835
836 async def destroy_controller(self, cluster_uuid: str) -> bool:
837 """Destroy a Kubernetes controller
838
839 Destroy an existing Kubernetes controller.
840
841 :param cluster_uuid str: The UUID of the cluster to bootstrap.
842 :returns: True upon success or raises an exception.
843 """
844 cmd = [
845 self.juju_command,
846 "destroy-controller",
847 "--destroy-all-models",
848 "--destroy-storage",
849 "-y",
850 cluster_uuid,
851 ]
852
853 process = await asyncio.create_subprocess_exec(
854 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
855 )
856
857 _stdout, stderr = await process.communicate()
858
859 return_code = process.returncode
860
861 if return_code > 0:
862 #
863 if "already exists" not in stderr:
864 raise Exception(stderr)
865
866 def get_config_file(self, cluster_uuid: str) -> str:
867 """
868 Get Cluster Kubeconfig location
869 """
870 return "{}/{}/.kube/config".format(self.fs.path, cluster_uuid)
871
872 def get_config(self, cluster_uuid: str,) -> dict:
873 """Get the cluster configuration
874
875 Gets the configuration of the cluster
876
877 :param cluster_uuid str: The UUID of the cluster.
878 :return: A dict upon success, or raises an exception.
879 """
880 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
881 if os.path.exists(cluster_config):
882 with open(cluster_config, "r") as f:
883 config = yaml.safe_load(f.read())
884 return config
885 else:
886 raise Exception(
887 "Unable to locate configuration for cluster {}".format(cluster_uuid)
888 )
889
890 async def get_model(self, model_name: str, cluster_uuid: str,) -> Model:
891 """Get a model from the Juju Controller.
892
893 Note: Model objects returned must call disconnected() before it goes
894 out of scope.
895
896 :param model_name str: The name of the model to get
897 :return The juju.model.Model object if found, or None.
898 """
899 if not self.authenticated:
900 await self.login(cluster_uuid)
901
902 model = None
903 models = await self.controller.list_models()
904 if model_name in models:
905 self.log.debug("Found model: {}".format(model_name))
906 model = await self.controller.get_model(model_name)
907 return model
908
909 def get_namespace(self, cluster_uuid: str,) -> str:
910 """Get the namespace UUID
911 Gets the namespace's unique name
912
913 :param cluster_uuid str: The UUID of the cluster
914 :returns: The namespace UUID, or raises an exception
915 """
916 config = self.get_config(cluster_uuid)
917
918 # Make sure the name is in the config
919 if "namespace" not in config:
920 raise Exception("Namespace not found.")
921
922 # TODO: We want to make sure this is unique to the cluster, in case
923 # the cluster is being reused.
924 # Consider pre/appending the cluster id to the namespace string
925 return config["namespace"]
926
927 async def has_model(self, model_name: str) -> bool:
928 """Check if a model exists in the controller
929
930 Checks to see if a model exists in the connected Juju controller.
931
932 :param model_name str: The name of the model
933 :return: A boolean indicating if the model exists
934 """
935 models = await self.controller.list_models()
936
937 if model_name in models:
938 return True
939 return False
940
941 def is_local_k8s(self, credentials: str,) -> bool:
942 """Check if a cluster is local
943
944 Checks if a cluster is running in the local host
945
946 :param credentials dict: A dictionary containing the k8s credentials
947 :returns: A boolean if the cluster is running locally
948 """
949 creds = yaml.safe_load(credentials)
950 if os.getenv("OSMLCM_VCA_APIPROXY"):
951 host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
952
953 if creds and host_ip:
954 for cluster in creds["clusters"]:
955 if "server" in cluster["cluster"]:
956 if host_ip in cluster["cluster"]["server"]:
957 return True
958
959 return False
960
961 async def login(self, cluster_uuid):
962 """Login to the Juju controller."""
963
964 if self.authenticated:
965 return
966
967 self.connecting = True
968
969 # Test: Make sure we have the credentials loaded
970 config = self.get_config(cluster_uuid)
971
972 self.juju_endpoint = config["endpoint"]
973 self.juju_user = config["username"]
974 self.juju_secret = config["secret"]
975 self.juju_ca_cert = config["cacert"]
976 self.juju_public_key = None
977
978 self.controller = Controller()
979
980 if self.juju_secret:
981 self.log.debug(
982 "Connecting to controller... ws://{} as {}/{}".format(
983 self.juju_endpoint, self.juju_user, self.juju_secret,
984 )
985 )
986 try:
987 await self.controller.connect(
988 endpoint=self.juju_endpoint,
989 username=self.juju_user,
990 password=self.juju_secret,
991 cacert=self.juju_ca_cert,
992 )
993 self.authenticated = True
994 self.log.debug("JujuApi: Logged into controller")
995 except Exception as ex:
996 self.log.debug(ex)
997 self.log.debug("Caught exception: {}".format(ex))
998 pass
999 else:
1000 self.log.fatal("VCA credentials not configured.")
1001 self.authenticated = False
1002
1003 async def logout(self):
1004 """Logout of the Juju controller."""
1005 self.log.debug("[logout]")
1006 if not self.authenticated:
1007 return False
1008
1009 for model in self.models:
1010 self.log.debug("Logging out of model {}".format(model))
1011 await self.models[model].disconnect()
1012
1013 if self.controller:
1014 self.log.debug("Disconnecting controller {}".format(self.controller))
1015 await self.controller.disconnect()
1016 self.controller = None
1017
1018 self.authenticated = False
1019
1020 async def remove_cloud(self, cloud_name: str,) -> bool:
1021 """Remove a k8s cloud from Juju
1022
1023 Removes a Kubernetes cloud from Juju.
1024
1025 :param cloud_name str: The name of the cloud to add.
1026
1027 :returns: True if successful, otherwise raises an exception.
1028 """
1029
1030 # Remove the bootstrapped controller
1031 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1032 process = await asyncio.create_subprocess_exec(
1033 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1034 )
1035
1036 _stdout, stderr = await process.communicate()
1037
1038 return_code = process.returncode
1039
1040 if return_code > 0:
1041 raise Exception(stderr)
1042
1043 # Remove the cloud from the local config
1044 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1045 process = await asyncio.create_subprocess_exec(
1046 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1047 )
1048
1049 _stdout, stderr = await process.communicate()
1050
1051 return_code = process.returncode
1052
1053 if return_code > 0:
1054 raise Exception(stderr)
1055
1056 return True
1057
1058 async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1059 """Save the cluster configuration
1060
1061 Saves the cluster information to the file store
1062
1063 :param cluster_uuid str: The UUID of the cluster
1064 :param config dict: A dictionary containing the cluster configuration
1065 :returns: Boolean upon success or raises an exception.
1066 """
1067
1068 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1069 if not os.path.exists(cluster_config):
1070 self.log.debug("Writing config to {}".format(cluster_config))
1071 with open(cluster_config, "w") as f:
1072 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1073
1074 return True