Enable lint, flake8 and unit tests
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 import os
18 import uuid
19
20 import juju
21 from juju.controller import Controller
22 from n2vc.exceptions import K8sException
23 from n2vc.k8s_conn import K8sConnector
24 import yaml
25
26 from .exceptions import MethodNotImplemented
27
28
29 # from juju.bundle import BundleHandler
30 # import re
31 # import ssl
32 # from .vnf import N2VC
33 class K8sJujuConnector(K8sConnector):
34 def __init__(
35 self,
36 fs: object,
37 db: object,
38 kubectl_command: str = "/usr/bin/kubectl",
39 juju_command: str = "/usr/bin/juju",
40 log: object = None,
41 on_update_db=None,
42 ):
43 """
44
45 :param kubectl_command: path to kubectl executable
46 :param helm_command: path to helm executable
47 :param fs: file system for kubernetes and helm configuration
48 :param log: logger
49 """
50
51 # parent class
52 K8sConnector.__init__(
53 self, db, log=log, on_update_db=on_update_db,
54 )
55
56 self.fs = fs
57 self.log.debug("Initializing K8S Juju connector")
58
59 self.authenticated = False
60 self.models = {}
61
62 self.juju_command = juju_command
63 self.juju_secret = ""
64
65 self.log.debug("K8S Juju connector initialized")
66
67 """Initialization"""
68
69 async def init_env(
70 self,
71 k8s_creds: str,
72 namespace: str = "kube-system",
73 reuse_cluster_uuid: str = None,
74 ) -> (str, bool):
75 """
76 It prepares a given K8s cluster environment to run Juju bundles.
77
78 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
79 '.kube/config'
80 :param namespace: optional namespace to be used for juju. By default,
81 'kube-system' will be used
82 :param reuse_cluster_uuid: existing cluster uuid for reuse
83 :return: uuid of the K8s cluster and True if connector has installed some
84 software in the cluster
85 (on error, an exception will be raised)
86 """
87
88 """Bootstrapping
89
90 Bootstrapping cannot be done, by design, through the API. We need to
91 use the CLI tools.
92 """
93
94 """
95 WIP: Workflow
96
97 1. Has the environment already been bootstrapped?
98 - Check the database to see if we have a record for this env
99
100 2. If this is a new env, create it
101 - Add the k8s cloud to Juju
102 - Bootstrap
103 - Record it in the database
104
105 3. Connect to the Juju controller for this cloud
106
107 """
108 # cluster_uuid = reuse_cluster_uuid
109 # if not cluster_uuid:
110 # cluster_uuid = str(uuid4())
111
112 ##################################################
113 # TODO: Pull info from db based on the namespace #
114 ##################################################
115
116 ###################################################
117 # TODO: Make it idempotent, calling add-k8s and #
118 # bootstrap whenever reuse_cluster_uuid is passed #
119 # as parameter #
120 # `init_env` is called to initialize the K8s #
121 # cluster for juju. If this initialization fails, #
122 # it can be called again by LCM with the param #
123 # reuse_cluster_uuid, e.g. to try to fix it. #
124 ###################################################
125
126 if not reuse_cluster_uuid:
127 # This is a new cluster, so bootstrap it
128
129 cluster_uuid = str(uuid.uuid4())
130
131 # Is a local k8s cluster?
132 localk8s = self.is_local_k8s(k8s_creds)
133
134 # If the k8s is external, the juju controller needs a loadbalancer
135 loadbalancer = False if localk8s else True
136
137 # Name the new k8s cloud
138 k8s_cloud = "k8s-{}".format(cluster_uuid)
139
140 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
141 await self.add_k8s(k8s_cloud, k8s_creds)
142
143 # Bootstrap Juju controller
144 self.log.debug("Bootstrapping...")
145 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
146 self.log.debug("Bootstrap done.")
147
148 # Get the controller information
149
150 # Parse ~/.local/share/juju/controllers.yaml
151 # controllers.testing.api-endpoints|ca-cert|uuid
152 self.log.debug("Getting controller endpoints")
153 with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
154 controllers = yaml.load(f, Loader=yaml.Loader)
155 controller = controllers["controllers"][cluster_uuid]
156 endpoints = controller["api-endpoints"]
157 self.juju_endpoint = endpoints[0]
158 self.juju_ca_cert = controller["ca-cert"]
159
160 # Parse ~/.local/share/juju/accounts
161 # controllers.testing.user|password
162 self.log.debug("Getting accounts")
163 with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
164 controllers = yaml.load(f, Loader=yaml.Loader)
165 controller = controllers["controllers"][cluster_uuid]
166
167 self.juju_user = controller["user"]
168 self.juju_secret = controller["password"]
169
170 # raise Exception("EOL")
171
172 self.juju_public_key = None
173
174 config = {
175 "endpoint": self.juju_endpoint,
176 "username": self.juju_user,
177 "secret": self.juju_secret,
178 "cacert": self.juju_ca_cert,
179 "namespace": namespace,
180 "loadbalancer": loadbalancer,
181 }
182
183 # Store the cluster configuration so it
184 # can be used for subsequent calls
185 self.log.debug("Setting config")
186 await self.set_config(cluster_uuid, config)
187
188 else:
189 # This is an existing cluster, so get its config
190 cluster_uuid = reuse_cluster_uuid
191
192 config = self.get_config(cluster_uuid)
193
194 self.juju_endpoint = config["endpoint"]
195 self.juju_user = config["username"]
196 self.juju_secret = config["secret"]
197 self.juju_ca_cert = config["cacert"]
198 self.juju_public_key = None
199
200 # Login to the k8s cluster
201 if not self.authenticated:
202 await self.login(cluster_uuid)
203
204 # We're creating a new cluster
205 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
206 # cluster_uuid=cluster_uuid))
207 # model = await self.get_model(
208 # self.get_namespace(cluster_uuid),
209 # cluster_uuid=cluster_uuid
210 # )
211
212 # Disconnect from the model
213 # if model and model.is_connected():
214 # await model.disconnect()
215
216 return cluster_uuid, True
217
218 """Repo Management"""
219
220 async def repo_add(
221 self, name: str, url: str, _type: str = "charm",
222 ):
223 raise MethodNotImplemented()
224
225 async def repo_list(self):
226 raise MethodNotImplemented()
227
228 async def repo_remove(
229 self, name: str,
230 ):
231 raise MethodNotImplemented()
232
233 async def synchronize_repos(self, cluster_uuid: str, name: str):
234 """
235 Returns None as currently add_repo is not implemented
236 """
237 return None
238
239 """Reset"""
240
241 async def reset(
242 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
243 ) -> bool:
244 """Reset a cluster
245
246 Resets the Kubernetes cluster by removing the model that represents it.
247
248 :param cluster_uuid str: The UUID of the cluster to reset
249 :return: Returns True if successful or raises an exception.
250 """
251
252 try:
253 if not self.authenticated:
254 await self.login(cluster_uuid)
255
256 if self.controller.is_connected():
257 # Destroy the model
258 namespace = self.get_namespace(cluster_uuid)
259 if await self.has_model(namespace):
260 self.log.debug("[reset] Destroying model")
261 await self.controller.destroy_model(namespace, destroy_storage=True)
262
263 # Disconnect from the controller
264 self.log.debug("[reset] Disconnecting controller")
265 await self.logout()
266
267 # Destroy the controller (via CLI)
268 self.log.debug("[reset] Destroying controller")
269 await self.destroy_controller(cluster_uuid)
270
271 self.log.debug("[reset] Removing k8s cloud")
272 k8s_cloud = "k8s-{}".format(cluster_uuid)
273 await self.remove_cloud(k8s_cloud)
274
275 except Exception as ex:
276 self.log.debug("Caught exception during reset: {}".format(ex))
277
278 return True
279
280 """Deployment"""
281
282 async def install(
283 self,
284 cluster_uuid: str,
285 kdu_model: str,
286 atomic: bool = True,
287 timeout: float = 300,
288 params: dict = None,
289 db_dict: dict = None,
290 kdu_name: str = None,
291 namespace: str = None,
292 ) -> bool:
293 """Install a bundle
294
295 :param cluster_uuid str: The UUID of the cluster to install to
296 :param kdu_model str: The name or path of a bundle to install
297 :param atomic bool: If set, waits until the model is active and resets
298 the cluster on failure.
299 :param timeout int: The time, in seconds, to wait for the install
300 to finish
301 :param params dict: Key-value pairs of instantiation parameters
302 :param kdu_name: Name of the KDU instance to be installed
303 :param namespace: K8s namespace to use for the KDU instance
304
305 :return: If successful, returns ?
306 """
307
308 if not self.authenticated:
309 self.log.debug("[install] Logging in to the controller")
310 await self.login(cluster_uuid)
311
312 ##
313 # Get or create the model, based on the NS
314 # uuid.
315 if kdu_name:
316 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
317 else:
318 kdu_instance = db_dict["filter"]["_id"]
319
320 self.log.debug("Checking for model named {}".format(kdu_instance))
321
322 # Create the new model
323 self.log.debug("Adding model: {}".format(kdu_instance))
324 model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
325
326 if model:
327 # TODO: Instantiation parameters
328
329 """
330 "Juju bundle that models the KDU, in any of the following ways:
331 - <juju-repo>/<juju-bundle>
332 - <juju-bundle folder under k8s_models folder in the package>
333 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
334 in the package>
335 - <URL_where_to_fetch_juju_bundle>
336 """
337
338 previous_workdir = os.getcwd()
339
340 bundle = kdu_model
341 if kdu_model.startswith("cs:"):
342 bundle = kdu_model
343 elif kdu_model.startswith("http"):
344 # Download the file
345 pass
346 else:
347 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
348
349 os.chdir(new_workdir)
350
351 bundle = "local:{}".format(kdu_model)
352
353 if not bundle:
354 # Raise named exception that the bundle could not be found
355 raise Exception()
356
357 self.log.debug("[install] deploying {}".format(bundle))
358 await model.deploy(bundle)
359
360 # Get the application
361 if atomic:
362 # applications = model.applications
363 self.log.debug("[install] Applications: {}".format(model.applications))
364 for name in model.applications:
365 self.log.debug("[install] Waiting for {} to settle".format(name))
366 application = model.applications[name]
367 try:
368 # It's not enough to wait for all units to be active;
369 # the application status needs to be active as well.
370 self.log.debug("Waiting for all units to be active...")
371 await model.block_until(
372 lambda: all(
373 unit.agent_status == "idle"
374 and application.status in ["active", "unknown"]
375 and unit.workload_status in ["active", "unknown"]
376 for unit in application.units
377 ),
378 timeout=timeout,
379 )
380 self.log.debug("All units active.")
381
382 # TODO use asyncio.TimeoutError
383 except concurrent.futures._base.TimeoutError:
384 os.chdir(previous_workdir)
385 self.log.debug("[install] Timeout exceeded; resetting cluster")
386 await self.reset(cluster_uuid)
387 return False
388
389 # Wait for the application to be active
390 if model.is_connected():
391 self.log.debug("[install] Disconnecting model")
392 await model.disconnect()
393
394 os.chdir(previous_workdir)
395
396 return kdu_instance
397 raise Exception("Unable to install")
398
399 async def instances_list(self, cluster_uuid: str) -> list:
400 """
401 returns a list of deployed releases in a cluster
402
403 :param cluster_uuid: the cluster
404 :return:
405 """
406 return []
407
408 async def upgrade(
409 self,
410 cluster_uuid: str,
411 kdu_instance: str,
412 kdu_model: str = None,
413 params: dict = None,
414 ) -> str:
415 """Upgrade a model
416
417 :param cluster_uuid str: The UUID of the cluster to upgrade
418 :param kdu_instance str: The unique name of the KDU instance
419 :param kdu_model str: The name or path of the bundle to upgrade to
420 :param params dict: Key-value pairs of instantiation parameters
421
422 :return: If successful, reference to the new revision number of the
423 KDU instance.
424 """
425
426 # TODO: Loop through the bundle and upgrade each charm individually
427
428 """
429 The API doesn't have a concept of bundle upgrades, because there are
430 many possible changes: charm revision, disk, number of units, etc.
431
432 As such, we are only supporting a limited subset of upgrades. We'll
433 upgrade the charm revision but leave storage and scale untouched.
434
435 Scale changes should happen through OSM constructs, and changes to
436 storage would require a redeployment of the service, at least in this
437 initial release.
438 """
439 namespace = self.get_namespace(cluster_uuid)
440 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
441
442 with open(kdu_model, "r") as f:
443 bundle = yaml.safe_load(f)
444
445 """
446 {
447 'description': 'Test bundle',
448 'bundle': 'kubernetes',
449 'applications': {
450 'mariadb-k8s': {
451 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
452 'scale': 1,
453 'options': {
454 'password': 'manopw',
455 'root_password': 'osm4u',
456 'user': 'mano'
457 },
458 'series': 'kubernetes'
459 }
460 }
461 }
462 """
463 # TODO: This should be returned in an agreed-upon format
464 for name in bundle["applications"]:
465 self.log.debug(model.applications)
466 application = model.applications[name]
467 self.log.debug(application)
468
469 path = bundle["applications"][name]["charm"]
470
471 try:
472 await application.upgrade_charm(switch=path)
473 except juju.errors.JujuError as ex:
474 if "already running charm" in str(ex):
475 # We're already running this version
476 pass
477
478 await model.disconnect()
479
480 return True
481 raise MethodNotImplemented()
482
483 """Rollback"""
484
485 async def rollback(
486 self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
487 ) -> str:
488 """Rollback a model
489
490 :param cluster_uuid str: The UUID of the cluster to rollback
491 :param kdu_instance str: The unique name of the KDU instance
492 :param revision int: The revision to revert to. If omitted, rolls back
493 the previous upgrade.
494
495 :return: If successful, returns the revision of active KDU instance,
496 or raises an exception
497 """
498 raise MethodNotImplemented()
499
500 """Deletion"""
501
502 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
503 """Uninstall a KDU instance
504
505 :param cluster_uuid str: The UUID of the cluster
506 :param kdu_instance str: The unique name of the KDU instance
507
508 :return: Returns True if successful, or raises an exception
509 """
510 if not self.authenticated:
511 self.log.debug("[uninstall] Connecting to controller")
512 await self.login(cluster_uuid)
513
514 self.log.debug("[uninstall] Destroying model")
515
516 await self.controller.destroy_models(kdu_instance)
517
518 self.log.debug("[uninstall] Model destroyed and disconnecting")
519 await self.logout()
520
521 return True
522
523 async def exec_primitive(
524 self,
525 cluster_uuid: str = None,
526 kdu_instance: str = None,
527 primitive_name: str = None,
528 timeout: float = 300,
529 params: dict = None,
530 db_dict: dict = None,
531 ) -> str:
532 """Exec primitive (Juju action)
533
534 :param cluster_uuid str: The UUID of the cluster
535 :param kdu_instance str: The unique name of the KDU instance
536 :param primitive_name: Name of action that will be executed
537 :param timeout: Timeout for action execution
538 :param params: Dictionary of all the parameters needed for the action
539 :db_dict: Dictionary for any additional data
540
541 :return: Returns the output of the action
542 """
543 if not self.authenticated:
544 self.log.debug("[exec_primitive] Connecting to controller")
545 await self.login(cluster_uuid)
546
547 if not params or "application-name" not in params:
548 raise K8sException(
549 "Missing application-name argument, \
550 argument needed for K8s actions"
551 )
552 try:
553 self.log.debug(
554 "[exec_primitive] Getting model "
555 "kdu_instance: {}".format(kdu_instance)
556 )
557
558 model = await self.get_model(kdu_instance, cluster_uuid)
559
560 application_name = params["application-name"]
561 application = model.applications[application_name]
562
563 actions = await application.get_actions()
564 if primitive_name not in actions:
565 raise K8sException("Primitive {} not found".format(primitive_name))
566
567 unit = None
568 for u in application.units:
569 if await u.is_leader_from_status():
570 unit = u
571 break
572
573 if unit is None:
574 raise K8sException("No leader unit found to execute action")
575
576 self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
577 action = await unit.run_action(primitive_name, **params)
578
579 output = await model.get_action_output(action_uuid=action.entity_id)
580 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
581
582 status = (
583 status[action.entity_id] if action.entity_id in status else "failed"
584 )
585
586 if status != "completed":
587 raise K8sException(
588 "status is not completed: {} output: {}".format(status, output)
589 )
590
591 return output
592
593 except Exception as e:
594 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
595 self.log.error(error_msg)
596 raise K8sException(message=error_msg)
597
598 """Introspection"""
599
600 async def inspect_kdu(self, kdu_model: str,) -> dict:
601 """Inspect a KDU
602
603 Inspects a bundle and returns a dictionary of config parameters and
604 their default values.
605
606 :param kdu_model str: The name or path of the bundle to inspect.
607
608 :return: If successful, returns a dictionary of available parameters
609 and their default values.
610 """
611
612 kdu = {}
613 with open(kdu_model, "r") as f:
614 bundle = yaml.safe_load(f)
615
616 """
617 {
618 'description': 'Test bundle',
619 'bundle': 'kubernetes',
620 'applications': {
621 'mariadb-k8s': {
622 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
623 'scale': 1,
624 'options': {
625 'password': 'manopw',
626 'root_password': 'osm4u',
627 'user': 'mano'
628 },
629 'series': 'kubernetes'
630 }
631 }
632 }
633 """
634 # TODO: This should be returned in an agreed-upon format
635 kdu = bundle["applications"]
636
637 return kdu
638
639 async def help_kdu(self, kdu_model: str,) -> str:
640 """View the README
641
642 If available, returns the README of the bundle.
643
644 :param kdu_model str: The name or path of a bundle
645
646 :return: If found, returns the contents of the README.
647 """
648 readme = None
649
650 files = ["README", "README.txt", "README.md"]
651 path = os.path.dirname(kdu_model)
652 for file in os.listdir(path):
653 if file in files:
654 with open(file, "r") as f:
655 readme = f.read()
656 break
657
658 return readme
659
660 async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
661 """Get the status of the KDU
662
663 Get the current status of the KDU instance.
664
665 :param cluster_uuid str: The UUID of the cluster
666 :param kdu_instance str: The unique id of the KDU instance
667
668 :return: Returns a dictionary containing namespace, state, resources,
669 and deployment_time.
670 """
671 status = {}
672
673 model = await self.get_model(
674 self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid
675 )
676
677 # model = await self.get_model_by_uuid(cluster_uuid)
678 if model:
679 model_status = await model.get_status()
680 status = model_status.applications
681
682 for name in model_status.applications:
683 application = model_status.applications[name]
684 status[name] = {"status": application["status"]["status"]}
685
686 if model.is_connected():
687 await model.disconnect()
688
689 return status
690
691 # Private methods
692 async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
693 """Add a k8s cloud to Juju
694
695 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
696 Juju Controller.
697
698 :param cloud_name str: The name of the cloud to add.
699 :param credentials dict: A dictionary representing the output of
700 `kubectl config view --raw`.
701
702 :returns: True if successful, otherwise raises an exception.
703 """
704
705 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
706 self.log.debug(cmd)
707
708 process = await asyncio.create_subprocess_exec(
709 *cmd,
710 stdout=asyncio.subprocess.PIPE,
711 stderr=asyncio.subprocess.PIPE,
712 stdin=asyncio.subprocess.PIPE,
713 )
714
715 # Feed the process the credentials
716 process.stdin.write(credentials.encode("utf-8"))
717 await process.stdin.drain()
718 process.stdin.close()
719
720 _stdout, stderr = await process.communicate()
721
722 return_code = process.returncode
723
724 self.log.debug("add-k8s return code: {}".format(return_code))
725
726 if return_code > 0:
727 raise Exception(stderr)
728
729 return True
730
731 async def add_model(self, model_name: str, cluster_uuid: str,) -> juju.model.Model:
732 """Adds a model to the controller
733
734 Adds a new model to the Juju controller
735
736 :param model_name str: The name of the model to add.
737 :returns: The juju.model.Model object of the new model upon success or
738 raises an exception.
739 """
740 if not self.authenticated:
741 await self.login(cluster_uuid)
742
743 self.log.debug(
744 "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
745 )
746 try:
747 model = await self.controller.add_model(
748 model_name, config={"authorized-keys": self.juju_public_key}
749 )
750 except Exception as ex:
751 self.log.debug(ex)
752 self.log.debug("Caught exception: {}".format(ex))
753 pass
754
755 return model
756
757 async def bootstrap(
758 self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
759 ) -> bool:
760 """Bootstrap a Kubernetes controller
761
762 Bootstrap a Juju controller inside the Kubernetes cluster
763
764 :param cloud_name str: The name of the cloud.
765 :param cluster_uuid str: The UUID of the cluster to bootstrap.
766 :param loadbalancer bool: If the controller should use loadbalancer or not.
767 :returns: True upon success or raises an exception.
768 """
769
770 if not loadbalancer:
771 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
772 else:
773 """
774 For public clusters, specify that the controller service is using a
775 LoadBalancer.
776 """
777 cmd = [
778 self.juju_command,
779 "bootstrap",
780 cloud_name,
781 cluster_uuid,
782 "--config",
783 "controller-service-type=loadbalancer",
784 ]
785
786 self.log.debug(
787 "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
788 )
789
790 process = await asyncio.create_subprocess_exec(
791 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
792 )
793
794 _stdout, stderr = await process.communicate()
795
796 return_code = process.returncode
797
798 if return_code > 0:
799 #
800 if b"already exists" not in stderr:
801 raise Exception(stderr)
802
803 return True
804
805 async def destroy_controller(self, cluster_uuid: str) -> bool:
806 """Destroy a Kubernetes controller
807
808 Destroy an existing Kubernetes controller.
809
810 :param cluster_uuid str: The UUID of the cluster to bootstrap.
811 :returns: True upon success or raises an exception.
812 """
813 cmd = [
814 self.juju_command,
815 "destroy-controller",
816 "--destroy-all-models",
817 "--destroy-storage",
818 "-y",
819 cluster_uuid,
820 ]
821
822 process = await asyncio.create_subprocess_exec(
823 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
824 )
825
826 _stdout, stderr = await process.communicate()
827
828 return_code = process.returncode
829
830 if return_code > 0:
831 #
832 if "already exists" not in stderr:
833 raise Exception(stderr)
834
835 def get_config(self, cluster_uuid: str,) -> dict:
836 """Get the cluster configuration
837
838 Gets the configuration of the cluster
839
840 :param cluster_uuid str: The UUID of the cluster.
841 :return: A dict upon success, or raises an exception.
842 """
843 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
844 if os.path.exists(cluster_config):
845 with open(cluster_config, "r") as f:
846 config = yaml.safe_load(f.read())
847 return config
848 else:
849 raise Exception(
850 "Unable to locate configuration for cluster {}".format(cluster_uuid)
851 )
852
853 async def get_model(self, model_name: str, cluster_uuid: str,) -> juju.model.Model:
854 """Get a model from the Juju Controller.
855
856 Note: Model objects returned must call disconnected() before it goes
857 out of scope.
858
859 :param model_name str: The name of the model to get
860 :return The juju.model.Model object if found, or None.
861 """
862 if not self.authenticated:
863 await self.login(cluster_uuid)
864
865 model = None
866 models = await self.controller.list_models()
867 if model_name in models:
868 self.log.debug("Found model: {}".format(model_name))
869 model = await self.controller.get_model(model_name)
870 return model
871
872 def get_namespace(self, cluster_uuid: str,) -> str:
873 """Get the namespace UUID
874 Gets the namespace's unique name
875
876 :param cluster_uuid str: The UUID of the cluster
877 :returns: The namespace UUID, or raises an exception
878 """
879 config = self.get_config(cluster_uuid)
880
881 # Make sure the name is in the config
882 if "namespace" not in config:
883 raise Exception("Namespace not found.")
884
885 # TODO: We want to make sure this is unique to the cluster, in case
886 # the cluster is being reused.
887 # Consider pre/appending the cluster id to the namespace string
888 return config["namespace"]
889
890 async def has_model(self, model_name: str) -> bool:
891 """Check if a model exists in the controller
892
893 Checks to see if a model exists in the connected Juju controller.
894
895 :param model_name str: The name of the model
896 :return: A boolean indicating if the model exists
897 """
898 models = await self.controller.list_models()
899
900 if model_name in models:
901 return True
902 return False
903
904 def is_local_k8s(self, credentials: str,) -> bool:
905 """Check if a cluster is local
906
907 Checks if a cluster is running in the local host
908
909 :param credentials dict: A dictionary containing the k8s credentials
910 :returns: A boolean if the cluster is running locally
911 """
912 creds = yaml.safe_load(credentials)
913 if os.getenv("OSMLCM_VCA_APIPROXY"):
914 host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
915
916 if creds and host_ip:
917 for cluster in creds["clusters"]:
918 if "server" in cluster["cluster"]:
919 if host_ip in cluster["cluster"]["server"]:
920 return True
921
922 return False
923
924 async def login(self, cluster_uuid):
925 """Login to the Juju controller."""
926
927 if self.authenticated:
928 return
929
930 self.connecting = True
931
932 # Test: Make sure we have the credentials loaded
933 config = self.get_config(cluster_uuid)
934
935 self.juju_endpoint = config["endpoint"]
936 self.juju_user = config["username"]
937 self.juju_secret = config["secret"]
938 self.juju_ca_cert = config["cacert"]
939 self.juju_public_key = None
940
941 self.controller = Controller()
942
943 if self.juju_secret:
944 self.log.debug(
945 "Connecting to controller... ws://{} as {}/{}".format(
946 self.juju_endpoint, self.juju_user, self.juju_secret,
947 )
948 )
949 try:
950 await self.controller.connect(
951 endpoint=self.juju_endpoint,
952 username=self.juju_user,
953 password=self.juju_secret,
954 cacert=self.juju_ca_cert,
955 )
956 self.authenticated = True
957 self.log.debug("JujuApi: Logged into controller")
958 except Exception as ex:
959 self.log.debug(ex)
960 self.log.debug("Caught exception: {}".format(ex))
961 pass
962 else:
963 self.log.fatal("VCA credentials not configured.")
964 self.authenticated = False
965
966 async def logout(self):
967 """Logout of the Juju controller."""
968 self.log.debug("[logout]")
969 if not self.authenticated:
970 return False
971
972 for model in self.models:
973 self.log.debug("Logging out of model {}".format(model))
974 await self.models[model].disconnect()
975
976 if self.controller:
977 self.log.debug("Disconnecting controller {}".format(self.controller))
978 await self.controller.disconnect()
979 self.controller = None
980
981 self.authenticated = False
982
983 async def remove_cloud(self, cloud_name: str,) -> bool:
984 """Remove a k8s cloud from Juju
985
986 Removes a Kubernetes cloud from Juju.
987
988 :param cloud_name str: The name of the cloud to add.
989
990 :returns: True if successful, otherwise raises an exception.
991 """
992
993 # Remove the bootstrapped controller
994 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
995 process = await asyncio.create_subprocess_exec(
996 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
997 )
998
999 _stdout, stderr = await process.communicate()
1000
1001 return_code = process.returncode
1002
1003 if return_code > 0:
1004 raise Exception(stderr)
1005
1006 # Remove the cloud from the local config
1007 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1008 process = await asyncio.create_subprocess_exec(
1009 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1010 )
1011
1012 _stdout, stderr = await process.communicate()
1013
1014 return_code = process.returncode
1015
1016 if return_code > 0:
1017 raise Exception(stderr)
1018
1019 return True
1020
1021 async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1022 """Save the cluster configuration
1023
1024 Saves the cluster information to the file store
1025
1026 :param cluster_uuid str: The UUID of the cluster
1027 :param config dict: A dictionary containing the cluster configuration
1028 :returns: Boolean upon success or raises an exception.
1029 """
1030
1031 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1032 if not os.path.exists(cluster_config):
1033 self.log.debug("Writing config to {}".format(cluster_config))
1034 with open(cluster_config, "w") as f:
1035 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1036
1037 return True