Get the kubeconfig credentials from MongoDB
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 import os
18 import uuid
19 import yaml
20
21 import juju
22 from juju.controller import Controller
23 from juju.model import Model
24 from n2vc.exceptions import K8sException
25 from n2vc.k8s_conn import K8sConnector
26 from n2vc.kubectl import Kubectl
27 from .exceptions import MethodNotImplemented
28
29
30 # from juju.bundle import BundleHandler
31 # import re
32 # import ssl
33 # from .vnf import N2VC
34 class K8sJujuConnector(K8sConnector):
35 def __init__(
36 self,
37 fs: object,
38 db: object,
39 kubectl_command: str = "/usr/bin/kubectl",
40 juju_command: str = "/usr/bin/juju",
41 log: object = None,
42 on_update_db=None,
43 ):
44 """
45
46 :param kubectl_command: path to kubectl executable
47 :param helm_command: path to helm executable
48 :param fs: file system for kubernetes and helm configuration
49 :param log: logger
50 """
51
52 # parent class
53 K8sConnector.__init__(
54 self, db, log=log, on_update_db=on_update_db,
55 )
56
57 self.fs = fs
58 self.log.debug("Initializing K8S Juju connector")
59
60 self.authenticated = False
61 self.models = {}
62
63 self.juju_command = juju_command
64 self.juju_secret = ""
65
66 self.log.debug("K8S Juju connector initialized")
67
68 """Initialization"""
69
70 async def init_env(
71 self,
72 k8s_creds: str,
73 namespace: str = "kube-system",
74 reuse_cluster_uuid: str = None,
75 ) -> (str, bool):
76 """
77 It prepares a given K8s cluster environment to run Juju bundles.
78
79 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
80 '.kube/config'
81 :param namespace: optional namespace to be used for juju. By default,
82 'kube-system' will be used
83 :param reuse_cluster_uuid: existing cluster uuid for reuse
84 :return: uuid of the K8s cluster and True if connector has installed some
85 software in the cluster
86 (on error, an exception will be raised)
87 """
88
89 """Bootstrapping
90
91 Bootstrapping cannot be done, by design, through the API. We need to
92 use the CLI tools.
93 """
94
95 """
96 WIP: Workflow
97
98 1. Has the environment already been bootstrapped?
99 - Check the database to see if we have a record for this env
100
101 2. If this is a new env, create it
102 - Add the k8s cloud to Juju
103 - Bootstrap
104 - Record it in the database
105
106 3. Connect to the Juju controller for this cloud
107
108 """
109 # cluster_uuid = reuse_cluster_uuid
110 # if not cluster_uuid:
111 # cluster_uuid = str(uuid4())
112
113 ##################################################
114 # TODO: Pull info from db based on the namespace #
115 ##################################################
116
117 ###################################################
118 # TODO: Make it idempotent, calling add-k8s and #
119 # bootstrap whenever reuse_cluster_uuid is passed #
120 # as parameter #
121 # `init_env` is called to initialize the K8s #
122 # cluster for juju. If this initialization fails, #
123 # it can be called again by LCM with the param #
124 # reuse_cluster_uuid, e.g. to try to fix it. #
125 ###################################################
126
127 # This is a new cluster, so bootstrap it
128
129 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
130
131 # Is a local k8s cluster?
132 localk8s = self.is_local_k8s(k8s_creds)
133
134 # If the k8s is external, the juju controller needs a loadbalancer
135 loadbalancer = False if localk8s else True
136
137 # Name the new k8s cloud
138 k8s_cloud = "k8s-{}".format(cluster_uuid)
139
140 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
141 await self.add_k8s(k8s_cloud, k8s_creds)
142
143 # Bootstrap Juju controller
144 self.log.debug("Bootstrapping...")
145 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
146 self.log.debug("Bootstrap done.")
147
148 # Get the controller information
149
150 # Parse ~/.local/share/juju/controllers.yaml
151 # controllers.testing.api-endpoints|ca-cert|uuid
152 self.log.debug("Getting controller endpoints")
153 with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
154 controllers = yaml.load(f, Loader=yaml.Loader)
155 controller = controllers["controllers"][cluster_uuid]
156 endpoints = controller["api-endpoints"]
157 self.juju_endpoint = endpoints[0]
158 self.juju_ca_cert = controller["ca-cert"]
159
160 # Parse ~/.local/share/juju/accounts
161 # controllers.testing.user|password
162 self.log.debug("Getting accounts")
163 with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
164 controllers = yaml.load(f, Loader=yaml.Loader)
165 controller = controllers["controllers"][cluster_uuid]
166
167 self.juju_user = controller["user"]
168 self.juju_secret = controller["password"]
169
170 # raise Exception("EOL")
171
172 self.juju_public_key = None
173
174 config = {
175 "endpoint": self.juju_endpoint,
176 "username": self.juju_user,
177 "secret": self.juju_secret,
178 "cacert": self.juju_ca_cert,
179 "namespace": namespace,
180 "loadbalancer": loadbalancer,
181 }
182
183 # Store the cluster configuration so it
184 # can be used for subsequent calls
185 self.log.debug("Setting config")
186 await self.set_config(cluster_uuid, config)
187
188 # Login to the k8s cluster
189 if not self.authenticated:
190 await self.login(cluster_uuid)
191
192 # We're creating a new cluster
193 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
194 # cluster_uuid=cluster_uuid))
195 # model = await self.get_model(
196 # self.get_namespace(cluster_uuid),
197 # cluster_uuid=cluster_uuid
198 # )
199
200 # Disconnect from the model
201 # if model and model.is_connected():
202 # await model.disconnect()
203
204 return cluster_uuid, True
205
206 """Repo Management"""
207
208 async def repo_add(
209 self, name: str, url: str, _type: str = "charm",
210 ):
211 raise MethodNotImplemented()
212
213 async def repo_list(self):
214 raise MethodNotImplemented()
215
216 async def repo_remove(
217 self, name: str,
218 ):
219 raise MethodNotImplemented()
220
221 async def synchronize_repos(self, cluster_uuid: str, name: str):
222 """
223 Returns None as currently add_repo is not implemented
224 """
225 return None
226
227 """Reset"""
228
229 async def reset(
230 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
231 ) -> bool:
232 """Reset a cluster
233
234 Resets the Kubernetes cluster by removing the model that represents it.
235
236 :param cluster_uuid str: The UUID of the cluster to reset
237 :return: Returns True if successful or raises an exception.
238 """
239
240 try:
241 if not self.authenticated:
242 await self.login(cluster_uuid)
243
244 if self.controller.is_connected():
245 # Destroy the model
246 namespace = self.get_namespace(cluster_uuid)
247 if await self.has_model(namespace):
248 self.log.debug("[reset] Destroying model")
249 await self.controller.destroy_model(namespace, destroy_storage=True)
250
251 # Disconnect from the controller
252 self.log.debug("[reset] Disconnecting controller")
253 await self.logout()
254
255 # Destroy the controller (via CLI)
256 self.log.debug("[reset] Destroying controller")
257 await self.destroy_controller(cluster_uuid)
258
259 self.log.debug("[reset] Removing k8s cloud")
260 k8s_cloud = "k8s-{}".format(cluster_uuid)
261 await self.remove_cloud(k8s_cloud)
262
263 except Exception as ex:
264 self.log.debug("Caught exception during reset: {}".format(ex))
265
266 return True
267
268 """Deployment"""
269
270 async def install(
271 self,
272 cluster_uuid: str,
273 kdu_model: str,
274 atomic: bool = True,
275 timeout: float = 300,
276 params: dict = None,
277 db_dict: dict = None,
278 kdu_name: str = None,
279 namespace: str = None,
280 ) -> bool:
281 """Install a bundle
282
283 :param cluster_uuid str: The UUID of the cluster to install to
284 :param kdu_model str: The name or path of a bundle to install
285 :param atomic bool: If set, waits until the model is active and resets
286 the cluster on failure.
287 :param timeout int: The time, in seconds, to wait for the install
288 to finish
289 :param params dict: Key-value pairs of instantiation parameters
290 :param kdu_name: Name of the KDU instance to be installed
291 :param namespace: K8s namespace to use for the KDU instance
292
293 :return: If successful, returns ?
294 """
295
296 if not self.authenticated:
297 self.log.debug("[install] Logging in to the controller")
298 await self.login(cluster_uuid)
299
300 ##
301 # Get or create the model, based on the NS
302 # uuid.
303 if kdu_name:
304 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
305 else:
306 kdu_instance = db_dict["filter"]["_id"]
307
308 self.log.debug("Checking for model named {}".format(kdu_instance))
309
310 # Create the new model
311 self.log.debug("Adding model: {}".format(kdu_instance))
312 model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
313
314 if model:
315 # TODO: Instantiation parameters
316
317 """
318 "Juju bundle that models the KDU, in any of the following ways:
319 - <juju-repo>/<juju-bundle>
320 - <juju-bundle folder under k8s_models folder in the package>
321 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
322 in the package>
323 - <URL_where_to_fetch_juju_bundle>
324 """
325 try:
326 previous_workdir = os.getcwd()
327 except FileNotFoundError:
328 previous_workdir = "/app/storage"
329
330 bundle = kdu_model
331 if kdu_model.startswith("cs:"):
332 bundle = kdu_model
333 elif kdu_model.startswith("http"):
334 # Download the file
335 pass
336 else:
337 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
338
339 os.chdir(new_workdir)
340
341 bundle = "local:{}".format(kdu_model)
342
343 if not bundle:
344 # Raise named exception that the bundle could not be found
345 raise Exception()
346
347 self.log.debug("[install] deploying {}".format(bundle))
348 await model.deploy(bundle)
349
350 # Get the application
351 if atomic:
352 # applications = model.applications
353 self.log.debug("[install] Applications: {}".format(model.applications))
354 for name in model.applications:
355 self.log.debug("[install] Waiting for {} to settle".format(name))
356 application = model.applications[name]
357 try:
358 # It's not enough to wait for all units to be active;
359 # the application status needs to be active as well.
360 self.log.debug("Waiting for all units to be active...")
361 await model.block_until(
362 lambda: all(
363 unit.agent_status == "idle"
364 and application.status in ["active", "unknown"]
365 and unit.workload_status in ["active", "unknown"]
366 for unit in application.units
367 ),
368 timeout=timeout,
369 )
370 self.log.debug("All units active.")
371
372 # TODO use asyncio.TimeoutError
373 except concurrent.futures._base.TimeoutError:
374 os.chdir(previous_workdir)
375 self.log.debug("[install] Timeout exceeded; resetting cluster")
376 await self.reset(cluster_uuid)
377 return False
378
379 # Wait for the application to be active
380 if model.is_connected():
381 self.log.debug("[install] Disconnecting model")
382 await model.disconnect()
383
384 os.chdir(previous_workdir)
385
386 return kdu_instance
387 raise Exception("Unable to install")
388
389 async def instances_list(self, cluster_uuid: str) -> list:
390 """
391 returns a list of deployed releases in a cluster
392
393 :param cluster_uuid: the cluster
394 :return:
395 """
396 return []
397
398 async def upgrade(
399 self,
400 cluster_uuid: str,
401 kdu_instance: str,
402 kdu_model: str = None,
403 params: dict = None,
404 ) -> str:
405 """Upgrade a model
406
407 :param cluster_uuid str: The UUID of the cluster to upgrade
408 :param kdu_instance str: The unique name of the KDU instance
409 :param kdu_model str: The name or path of the bundle to upgrade to
410 :param params dict: Key-value pairs of instantiation parameters
411
412 :return: If successful, reference to the new revision number of the
413 KDU instance.
414 """
415
416 # TODO: Loop through the bundle and upgrade each charm individually
417
418 """
419 The API doesn't have a concept of bundle upgrades, because there are
420 many possible changes: charm revision, disk, number of units, etc.
421
422 As such, we are only supporting a limited subset of upgrades. We'll
423 upgrade the charm revision but leave storage and scale untouched.
424
425 Scale changes should happen through OSM constructs, and changes to
426 storage would require a redeployment of the service, at least in this
427 initial release.
428 """
429 namespace = self.get_namespace(cluster_uuid)
430 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
431
432 with open(kdu_model, "r") as f:
433 bundle = yaml.safe_load(f)
434
435 """
436 {
437 'description': 'Test bundle',
438 'bundle': 'kubernetes',
439 'applications': {
440 'mariadb-k8s': {
441 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
442 'scale': 1,
443 'options': {
444 'password': 'manopw',
445 'root_password': 'osm4u',
446 'user': 'mano'
447 },
448 'series': 'kubernetes'
449 }
450 }
451 }
452 """
453 # TODO: This should be returned in an agreed-upon format
454 for name in bundle["applications"]:
455 self.log.debug(model.applications)
456 application = model.applications[name]
457 self.log.debug(application)
458
459 path = bundle["applications"][name]["charm"]
460
461 try:
462 await application.upgrade_charm(switch=path)
463 except juju.errors.JujuError as ex:
464 if "already running charm" in str(ex):
465 # We're already running this version
466 pass
467
468 await model.disconnect()
469
470 return True
471 raise MethodNotImplemented()
472
473 """Rollback"""
474
475 async def rollback(
476 self, cluster_uuid: str, kdu_instance: str, revision: int = 0,
477 ) -> str:
478 """Rollback a model
479
480 :param cluster_uuid str: The UUID of the cluster to rollback
481 :param kdu_instance str: The unique name of the KDU instance
482 :param revision int: The revision to revert to. If omitted, rolls back
483 the previous upgrade.
484
485 :return: If successful, returns the revision of active KDU instance,
486 or raises an exception
487 """
488 raise MethodNotImplemented()
489
490 """Deletion"""
491
492 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
493 """Uninstall a KDU instance
494
495 :param cluster_uuid str: The UUID of the cluster
496 :param kdu_instance str: The unique name of the KDU instance
497
498 :return: Returns True if successful, or raises an exception
499 """
500 if not self.authenticated:
501 self.log.debug("[uninstall] Connecting to controller")
502 await self.login(cluster_uuid)
503
504 self.log.debug("[uninstall] Destroying model")
505
506 await self.controller.destroy_models(kdu_instance)
507
508 self.log.debug("[uninstall] Model destroyed and disconnecting")
509 await self.logout()
510
511 return True
512
513 async def exec_primitive(
514 self,
515 cluster_uuid: str = None,
516 kdu_instance: str = None,
517 primitive_name: str = None,
518 timeout: float = 300,
519 params: dict = None,
520 db_dict: dict = None,
521 ) -> str:
522 """Exec primitive (Juju action)
523
524 :param cluster_uuid str: The UUID of the cluster
525 :param kdu_instance str: The unique name of the KDU instance
526 :param primitive_name: Name of action that will be executed
527 :param timeout: Timeout for action execution
528 :param params: Dictionary of all the parameters needed for the action
529 :db_dict: Dictionary for any additional data
530
531 :return: Returns the output of the action
532 """
533 if not self.authenticated:
534 self.log.debug("[exec_primitive] Connecting to controller")
535 await self.login(cluster_uuid)
536
537 if not params or "application-name" not in params:
538 raise K8sException(
539 "Missing application-name argument, \
540 argument needed for K8s actions"
541 )
542 try:
543 self.log.debug(
544 "[exec_primitive] Getting model "
545 "kdu_instance: {}".format(kdu_instance)
546 )
547
548 model = await self.get_model(kdu_instance, cluster_uuid)
549
550 application_name = params["application-name"]
551 application = model.applications[application_name]
552
553 actions = await application.get_actions()
554 if primitive_name not in actions:
555 raise K8sException("Primitive {} not found".format(primitive_name))
556
557 unit = None
558 for u in application.units:
559 if await u.is_leader_from_status():
560 unit = u
561 break
562
563 if unit is None:
564 raise K8sException("No leader unit found to execute action")
565
566 self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
567 action = await unit.run_action(primitive_name, **params)
568
569 output = await model.get_action_output(action_uuid=action.entity_id)
570 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
571
572 status = (
573 status[action.entity_id] if action.entity_id in status else "failed"
574 )
575
576 if status != "completed":
577 raise K8sException(
578 "status is not completed: {} output: {}".format(status, output)
579 )
580
581 return output
582
583 except Exception as e:
584 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
585 self.log.error(error_msg)
586 raise K8sException(message=error_msg)
587
588 """Introspection"""
589
590 async def inspect_kdu(self, kdu_model: str,) -> dict:
591 """Inspect a KDU
592
593 Inspects a bundle and returns a dictionary of config parameters and
594 their default values.
595
596 :param kdu_model str: The name or path of the bundle to inspect.
597
598 :return: If successful, returns a dictionary of available parameters
599 and their default values.
600 """
601
602 kdu = {}
603 with open(kdu_model, "r") as f:
604 bundle = yaml.safe_load(f)
605
606 """
607 {
608 'description': 'Test bundle',
609 'bundle': 'kubernetes',
610 'applications': {
611 'mariadb-k8s': {
612 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
613 'scale': 1,
614 'options': {
615 'password': 'manopw',
616 'root_password': 'osm4u',
617 'user': 'mano'
618 },
619 'series': 'kubernetes'
620 }
621 }
622 }
623 """
624 # TODO: This should be returned in an agreed-upon format
625 kdu = bundle["applications"]
626
627 return kdu
628
629 async def help_kdu(self, kdu_model: str,) -> str:
630 """View the README
631
632 If available, returns the README of the bundle.
633
634 :param kdu_model str: The name or path of a bundle
635
636 :return: If found, returns the contents of the README.
637 """
638 readme = None
639
640 files = ["README", "README.txt", "README.md"]
641 path = os.path.dirname(kdu_model)
642 for file in os.listdir(path):
643 if file in files:
644 with open(file, "r") as f:
645 readme = f.read()
646 break
647
648 return readme
649
650 async def status_kdu(self, cluster_uuid: str, kdu_instance: str,) -> dict:
651 """Get the status of the KDU
652
653 Get the current status of the KDU instance.
654
655 :param cluster_uuid str: The UUID of the cluster
656 :param kdu_instance str: The unique id of the KDU instance
657
658 :return: Returns a dictionary containing namespace, state, resources,
659 and deployment_time.
660 """
661 status = {}
662
663 model = await self.get_model(
664 self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid
665 )
666
667 # model = await self.get_model_by_uuid(cluster_uuid)
668 if model:
669 model_status = await model.get_status()
670 status = model_status.applications
671
672 for name in model_status.applications:
673 application = model_status.applications[name]
674 status[name] = {"status": application["status"]["status"]}
675
676 if model.is_connected():
677 await model.disconnect()
678
679 return status
680
681 async def get_services(
682 self, cluster_uuid: str, kdu_instance: str, namespace: str
683 ) -> list:
684 """Return a list of services of a kdu_instance"""
685
686 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
687
688 config_path = "/tmp/{}".format(cluster_uuid)
689 config_file = "{}/config".format(config_path)
690
691 if not os.path.exists(config_path):
692 os.makedirs(config_path)
693 with open(config_file, "w") as f:
694 f.write(credentials)
695
696 kubectl = Kubectl(config_file=config_file)
697 return kubectl.get_services(
698 field_selector="metadata.namespace={}".format(kdu_instance)
699 )
700
701 async def get_service(
702 self, cluster_uuid: str, service_name: str, namespace: str
703 ) -> object:
704 """Return data for a specific service inside a namespace"""
705
706 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
707
708 config_path = "/tmp/{}".format(cluster_uuid)
709 config_file = "{}/config".format(config_path)
710
711 if not os.path.exists(config_path):
712 os.makedirs(config_path)
713 with open(config_file, "w") as f:
714 f.write(credentials)
715
716 kubectl = Kubectl(config_file=config_file)
717
718 return kubectl.get_services(
719 field_selector="metadata.name={},metadata.namespace={}".format(
720 service_name, namespace
721 )
722 )[0]
723
724 # Private methods
725 async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
726 """Add a k8s cloud to Juju
727
728 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
729 Juju Controller.
730
731 :param cloud_name str: The name of the cloud to add.
732 :param credentials dict: A dictionary representing the output of
733 `kubectl config view --raw`.
734
735 :returns: True if successful, otherwise raises an exception.
736 """
737
738 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
739 self.log.debug(cmd)
740
741 process = await asyncio.create_subprocess_exec(
742 *cmd,
743 stdout=asyncio.subprocess.PIPE,
744 stderr=asyncio.subprocess.PIPE,
745 stdin=asyncio.subprocess.PIPE,
746 )
747
748 # Feed the process the credentials
749 process.stdin.write(credentials.encode("utf-8"))
750 await process.stdin.drain()
751 process.stdin.close()
752
753 _stdout, stderr = await process.communicate()
754
755 return_code = process.returncode
756
757 self.log.debug("add-k8s return code: {}".format(return_code))
758
759 if return_code > 0:
760 raise Exception(stderr)
761
762 return True
763
764 async def add_model(self, model_name: str, cluster_uuid: str,) -> Model:
765 """Adds a model to the controller
766
767 Adds a new model to the Juju controller
768
769 :param model_name str: The name of the model to add.
770 :returns: The juju.model.Model object of the new model upon success or
771 raises an exception.
772 """
773 if not self.authenticated:
774 await self.login(cluster_uuid)
775
776 self.log.debug(
777 "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
778 )
779 model = None
780 try:
781 if self.juju_public_key is not None:
782 model = await self.controller.add_model(
783 model_name, config={"authorized-keys": self.juju_public_key}
784 )
785 else:
786 model = await self.controller.add_model(model_name)
787 except Exception as ex:
788 self.log.debug(ex)
789 self.log.debug("Caught exception: {}".format(ex))
790 pass
791
792 return model
793
794 async def bootstrap(
795 self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
796 ) -> bool:
797 """Bootstrap a Kubernetes controller
798
799 Bootstrap a Juju controller inside the Kubernetes cluster
800
801 :param cloud_name str: The name of the cloud.
802 :param cluster_uuid str: The UUID of the cluster to bootstrap.
803 :param loadbalancer bool: If the controller should use loadbalancer or not.
804 :returns: True upon success or raises an exception.
805 """
806
807 if not loadbalancer:
808 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
809 else:
810 """
811 For public clusters, specify that the controller service is using a
812 LoadBalancer.
813 """
814 cmd = [
815 self.juju_command,
816 "bootstrap",
817 cloud_name,
818 cluster_uuid,
819 "--config",
820 "controller-service-type=loadbalancer",
821 ]
822
823 self.log.debug(
824 "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
825 )
826
827 process = await asyncio.create_subprocess_exec(
828 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
829 )
830
831 _stdout, stderr = await process.communicate()
832
833 return_code = process.returncode
834
835 if return_code > 0:
836 #
837 if b"already exists" not in stderr:
838 raise Exception(stderr)
839
840 return True
841
842 async def destroy_controller(self, cluster_uuid: str) -> bool:
843 """Destroy a Kubernetes controller
844
845 Destroy an existing Kubernetes controller.
846
847 :param cluster_uuid str: The UUID of the cluster to bootstrap.
848 :returns: True upon success or raises an exception.
849 """
850 cmd = [
851 self.juju_command,
852 "destroy-controller",
853 "--destroy-all-models",
854 "--destroy-storage",
855 "-y",
856 cluster_uuid,
857 ]
858
859 process = await asyncio.create_subprocess_exec(
860 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
861 )
862
863 _stdout, stderr = await process.communicate()
864
865 return_code = process.returncode
866
867 if return_code > 0:
868 #
869 if "already exists" not in stderr:
870 raise Exception(stderr)
871
872 def get_credentials(self, cluster_uuid: str) -> str:
873 """
874 Get Cluster Kubeconfig
875 """
876 k8scluster = self.db.get_one(
877 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
878 )
879
880 self.db.encrypt_decrypt_fields(
881 k8scluster.get("credentials"),
882 "decrypt",
883 ["password", "secret"],
884 schema_version=k8scluster["schema_version"],
885 salt=k8scluster["_id"],
886 )
887
888 return yaml.safe_dump(k8scluster.get("credentials"))
889
890 def get_config(self, cluster_uuid: str,) -> dict:
891 """Get the cluster configuration
892
893 Gets the configuration of the cluster
894
895 :param cluster_uuid str: The UUID of the cluster.
896 :return: A dict upon success, or raises an exception.
897 """
898 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
899 if os.path.exists(cluster_config):
900 with open(cluster_config, "r") as f:
901 config = yaml.safe_load(f.read())
902 return config
903 else:
904 raise Exception(
905 "Unable to locate configuration for cluster {}".format(cluster_uuid)
906 )
907
908 async def get_model(self, model_name: str, cluster_uuid: str,) -> Model:
909 """Get a model from the Juju Controller.
910
911 Note: Model objects returned must call disconnected() before it goes
912 out of scope.
913
914 :param model_name str: The name of the model to get
915 :return The juju.model.Model object if found, or None.
916 """
917 if not self.authenticated:
918 await self.login(cluster_uuid)
919
920 model = None
921 models = await self.controller.list_models()
922 if model_name in models:
923 self.log.debug("Found model: {}".format(model_name))
924 model = await self.controller.get_model(model_name)
925 return model
926
927 def get_namespace(self, cluster_uuid: str,) -> str:
928 """Get the namespace UUID
929 Gets the namespace's unique name
930
931 :param cluster_uuid str: The UUID of the cluster
932 :returns: The namespace UUID, or raises an exception
933 """
934 config = self.get_config(cluster_uuid)
935
936 # Make sure the name is in the config
937 if "namespace" not in config:
938 raise Exception("Namespace not found.")
939
940 # TODO: We want to make sure this is unique to the cluster, in case
941 # the cluster is being reused.
942 # Consider pre/appending the cluster id to the namespace string
943 return config["namespace"]
944
945 async def has_model(self, model_name: str) -> bool:
946 """Check if a model exists in the controller
947
948 Checks to see if a model exists in the connected Juju controller.
949
950 :param model_name str: The name of the model
951 :return: A boolean indicating if the model exists
952 """
953 models = await self.controller.list_models()
954
955 if model_name in models:
956 return True
957 return False
958
959 def is_local_k8s(self, credentials: str,) -> bool:
960 """Check if a cluster is local
961
962 Checks if a cluster is running in the local host
963
964 :param credentials dict: A dictionary containing the k8s credentials
965 :returns: A boolean if the cluster is running locally
966 """
967
968 creds = yaml.safe_load(credentials)
969
970 if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
971 for cluster in creds["clusters"]:
972 if "server" in cluster["cluster"]:
973 if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
974 return True
975
976 return False
977
978 async def login(self, cluster_uuid):
979 """Login to the Juju controller."""
980
981 if self.authenticated:
982 return
983
984 self.connecting = True
985
986 # Test: Make sure we have the credentials loaded
987 config = self.get_config(cluster_uuid)
988
989 self.juju_endpoint = config["endpoint"]
990 self.juju_user = config["username"]
991 self.juju_secret = config["secret"]
992 self.juju_ca_cert = config["cacert"]
993 self.juju_public_key = None
994
995 self.controller = Controller()
996
997 if self.juju_secret:
998 self.log.debug(
999 "Connecting to controller... ws://{} as {}/{}".format(
1000 self.juju_endpoint, self.juju_user, self.juju_secret,
1001 )
1002 )
1003 try:
1004 await self.controller.connect(
1005 endpoint=self.juju_endpoint,
1006 username=self.juju_user,
1007 password=self.juju_secret,
1008 cacert=self.juju_ca_cert,
1009 )
1010 self.authenticated = True
1011 self.log.debug("JujuApi: Logged into controller")
1012 except Exception as ex:
1013 self.log.debug(ex)
1014 self.log.debug("Caught exception: {}".format(ex))
1015 pass
1016 else:
1017 self.log.fatal("VCA credentials not configured.")
1018 self.authenticated = False
1019
1020 async def logout(self):
1021 """Logout of the Juju controller."""
1022 self.log.debug("[logout]")
1023 if not self.authenticated:
1024 return False
1025
1026 for model in self.models:
1027 self.log.debug("Logging out of model {}".format(model))
1028 await self.models[model].disconnect()
1029
1030 if self.controller:
1031 self.log.debug("Disconnecting controller {}".format(self.controller))
1032 await self.controller.disconnect()
1033 self.controller = None
1034
1035 self.authenticated = False
1036
1037 async def remove_cloud(self, cloud_name: str,) -> bool:
1038 """Remove a k8s cloud from Juju
1039
1040 Removes a Kubernetes cloud from Juju.
1041
1042 :param cloud_name str: The name of the cloud to add.
1043
1044 :returns: True if successful, otherwise raises an exception.
1045 """
1046
1047 # Remove the bootstrapped controller
1048 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1049 process = await asyncio.create_subprocess_exec(
1050 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1051 )
1052
1053 _stdout, stderr = await process.communicate()
1054
1055 return_code = process.returncode
1056
1057 if return_code > 0:
1058 raise Exception(stderr)
1059
1060 # Remove the cloud from the local config
1061 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1062 process = await asyncio.create_subprocess_exec(
1063 *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1064 )
1065
1066 _stdout, stderr = await process.communicate()
1067
1068 return_code = process.returncode
1069
1070 if return_code > 0:
1071 raise Exception(stderr)
1072
1073 return True
1074
1075 async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1076 """Save the cluster configuration
1077
1078 Saves the cluster information to the file store
1079
1080 :param cluster_uuid str: The UUID of the cluster
1081 :param config dict: A dictionary containing the cluster configuration
1082 :returns: Boolean upon success or raises an exception.
1083 """
1084
1085 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1086 if not os.path.exists(cluster_config):
1087 self.log.debug("Writing config to {}".format(cluster_config))
1088 with open(cluster_config, "w") as f:
1089 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1090
1091 return True