3316087d975889111ea4166a719babfee9472110
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 from .exceptions import NotImplemented
18
19 import io
20 import juju
21 # from juju.bundle import BundleHandler
22 from juju.controller import Controller
23 from juju.model import Model
24 from juju.errors import JujuAPIError, JujuError
25
26 from n2vc.k8s_conn import K8sConnector
27
28 import os
29 # import re
30 # import ssl
31 # from .vnf import N2VC
32
33 import uuid
34 import yaml
35
36
37 class K8sJujuConnector(K8sConnector):
38
39 def __init__(
40 self,
41 fs: object,
42 db: object,
43 kubectl_command: str = '/usr/bin/kubectl',
44 juju_command: str = '/usr/bin/juju',
45 log: object = None,
46 on_update_db=None,
47 ):
48 """
49
50 :param kubectl_command: path to kubectl executable
51 :param helm_command: path to helm executable
52 :param fs: file system for kubernetes and helm configuration
53 :param log: logger
54 """
55
56 # parent class
57 K8sConnector.__init__(
58 self,
59 db,
60 log=log,
61 on_update_db=on_update_db,
62 )
63
64 self.fs = fs
65 self.log.debug('Initializing K8S Juju connector')
66
67 self.authenticated = False
68 self.models = {}
69
70 self.juju_command = juju_command
71 self.juju_secret = ""
72
73 self.log.debug('K8S Juju connector initialized')
74
75 """Initialization"""
76 async def init_env(
77 self,
78 k8s_creds: str,
79 namespace: str = 'kube-system',
80 reuse_cluster_uuid: str = None,
81 ) -> (str, bool):
82 """
83 It prepares a given K8s cluster environment to run Juju bundles.
84
85 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
86 :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
87 :param reuse_cluster_uuid: existing cluster uuid for reuse
88 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
89 (on error, an exception will be raised)
90 """
91
92 """Bootstrapping
93
94 Bootstrapping cannot be done, by design, through the API. We need to
95 use the CLI tools.
96 """
97
98 """
99 WIP: Workflow
100
101 1. Has the environment already been bootstrapped?
102 - Check the database to see if we have a record for this env
103
104 2. If this is a new env, create it
105 - Add the k8s cloud to Juju
106 - Bootstrap
107 - Record it in the database
108
109 3. Connect to the Juju controller for this cloud
110
111 """
112 # cluster_uuid = reuse_cluster_uuid
113 # if not cluster_uuid:
114 # cluster_uuid = str(uuid4())
115
116 ##################################################
117 # TODO: Pull info from db based on the namespace #
118 ##################################################
119
120 ###################################################
121 # TODO: Make it idempotent, calling add-k8s and #
122 # bootstrap whenever reuse_cluster_uuid is passed #
123 # as parameter #
124 # `init_env` is called to initialize the K8s #
125 # cluster for juju. If this initialization fails, #
126 # it can be called again by LCM with the param #
127 # reuse_cluster_uuid, e.g. to try to fix it. #
128 ###################################################
129
130 if not reuse_cluster_uuid:
131 # This is a new cluster, so bootstrap it
132
133 cluster_uuid = str(uuid.uuid4())
134
135 # Is a local k8s cluster?
136 localk8s = self.is_local_k8s(k8s_creds)
137
138 # If the k8s is external, the juju controller needs a loadbalancer
139 loadbalancer = False if localk8s else True
140
141 # Name the new k8s cloud
142 k8s_cloud = "k8s-{}".format(cluster_uuid)
143
144 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
145 await self.add_k8s(k8s_cloud, k8s_creds)
146
147 # Bootstrap Juju controller
148 self.log.debug("Bootstrapping...")
149 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
150 self.log.debug("Bootstrap done.")
151
152 # Get the controller information
153
154 # Parse ~/.local/share/juju/controllers.yaml
155 # controllers.testing.api-endpoints|ca-cert|uuid
156 self.log.debug("Getting controller endpoints")
157 with open(os.path.expanduser(
158 "~/.local/share/juju/controllers.yaml"
159 )) as f:
160 controllers = yaml.load(f, Loader=yaml.Loader)
161 controller = controllers['controllers'][cluster_uuid]
162 endpoints = controller['api-endpoints']
163 self.juju_endpoint = endpoints[0]
164 self.juju_ca_cert = controller['ca-cert']
165
166 # Parse ~/.local/share/juju/accounts
167 # controllers.testing.user|password
168 self.log.debug("Getting accounts")
169 with open(os.path.expanduser(
170 "~/.local/share/juju/accounts.yaml"
171 )) as f:
172 controllers = yaml.load(f, Loader=yaml.Loader)
173 controller = controllers['controllers'][cluster_uuid]
174
175 self.juju_user = controller['user']
176 self.juju_secret = controller['password']
177
178 # raise Exception("EOL")
179
180 self.juju_public_key = None
181
182 config = {
183 'endpoint': self.juju_endpoint,
184 'username': self.juju_user,
185 'secret': self.juju_secret,
186 'cacert': self.juju_ca_cert,
187 'namespace': namespace,
188 'loadbalancer': loadbalancer,
189 }
190
191 # Store the cluster configuration so it
192 # can be used for subsequent calls
193 self.log.debug("Setting config")
194 await self.set_config(cluster_uuid, config)
195
196 else:
197 # This is an existing cluster, so get its config
198 cluster_uuid = reuse_cluster_uuid
199
200 config = self.get_config(cluster_uuid)
201
202 self.juju_endpoint = config['endpoint']
203 self.juju_user = config['username']
204 self.juju_secret = config['secret']
205 self.juju_ca_cert = config['cacert']
206 self.juju_public_key = None
207
208 # Login to the k8s cluster
209 if not self.authenticated:
210 await self.login(cluster_uuid)
211
212 # We're creating a new cluster
213 #print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
214 #model = await self.get_model(
215 # self.get_namespace(cluster_uuid),
216 # cluster_uuid=cluster_uuid
217 #)
218
219 ## Disconnect from the model
220 #if model and model.is_connected():
221 # await model.disconnect()
222
223 return cluster_uuid, True
224
225 """Repo Management"""
226 async def repo_add(
227 self,
228 name: str,
229 url: str,
230 type: str = "charm",
231 ):
232 raise NotImplemented()
233
234 async def repo_list(self):
235 raise NotImplemented()
236
237 async def repo_remove(
238 self,
239 name: str,
240 ):
241 raise NotImplemented()
242
243 async def synchronize_repos(
244 self,
245 cluster_uuid: str,
246 name: str
247 ):
248 """
249 Returns None as currently add_repo is not implemented
250 """
251 return None
252
253 """Reset"""
254 async def reset(
255 self,
256 cluster_uuid: str,
257 force: bool = False,
258 uninstall_sw: bool = False
259 ) -> bool:
260 """Reset a cluster
261
262 Resets the Kubernetes cluster by removing the model that represents it.
263
264 :param cluster_uuid str: The UUID of the cluster to reset
265 :return: Returns True if successful or raises an exception.
266 """
267
268 try:
269 if not self.authenticated:
270 await self.login(cluster_uuid)
271
272 if self.controller.is_connected():
273 # Destroy the model
274 namespace = self.get_namespace(cluster_uuid)
275 if await self.has_model(namespace):
276 self.log.debug("[reset] Destroying model")
277 await self.controller.destroy_model(
278 namespace,
279 destroy_storage=True
280 )
281
282 # Disconnect from the controller
283 self.log.debug("[reset] Disconnecting controller")
284 await self.logout()
285
286 # Destroy the controller (via CLI)
287 self.log.debug("[reset] Destroying controller")
288 await self.destroy_controller(cluster_uuid)
289
290 self.log.debug("[reset] Removing k8s cloud")
291 k8s_cloud = "k8s-{}".format(cluster_uuid)
292 await self.remove_cloud(k8s_cloud)
293
294 except Exception as ex:
295 self.log.debug("Caught exception during reset: {}".format(ex))
296
297 return True
298
299 """Deployment"""
300
301 async def install(
302 self,
303 cluster_uuid: str,
304 kdu_model: str,
305 atomic: bool = True,
306 timeout: float = 300,
307 params: dict = None,
308 db_dict: dict = None,
309 kdu_name: str = None
310 ) -> bool:
311 """Install a bundle
312
313 :param cluster_uuid str: The UUID of the cluster to install to
314 :param kdu_model str: The name or path of a bundle to install
315 :param atomic bool: If set, waits until the model is active and resets
316 the cluster on failure.
317 :param timeout int: The time, in seconds, to wait for the install
318 to finish
319 :param params dict: Key-value pairs of instantiation parameters
320 :param kdu_name: Name of the KDU instance to be installed
321
322 :return: If successful, returns ?
323 """
324
325 if not self.authenticated:
326 self.log.debug("[install] Logging in to the controller")
327 await self.login(cluster_uuid)
328
329 ##
330 # Get or create the model, based on the NS
331 # uuid.
332 if kdu_name:
333 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
334 else:
335 kdu_instance = db_dict["filter"]["_id"]
336
337 self.log.debug("Checking for model named {}".format(kdu_instance))
338
339 # Create the new model
340 self.log.debug("Adding model: {}".format(kdu_instance))
341 model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
342
343 if model:
344 # TODO: Instantiation parameters
345
346 """
347 "Juju bundle that models the KDU, in any of the following ways:
348 - <juju-repo>/<juju-bundle>
349 - <juju-bundle folder under k8s_models folder in the package>
350 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
351 - <URL_where_to_fetch_juju_bundle>
352 """
353
354 previous_workdir = os.getcwd()
355
356 bundle = kdu_model
357 if kdu_model.startswith("cs:"):
358 bundle = kdu_model
359 elif kdu_model.startswith("http"):
360 # Download the file
361 pass
362 else:
363 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
364
365 os.chdir(new_workdir)
366
367 bundle = "local:{}".format(kdu_model)
368
369 if not bundle:
370 # Raise named exception that the bundle could not be found
371 raise Exception()
372
373 self.log.debug("[install] deploying {}".format(bundle))
374 await model.deploy(bundle)
375
376 # Get the application
377 if atomic:
378 # applications = model.applications
379 self.log.debug("[install] Applications: {}".format(model.applications))
380 for name in model.applications:
381 self.log.debug("[install] Waiting for {} to settle".format(name))
382 application = model.applications[name]
383 try:
384 # It's not enough to wait for all units to be active;
385 # the application status needs to be active as well.
386 self.log.debug("Waiting for all units to be active...")
387 await model.block_until(
388 lambda: all(
389 unit.agent_status == 'idle'
390 and application.status in ['active', 'unknown']
391 and unit.workload_status in [
392 'active', 'unknown'
393 ] for unit in application.units
394 ),
395 timeout=timeout
396 )
397 self.log.debug("All units active.")
398
399 except concurrent.futures._base.TimeoutError:
400 os.chdir(previous_workdir)
401 self.log.debug("[install] Timeout exceeded; resetting cluster")
402 await self.reset(cluster_uuid)
403 return False
404
405 # Wait for the application to be active
406 if model.is_connected():
407 self.log.debug("[install] Disconnecting model")
408 await model.disconnect()
409
410 os.chdir(previous_workdir)
411
412 return kdu_instance
413 raise Exception("Unable to install")
414
415 async def instances_list(
416 self,
417 cluster_uuid: str
418 ) -> list:
419 """
420 returns a list of deployed releases in a cluster
421
422 :param cluster_uuid: the cluster
423 :return:
424 """
425 return []
426
427 async def upgrade(
428 self,
429 cluster_uuid: str,
430 kdu_instance: str,
431 kdu_model: str = None,
432 params: dict = None,
433 ) -> str:
434 """Upgrade a model
435
436 :param cluster_uuid str: The UUID of the cluster to upgrade
437 :param kdu_instance str: The unique name of the KDU instance
438 :param kdu_model str: The name or path of the bundle to upgrade to
439 :param params dict: Key-value pairs of instantiation parameters
440
441 :return: If successful, reference to the new revision number of the
442 KDU instance.
443 """
444
445 # TODO: Loop through the bundle and upgrade each charm individually
446
447 """
448 The API doesn't have a concept of bundle upgrades, because there are
449 many possible changes: charm revision, disk, number of units, etc.
450
451 As such, we are only supporting a limited subset of upgrades. We'll
452 upgrade the charm revision but leave storage and scale untouched.
453
454 Scale changes should happen through OSM constructs, and changes to
455 storage would require a redeployment of the service, at least in this
456 initial release.
457 """
458 namespace = self.get_namespace(cluster_uuid)
459 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
460
461 with open(kdu_model, 'r') as f:
462 bundle = yaml.safe_load(f)
463
464 """
465 {
466 'description': 'Test bundle',
467 'bundle': 'kubernetes',
468 'applications': {
469 'mariadb-k8s': {
470 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
471 'scale': 1,
472 'options': {
473 'password': 'manopw',
474 'root_password': 'osm4u',
475 'user': 'mano'
476 },
477 'series': 'kubernetes'
478 }
479 }
480 }
481 """
482 # TODO: This should be returned in an agreed-upon format
483 for name in bundle['applications']:
484 self.log.debug(model.applications)
485 application = model.applications[name]
486 self.log.debug(application)
487
488 path = bundle['applications'][name]['charm']
489
490 try:
491 await application.upgrade_charm(switch=path)
492 except juju.errors.JujuError as ex:
493 if 'already running charm' in str(ex):
494 # We're already running this version
495 pass
496
497 await model.disconnect()
498
499 return True
500 raise NotImplemented()
501
502 """Rollback"""
503 async def rollback(
504 self,
505 cluster_uuid: str,
506 kdu_instance: str,
507 revision: int = 0,
508 ) -> str:
509 """Rollback a model
510
511 :param cluster_uuid str: The UUID of the cluster to rollback
512 :param kdu_instance str: The unique name of the KDU instance
513 :param revision int: The revision to revert to. If omitted, rolls back
514 the previous upgrade.
515
516 :return: If successful, returns the revision of active KDU instance,
517 or raises an exception
518 """
519 raise NotImplemented()
520
521 """Deletion"""
522 async def uninstall(
523 self,
524 cluster_uuid: str,
525 kdu_instance: str
526 ) -> bool:
527 """Uninstall a KDU instance
528
529 :param cluster_uuid str: The UUID of the cluster
530 :param kdu_instance str: The unique name of the KDU instance
531
532 :return: Returns True if successful, or raises an exception
533 """
534 if not self.authenticated:
535 self.log.debug("[uninstall] Connecting to controller")
536 await self.login(cluster_uuid)
537
538 self.log.debug("[uninstall] Destroying model")
539
540 await self.controller.destroy_models(kdu_instance)
541
542 self.log.debug("[uninstall] Model destroyed and disconnecting")
543 await self.logout()
544
545 return True
546
547 """Introspection"""
548 async def inspect_kdu(
549 self,
550 kdu_model: str,
551 ) -> dict:
552 """Inspect a KDU
553
554 Inspects a bundle and returns a dictionary of config parameters and
555 their default values.
556
557 :param kdu_model str: The name or path of the bundle to inspect.
558
559 :return: If successful, returns a dictionary of available parameters
560 and their default values.
561 """
562
563 kdu = {}
564 with open(kdu_model, 'r') as f:
565 bundle = yaml.safe_load(f)
566
567 """
568 {
569 'description': 'Test bundle',
570 'bundle': 'kubernetes',
571 'applications': {
572 'mariadb-k8s': {
573 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
574 'scale': 1,
575 'options': {
576 'password': 'manopw',
577 'root_password': 'osm4u',
578 'user': 'mano'
579 },
580 'series': 'kubernetes'
581 }
582 }
583 }
584 """
585 # TODO: This should be returned in an agreed-upon format
586 kdu = bundle['applications']
587
588 return kdu
589
590 async def help_kdu(
591 self,
592 kdu_model: str,
593 ) -> str:
594 """View the README
595
596 If available, returns the README of the bundle.
597
598 :param kdu_model str: The name or path of a bundle
599
600 :return: If found, returns the contents of the README.
601 """
602 readme = None
603
604 files = ['README', 'README.txt', 'README.md']
605 path = os.path.dirname(kdu_model)
606 for file in os.listdir(path):
607 if file in files:
608 with open(file, 'r') as f:
609 readme = f.read()
610 break
611
612 return readme
613
614 async def status_kdu(
615 self,
616 cluster_uuid: str,
617 kdu_instance: str,
618 ) -> dict:
619 """Get the status of the KDU
620
621 Get the current status of the KDU instance.
622
623 :param cluster_uuid str: The UUID of the cluster
624 :param kdu_instance str: The unique id of the KDU instance
625
626 :return: Returns a dictionary containing namespace, state, resources,
627 and deployment_time.
628 """
629 status = {}
630
631 model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
632
633 # model = await self.get_model_by_uuid(cluster_uuid)
634 if model:
635 model_status = await model.get_status()
636 status = model_status.applications
637
638 for name in model_status.applications:
639 application = model_status.applications[name]
640 status[name] = {
641 'status': application['status']['status']
642 }
643
644 if model.is_connected():
645 await model.disconnect()
646
647 return status
648
649 # Private methods
650 async def add_k8s(
651 self,
652 cloud_name: str,
653 credentials: str,
654 ) -> bool:
655 """Add a k8s cloud to Juju
656
657 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
658 Juju Controller.
659
660 :param cloud_name str: The name of the cloud to add.
661 :param credentials dict: A dictionary representing the output of
662 `kubectl config view --raw`.
663
664 :returns: True if successful, otherwise raises an exception.
665 """
666
667 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
668 self.log.debug(cmd)
669
670 process = await asyncio.create_subprocess_exec(
671 *cmd,
672 stdout=asyncio.subprocess.PIPE,
673 stderr=asyncio.subprocess.PIPE,
674 stdin=asyncio.subprocess.PIPE,
675 )
676
677 # Feed the process the credentials
678 process.stdin.write(credentials.encode("utf-8"))
679 await process.stdin.drain()
680 process.stdin.close()
681
682 stdout, stderr = await process.communicate()
683
684 return_code = process.returncode
685
686 self.log.debug("add-k8s return code: {}".format(return_code))
687
688 if return_code > 0:
689 raise Exception(stderr)
690
691 return True
692
693 async def add_model(
694 self,
695 model_name: str,
696 cluster_uuid: str,
697 ) -> juju.model.Model:
698 """Adds a model to the controller
699
700 Adds a new model to the Juju controller
701
702 :param model_name str: The name of the model to add.
703 :returns: The juju.model.Model object of the new model upon success or
704 raises an exception.
705 """
706 if not self.authenticated:
707 await self.login(cluster_uuid)
708
709 self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
710 try:
711 model = await self.controller.add_model(
712 model_name,
713 config={'authorized-keys': self.juju_public_key}
714 )
715 except Exception as ex:
716 self.log.debug(ex)
717 self.log.debug("Caught exception: {}".format(ex))
718 pass
719
720 return model
721
722 async def bootstrap(
723 self,
724 cloud_name: str,
725 cluster_uuid: str,
726 loadbalancer: bool
727 ) -> bool:
728 """Bootstrap a Kubernetes controller
729
730 Bootstrap a Juju controller inside the Kubernetes cluster
731
732 :param cloud_name str: The name of the cloud.
733 :param cluster_uuid str: The UUID of the cluster to bootstrap.
734 :param loadbalancer bool: If the controller should use loadbalancer or not.
735 :returns: True upon success or raises an exception.
736 """
737
738 if not loadbalancer:
739 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
740 else:
741 """
742 For public clusters, specify that the controller service is using a LoadBalancer.
743 """
744 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
745
746 self.log.debug("Bootstrapping controller {} in cloud {}".format(
747 cluster_uuid, cloud_name
748 ))
749
750 process = await asyncio.create_subprocess_exec(
751 *cmd,
752 stdout=asyncio.subprocess.PIPE,
753 stderr=asyncio.subprocess.PIPE,
754 )
755
756 stdout, stderr = await process.communicate()
757
758 return_code = process.returncode
759
760 if return_code > 0:
761 #
762 if b'already exists' not in stderr:
763 raise Exception(stderr)
764
765 return True
766
767 async def destroy_controller(
768 self,
769 cluster_uuid: str
770 ) -> bool:
771 """Destroy a Kubernetes controller
772
773 Destroy an existing Kubernetes controller.
774
775 :param cluster_uuid str: The UUID of the cluster to bootstrap.
776 :returns: True upon success or raises an exception.
777 """
778 cmd = [
779 self.juju_command,
780 "destroy-controller",
781 "--destroy-all-models",
782 "--destroy-storage",
783 "-y",
784 cluster_uuid
785 ]
786
787 process = await asyncio.create_subprocess_exec(
788 *cmd,
789 stdout=asyncio.subprocess.PIPE,
790 stderr=asyncio.subprocess.PIPE,
791 )
792
793 stdout, stderr = await process.communicate()
794
795 return_code = process.returncode
796
797 if return_code > 0:
798 #
799 if 'already exists' not in stderr:
800 raise Exception(stderr)
801
802 def get_config(
803 self,
804 cluster_uuid: str,
805 ) -> dict:
806 """Get the cluster configuration
807
808 Gets the configuration of the cluster
809
810 :param cluster_uuid str: The UUID of the cluster.
811 :return: A dict upon success, or raises an exception.
812 """
813 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
814 if os.path.exists(cluster_config):
815 with open(cluster_config, 'r') as f:
816 config = yaml.safe_load(f.read())
817 return config
818 else:
819 raise Exception(
820 "Unable to locate configuration for cluster {}".format(
821 cluster_uuid
822 )
823 )
824
825 async def get_model(
826 self,
827 model_name: str,
828 cluster_uuid: str,
829 ) -> juju.model.Model:
830 """Get a model from the Juju Controller.
831
832 Note: Model objects returned must call disconnected() before it goes
833 out of scope.
834
835 :param model_name str: The name of the model to get
836 :return The juju.model.Model object if found, or None.
837 """
838 if not self.authenticated:
839 await self.login(cluster_uuid)
840
841 model = None
842 models = await self.controller.list_models()
843 self.log.debug(models)
844 if model_name in models:
845 self.log.debug("Found model: {}".format(model_name))
846 model = await self.controller.get_model(
847 model_name
848 )
849 return model
850
851 def get_namespace(
852 self,
853 cluster_uuid: str,
854 ) -> str:
855 """Get the namespace UUID
856 Gets the namespace's unique name
857
858 :param cluster_uuid str: The UUID of the cluster
859 :returns: The namespace UUID, or raises an exception
860 """
861 config = self.get_config(cluster_uuid)
862
863 # Make sure the name is in the config
864 if 'namespace' not in config:
865 raise Exception("Namespace not found.")
866
867 # TODO: We want to make sure this is unique to the cluster, in case
868 # the cluster is being reused.
869 # Consider pre/appending the cluster id to the namespace string
870 return config['namespace']
871
872 async def has_model(
873 self,
874 model_name: str
875 ) -> bool:
876 """Check if a model exists in the controller
877
878 Checks to see if a model exists in the connected Juju controller.
879
880 :param model_name str: The name of the model
881 :return: A boolean indicating if the model exists
882 """
883 models = await self.controller.list_models()
884
885 if model_name in models:
886 return True
887 return False
888
889 def is_local_k8s(
890 self,
891 credentials: str,
892 ) -> bool:
893 """Check if a cluster is local
894
895 Checks if a cluster is running in the local host
896
897 :param credentials dict: A dictionary containing the k8s credentials
898 :returns: A boolean if the cluster is running locally
899 """
900 creds = yaml.safe_load(credentials)
901 if os.getenv("OSMLCM_VCA_APIPROXY"):
902 host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
903
904 if creds and host_ip:
905 for cluster in creds['clusters']:
906 if 'server' in cluster['cluster']:
907 if host_ip in cluster['cluster']['server']:
908 return True
909
910 return False
911
912 async def login(self, cluster_uuid):
913 """Login to the Juju controller."""
914
915 if self.authenticated:
916 return
917
918 self.connecting = True
919
920 # Test: Make sure we have the credentials loaded
921 config = self.get_config(cluster_uuid)
922
923 self.juju_endpoint = config['endpoint']
924 self.juju_user = config['username']
925 self.juju_secret = config['secret']
926 self.juju_ca_cert = config['cacert']
927 self.juju_public_key = None
928
929 self.controller = Controller()
930
931 if self.juju_secret:
932 self.log.debug(
933 "Connecting to controller... ws://{} as {}/{}".format(
934 self.juju_endpoint,
935 self.juju_user,
936 self.juju_secret,
937 )
938 )
939 try:
940 await self.controller.connect(
941 endpoint=self.juju_endpoint,
942 username=self.juju_user,
943 password=self.juju_secret,
944 cacert=self.juju_ca_cert,
945 )
946 self.authenticated = True
947 self.log.debug("JujuApi: Logged into controller")
948 except Exception as ex:
949 self.log.debug(ex)
950 self.log.debug("Caught exception: {}".format(ex))
951 pass
952 else:
953 self.log.fatal("VCA credentials not configured.")
954 self.authenticated = False
955
956 async def logout(self):
957 """Logout of the Juju controller."""
958 self.log.debug("[logout]")
959 if not self.authenticated:
960 return False
961
962 for model in self.models:
963 self.log.debug("Logging out of model {}".format(model))
964 await self.models[model].disconnect()
965
966 if self.controller:
967 self.log.debug("Disconnecting controller {}".format(
968 self.controller
969 ))
970 await self.controller.disconnect()
971 self.controller = None
972
973 self.authenticated = False
974
975 async def remove_cloud(
976 self,
977 cloud_name: str,
978 ) -> bool:
979 """Remove a k8s cloud from Juju
980
981 Removes a Kubernetes cloud from Juju.
982
983 :param cloud_name str: The name of the cloud to add.
984
985 :returns: True if successful, otherwise raises an exception.
986 """
987
988 # Remove the bootstrapped controller
989 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
990 process = await asyncio.create_subprocess_exec(
991 *cmd,
992 stdout=asyncio.subprocess.PIPE,
993 stderr=asyncio.subprocess.PIPE,
994 )
995
996 stdout, stderr = await process.communicate()
997
998 return_code = process.returncode
999
1000 if return_code > 0:
1001 raise Exception(stderr)
1002
1003 # Remove the cloud from the local config
1004 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1005 process = await asyncio.create_subprocess_exec(
1006 *cmd,
1007 stdout=asyncio.subprocess.PIPE,
1008 stderr=asyncio.subprocess.PIPE,
1009 )
1010
1011 stdout, stderr = await process.communicate()
1012
1013 return_code = process.returncode
1014
1015 if return_code > 0:
1016 raise Exception(stderr)
1017
1018 return True
1019
1020 async def set_config(
1021 self,
1022 cluster_uuid: str,
1023 config: dict,
1024 ) -> bool:
1025 """Save the cluster configuration
1026
1027 Saves the cluster information to the file store
1028
1029 :param cluster_uuid str: The UUID of the cluster
1030 :param config dict: A dictionary containing the cluster configuration
1031 :returns: Boolean upon success or raises an exception.
1032 """
1033
1034 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1035 if not os.path.exists(cluster_config):
1036 self.log.debug("Writing config to {}".format(cluster_config))
1037 with open(cluster_config, 'w') as f:
1038 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1039
1040 return True