k8s_juju_conn.py: fix cloud name for k8s
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 from .exceptions import NotImplemented
18
19 import io
20 import juju
21 # from juju.bundle import BundleHandler
22 from juju.controller import Controller
23 from juju.model import Model
24 from juju.errors import JujuAPIError, JujuError
25
26 import logging
27
28 from n2vc.k8s_conn import K8sConnector
29
30 import os
31 # import re
32 # import ssl
33 # from .vnf import N2VC
34
35 import uuid
36 import yaml
37
38
39 class K8sJujuConnector(K8sConnector):
40
41 def __init__(
42 self,
43 fs: object,
44 db: object,
45 kubectl_command: str = '/usr/bin/kubectl',
46 juju_command: str = '/usr/bin/juju',
47 log=None,
48 on_update_db=None,
49 ):
50 """
51
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param fs: file system for kubernetes and helm configuration
55 :param log: logger
56 """
57
58 # parent class
59 K8sConnector.__init__(
60 self,
61 db,
62 log=log,
63 on_update_db=on_update_db,
64 )
65
66 self.fs = fs
67 self.info('Initializing K8S Juju connector')
68
69 self.authenticated = False
70 self.models = {}
71 self.log = logging.getLogger(__name__)
72
73 self.juju_command = juju_command
74 self.juju_secret = ""
75
76 self.info('K8S Juju connector initialized')
77
78 """Initialization"""
79 async def init_env(
80 self,
81 k8s_creds: str,
82 namespace: str = 'kube-system',
83 reuse_cluster_uuid: str = None,
84 ) -> (str, bool):
85 """
86 It prepares a given K8s cluster environment to run Juju bundles.
87
88 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
89 :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
90 :param reuse_cluster_uuid: existing cluster uuid for reuse
91 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
92 (on error, an exception will be raised)
93 """
94
95 """Bootstrapping
96
97 Bootstrapping cannot be done, by design, through the API. We need to
98 use the CLI tools.
99 """
100
101 """
102 WIP: Workflow
103
104 1. Has the environment already been bootstrapped?
105 - Check the database to see if we have a record for this env
106
107 2. If this is a new env, create it
108 - Add the k8s cloud to Juju
109 - Bootstrap
110 - Record it in the database
111
112 3. Connect to the Juju controller for this cloud
113
114 """
115 # cluster_uuid = reuse_cluster_uuid
116 # if not cluster_uuid:
117 # cluster_uuid = str(uuid4())
118
119 ##################################################
120 # TODO: Pull info from db based on the namespace #
121 ##################################################
122
123 ###################################################
124 # TODO: Make it idempotent, calling add-k8s and #
125 # bootstrap whenever reuse_cluster_uuid is passed #
126 # as parameter #
127 # `init_env` is called to initialize the K8s #
128 # cluster for juju. If this initialization fails, #
129 # it can be called again by LCM with the param #
130 # reuse_cluster_uuid, e.g. to try to fix it. #
131 ###################################################
132
133 if not reuse_cluster_uuid:
134 # This is a new cluster, so bootstrap it
135
136 cluster_uuid = str(uuid.uuid4())
137
138 # Is a local k8s cluster?
139 localk8s = self.is_local_k8s(k8s_creds)
140
141 # If the k8s is external, the juju controller needs a loadbalancer
142 loadbalancer = False if localk8s else True
143
144 # Name the new k8s cloud
145 k8s_cloud = "k8s-{}".format(cluster_uuid)
146
147 print("Adding k8s cloud {}".format(k8s_cloud))
148 await self.add_k8s(k8s_cloud, k8s_creds)
149
150 # Bootstrap Juju controller
151 print("Bootstrapping...")
152 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
153 print("Bootstrap done.")
154
155 # Get the controller information
156
157 # Parse ~/.local/share/juju/controllers.yaml
158 # controllers.testing.api-endpoints|ca-cert|uuid
159 print("Getting controller endpoints")
160 with open(os.path.expanduser(
161 "~/.local/share/juju/controllers.yaml"
162 )) as f:
163 controllers = yaml.load(f, Loader=yaml.Loader)
164 controller = controllers['controllers'][cluster_uuid]
165 endpoints = controller['api-endpoints']
166 self.juju_endpoint = endpoints[0]
167 self.juju_ca_cert = controller['ca-cert']
168
169 # Parse ~/.local/share/juju/accounts
170 # controllers.testing.user|password
171 print("Getting accounts")
172 with open(os.path.expanduser(
173 "~/.local/share/juju/accounts.yaml"
174 )) as f:
175 controllers = yaml.load(f, Loader=yaml.Loader)
176 controller = controllers['controllers'][cluster_uuid]
177
178 self.juju_user = controller['user']
179 self.juju_secret = controller['password']
180
181 print("user: {}".format(self.juju_user))
182 print("secret: {}".format(self.juju_secret))
183 print("endpoint: {}".format(self.juju_endpoint))
184 print("ca-cert: {}".format(self.juju_ca_cert))
185
186 # raise Exception("EOL")
187
188 self.juju_public_key = None
189
190 config = {
191 'endpoint': self.juju_endpoint,
192 'username': self.juju_user,
193 'secret': self.juju_secret,
194 'cacert': self.juju_ca_cert,
195 'namespace': namespace,
196 'loadbalancer': loadbalancer,
197 }
198
199 # Store the cluster configuration so it
200 # can be used for subsequent calls
201 print("Setting config")
202 await self.set_config(cluster_uuid, config)
203
204 else:
205 # This is an existing cluster, so get its config
206 cluster_uuid = reuse_cluster_uuid
207
208 config = self.get_config(cluster_uuid)
209
210 self.juju_endpoint = config['endpoint']
211 self.juju_user = config['username']
212 self.juju_secret = config['secret']
213 self.juju_ca_cert = config['cacert']
214 self.juju_public_key = None
215
216 # Login to the k8s cluster
217 if not self.authenticated:
218 await self.login(cluster_uuid)
219
220 # We're creating a new cluster
221 print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
222 model = await self.get_model(
223 self.get_namespace(cluster_uuid),
224 cluster_uuid=cluster_uuid
225 )
226
227 # Disconnect from the model
228 if model and model.is_connected():
229 await model.disconnect()
230
231 return cluster_uuid, True
232
233 """Repo Management"""
234 async def repo_add(
235 self,
236 name: str,
237 url: str,
238 type: str = "charm",
239 ):
240 raise NotImplemented()
241
242 async def repo_list(self):
243 raise NotImplemented()
244
245 async def repo_remove(
246 self,
247 name: str,
248 ):
249 raise NotImplemented()
250
251 """Reset"""
252 async def reset(
253 self,
254 cluster_uuid: str,
255 force: bool = False,
256 uninstall_sw: bool = False
257 ) -> bool:
258 """Reset a cluster
259
260 Resets the Kubernetes cluster by removing the model that represents it.
261
262 :param cluster_uuid str: The UUID of the cluster to reset
263 :return: Returns True if successful or raises an exception.
264 """
265
266 try:
267 if not self.authenticated:
268 await self.login(cluster_uuid)
269
270 if self.controller.is_connected():
271 # Destroy the model
272 namespace = self.get_namespace(cluster_uuid)
273 if await self.has_model(namespace):
274 print("[reset] Destroying model")
275 await self.controller.destroy_model(
276 namespace,
277 destroy_storage=True
278 )
279
280 # Disconnect from the controller
281 print("[reset] Disconnecting controller")
282 await self.controller.disconnect()
283
284 # Destroy the controller (via CLI)
285 print("[reset] Destroying controller")
286 await self.destroy_controller(cluster_uuid)
287
288 print("[reset] Removing k8s cloud")
289 namespace = self.get_namespace(cluster_uuid)
290 k8s_cloud = "{}-k8s".format(namespace)
291 await self.remove_cloud(k8s_cloud)
292
293 except Exception as ex:
294 print("Caught exception during reset: {}".format(ex))
295
296 """Deployment"""
297
298 async def install(
299 self,
300 cluster_uuid: str,
301 kdu_model: str,
302 atomic: bool = True,
303 timeout: float = 300,
304 params: dict = None,
305 db_dict: dict = None
306 ) -> bool:
307 """Install a bundle
308
309 :param cluster_uuid str: The UUID of the cluster to install to
310 :param kdu_model str: The name or path of a bundle to install
311 :param atomic bool: If set, waits until the model is active and resets
312 the cluster on failure.
313 :param timeout int: The time, in seconds, to wait for the install
314 to finish
315 :param params dict: Key-value pairs of instantiation parameters
316
317 :return: If successful, returns ?
318 """
319
320 if not self.authenticated:
321 print("[install] Logging in to the controller")
322 await self.login(cluster_uuid)
323
324 ##
325 # Get or create the model, based on the NS
326 # uuid.
327 model_name = db_dict["filter"]["_id"]
328
329 self.log.debug("Checking for model named {}".format(model_name))
330 model = await self.get_model(model_name, cluster_uuid=cluster_uuid)
331 if not model:
332 # Create the new model
333 self.log.debug("Adding model: {}".format(model_name))
334 model = await self.add_model(model_name, cluster_uuid=cluster_uuid)
335
336 if model:
337 # TODO: Instantiation parameters
338
339 """
340 "Juju bundle that models the KDU, in any of the following ways:
341 - <juju-repo>/<juju-bundle>
342 - <juju-bundle folder under k8s_models folder in the package>
343 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
344 - <URL_where_to_fetch_juju_bundle>
345 """
346
347 bundle = kdu_model
348 if kdu_model.startswith("cs:"):
349 bundle = kdu_model
350 elif kdu_model.startswith("http"):
351 # Download the file
352 pass
353 else:
354 # Local file
355
356 # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
357 # Uncompress temporarily
358 # bundle = <uncompressed file>
359 pass
360
361 if not bundle:
362 # Raise named exception that the bundle could not be found
363 raise Exception()
364
365 print("[install] deploying {}".format(bundle))
366 await model.deploy(bundle)
367
368 # Get the application
369 if atomic:
370 # applications = model.applications
371 print("[install] Applications: {}".format(model.applications))
372 for name in model.applications:
373 print("[install] Waiting for {} to settle".format(name))
374 application = model.applications[name]
375 try:
376 # It's not enough to wait for all units to be active;
377 # the application status needs to be active as well.
378 print("Waiting for all units to be active...")
379 await model.block_until(
380 lambda: all(
381 unit.agent_status == 'idle'
382 and application.status in ['active', 'unknown']
383 and unit.workload_status in [
384 'active', 'unknown'
385 ] for unit in application.units
386 ),
387 timeout=timeout
388 )
389 print("All units active.")
390
391 except concurrent.futures._base.TimeoutError:
392 print("[install] Timeout exceeded; resetting cluster")
393 await self.reset(cluster_uuid)
394 return False
395
396 # Wait for the application to be active
397 if model.is_connected():
398 print("[install] Disconnecting model")
399 await model.disconnect()
400
401 return True
402 raise Exception("Unable to install")
403
404 async def instances_list(
405 self,
406 cluster_uuid: str
407 ) -> list:
408 """
409 returns a list of deployed releases in a cluster
410
411 :param cluster_uuid: the cluster
412 :return:
413 """
414 return []
415
416 async def upgrade(
417 self,
418 cluster_uuid: str,
419 kdu_instance: str,
420 kdu_model: str = None,
421 params: dict = None,
422 ) -> str:
423 """Upgrade a model
424
425 :param cluster_uuid str: The UUID of the cluster to upgrade
426 :param kdu_instance str: The unique name of the KDU instance
427 :param kdu_model str: The name or path of the bundle to upgrade to
428 :param params dict: Key-value pairs of instantiation parameters
429
430 :return: If successful, reference to the new revision number of the
431 KDU instance.
432 """
433
434 # TODO: Loop through the bundle and upgrade each charm individually
435
436 """
437 The API doesn't have a concept of bundle upgrades, because there are
438 many possible changes: charm revision, disk, number of units, etc.
439
440 As such, we are only supporting a limited subset of upgrades. We'll
441 upgrade the charm revision but leave storage and scale untouched.
442
443 Scale changes should happen through OSM constructs, and changes to
444 storage would require a redeployment of the service, at least in this
445 initial release.
446 """
447 namespace = self.get_namespace(cluster_uuid)
448 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
449
450 with open(kdu_model, 'r') as f:
451 bundle = yaml.safe_load(f)
452
453 """
454 {
455 'description': 'Test bundle',
456 'bundle': 'kubernetes',
457 'applications': {
458 'mariadb-k8s': {
459 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
460 'scale': 1,
461 'options': {
462 'password': 'manopw',
463 'root_password': 'osm4u',
464 'user': 'mano'
465 },
466 'series': 'kubernetes'
467 }
468 }
469 }
470 """
471 # TODO: This should be returned in an agreed-upon format
472 for name in bundle['applications']:
473 print(model.applications)
474 application = model.applications[name]
475 print(application)
476
477 path = bundle['applications'][name]['charm']
478
479 try:
480 await application.upgrade_charm(switch=path)
481 except juju.errors.JujuError as ex:
482 if 'already running charm' in str(ex):
483 # We're already running this version
484 pass
485
486 await model.disconnect()
487
488 return True
489 raise NotImplemented()
490
491 """Rollback"""
492 async def rollback(
493 self,
494 cluster_uuid: str,
495 kdu_instance: str,
496 revision: int = 0,
497 ) -> str:
498 """Rollback a model
499
500 :param cluster_uuid str: The UUID of the cluster to rollback
501 :param kdu_instance str: The unique name of the KDU instance
502 :param revision int: The revision to revert to. If omitted, rolls back
503 the previous upgrade.
504
505 :return: If successful, returns the revision of active KDU instance,
506 or raises an exception
507 """
508 raise NotImplemented()
509
510 """Deletion"""
511 async def uninstall(
512 self,
513 cluster_uuid: str,
514 kdu_instance: str,
515 ) -> bool:
516 """Uninstall a KDU instance
517
518 :param cluster_uuid str: The UUID of the cluster to uninstall
519 :param kdu_instance str: The unique name of the KDU instance
520
521 :return: Returns True if successful, or raises an exception
522 """
523 removed = False
524
525 # Remove an application from the model
526 model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
527
528 if model:
529 # Get the application
530 if kdu_instance not in model.applications:
531 # TODO: Raise a named exception
532 raise Exception("Application not found.")
533
534 application = model.applications[kdu_instance]
535
536 # Destroy the application
537 await application.destroy()
538
539 # TODO: Verify removal
540
541 removed = True
542 return removed
543
544 """Introspection"""
545 async def inspect_kdu(
546 self,
547 kdu_model: str,
548 ) -> dict:
549 """Inspect a KDU
550
551 Inspects a bundle and returns a dictionary of config parameters and
552 their default values.
553
554 :param kdu_model str: The name or path of the bundle to inspect.
555
556 :return: If successful, returns a dictionary of available parameters
557 and their default values.
558 """
559
560 kdu = {}
561 with open(kdu_model, 'r') as f:
562 bundle = yaml.safe_load(f)
563
564 """
565 {
566 'description': 'Test bundle',
567 'bundle': 'kubernetes',
568 'applications': {
569 'mariadb-k8s': {
570 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
571 'scale': 1,
572 'options': {
573 'password': 'manopw',
574 'root_password': 'osm4u',
575 'user': 'mano'
576 },
577 'series': 'kubernetes'
578 }
579 }
580 }
581 """
582 # TODO: This should be returned in an agreed-upon format
583 kdu = bundle['applications']
584
585 return kdu
586
587 async def help_kdu(
588 self,
589 kdu_model: str,
590 ) -> str:
591 """View the README
592
593 If available, returns the README of the bundle.
594
595 :param kdu_model str: The name or path of a bundle
596
597 :return: If found, returns the contents of the README.
598 """
599 readme = None
600
601 files = ['README', 'README.txt', 'README.md']
602 path = os.path.dirname(kdu_model)
603 for file in os.listdir(path):
604 if file in files:
605 with open(file, 'r') as f:
606 readme = f.read()
607 break
608
609 return readme
610
611 async def status_kdu(
612 self,
613 cluster_uuid: str,
614 kdu_instance: str,
615 ) -> dict:
616 """Get the status of the KDU
617
618 Get the current status of the KDU instance.
619
620 :param cluster_uuid str: The UUID of the cluster
621 :param kdu_instance str: The unique id of the KDU instance
622
623 :return: Returns a dictionary containing namespace, state, resources,
624 and deployment_time.
625 """
626 status = {}
627
628 model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
629
630 # model = await self.get_model_by_uuid(cluster_uuid)
631 if model:
632 model_status = await model.get_status()
633 status = model_status.applications
634
635 for name in model_status.applications:
636 application = model_status.applications[name]
637 status[name] = {
638 'status': application['status']['status']
639 }
640
641 if model.is_connected():
642 await model.disconnect()
643
644 return status
645
646 # Private methods
647 async def add_k8s(
648 self,
649 cloud_name: str,
650 credentials: str,
651 ) -> bool:
652 """Add a k8s cloud to Juju
653
654 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
655 Juju Controller.
656
657 :param cloud_name str: The name of the cloud to add.
658 :param credentials dict: A dictionary representing the output of
659 `kubectl config view --raw`.
660
661 :returns: True if successful, otherwise raises an exception.
662 """
663
664 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
665 print(cmd)
666
667 process = await asyncio.create_subprocess_exec(
668 *cmd,
669 stdout=asyncio.subprocess.PIPE,
670 stderr=asyncio.subprocess.PIPE,
671 stdin=asyncio.subprocess.PIPE,
672 )
673
674 # Feed the process the credentials
675 process.stdin.write(credentials.encode("utf-8"))
676 await process.stdin.drain()
677 process.stdin.close()
678
679 stdout, stderr = await process.communicate()
680
681 return_code = process.returncode
682
683 print("add-k8s return code: {}".format(return_code))
684
685 if return_code > 0:
686 raise Exception(stderr)
687
688 return True
689
690 async def add_model(
691 self,
692 model_name: str,
693 cluster_uuid: str,
694 ) -> juju.model.Model:
695 """Adds a model to the controller
696
697 Adds a new model to the Juju controller
698
699 :param model_name str: The name of the model to add.
700 :returns: The juju.model.Model object of the new model upon success or
701 raises an exception.
702 """
703 if not self.authenticated:
704 await self.login(cluster_uuid)
705
706 self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
707 model = await self.controller.add_model(
708 model_name,
709 config={'authorized-keys': self.juju_public_key}
710 )
711 return model
712
713 async def bootstrap(
714 self,
715 cloud_name: str,
716 cluster_uuid: str,
717 loadbalancer: bool
718 ) -> bool:
719 """Bootstrap a Kubernetes controller
720
721 Bootstrap a Juju controller inside the Kubernetes cluster
722
723 :param cloud_name str: The name of the cloud.
724 :param cluster_uuid str: The UUID of the cluster to bootstrap.
725 :param loadbalancer bool: If the controller should use loadbalancer or not.
726 :returns: True upon success or raises an exception.
727 """
728
729 if not loadbalancer:
730 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
731 else:
732 """
733 For public clusters, specify that the controller service is using a LoadBalancer.
734 """
735 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
736
737 print("Bootstrapping controller {} in cloud {}".format(
738 cluster_uuid, cloud_name
739 ))
740
741 process = await asyncio.create_subprocess_exec(
742 *cmd,
743 stdout=asyncio.subprocess.PIPE,
744 stderr=asyncio.subprocess.PIPE,
745 )
746
747 stdout, stderr = await process.communicate()
748
749 return_code = process.returncode
750
751 if return_code > 0:
752 #
753 if b'already exists' not in stderr:
754 raise Exception(stderr)
755
756 return True
757
758 async def destroy_controller(
759 self,
760 cluster_uuid: str
761 ) -> bool:
762 """Destroy a Kubernetes controller
763
764 Destroy an existing Kubernetes controller.
765
766 :param cluster_uuid str: The UUID of the cluster to bootstrap.
767 :returns: True upon success or raises an exception.
768 """
769 cmd = [
770 self.juju_command,
771 "destroy-controller",
772 "--destroy-all-models",
773 "--destroy-storage",
774 "-y",
775 cluster_uuid
776 ]
777
778 process = await asyncio.create_subprocess_exec(
779 *cmd,
780 stdout=asyncio.subprocess.PIPE,
781 stderr=asyncio.subprocess.PIPE,
782 )
783
784 stdout, stderr = await process.communicate()
785
786 return_code = process.returncode
787
788 if return_code > 0:
789 #
790 if 'already exists' not in stderr:
791 raise Exception(stderr)
792
793 def get_config(
794 self,
795 cluster_uuid: str,
796 ) -> dict:
797 """Get the cluster configuration
798
799 Gets the configuration of the cluster
800
801 :param cluster_uuid str: The UUID of the cluster.
802 :return: A dict upon success, or raises an exception.
803 """
804 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
805 if os.path.exists(cluster_config):
806 with open(cluster_config, 'r') as f:
807 config = yaml.safe_load(f.read())
808 return config
809 else:
810 raise Exception(
811 "Unable to locate configuration for cluster {}".format(
812 cluster_uuid
813 )
814 )
815
816 async def get_model(
817 self,
818 model_name: str,
819 cluster_uuid: str,
820 ) -> juju.model.Model:
821 """Get a model from the Juju Controller.
822
823 Note: Model objects returned must call disconnected() before it goes
824 out of scope.
825
826 :param model_name str: The name of the model to get
827 :return The juju.model.Model object if found, or None.
828 """
829 if not self.authenticated:
830 await self.login(cluster_uuid)
831
832 model = None
833 models = await self.controller.list_models()
834 self.log.debug(models)
835 if model_name in models:
836 self.log.debug("Found model: {}".format(model_name))
837 model = await self.controller.get_model(
838 model_name
839 )
840 return model
841
842 def get_namespace(
843 self,
844 cluster_uuid: str,
845 ) -> str:
846 """Get the namespace UUID
847 Gets the namespace's unique name
848
849 :param cluster_uuid str: The UUID of the cluster
850 :returns: The namespace UUID, or raises an exception
851 """
852 config = self.get_config(cluster_uuid)
853
854 # Make sure the name is in the config
855 if 'namespace' not in config:
856 raise Exception("Namespace not found.")
857
858 # TODO: We want to make sure this is unique to the cluster, in case
859 # the cluster is being reused.
860 # Consider pre/appending the cluster id to the namespace string
861 return config['namespace']
862
863 async def has_model(
864 self,
865 model_name: str
866 ) -> bool:
867 """Check if a model exists in the controller
868
869 Checks to see if a model exists in the connected Juju controller.
870
871 :param model_name str: The name of the model
872 :return: A boolean indicating if the model exists
873 """
874 models = await self.controller.list_models()
875
876 if model_name in models:
877 return True
878 return False
879
880 def is_local_k8s(
881 self,
882 credentials: str,
883 ) -> bool:
884 """Check if a cluster is local
885
886 Checks if a cluster is running in the local host
887
888 :param credentials dict: A dictionary containing the k8s credentials
889 :returns: A boolean if the cluster is running locally
890 """
891 creds = yaml.safe_load(credentials)
892 if os.getenv("OSMLCM_VCA_APIPROXY"):
893 host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
894
895 if creds and host_ip:
896 for cluster in creds['clusters']:
897 if 'server' in cluster['cluster']:
898 if host_ip in cluster['cluster']['server']:
899 return True
900
901 return False
902
903 async def login(self, cluster_uuid):
904 """Login to the Juju controller."""
905
906 if self.authenticated:
907 return
908
909 self.connecting = True
910
911 # Test: Make sure we have the credentials loaded
912 config = self.get_config(cluster_uuid)
913
914 self.juju_endpoint = config['endpoint']
915 self.juju_user = config['username']
916 self.juju_secret = config['secret']
917 self.juju_ca_cert = config['cacert']
918 self.juju_public_key = None
919
920 self.controller = Controller()
921
922 if self.juju_secret:
923 self.log.debug(
924 "Connecting to controller... ws://{} as {}/{}".format(
925 self.juju_endpoint,
926 self.juju_user,
927 self.juju_secret,
928 )
929 )
930 try:
931 await self.controller.connect(
932 endpoint=self.juju_endpoint,
933 username=self.juju_user,
934 password=self.juju_secret,
935 cacert=self.juju_ca_cert,
936 )
937 self.authenticated = True
938 self.log.debug("JujuApi: Logged into controller")
939 except Exception as ex:
940 print(ex)
941 self.log.debug("Caught exception: {}".format(ex))
942 pass
943 else:
944 self.log.fatal("VCA credentials not configured.")
945 self.authenticated = False
946
947 async def logout(self):
948 """Logout of the Juju controller."""
949 print("[logout]")
950 if not self.authenticated:
951 return False
952
953 for model in self.models:
954 print("Logging out of model {}".format(model))
955 await self.models[model].disconnect()
956
957 if self.controller:
958 self.log.debug("Disconnecting controller {}".format(
959 self.controller
960 ))
961 await self.controller.disconnect()
962 self.controller = None
963
964 self.authenticated = False
965
966 async def remove_cloud(
967 self,
968 cloud_name: str,
969 ) -> bool:
970 """Remove a k8s cloud from Juju
971
972 Removes a Kubernetes cloud from Juju.
973
974 :param cloud_name str: The name of the cloud to add.
975
976 :returns: True if successful, otherwise raises an exception.
977 """
978
979 # Remove the bootstrapped controller
980 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
981 process = await asyncio.create_subprocess_exec(
982 *cmd,
983 stdout=asyncio.subprocess.PIPE,
984 stderr=asyncio.subprocess.PIPE,
985 )
986
987 stdout, stderr = await process.communicate()
988
989 return_code = process.returncode
990
991 if return_code > 0:
992 raise Exception(stderr)
993
994 # Remove the cloud from the local config
995 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
996 process = await asyncio.create_subprocess_exec(
997 *cmd,
998 stdout=asyncio.subprocess.PIPE,
999 stderr=asyncio.subprocess.PIPE,
1000 )
1001
1002 stdout, stderr = await process.communicate()
1003
1004 return_code = process.returncode
1005
1006 if return_code > 0:
1007 raise Exception(stderr)
1008
1009 return True
1010
1011 async def set_config(
1012 self,
1013 cluster_uuid: str,
1014 config: dict,
1015 ) -> bool:
1016 """Save the cluster configuration
1017
1018 Saves the cluster information to the file store
1019
1020 :param cluster_uuid str: The UUID of the cluster
1021 :param config dict: A dictionary containing the cluster configuration
1022 :returns: Boolean upon success or raises an exception.
1023 """
1024
1025 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1026 if not os.path.exists(cluster_config):
1027 print("Writing config to {}".format(cluster_config))
1028 with open(cluster_config, 'w') as f:
1029 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1030
1031 return True