adding optional namespace to K8s install
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 from .exceptions import NotImplemented
18
19 import io
20 import juju
21 # from juju.bundle import BundleHandler
22 from juju.controller import Controller
23 from juju.model import Model
24 from juju.errors import JujuAPIError, JujuError
25
26 from n2vc.k8s_conn import K8sConnector
27
28 import os
29 # import re
30 # import ssl
31 # from .vnf import N2VC
32
33 import uuid
34 import yaml
35
36
37 class K8sJujuConnector(K8sConnector):
38
39 def __init__(
40 self,
41 fs: object,
42 db: object,
43 kubectl_command: str = '/usr/bin/kubectl',
44 juju_command: str = '/usr/bin/juju',
45 log: object = None,
46 on_update_db=None,
47 ):
48 """
49
50 :param kubectl_command: path to kubectl executable
51 :param helm_command: path to helm executable
52 :param fs: file system for kubernetes and helm configuration
53 :param log: logger
54 """
55
56 # parent class
57 K8sConnector.__init__(
58 self,
59 db,
60 log=log,
61 on_update_db=on_update_db,
62 )
63
64 self.fs = fs
65 self.log.debug('Initializing K8S Juju connector')
66
67 self.authenticated = False
68 self.models = {}
69
70 self.juju_command = juju_command
71 self.juju_secret = ""
72
73 self.log.debug('K8S Juju connector initialized')
74
75 """Initialization"""
76 async def init_env(
77 self,
78 k8s_creds: str,
79 namespace: str = 'kube-system',
80 reuse_cluster_uuid: str = None,
81 ) -> (str, bool):
82 """
83 It prepares a given K8s cluster environment to run Juju bundles.
84
85 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
86 :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
87 :param reuse_cluster_uuid: existing cluster uuid for reuse
88 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
89 (on error, an exception will be raised)
90 """
91
92 """Bootstrapping
93
94 Bootstrapping cannot be done, by design, through the API. We need to
95 use the CLI tools.
96 """
97
98 """
99 WIP: Workflow
100
101 1. Has the environment already been bootstrapped?
102 - Check the database to see if we have a record for this env
103
104 2. If this is a new env, create it
105 - Add the k8s cloud to Juju
106 - Bootstrap
107 - Record it in the database
108
109 3. Connect to the Juju controller for this cloud
110
111 """
112 # cluster_uuid = reuse_cluster_uuid
113 # if not cluster_uuid:
114 # cluster_uuid = str(uuid4())
115
116 ##################################################
117 # TODO: Pull info from db based on the namespace #
118 ##################################################
119
120 ###################################################
121 # TODO: Make it idempotent, calling add-k8s and #
122 # bootstrap whenever reuse_cluster_uuid is passed #
123 # as parameter #
124 # `init_env` is called to initialize the K8s #
125 # cluster for juju. If this initialization fails, #
126 # it can be called again by LCM with the param #
127 # reuse_cluster_uuid, e.g. to try to fix it. #
128 ###################################################
129
130 if not reuse_cluster_uuid:
131 # This is a new cluster, so bootstrap it
132
133 cluster_uuid = str(uuid.uuid4())
134
135 # Is a local k8s cluster?
136 localk8s = self.is_local_k8s(k8s_creds)
137
138 # If the k8s is external, the juju controller needs a loadbalancer
139 loadbalancer = False if localk8s else True
140
141 # Name the new k8s cloud
142 k8s_cloud = "k8s-{}".format(cluster_uuid)
143
144 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
145 await self.add_k8s(k8s_cloud, k8s_creds)
146
147 # Bootstrap Juju controller
148 self.log.debug("Bootstrapping...")
149 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
150 self.log.debug("Bootstrap done.")
151
152 # Get the controller information
153
154 # Parse ~/.local/share/juju/controllers.yaml
155 # controllers.testing.api-endpoints|ca-cert|uuid
156 self.log.debug("Getting controller endpoints")
157 with open(os.path.expanduser(
158 "~/.local/share/juju/controllers.yaml"
159 )) as f:
160 controllers = yaml.load(f, Loader=yaml.Loader)
161 controller = controllers['controllers'][cluster_uuid]
162 endpoints = controller['api-endpoints']
163 self.juju_endpoint = endpoints[0]
164 self.juju_ca_cert = controller['ca-cert']
165
166 # Parse ~/.local/share/juju/accounts
167 # controllers.testing.user|password
168 self.log.debug("Getting accounts")
169 with open(os.path.expanduser(
170 "~/.local/share/juju/accounts.yaml"
171 )) as f:
172 controllers = yaml.load(f, Loader=yaml.Loader)
173 controller = controllers['controllers'][cluster_uuid]
174
175 self.juju_user = controller['user']
176 self.juju_secret = controller['password']
177
178 # raise Exception("EOL")
179
180 self.juju_public_key = None
181
182 config = {
183 'endpoint': self.juju_endpoint,
184 'username': self.juju_user,
185 'secret': self.juju_secret,
186 'cacert': self.juju_ca_cert,
187 'namespace': namespace,
188 'loadbalancer': loadbalancer,
189 }
190
191 # Store the cluster configuration so it
192 # can be used for subsequent calls
193 self.log.debug("Setting config")
194 await self.set_config(cluster_uuid, config)
195
196 else:
197 # This is an existing cluster, so get its config
198 cluster_uuid = reuse_cluster_uuid
199
200 config = self.get_config(cluster_uuid)
201
202 self.juju_endpoint = config['endpoint']
203 self.juju_user = config['username']
204 self.juju_secret = config['secret']
205 self.juju_ca_cert = config['cacert']
206 self.juju_public_key = None
207
208 # Login to the k8s cluster
209 if not self.authenticated:
210 await self.login(cluster_uuid)
211
212 # We're creating a new cluster
213 #print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
214 #model = await self.get_model(
215 # self.get_namespace(cluster_uuid),
216 # cluster_uuid=cluster_uuid
217 #)
218
219 ## Disconnect from the model
220 #if model and model.is_connected():
221 # await model.disconnect()
222
223 return cluster_uuid, True
224
225 """Repo Management"""
226 async def repo_add(
227 self,
228 name: str,
229 url: str,
230 type: str = "charm",
231 ):
232 raise NotImplemented()
233
234 async def repo_list(self):
235 raise NotImplemented()
236
237 async def repo_remove(
238 self,
239 name: str,
240 ):
241 raise NotImplemented()
242
243 async def synchronize_repos(
244 self,
245 cluster_uuid: str,
246 name: str
247 ):
248 """
249 Returns None as currently add_repo is not implemented
250 """
251 return None
252
253 """Reset"""
254 async def reset(
255 self,
256 cluster_uuid: str,
257 force: bool = False,
258 uninstall_sw: bool = False
259 ) -> bool:
260 """Reset a cluster
261
262 Resets the Kubernetes cluster by removing the model that represents it.
263
264 :param cluster_uuid str: The UUID of the cluster to reset
265 :return: Returns True if successful or raises an exception.
266 """
267
268 try:
269 if not self.authenticated:
270 await self.login(cluster_uuid)
271
272 if self.controller.is_connected():
273 # Destroy the model
274 namespace = self.get_namespace(cluster_uuid)
275 if await self.has_model(namespace):
276 self.log.debug("[reset] Destroying model")
277 await self.controller.destroy_model(
278 namespace,
279 destroy_storage=True
280 )
281
282 # Disconnect from the controller
283 self.log.debug("[reset] Disconnecting controller")
284 await self.logout()
285
286 # Destroy the controller (via CLI)
287 self.log.debug("[reset] Destroying controller")
288 await self.destroy_controller(cluster_uuid)
289
290 self.log.debug("[reset] Removing k8s cloud")
291 k8s_cloud = "k8s-{}".format(cluster_uuid)
292 await self.remove_cloud(k8s_cloud)
293
294 except Exception as ex:
295 self.log.debug("Caught exception during reset: {}".format(ex))
296
297 return True
298
299 """Deployment"""
300
301 async def install(
302 self,
303 cluster_uuid: str,
304 kdu_model: str,
305 atomic: bool = True,
306 timeout: float = 300,
307 params: dict = None,
308 db_dict: dict = None,
309 kdu_name: str = None,
310 namespace: str = None
311 ) -> bool:
312 """Install a bundle
313
314 :param cluster_uuid str: The UUID of the cluster to install to
315 :param kdu_model str: The name or path of a bundle to install
316 :param atomic bool: If set, waits until the model is active and resets
317 the cluster on failure.
318 :param timeout int: The time, in seconds, to wait for the install
319 to finish
320 :param params dict: Key-value pairs of instantiation parameters
321 :param kdu_name: Name of the KDU instance to be installed
322 :param namespace: K8s namespace to use for the KDU instance
323
324 :return: If successful, returns ?
325 """
326
327 if not self.authenticated:
328 self.log.debug("[install] Logging in to the controller")
329 await self.login(cluster_uuid)
330
331 ##
332 # Get or create the model, based on the NS
333 # uuid.
334 if kdu_name:
335 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
336 else:
337 kdu_instance = db_dict["filter"]["_id"]
338
339 self.log.debug("Checking for model named {}".format(kdu_instance))
340
341 # Create the new model
342 self.log.debug("Adding model: {}".format(kdu_instance))
343 model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
344
345 if model:
346 # TODO: Instantiation parameters
347
348 """
349 "Juju bundle that models the KDU, in any of the following ways:
350 - <juju-repo>/<juju-bundle>
351 - <juju-bundle folder under k8s_models folder in the package>
352 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
353 - <URL_where_to_fetch_juju_bundle>
354 """
355
356 previous_workdir = os.getcwd()
357
358 bundle = kdu_model
359 if kdu_model.startswith("cs:"):
360 bundle = kdu_model
361 elif kdu_model.startswith("http"):
362 # Download the file
363 pass
364 else:
365 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
366
367 os.chdir(new_workdir)
368
369 bundle = "local:{}".format(kdu_model)
370
371 if not bundle:
372 # Raise named exception that the bundle could not be found
373 raise Exception()
374
375 self.log.debug("[install] deploying {}".format(bundle))
376 await model.deploy(bundle)
377
378 # Get the application
379 if atomic:
380 # applications = model.applications
381 self.log.debug("[install] Applications: {}".format(model.applications))
382 for name in model.applications:
383 self.log.debug("[install] Waiting for {} to settle".format(name))
384 application = model.applications[name]
385 try:
386 # It's not enough to wait for all units to be active;
387 # the application status needs to be active as well.
388 self.log.debug("Waiting for all units to be active...")
389 await model.block_until(
390 lambda: all(
391 unit.agent_status == 'idle'
392 and application.status in ['active', 'unknown']
393 and unit.workload_status in [
394 'active', 'unknown'
395 ] for unit in application.units
396 ),
397 timeout=timeout
398 )
399 self.log.debug("All units active.")
400
401 except concurrent.futures._base.TimeoutError: # TODO use asyncio.TimeoutError
402 os.chdir(previous_workdir)
403 self.log.debug("[install] Timeout exceeded; resetting cluster")
404 await self.reset(cluster_uuid)
405 return False
406
407 # Wait for the application to be active
408 if model.is_connected():
409 self.log.debug("[install] Disconnecting model")
410 await model.disconnect()
411
412 os.chdir(previous_workdir)
413
414 return kdu_instance
415 raise Exception("Unable to install")
416
417 async def instances_list(
418 self,
419 cluster_uuid: str
420 ) -> list:
421 """
422 returns a list of deployed releases in a cluster
423
424 :param cluster_uuid: the cluster
425 :return:
426 """
427 return []
428
429 async def upgrade(
430 self,
431 cluster_uuid: str,
432 kdu_instance: str,
433 kdu_model: str = None,
434 params: dict = None,
435 ) -> str:
436 """Upgrade a model
437
438 :param cluster_uuid str: The UUID of the cluster to upgrade
439 :param kdu_instance str: The unique name of the KDU instance
440 :param kdu_model str: The name or path of the bundle to upgrade to
441 :param params dict: Key-value pairs of instantiation parameters
442
443 :return: If successful, reference to the new revision number of the
444 KDU instance.
445 """
446
447 # TODO: Loop through the bundle and upgrade each charm individually
448
449 """
450 The API doesn't have a concept of bundle upgrades, because there are
451 many possible changes: charm revision, disk, number of units, etc.
452
453 As such, we are only supporting a limited subset of upgrades. We'll
454 upgrade the charm revision but leave storage and scale untouched.
455
456 Scale changes should happen through OSM constructs, and changes to
457 storage would require a redeployment of the service, at least in this
458 initial release.
459 """
460 namespace = self.get_namespace(cluster_uuid)
461 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
462
463 with open(kdu_model, 'r') as f:
464 bundle = yaml.safe_load(f)
465
466 """
467 {
468 'description': 'Test bundle',
469 'bundle': 'kubernetes',
470 'applications': {
471 'mariadb-k8s': {
472 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
473 'scale': 1,
474 'options': {
475 'password': 'manopw',
476 'root_password': 'osm4u',
477 'user': 'mano'
478 },
479 'series': 'kubernetes'
480 }
481 }
482 }
483 """
484 # TODO: This should be returned in an agreed-upon format
485 for name in bundle['applications']:
486 self.log.debug(model.applications)
487 application = model.applications[name]
488 self.log.debug(application)
489
490 path = bundle['applications'][name]['charm']
491
492 try:
493 await application.upgrade_charm(switch=path)
494 except juju.errors.JujuError as ex:
495 if 'already running charm' in str(ex):
496 # We're already running this version
497 pass
498
499 await model.disconnect()
500
501 return True
502 raise NotImplemented()
503
504 """Rollback"""
505 async def rollback(
506 self,
507 cluster_uuid: str,
508 kdu_instance: str,
509 revision: int = 0,
510 ) -> str:
511 """Rollback a model
512
513 :param cluster_uuid str: The UUID of the cluster to rollback
514 :param kdu_instance str: The unique name of the KDU instance
515 :param revision int: The revision to revert to. If omitted, rolls back
516 the previous upgrade.
517
518 :return: If successful, returns the revision of active KDU instance,
519 or raises an exception
520 """
521 raise NotImplemented()
522
523 """Deletion"""
524 async def uninstall(
525 self,
526 cluster_uuid: str,
527 kdu_instance: str
528 ) -> bool:
529 """Uninstall a KDU instance
530
531 :param cluster_uuid str: The UUID of the cluster
532 :param kdu_instance str: The unique name of the KDU instance
533
534 :return: Returns True if successful, or raises an exception
535 """
536 if not self.authenticated:
537 self.log.debug("[uninstall] Connecting to controller")
538 await self.login(cluster_uuid)
539
540 self.log.debug("[uninstall] Destroying model")
541
542 await self.controller.destroy_models(kdu_instance)
543
544 self.log.debug("[uninstall] Model destroyed and disconnecting")
545 await self.logout()
546
547 return True
548
549 """Introspection"""
550 async def inspect_kdu(
551 self,
552 kdu_model: str,
553 ) -> dict:
554 """Inspect a KDU
555
556 Inspects a bundle and returns a dictionary of config parameters and
557 their default values.
558
559 :param kdu_model str: The name or path of the bundle to inspect.
560
561 :return: If successful, returns a dictionary of available parameters
562 and their default values.
563 """
564
565 kdu = {}
566 with open(kdu_model, 'r') as f:
567 bundle = yaml.safe_load(f)
568
569 """
570 {
571 'description': 'Test bundle',
572 'bundle': 'kubernetes',
573 'applications': {
574 'mariadb-k8s': {
575 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
576 'scale': 1,
577 'options': {
578 'password': 'manopw',
579 'root_password': 'osm4u',
580 'user': 'mano'
581 },
582 'series': 'kubernetes'
583 }
584 }
585 }
586 """
587 # TODO: This should be returned in an agreed-upon format
588 kdu = bundle['applications']
589
590 return kdu
591
592 async def help_kdu(
593 self,
594 kdu_model: str,
595 ) -> str:
596 """View the README
597
598 If available, returns the README of the bundle.
599
600 :param kdu_model str: The name or path of a bundle
601
602 :return: If found, returns the contents of the README.
603 """
604 readme = None
605
606 files = ['README', 'README.txt', 'README.md']
607 path = os.path.dirname(kdu_model)
608 for file in os.listdir(path):
609 if file in files:
610 with open(file, 'r') as f:
611 readme = f.read()
612 break
613
614 return readme
615
616 async def status_kdu(
617 self,
618 cluster_uuid: str,
619 kdu_instance: str,
620 ) -> dict:
621 """Get the status of the KDU
622
623 Get the current status of the KDU instance.
624
625 :param cluster_uuid str: The UUID of the cluster
626 :param kdu_instance str: The unique id of the KDU instance
627
628 :return: Returns a dictionary containing namespace, state, resources,
629 and deployment_time.
630 """
631 status = {}
632
633 model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
634
635 # model = await self.get_model_by_uuid(cluster_uuid)
636 if model:
637 model_status = await model.get_status()
638 status = model_status.applications
639
640 for name in model_status.applications:
641 application = model_status.applications[name]
642 status[name] = {
643 'status': application['status']['status']
644 }
645
646 if model.is_connected():
647 await model.disconnect()
648
649 return status
650
651 # Private methods
652 async def add_k8s(
653 self,
654 cloud_name: str,
655 credentials: str,
656 ) -> bool:
657 """Add a k8s cloud to Juju
658
659 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
660 Juju Controller.
661
662 :param cloud_name str: The name of the cloud to add.
663 :param credentials dict: A dictionary representing the output of
664 `kubectl config view --raw`.
665
666 :returns: True if successful, otherwise raises an exception.
667 """
668
669 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
670 self.log.debug(cmd)
671
672 process = await asyncio.create_subprocess_exec(
673 *cmd,
674 stdout=asyncio.subprocess.PIPE,
675 stderr=asyncio.subprocess.PIPE,
676 stdin=asyncio.subprocess.PIPE,
677 )
678
679 # Feed the process the credentials
680 process.stdin.write(credentials.encode("utf-8"))
681 await process.stdin.drain()
682 process.stdin.close()
683
684 stdout, stderr = await process.communicate()
685
686 return_code = process.returncode
687
688 self.log.debug("add-k8s return code: {}".format(return_code))
689
690 if return_code > 0:
691 raise Exception(stderr)
692
693 return True
694
695 async def add_model(
696 self,
697 model_name: str,
698 cluster_uuid: str,
699 ) -> juju.model.Model:
700 """Adds a model to the controller
701
702 Adds a new model to the Juju controller
703
704 :param model_name str: The name of the model to add.
705 :returns: The juju.model.Model object of the new model upon success or
706 raises an exception.
707 """
708 if not self.authenticated:
709 await self.login(cluster_uuid)
710
711 self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
712 try:
713 model = await self.controller.add_model(
714 model_name,
715 config={'authorized-keys': self.juju_public_key}
716 )
717 except Exception as ex:
718 self.log.debug(ex)
719 self.log.debug("Caught exception: {}".format(ex))
720 pass
721
722 return model
723
724 async def bootstrap(
725 self,
726 cloud_name: str,
727 cluster_uuid: str,
728 loadbalancer: bool
729 ) -> bool:
730 """Bootstrap a Kubernetes controller
731
732 Bootstrap a Juju controller inside the Kubernetes cluster
733
734 :param cloud_name str: The name of the cloud.
735 :param cluster_uuid str: The UUID of the cluster to bootstrap.
736 :param loadbalancer bool: If the controller should use loadbalancer or not.
737 :returns: True upon success or raises an exception.
738 """
739
740 if not loadbalancer:
741 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
742 else:
743 """
744 For public clusters, specify that the controller service is using a LoadBalancer.
745 """
746 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
747
748 self.log.debug("Bootstrapping controller {} in cloud {}".format(
749 cluster_uuid, cloud_name
750 ))
751
752 process = await asyncio.create_subprocess_exec(
753 *cmd,
754 stdout=asyncio.subprocess.PIPE,
755 stderr=asyncio.subprocess.PIPE,
756 )
757
758 stdout, stderr = await process.communicate()
759
760 return_code = process.returncode
761
762 if return_code > 0:
763 #
764 if b'already exists' not in stderr:
765 raise Exception(stderr)
766
767 return True
768
769 async def destroy_controller(
770 self,
771 cluster_uuid: str
772 ) -> bool:
773 """Destroy a Kubernetes controller
774
775 Destroy an existing Kubernetes controller.
776
777 :param cluster_uuid str: The UUID of the cluster to bootstrap.
778 :returns: True upon success or raises an exception.
779 """
780 cmd = [
781 self.juju_command,
782 "destroy-controller",
783 "--destroy-all-models",
784 "--destroy-storage",
785 "-y",
786 cluster_uuid
787 ]
788
789 process = await asyncio.create_subprocess_exec(
790 *cmd,
791 stdout=asyncio.subprocess.PIPE,
792 stderr=asyncio.subprocess.PIPE,
793 )
794
795 stdout, stderr = await process.communicate()
796
797 return_code = process.returncode
798
799 if return_code > 0:
800 #
801 if 'already exists' not in stderr:
802 raise Exception(stderr)
803
804 def get_config(
805 self,
806 cluster_uuid: str,
807 ) -> dict:
808 """Get the cluster configuration
809
810 Gets the configuration of the cluster
811
812 :param cluster_uuid str: The UUID of the cluster.
813 :return: A dict upon success, or raises an exception.
814 """
815 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
816 if os.path.exists(cluster_config):
817 with open(cluster_config, 'r') as f:
818 config = yaml.safe_load(f.read())
819 return config
820 else:
821 raise Exception(
822 "Unable to locate configuration for cluster {}".format(
823 cluster_uuid
824 )
825 )
826
827 async def get_model(
828 self,
829 model_name: str,
830 cluster_uuid: str,
831 ) -> juju.model.Model:
832 """Get a model from the Juju Controller.
833
834 Note: Model objects returned must call disconnected() before it goes
835 out of scope.
836
837 :param model_name str: The name of the model to get
838 :return The juju.model.Model object if found, or None.
839 """
840 if not self.authenticated:
841 await self.login(cluster_uuid)
842
843 model = None
844 models = await self.controller.list_models()
845 self.log.debug(models)
846 if model_name in models:
847 self.log.debug("Found model: {}".format(model_name))
848 model = await self.controller.get_model(
849 model_name
850 )
851 return model
852
853 def get_namespace(
854 self,
855 cluster_uuid: str,
856 ) -> str:
857 """Get the namespace UUID
858 Gets the namespace's unique name
859
860 :param cluster_uuid str: The UUID of the cluster
861 :returns: The namespace UUID, or raises an exception
862 """
863 config = self.get_config(cluster_uuid)
864
865 # Make sure the name is in the config
866 if 'namespace' not in config:
867 raise Exception("Namespace not found.")
868
869 # TODO: We want to make sure this is unique to the cluster, in case
870 # the cluster is being reused.
871 # Consider pre/appending the cluster id to the namespace string
872 return config['namespace']
873
874 async def has_model(
875 self,
876 model_name: str
877 ) -> bool:
878 """Check if a model exists in the controller
879
880 Checks to see if a model exists in the connected Juju controller.
881
882 :param model_name str: The name of the model
883 :return: A boolean indicating if the model exists
884 """
885 models = await self.controller.list_models()
886
887 if model_name in models:
888 return True
889 return False
890
891 def is_local_k8s(
892 self,
893 credentials: str,
894 ) -> bool:
895 """Check if a cluster is local
896
897 Checks if a cluster is running in the local host
898
899 :param credentials dict: A dictionary containing the k8s credentials
900 :returns: A boolean if the cluster is running locally
901 """
902 creds = yaml.safe_load(credentials)
903 if os.getenv("OSMLCM_VCA_APIPROXY"):
904 host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
905
906 if creds and host_ip:
907 for cluster in creds['clusters']:
908 if 'server' in cluster['cluster']:
909 if host_ip in cluster['cluster']['server']:
910 return True
911
912 return False
913
914 async def login(self, cluster_uuid):
915 """Login to the Juju controller."""
916
917 if self.authenticated:
918 return
919
920 self.connecting = True
921
922 # Test: Make sure we have the credentials loaded
923 config = self.get_config(cluster_uuid)
924
925 self.juju_endpoint = config['endpoint']
926 self.juju_user = config['username']
927 self.juju_secret = config['secret']
928 self.juju_ca_cert = config['cacert']
929 self.juju_public_key = None
930
931 self.controller = Controller()
932
933 if self.juju_secret:
934 self.log.debug(
935 "Connecting to controller... ws://{} as {}/{}".format(
936 self.juju_endpoint,
937 self.juju_user,
938 self.juju_secret,
939 )
940 )
941 try:
942 await self.controller.connect(
943 endpoint=self.juju_endpoint,
944 username=self.juju_user,
945 password=self.juju_secret,
946 cacert=self.juju_ca_cert,
947 )
948 self.authenticated = True
949 self.log.debug("JujuApi: Logged into controller")
950 except Exception as ex:
951 self.log.debug(ex)
952 self.log.debug("Caught exception: {}".format(ex))
953 pass
954 else:
955 self.log.fatal("VCA credentials not configured.")
956 self.authenticated = False
957
958 async def logout(self):
959 """Logout of the Juju controller."""
960 self.log.debug("[logout]")
961 if not self.authenticated:
962 return False
963
964 for model in self.models:
965 self.log.debug("Logging out of model {}".format(model))
966 await self.models[model].disconnect()
967
968 if self.controller:
969 self.log.debug("Disconnecting controller {}".format(
970 self.controller
971 ))
972 await self.controller.disconnect()
973 self.controller = None
974
975 self.authenticated = False
976
977 async def remove_cloud(
978 self,
979 cloud_name: str,
980 ) -> bool:
981 """Remove a k8s cloud from Juju
982
983 Removes a Kubernetes cloud from Juju.
984
985 :param cloud_name str: The name of the cloud to add.
986
987 :returns: True if successful, otherwise raises an exception.
988 """
989
990 # Remove the bootstrapped controller
991 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
992 process = await asyncio.create_subprocess_exec(
993 *cmd,
994 stdout=asyncio.subprocess.PIPE,
995 stderr=asyncio.subprocess.PIPE,
996 )
997
998 stdout, stderr = await process.communicate()
999
1000 return_code = process.returncode
1001
1002 if return_code > 0:
1003 raise Exception(stderr)
1004
1005 # Remove the cloud from the local config
1006 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1007 process = await asyncio.create_subprocess_exec(
1008 *cmd,
1009 stdout=asyncio.subprocess.PIPE,
1010 stderr=asyncio.subprocess.PIPE,
1011 )
1012
1013 stdout, stderr = await process.communicate()
1014
1015 return_code = process.returncode
1016
1017 if return_code > 0:
1018 raise Exception(stderr)
1019
1020 return True
1021
1022 async def set_config(
1023 self,
1024 cluster_uuid: str,
1025 config: dict,
1026 ) -> bool:
1027 """Save the cluster configuration
1028
1029 Saves the cluster information to the file store
1030
1031 :param cluster_uuid str: The UUID of the cluster
1032 :param config dict: A dictionary containing the cluster configuration
1033 :returns: Boolean upon success or raises an exception.
1034 """
1035
1036 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1037 if not os.path.exists(cluster_config):
1038 self.log.debug("Writing config to {}".format(cluster_config))
1039 with open(cluster_config, 'w') as f:
1040 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1041
1042 return True