1de44cd6a2e4a1a134d482645f25d7c66df1ea2b
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 import os
18 import uuid
19 import yaml
20
21 import juju
22 from juju.controller import Controller
23 from juju.model import Model
24 from n2vc.exceptions import K8sException
25 from n2vc.k8s_conn import K8sConnector
26 from n2vc.kubectl import Kubectl
27 from .exceptions import MethodNotImplemented
28
29
30 # from juju.bundle import BundleHandler
31 # import re
32 # import ssl
33 # from .vnf import N2VC
34 class K8sJujuConnector(K8sConnector):
35 def __init__(
36 self,
37 fs: object,
38 db: object,
39 kubectl_command: str = "/usr/bin/kubectl",
40 juju_command: str = "/usr/bin/juju",
41 log: object = None,
42 on_update_db=None,
43 ):
44 """
45
46 :param kubectl_command: path to kubectl executable
47 :param helm_command: path to helm executable
48 :param fs: file system for kubernetes and helm configuration
49 :param log: logger
50 """
51
52 # parent class
53 K8sConnector.__init__(
54 self, db, log=log, on_update_db=on_update_db,
55 )
56
57 self.fs = fs
58 self.log.debug("Initializing K8S Juju connector")
59
60 self.authenticated = False
61 self.models = {}
62
63 self.juju_command = juju_command
64 self.juju_secret = ""
65
66 self.log.debug("K8S Juju connector initialized")
67
68 """Initialization"""
69
70 async def init_env(
71 self,
72 k8s_creds: str,
73 namespace: str = "kube-system",
74 reuse_cluster_uuid: str = None,
75 ) -> (str, bool):
76 """
77 It prepares a given K8s cluster environment to run Juju bundles.
78
79 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
80 '.kube/config'
81 :param namespace: optional namespace to be used for juju. By default,
82 'kube-system' will be used
83 :param reuse_cluster_uuid: existing cluster uuid for reuse
84 :return: uuid of the K8s cluster and True if connector has installed some
85 software in the cluster
86 (on error, an exception will be raised)
87 """
88
89 """Bootstrapping
90
91 Bootstrapping cannot be done, by design, through the API. We need to
92 use the CLI tools.
93 """
94
95 """
96 WIP: Workflow
97
98 1. Has the environment already been bootstrapped?
99 - Check the database to see if we have a record for this env
100
101 2. If this is a new env, create it
102 - Add the k8s cloud to Juju
103 - Bootstrap
104 - Record it in the database
105
106 3. Connect to the Juju controller for this cloud
107
108 """
109 # cluster_uuid = reuse_cluster_uuid
110 # if not cluster_uuid:
111 # cluster_uuid = str(uuid4())
112
113 ##################################################
114 # TODO: Pull info from db based on the namespace #
115 ##################################################
116
117 ###################################################
118 # TODO: Make it idempotent, calling add-k8s and #
119 # bootstrap whenever reuse_cluster_uuid is passed #
120 # as parameter #
121 # `init_env` is called to initialize the K8s #
122 # cluster for juju. If this initialization fails, #
123 # it can be called again by LCM with the param #
124 # reuse_cluster_uuid, e.g. to try to fix it. #
125 ###################################################
126
127 # This is a new cluster, so bootstrap it
128
129 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
130
131 # Is a local k8s cluster?
132 localk8s = self.is_local_k8s(k8s_creds)
133
134 # If the k8s is external, the juju controller needs a loadbalancer
135 loadbalancer = False if localk8s else True
136
137 # Name the new k8s cloud
138 k8s_cloud = "k8s-{}".format(cluster_uuid)
139
140 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
141 await self.add_k8s(k8s_cloud, k8s_creds)
142
143 # Bootstrap Juju controller
144 self.log.debug("Bootstrapping...")
145 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
146 self.log.debug("Bootstrap done.")
147
148 # Get the controller information
149
150 # Parse ~/.local/share/juju/controllers.yaml
151 # controllers.testing.api-endpoints|ca-cert|uuid
152 self.log.debug("Getting controller endpoints")
153 with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
154 controllers = yaml.load(f, Loader=yaml.Loader)
155 controller = controllers["controllers"][cluster_uuid]
156 endpoints = controller["api-endpoints"]
157 self.juju_endpoint = endpoints[0]
158 self.juju_ca_cert = controller["ca-cert"]
159
160 # Parse ~/.local/share/juju/accounts
161 # controllers.testing.user|password
162 self.log.debug("Getting accounts")
163 with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
164 controllers = yaml.load(f, Loader=yaml.Loader)
165 controller = controllers["controllers"][cluster_uuid]
166
167 self.juju_user = controller["user"]
168 self.juju_secret = controller["password"]
169
170 # raise Exception("EOL")
171
172 self.juju_public_key = None
173
174 config = {
175 "endpoint": self.juju_endpoint,
176 "username": self.juju_user,
177 "secret": self.juju_secret,
178 "cacert": self.juju_ca_cert,
179 "namespace": namespace,
180 "loadbalancer": loadbalancer,
181 }
182
183 # Store the cluster configuration so it
184 # can be used for subsequent calls
185 self.log.debug("Setting config")
186 await self.set_config(cluster_uuid, config)
187
188 # Login to the k8s cluster
189 if not self.authenticated:
190 await self.login(cluster_uuid)
191
192 # We're creating a new cluster
193 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
194 # cluster_uuid=cluster_uuid))
195 # model = await self.get_model(
196 # self.get_namespace(cluster_uuid),
197 # cluster_uuid=cluster_uuid
198 # )
199
200 # Disconnect from the model
201 # if model and model.is_connected():
202 # await model.disconnect()
203
204 return cluster_uuid, True
205
206 """Repo Management"""
207
208 async def repo_add(
209 self, name: str, url: str, _type: str = "charm",
210 ):
211 raise MethodNotImplemented()
212
213 async def repo_list(self):
214 raise MethodNotImplemented()
215
216 async def repo_remove(
217 self, name: str,
218 ):
219 raise MethodNotImplemented()
220
221 async def synchronize_repos(self, cluster_uuid: str, name: str):
222 """
223 Returns None as currently add_repo is not implemented
224 """
225 return None
226
227 """Reset"""
228
229 async def reset(
230 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
231 ) -> bool:
232 """Reset a cluster
233
234 Resets the Kubernetes cluster by removing the model that represents it.
235
236 :param cluster_uuid str: The UUID of the cluster to reset
237 :return: Returns True if successful or raises an exception.
238 """
239
240 try:
241 if not self.authenticated:
242 await self.login(cluster_uuid)
243
244 if self.controller.is_connected():
245 # Destroy the model
246 namespace = self.get_namespace(cluster_uuid)
247 if await self.has_model(namespace):
248 self.log.debug("[reset] Destroying model")
249 await self.controller.destroy_model(namespace, destroy_storage=True)
250
251 # Disconnect from the controller
252 self.log.debug("[reset] Disconnecting controller")
253 await self.logout()
254
255 # Destroy the controller (via CLI)
256 self.log.debug("[reset] Destroying controller")
257 await self.destroy_controller(cluster_uuid)
258
259 self.log.debug("[reset] Removing k8s cloud")
260 k8s_cloud = "k8s-{}".format(cluster_uuid)
261 await self.remove_cloud(k8s_cloud)
262
263 except Exception as ex:
264 self.log.debug("Caught exception during reset: {}".format(ex))
265
266 return True
267
268 """Deployment"""
269
270 async def install(
271 self,
272 cluster_uuid: str,
273 kdu_model: str,
274 atomic: bool = True,
275 timeout: float = 300,
276 params: dict = None,
277 db_dict: dict = None,
278 kdu_name: str = None,
279 namespace: str = None,
280 ) -> bool:
281 """Install a bundle
282
283 :param cluster_uuid str: The UUID of the cluster to install to
284 :param kdu_model str: The name or path of a bundle to install
285 :param atomic bool: If set, waits until the model is active and resets
286 the cluster on failure.
287 :param timeout int: The time, in seconds, to wait for the install
288 to finish
289 :param params dict: Key-value pairs of instantiation parameters
290 :param kdu_name: Name of the KDU instance to be installed
291 :param namespace: K8s namespace to use for the KDU instance
292
293 :return: If successful, returns ?
294 """
295
296 if not self.authenticated:
297 self.log.debug("[install] Logging in to the controller")
298 await self.login(cluster_uuid)
299
300 ##
301 # Get or create the model, based on the NS
302 # uuid.
303 if kdu_name:
304 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
305 else:
306 kdu_instance = db_dict["filter"]["_id"]
307
308 self.log.debug("Checking for model named {}".format(kdu_instance))
309
310 # Create the new model
311 self.log.debug("Adding model: {}".format(kdu_instance))
312 model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
313
314 if model:
315 # TODO: Instantiation parameters
316
317 """
318 "Juju bundle that models the KDU, in any of the following ways:
319 - <juju-repo>/<juju-bundle>
320 - <juju-bundle folder under k8s_models folder in the package>
321 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
322 in the package>
323 - <URL_where_to_fetch_juju_bundle>
324 """
325 try:
326 previous_workdir = os.getcwd()
327 except FileNotFoundError:
328 previous_workdir = "/app/storage"
329
330 bundle = kdu_model
331 if kdu_model.startswith("cs:"):
332 bundle = kdu_model
333 elif kdu_model.startswith("http"):
334 # Download the file
335 pass
336 else:
337 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
338
339 os.chdir(new_workdir)
340
341 bundle = "local:{}".format(kdu_model)
342
343 if not bundle:
344 # Raise named exception that the bundle could not be found
345 raise Exception()
346
347 self.log.debug("[install] deploying {}".format(bundle))
348 await model.deploy(bundle)
349
350 # Get the application
351 if atomic:
352 # applications = model.applications
353 self.log.debug("[install] Applications: {}".format(model.applications))
354 for name in model.applications:
355 self.log.debug("[install] Waiting for {} to settle".format(name))
356 application = model.applications[name]
357 try:
358 # It's not enough to wait for all units to be active;
359 # the application status needs to be active as well.
360 self.log.debug("Waiting for all units to be active...")
361 await model.block_until(
362 lambda: all(
363 unit.agent_status == "idle"
364 and application.status in ["active", "unknown"]
365 and unit.workload_status in ["active", "unknown"]
366 for unit in application.units
367 ),
368 timeout=timeout,
369 )
370 self.log.debug("All units active.")
371
372 # TODO use asyncio.TimeoutError
373 except concurrent.futures._base.TimeoutError:
374 os.chdir(previous_workdir)
375 self.log.debug("[install] Timeout exceeded; resetting cluster")
376 await self.reset(cluster_uuid)
377 return False
378
379 # Wait for the application to be active
380 if model.is_connected():
381 self.log.debug("[install] Disconnecting model")
382 await model.disconnect()
383
384 os.chdir(previous_workdir)
385
386 return kdu_instance
387 raise Exception("Unable to install")
388
389 async def instances_list(self, cluster_uuid: str) -> list:
390 """
391 returns a list of deployed releases in a cluster
392
393 :param cluster_uuid: the cluster
394 :return:
395 """
396 return []
397
398 async def upgrade(
399 self,
400 cluster_uuid: str,
401 kdu_instance: str,
402 kdu_model: str = None,
403 params: dict = None,
404 ) -> str:
405 """Upgrade a model
406
407 :param cluster_uuid str: The UUID of the cluster to upgrade
408 :param kdu_instance str: The unique name of the KDU instance
409 :param kdu_model str: The name or path of the bundle to upgrade to
410 :param params dict: Key-value pairs of instantiation parameters
411
412 :return: If successful, reference to the new revision number of the
413 KDU instance.
414 """
415
416 # TODO: Loop through the bundle and upgrade each charm individually
417
418 """
419 The API doesn't have a concept of bundle upgrades, because there are
420 many possible changes: charm revision, disk, number of units, etc.
421
422 As such, we are only supporting a limited subset of upgrades. We'll
423 upgrade the charm revision but leave storage and scale untouched.
424
425 Scale changes should happen through OSM constructs, and changes to
426 storage would require a redeployment of the service, at least in this
427 initial release.
428 """
429 namespace = self.get_namespace(cluster_uuid)
430 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
431
432 with open(kdu_model, "r") as f:
433 bundle = yaml.safe_load(f)
434
435 """
436 {
437 'description': 'Test bundle',
438 'bundle': 'kubernetes',
439 'applications': {
440 'mariadb-k8s': {
441 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
442 'scale': 1,
443 'options': {
444 'password': 'manopw',
445 'root_password': 'osm4u',
446 'user': 'mano'
447 },
448 'series': 'kubernetes'
449 }
450 }
451 }
452 """
453 # TODO: This should be returned in an agreed-upon format
454 for name in bundle["applications"]:
455 self.log.debug(model.applications)
456 application = model.applications[name]
457 self.log.debug(application)
458
459 path = bundle["applications"][name]["charm"]
460
461 try:
462 await application.upgrade_charm(switch=path)
463 except juju.errors.JujuError as ex:
464 if "already running charm" in str(ex):
465 # We're already running this version
466 pass
467
468 await model.disconnect()
469
470 return True
471 raise MethodNotImplemented()
472
473 """Rollback"""
474
475 async def rollback(
476 self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
477 ) -> str:
478 """Rollback a model
479
480 :param cluster_uuid str: The UUID of the cluster to rollback
481 :param kdu_instance str: The unique name of the KDU instance
482 :param revision int: The revision to revert to. If omitted, rolls back
483 the previous upgrade.
484
485 :return: If successful, returns the revision of active KDU instance,
486 or raises an exception
487 """
488 raise MethodNotImplemented()
489
490 """Deletion"""
491
492 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
493 """Uninstall a KDU instance
494
495 :param cluster_uuid str: The UUID of the cluster
496 :param kdu_instance str: The unique name of the KDU instance
497
498 :return: Returns True if successful, or raises an exception
499 """
500 if not self.authenticated:
501 self.log.debug("[uninstall] Connecting to controller")
502 await self.login(cluster_uuid)
503
504 self.log.debug("[uninstall] Destroying model")
505
506 await self.controller.destroy_models(kdu_instance)
507
508 self.log.debug("[uninstall] Model destroyed and disconnecting")
509 await self.logout()
510
511 return True
512
513 async def exec_primitive(
514 self,
515 cluster_uuid: str = None,
516 kdu_instance: str = None,
517 primitive_name: str = None,
518 timeout: float = 300,
519 params: dict = None,
520 db_dict: dict = None,
521 ) -> str:
522 """Exec primitive (Juju action)
523
524 :param cluster_uuid str: The UUID of the cluster
525 :param kdu_instance str: The unique name of the KDU instance
526 :param primitive_name: Name of action that will be executed
527 :param timeout: Timeout for action execution
528 :param params: Dictionary of all the parameters needed for the action
529 :db_dict: Dictionary for any additional data
530
531 :return: Returns the output of the action
532 """
533 if not self.authenticated:
534 self.log.debug("[exec_primitive] Connecting to controller")
535 await self.login(cluster_uuid)
536
537 if not params or "application-name" not in params:
538 raise K8sException(
539 "Missing application-name argument, \
540 argument needed for K8s actions"
541 )
542 try:
543 self.log.debug(
544 "[exec_primitive] Getting model "
545 "kdu_instance: {}".format(kdu_instance)
546 )
547
548 model = await self.get_model(kdu_instance, cluster_uuid)
549
550 application_name = params["application-name"]
551 application = model.applications[application_name]
552
553 actions = await application.get_actions()
554 if primitive_name not in actions:
555 raise K8sException("Primitive {} not found".format(primitive_name))
556
557 unit = None
558 for u in application.units:
559 if await u.is_leader_from_status():
560 unit = u
561 break
562
563 if unit is None:
564 raise K8sException("No leader unit found to execute action")
565
566 self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
567 action = await unit.run_action(primitive_name, **params)
568
569 output = await model.get_action_output(action_uuid=action.entity_id)
570 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
571
572 status = (
573 status[action.entity_id] if action.entity_id in status else "failed"
574 )
575
576 if status != "completed":
577 raise K8sException(
578 "status is not completed: {} output: {}".format(status, output)
579 )
580
581 return output
582
583 except Exception as e:
584 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
585 self.log.error(error_msg)
586 raise K8sException(message=error_msg)
587
588 """Introspection"""
589
590 async def inspect_kdu(self, kdu_model: str,) -> dict:
591 """Inspect a KDU
592
593 Inspects a bundle and returns a dictionary of config parameters and
594 their default values.
595
596 :param kdu_model str: The name or path of the bundle to inspect.
597
598 :return: If successful, returns a dictionary of available parameters
599 and their default values.
600 """
601
602 kdu = {}
603 with open(kdu_model, "r") as f:
604 bundle = yaml.safe_load(f)
605
606 """
607 {
608 'description': 'Test bundle',
609 'bundle': 'kubernetes',
610 'applications': {
611 'mariadb-k8s': {
612 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
613 'scale': 1,
614 'options': {
615 'password': 'manopw',
616 'root_password': 'osm4u',
617 'user': 'mano'
618 },
619 'series': 'kubernetes'
620 }
621 }
622 }
623 """
624 # TODO: This should be returned in an agreed-upon format
625 kdu = bundle["applications"]
626
627 return kdu
628
629 async def help_kdu(self, kdu_model: str,) -> str:
630 """View the README
631
632 If available, returns the README of the bundle.
633
634 :param kdu_model str: The name or path of a bundle
635
636 :return: If found, returns the contents of the README.
637 """
638 readme = None
639
640 files = ["README", "README.txt", "README.md"]
641 path = os.path.dirname(kdu_model)
642 for file in os.listdir(path):
643 if file in files:
644 with open(file, "r") as f:
645 readme = f.read()
646 break
647
648 return readme
649
650 async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
651 """Get the status of the KDU
652
653 Get the current status of the KDU instance.
654
655 :param cluster_uuid str: The UUID of the cluster
656 :param kdu_instance str: The unique id of the KDU instance
657
658 :return: Returns a dictionary containing namespace, state, resources,
659 and deployment_time.
660 """
661 status = {}
662
663 model = await self.get_model(
664 self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid
665 )
666
667 # model = await self.get_model_by_uuid(cluster_uuid)
668 if model:
669 model_status = await model.get_status()
670 status = model_status.applications
671
672 for name in model_status.applications:
673 application = model_status.applications[name]
674 status[name] = {"status": application["status"]["status"]}
675
676 if model.is_connected():
677 await model.disconnect()
678
679 return status
680
681 async def get_services(
682 self, cluster_uuid: str, kdu_instance: str, namespace: str
683 ) -> list:
684 """Return a list of services of a kdu_instance"""
685
686 config_file = self.get_config_file(cluster_uuid=cluster_uuid)
687 kubectl = Kubectl(config_file=config_file)
688 return kubectl.get_services(
689 field_selector="metadata.namespace={}".format(kdu_instance)
690 )
691
692 async def get_service(
693 self, cluster_uuid: str, service_name: str, namespace: str
694 ) -> object:
695 """Return data for a specific service inside a namespace"""
696
697 config_file = self.get_config_file(cluster_uuid=cluster_uuid)
698 kubectl = Kubectl(config_file=config_file)
699
700 return kubectl.get_services(
701 field_selector="metadata.name={},metadata.namespace={}".format(
702 service_name, namespace
703 )
704 )[0]
705
706 # Private methods
707 async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
708 """Add a k8s cloud to Juju
709
710 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
711 Juju Controller.
712
713 :param cloud_name str: The name of the cloud to add.
714 :param credentials dict: A dictionary representing the output of
715 `kubectl config view --raw`.
716
717 :returns: True if successful, otherwise raises an exception.
718 """
719
720 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
721 self.log.debug(cmd)
722
723 process = await asyncio.create_subprocess_exec(
724 *cmd,
725 stdout=asyncio.subprocess.PIPE,
726 stderr=asyncio.subprocess.PIPE,
727 stdin=asyncio.subprocess.PIPE,
728 )
729
730 # Feed the process the credentials
731 process.stdin.write(credentials.encode("utf-8"))
732 await process.stdin.drain()
733 process.stdin.close()
734
735 _stdout, stderr = await process.communicate()
736
737 return_code = process.returncode
738
739 self.log.debug("add-k8s return code: {}".format(return_code))
740
741 if return_code > 0:
742 raise Exception(stderr)
743
744 return True
745
746 async def add_model(self, model_name: str, cluster_uuid: str,) -> Model:
747 """Adds a model to the controller
748
749 Adds a new model to the Juju controller
750
751 :param model_name str: The name of the model to add.
752 :returns: The juju.model.Model object of the new model upon success or
753 raises an exception.
754 """
755 if not self.authenticated:
756 await self.login(cluster_uuid)
757
758 self.log.debug(
759 "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
760 )
761 try:
762 if self.juju_public_key is not None:
763 model = await self.controller.add_model(
764 model_name, config={"authorized-keys": self.juju_public_key}
765 )
766 else:
767 model = await self.controller.add_model(model_name)
768 except Exception as ex:
769 self.log.debug(ex)
770 self.log.debug("Caught exception: {}".format(ex))
771 pass
772
773 return model
774
775 async def bootstrap(
776 self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
777 ) -> bool:
778 """Bootstrap a Kubernetes controller
779
780 Bootstrap a Juju controller inside the Kubernetes cluster
781
782 :param cloud_name str: The name of the cloud.
783 :param cluster_uuid str: The UUID of the cluster to bootstrap.
784 :param loadbalancer bool: If the controller should use loadbalancer or not.
785 :returns: True upon success or raises an exception.
786 """
787
788 if not loadbalancer:
789 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
790 else:
791 """
792 For public clusters, specify that the controller service is using a
793 LoadBalancer.
794 """
795 cmd = [
796 self.juju_command,
797 "bootstrap",
798 cloud_name,
799 cluster_uuid,
800 "--config",
801 "controller-service-type=loadbalancer",
802 ]
803
804 self.log.debug(
805 "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
806 )
807
808 process = await asyncio.create_subprocess_exec(
809 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
810 )
811
812 _stdout, stderr = await process.communicate()
813
814 return_code = process.returncode
815
816 if return_code > 0:
817 #
818 if b"already exists" not in stderr:
819 raise Exception(stderr)
820
821 return True
822
823 async def destroy_controller(self, cluster_uuid: str) -> bool:
824 """Destroy a Kubernetes controller
825
826 Destroy an existing Kubernetes controller.
827
828 :param cluster_uuid str: The UUID of the cluster to bootstrap.
829 :returns: True upon success or raises an exception.
830 """
831 cmd = [
832 self.juju_command,
833 "destroy-controller",
834 "--destroy-all-models",
835 "--destroy-storage",
836 "-y",
837 cluster_uuid,
838 ]
839
840 process = await asyncio.create_subprocess_exec(
841 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
842 )
843
844 _stdout, stderr = await process.communicate()
845
846 return_code = process.returncode
847
848 if return_code > 0:
849 #
850 if "already exists" not in stderr:
851 raise Exception(stderr)
852
853 def get_config_file(self, cluster_uuid: str) -> str:
854 """
855 Get Cluster Kubeconfig location
856 """
857 return "{}/{}/.kube/config".format(self.fs.path, cluster_uuid)
858
859 def get_config(self, cluster_uuid: str,) -> dict:
860 """Get the cluster configuration
861
862 Gets the configuration of the cluster
863
864 :param cluster_uuid str: The UUID of the cluster.
865 :return: A dict upon success, or raises an exception.
866 """
867 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
868 if os.path.exists(cluster_config):
869 with open(cluster_config, "r") as f:
870 config = yaml.safe_load(f.read())
871 return config
872 else:
873 raise Exception(
874 "Unable to locate configuration for cluster {}".format(cluster_uuid)
875 )
876
877 async def get_model(self, model_name: str, cluster_uuid: str,) -> Model:
878 """Get a model from the Juju Controller.
879
880 Note: Model objects returned must call disconnected() before it goes
881 out of scope.
882
883 :param model_name str: The name of the model to get
884 :return The juju.model.Model object if found, or None.
885 """
886 if not self.authenticated:
887 await self.login(cluster_uuid)
888
889 model = None
890 models = await self.controller.list_models()
891 if model_name in models:
892 self.log.debug("Found model: {}".format(model_name))
893 model = await self.controller.get_model(model_name)
894 return model
895
896 def get_namespace(self, cluster_uuid: str,) -> str:
897 """Get the namespace UUID
898 Gets the namespace's unique name
899
900 :param cluster_uuid str: The UUID of the cluster
901 :returns: The namespace UUID, or raises an exception
902 """
903 config = self.get_config(cluster_uuid)
904
905 # Make sure the name is in the config
906 if "namespace" not in config:
907 raise Exception("Namespace not found.")
908
909 # TODO: We want to make sure this is unique to the cluster, in case
910 # the cluster is being reused.
911 # Consider pre/appending the cluster id to the namespace string
912 return config["namespace"]
913
914 async def has_model(self, model_name: str) -> bool:
915 """Check if a model exists in the controller
916
917 Checks to see if a model exists in the connected Juju controller.
918
919 :param model_name str: The name of the model
920 :return: A boolean indicating if the model exists
921 """
922 models = await self.controller.list_models()
923
924 if model_name in models:
925 return True
926 return False
927
928 def is_local_k8s(self, credentials: str,) -> bool:
929 """Check if a cluster is local
930
931 Checks if a cluster is running in the local host
932
933 :param credentials dict: A dictionary containing the k8s credentials
934 :returns: A boolean if the cluster is running locally
935 """
936
937 creds = yaml.safe_load(credentials)
938
939 if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
940 for cluster in creds["clusters"]:
941 if "server" in cluster["cluster"]:
942 if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
943 return True
944
945 return False
946
947 async def login(self, cluster_uuid):
948 """Login to the Juju controller."""
949
950 if self.authenticated:
951 return
952
953 self.connecting = True
954
955 # Test: Make sure we have the credentials loaded
956 config = self.get_config(cluster_uuid)
957
958 self.juju_endpoint = config["endpoint"]
959 self.juju_user = config["username"]
960 self.juju_secret = config["secret"]
961 self.juju_ca_cert = config["cacert"]
962 self.juju_public_key = None
963
964 self.controller = Controller()
965
966 if self.juju_secret:
967 self.log.debug(
968 "Connecting to controller... ws://{} as {}/{}".format(
969 self.juju_endpoint, self.juju_user, self.juju_secret,
970 )
971 )
972 try:
973 await self.controller.connect(
974 endpoint=self.juju_endpoint,
975 username=self.juju_user,
976 password=self.juju_secret,
977 cacert=self.juju_ca_cert,
978 )
979 self.authenticated = True
980 self.log.debug("JujuApi: Logged into controller")
981 except Exception as ex:
982 self.log.debug(ex)
983 self.log.debug("Caught exception: {}".format(ex))
984 pass
985 else:
986 self.log.fatal("VCA credentials not configured.")
987 self.authenticated = False
988
989 async def logout(self):
990 """Logout of the Juju controller."""
991 self.log.debug("[logout]")
992 if not self.authenticated:
993 return False
994
995 for model in self.models:
996 self.log.debug("Logging out of model {}".format(model))
997 await self.models[model].disconnect()
998
999 if self.controller:
1000 self.log.debug("Disconnecting controller {}".format(self.controller))
1001 await self.controller.disconnect()
1002 self.controller = None
1003
1004 self.authenticated = False
1005
1006 async def remove_cloud(self, cloud_name: str,) -> bool:
1007 """Remove a k8s cloud from Juju
1008
1009 Removes a Kubernetes cloud from Juju.
1010
1011 :param cloud_name str: The name of the cloud to add.
1012
1013 :returns: True if successful, otherwise raises an exception.
1014 """
1015
1016 # Remove the bootstrapped controller
1017 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1018 process = await asyncio.create_subprocess_exec(
1019 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1020 )
1021
1022 _stdout, stderr = await process.communicate()
1023
1024 return_code = process.returncode
1025
1026 if return_code > 0:
1027 raise Exception(stderr)
1028
1029 # Remove the cloud from the local config
1030 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1031 process = await asyncio.create_subprocess_exec(
1032 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1033 )
1034
1035 _stdout, stderr = await process.communicate()
1036
1037 return_code = process.returncode
1038
1039 if return_code > 0:
1040 raise Exception(stderr)
1041
1042 return True
1043
1044 async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1045 """Save the cluster configuration
1046
1047 Saves the cluster information to the file store
1048
1049 :param cluster_uuid str: The UUID of the cluster
1050 :param config dict: A dictionary containing the cluster configuration
1051 :returns: Boolean upon success or raises an exception.
1052 """
1053
1054 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1055 if not os.path.exists(cluster_config):
1056 self.log.debug("Writing config to {}".format(cluster_config))
1057 with open(cluster_config, "w") as f:
1058 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1059
1060 return True