895e82be1077389abc62a5dda6b28043d1d720cb
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 from .exceptions import NotImplemented
18
19 import io
20 import juju
21 # from juju.bundle import BundleHandler
22 from juju.controller import Controller
23 from juju.model import Model
24 from juju.errors import JujuAPIError, JujuError
25
26 from n2vc.k8s_conn import K8sConnector
27
28 import os
29 # import re
30 # import ssl
31 # from .vnf import N2VC
32
33 import uuid
34 import yaml
35
36
37 class K8sJujuConnector(K8sConnector):
38
39 def __init__(
40 self,
41 fs: object,
42 db: object,
43 kubectl_command: str = '/usr/bin/kubectl',
44 juju_command: str = '/usr/bin/juju',
45 log: object = None,
46 on_update_db=None,
47 ):
48 """
49
50 :param kubectl_command: path to kubectl executable
51 :param helm_command: path to helm executable
52 :param fs: file system for kubernetes and helm configuration
53 :param log: logger
54 """
55
56 # parent class
57 K8sConnector.__init__(
58 self,
59 db,
60 log=log,
61 on_update_db=on_update_db,
62 )
63
64 self.fs = fs
65 self.log.debug('Initializing K8S Juju connector')
66
67 self.authenticated = False
68 self.models = {}
69
70 self.juju_command = juju_command
71 self.juju_secret = ""
72
73 self.log.debug('K8S Juju connector initialized')
74
75 """Initialization"""
76 async def init_env(
77 self,
78 k8s_creds: str,
79 namespace: str = 'kube-system',
80 reuse_cluster_uuid: str = None,
81 ) -> (str, bool):
82 """
83 It prepares a given K8s cluster environment to run Juju bundles.
84
85 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
86 :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
87 :param reuse_cluster_uuid: existing cluster uuid for reuse
88 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
89 (on error, an exception will be raised)
90 """
91
92 """Bootstrapping
93
94 Bootstrapping cannot be done, by design, through the API. We need to
95 use the CLI tools.
96 """
97
98 """
99 WIP: Workflow
100
101 1. Has the environment already been bootstrapped?
102 - Check the database to see if we have a record for this env
103
104 2. If this is a new env, create it
105 - Add the k8s cloud to Juju
106 - Bootstrap
107 - Record it in the database
108
109 3. Connect to the Juju controller for this cloud
110
111 """
112 # cluster_uuid = reuse_cluster_uuid
113 # if not cluster_uuid:
114 # cluster_uuid = str(uuid4())
115
116 ##################################################
117 # TODO: Pull info from db based on the namespace #
118 ##################################################
119
120 ###################################################
121 # TODO: Make it idempotent, calling add-k8s and #
122 # bootstrap whenever reuse_cluster_uuid is passed #
123 # as parameter #
124 # `init_env` is called to initialize the K8s #
125 # cluster for juju. If this initialization fails, #
126 # it can be called again by LCM with the param #
127 # reuse_cluster_uuid, e.g. to try to fix it. #
128 ###################################################
129
130 if not reuse_cluster_uuid:
131 # This is a new cluster, so bootstrap it
132
133 cluster_uuid = str(uuid.uuid4())
134
135 # Is a local k8s cluster?
136 localk8s = self.is_local_k8s(k8s_creds)
137
138 # If the k8s is external, the juju controller needs a loadbalancer
139 loadbalancer = False if localk8s else True
140
141 # Name the new k8s cloud
142 k8s_cloud = "k8s-{}".format(cluster_uuid)
143
144 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
145 await self.add_k8s(k8s_cloud, k8s_creds)
146
147 # Bootstrap Juju controller
148 self.log.debug("Bootstrapping...")
149 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
150 self.log.debug("Bootstrap done.")
151
152 # Get the controller information
153
154 # Parse ~/.local/share/juju/controllers.yaml
155 # controllers.testing.api-endpoints|ca-cert|uuid
156 self.log.debug("Getting controller endpoints")
157 with open(os.path.expanduser(
158 "~/.local/share/juju/controllers.yaml"
159 )) as f:
160 controllers = yaml.load(f, Loader=yaml.Loader)
161 controller = controllers['controllers'][cluster_uuid]
162 endpoints = controller['api-endpoints']
163 self.juju_endpoint = endpoints[0]
164 self.juju_ca_cert = controller['ca-cert']
165
166 # Parse ~/.local/share/juju/accounts
167 # controllers.testing.user|password
168 self.log.debug("Getting accounts")
169 with open(os.path.expanduser(
170 "~/.local/share/juju/accounts.yaml"
171 )) as f:
172 controllers = yaml.load(f, Loader=yaml.Loader)
173 controller = controllers['controllers'][cluster_uuid]
174
175 self.juju_user = controller['user']
176 self.juju_secret = controller['password']
177
178 # raise Exception("EOL")
179
180 self.juju_public_key = None
181
182 config = {
183 'endpoint': self.juju_endpoint,
184 'username': self.juju_user,
185 'secret': self.juju_secret,
186 'cacert': self.juju_ca_cert,
187 'namespace': namespace,
188 'loadbalancer': loadbalancer,
189 }
190
191 # Store the cluster configuration so it
192 # can be used for subsequent calls
193 self.log.debug("Setting config")
194 await self.set_config(cluster_uuid, config)
195
196 else:
197 # This is an existing cluster, so get its config
198 cluster_uuid = reuse_cluster_uuid
199
200 config = self.get_config(cluster_uuid)
201
202 self.juju_endpoint = config['endpoint']
203 self.juju_user = config['username']
204 self.juju_secret = config['secret']
205 self.juju_ca_cert = config['cacert']
206 self.juju_public_key = None
207
208 # Login to the k8s cluster
209 if not self.authenticated:
210 await self.login(cluster_uuid)
211
212 # We're creating a new cluster
213 #print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
214 #model = await self.get_model(
215 # self.get_namespace(cluster_uuid),
216 # cluster_uuid=cluster_uuid
217 #)
218
219 ## Disconnect from the model
220 #if model and model.is_connected():
221 # await model.disconnect()
222
223 return cluster_uuid, True
224
225 """Repo Management"""
226 async def repo_add(
227 self,
228 name: str,
229 url: str,
230 type: str = "charm",
231 ):
232 raise NotImplemented()
233
234 async def repo_list(self):
235 raise NotImplemented()
236
237 async def repo_remove(
238 self,
239 name: str,
240 ):
241 raise NotImplemented()
242
243 async def synchronize_repos(
244 self,
245 cluster_uuid: str,
246 name: str
247 ):
248 """
249 Returns None as currently add_repo is not implemented
250 """
251 return None
252
253 """Reset"""
254 async def reset(
255 self,
256 cluster_uuid: str,
257 force: bool = False,
258 uninstall_sw: bool = False
259 ) -> bool:
260 """Reset a cluster
261
262 Resets the Kubernetes cluster by removing the model that represents it.
263
264 :param cluster_uuid str: The UUID of the cluster to reset
265 :return: Returns True if successful or raises an exception.
266 """
267
268 try:
269 if not self.authenticated:
270 await self.login(cluster_uuid)
271
272 if self.controller.is_connected():
273 # Destroy the model
274 namespace = self.get_namespace(cluster_uuid)
275 if await self.has_model(namespace):
276 self.log.debug("[reset] Destroying model")
277 await self.controller.destroy_model(
278 namespace,
279 destroy_storage=True
280 )
281
282 # Disconnect from the controller
283 self.log.debug("[reset] Disconnecting controller")
284 await self.logout()
285
286 # Destroy the controller (via CLI)
287 self.log.debug("[reset] Destroying controller")
288 await self.destroy_controller(cluster_uuid)
289
290 self.log.debug("[reset] Removing k8s cloud")
291 k8s_cloud = "k8s-{}".format(cluster_uuid)
292 await self.remove_cloud(k8s_cloud)
293
294 except Exception as ex:
295 self.log.debug("Caught exception during reset: {}".format(ex))
296
297 return True
298
299 """Deployment"""
300
301 async def install(
302 self,
303 cluster_uuid: str,
304 kdu_model: str,
305 atomic: bool = True,
306 timeout: float = 300,
307 params: dict = None,
308 db_dict: dict = None,
309 kdu_name: str = None
310 ) -> bool:
311 """Install a bundle
312
313 :param cluster_uuid str: The UUID of the cluster to install to
314 :param kdu_model str: The name or path of a bundle to install
315 :param atomic bool: If set, waits until the model is active and resets
316 the cluster on failure.
317 :param timeout int: The time, in seconds, to wait for the install
318 to finish
319 :param params dict: Key-value pairs of instantiation parameters
320 :param kdu_name: Name of the KDU instance to be installed
321
322 :return: If successful, returns ?
323 """
324
325 if not self.authenticated:
326 self.log.debug("[install] Logging in to the controller")
327 await self.login(cluster_uuid)
328
329 ##
330 # Get or create the model, based on the NS
331 # uuid.
332 if kdu_name:
333 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
334 else:
335 kdu_instance = db_dict["filter"]["_id"]
336
337 self.log.debug("Checking for model named {}".format(kdu_instance))
338
339 # Create the new model
340 self.log.debug("Adding model: {}".format(kdu_instance))
341 model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
342
343 if model:
344 # TODO: Instantiation parameters
345
346 """
347 "Juju bundle that models the KDU, in any of the following ways:
348 - <juju-repo>/<juju-bundle>
349 - <juju-bundle folder under k8s_models folder in the package>
350 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
351 - <URL_where_to_fetch_juju_bundle>
352 """
353
354 bundle = kdu_model
355 if kdu_model.startswith("cs:"):
356 bundle = kdu_model
357 elif kdu_model.startswith("http"):
358 # Download the file
359 pass
360 else:
361 # Local file
362
363 # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
364 # Uncompress temporarily
365 # bundle = <uncompressed file>
366 pass
367
368 if not bundle:
369 # Raise named exception that the bundle could not be found
370 raise Exception()
371
372 self.log.debug("[install] deploying {}".format(bundle))
373 await model.deploy(bundle)
374
375 # Get the application
376 if atomic:
377 # applications = model.applications
378 self.log.debug("[install] Applications: {}".format(model.applications))
379 for name in model.applications:
380 self.log.debug("[install] Waiting for {} to settle".format(name))
381 application = model.applications[name]
382 try:
383 # It's not enough to wait for all units to be active;
384 # the application status needs to be active as well.
385 self.log.debug("Waiting for all units to be active...")
386 await model.block_until(
387 lambda: all(
388 unit.agent_status == 'idle'
389 and application.status in ['active', 'unknown']
390 and unit.workload_status in [
391 'active', 'unknown'
392 ] for unit in application.units
393 ),
394 timeout=timeout
395 )
396 self.log.debug("All units active.")
397
398 except concurrent.futures._base.TimeoutError:
399 self.log.debug("[install] Timeout exceeded; resetting cluster")
400 await self.reset(cluster_uuid)
401 return False
402
403 # Wait for the application to be active
404 if model.is_connected():
405 self.log.debug("[install] Disconnecting model")
406 await model.disconnect()
407
408 return kdu_instance
409 raise Exception("Unable to install")
410
411 async def instances_list(
412 self,
413 cluster_uuid: str
414 ) -> list:
415 """
416 returns a list of deployed releases in a cluster
417
418 :param cluster_uuid: the cluster
419 :return:
420 """
421 return []
422
423 async def upgrade(
424 self,
425 cluster_uuid: str,
426 kdu_instance: str,
427 kdu_model: str = None,
428 params: dict = None,
429 ) -> str:
430 """Upgrade a model
431
432 :param cluster_uuid str: The UUID of the cluster to upgrade
433 :param kdu_instance str: The unique name of the KDU instance
434 :param kdu_model str: The name or path of the bundle to upgrade to
435 :param params dict: Key-value pairs of instantiation parameters
436
437 :return: If successful, reference to the new revision number of the
438 KDU instance.
439 """
440
441 # TODO: Loop through the bundle and upgrade each charm individually
442
443 """
444 The API doesn't have a concept of bundle upgrades, because there are
445 many possible changes: charm revision, disk, number of units, etc.
446
447 As such, we are only supporting a limited subset of upgrades. We'll
448 upgrade the charm revision but leave storage and scale untouched.
449
450 Scale changes should happen through OSM constructs, and changes to
451 storage would require a redeployment of the service, at least in this
452 initial release.
453 """
454 namespace = self.get_namespace(cluster_uuid)
455 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
456
457 with open(kdu_model, 'r') as f:
458 bundle = yaml.safe_load(f)
459
460 """
461 {
462 'description': 'Test bundle',
463 'bundle': 'kubernetes',
464 'applications': {
465 'mariadb-k8s': {
466 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
467 'scale': 1,
468 'options': {
469 'password': 'manopw',
470 'root_password': 'osm4u',
471 'user': 'mano'
472 },
473 'series': 'kubernetes'
474 }
475 }
476 }
477 """
478 # TODO: This should be returned in an agreed-upon format
479 for name in bundle['applications']:
480 self.log.debug(model.applications)
481 application = model.applications[name]
482 self.log.debug(application)
483
484 path = bundle['applications'][name]['charm']
485
486 try:
487 await application.upgrade_charm(switch=path)
488 except juju.errors.JujuError as ex:
489 if 'already running charm' in str(ex):
490 # We're already running this version
491 pass
492
493 await model.disconnect()
494
495 return True
496 raise NotImplemented()
497
498 """Rollback"""
499 async def rollback(
500 self,
501 cluster_uuid: str,
502 kdu_instance: str,
503 revision: int = 0,
504 ) -> str:
505 """Rollback a model
506
507 :param cluster_uuid str: The UUID of the cluster to rollback
508 :param kdu_instance str: The unique name of the KDU instance
509 :param revision int: The revision to revert to. If omitted, rolls back
510 the previous upgrade.
511
512 :return: If successful, returns the revision of active KDU instance,
513 or raises an exception
514 """
515 raise NotImplemented()
516
517 """Deletion"""
518 async def uninstall(
519 self,
520 cluster_uuid: str,
521 kdu_instance: str
522 ) -> bool:
523 """Uninstall a KDU instance
524
525 :param cluster_uuid str: The UUID of the cluster
526 :param kdu_instance str: The unique name of the KDU instance
527
528 :return: Returns True if successful, or raises an exception
529 """
530 if not self.authenticated:
531 self.log.debug("[uninstall] Connecting to controller")
532 await self.login(cluster_uuid)
533
534 self.log.debug("[uninstall] Destroying model")
535
536 await self.controller.destroy_models(kdu_instance)
537
538 self.log.debug("[uninstall] Model destroyed and disconnecting")
539 await self.logout()
540
541 return True
542
543 """Introspection"""
544 async def inspect_kdu(
545 self,
546 kdu_model: str,
547 ) -> dict:
548 """Inspect a KDU
549
550 Inspects a bundle and returns a dictionary of config parameters and
551 their default values.
552
553 :param kdu_model str: The name or path of the bundle to inspect.
554
555 :return: If successful, returns a dictionary of available parameters
556 and their default values.
557 """
558
559 kdu = {}
560 with open(kdu_model, 'r') as f:
561 bundle = yaml.safe_load(f)
562
563 """
564 {
565 'description': 'Test bundle',
566 'bundle': 'kubernetes',
567 'applications': {
568 'mariadb-k8s': {
569 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
570 'scale': 1,
571 'options': {
572 'password': 'manopw',
573 'root_password': 'osm4u',
574 'user': 'mano'
575 },
576 'series': 'kubernetes'
577 }
578 }
579 }
580 """
581 # TODO: This should be returned in an agreed-upon format
582 kdu = bundle['applications']
583
584 return kdu
585
586 async def help_kdu(
587 self,
588 kdu_model: str,
589 ) -> str:
590 """View the README
591
592 If available, returns the README of the bundle.
593
594 :param kdu_model str: The name or path of a bundle
595
596 :return: If found, returns the contents of the README.
597 """
598 readme = None
599
600 files = ['README', 'README.txt', 'README.md']
601 path = os.path.dirname(kdu_model)
602 for file in os.listdir(path):
603 if file in files:
604 with open(file, 'r') as f:
605 readme = f.read()
606 break
607
608 return readme
609
610 async def status_kdu(
611 self,
612 cluster_uuid: str,
613 kdu_instance: str,
614 ) -> dict:
615 """Get the status of the KDU
616
617 Get the current status of the KDU instance.
618
619 :param cluster_uuid str: The UUID of the cluster
620 :param kdu_instance str: The unique id of the KDU instance
621
622 :return: Returns a dictionary containing namespace, state, resources,
623 and deployment_time.
624 """
625 status = {}
626
627 model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
628
629 # model = await self.get_model_by_uuid(cluster_uuid)
630 if model:
631 model_status = await model.get_status()
632 status = model_status.applications
633
634 for name in model_status.applications:
635 application = model_status.applications[name]
636 status[name] = {
637 'status': application['status']['status']
638 }
639
640 if model.is_connected():
641 await model.disconnect()
642
643 return status
644
645 # Private methods
646 async def add_k8s(
647 self,
648 cloud_name: str,
649 credentials: str,
650 ) -> bool:
651 """Add a k8s cloud to Juju
652
653 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
654 Juju Controller.
655
656 :param cloud_name str: The name of the cloud to add.
657 :param credentials dict: A dictionary representing the output of
658 `kubectl config view --raw`.
659
660 :returns: True if successful, otherwise raises an exception.
661 """
662
663 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
664 self.log.debug(cmd)
665
666 process = await asyncio.create_subprocess_exec(
667 *cmd,
668 stdout=asyncio.subprocess.PIPE,
669 stderr=asyncio.subprocess.PIPE,
670 stdin=asyncio.subprocess.PIPE,
671 )
672
673 # Feed the process the credentials
674 process.stdin.write(credentials.encode("utf-8"))
675 await process.stdin.drain()
676 process.stdin.close()
677
678 stdout, stderr = await process.communicate()
679
680 return_code = process.returncode
681
682 self.log.debug("add-k8s return code: {}".format(return_code))
683
684 if return_code > 0:
685 raise Exception(stderr)
686
687 return True
688
689 async def add_model(
690 self,
691 model_name: str,
692 cluster_uuid: str,
693 ) -> juju.model.Model:
694 """Adds a model to the controller
695
696 Adds a new model to the Juju controller
697
698 :param model_name str: The name of the model to add.
699 :returns: The juju.model.Model object of the new model upon success or
700 raises an exception.
701 """
702 if not self.authenticated:
703 await self.login(cluster_uuid)
704
705 self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
706 try:
707 model = await self.controller.add_model(
708 model_name,
709 config={'authorized-keys': self.juju_public_key}
710 )
711 except Exception as ex:
712 self.log.debug(ex)
713 self.log.debug("Caught exception: {}".format(ex))
714 pass
715
716 return model
717
718 async def bootstrap(
719 self,
720 cloud_name: str,
721 cluster_uuid: str,
722 loadbalancer: bool
723 ) -> bool:
724 """Bootstrap a Kubernetes controller
725
726 Bootstrap a Juju controller inside the Kubernetes cluster
727
728 :param cloud_name str: The name of the cloud.
729 :param cluster_uuid str: The UUID of the cluster to bootstrap.
730 :param loadbalancer bool: If the controller should use loadbalancer or not.
731 :returns: True upon success or raises an exception.
732 """
733
734 if not loadbalancer:
735 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
736 else:
737 """
738 For public clusters, specify that the controller service is using a LoadBalancer.
739 """
740 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
741
742 self.log.debug("Bootstrapping controller {} in cloud {}".format(
743 cluster_uuid, cloud_name
744 ))
745
746 process = await asyncio.create_subprocess_exec(
747 *cmd,
748 stdout=asyncio.subprocess.PIPE,
749 stderr=asyncio.subprocess.PIPE,
750 )
751
752 stdout, stderr = await process.communicate()
753
754 return_code = process.returncode
755
756 if return_code > 0:
757 #
758 if b'already exists' not in stderr:
759 raise Exception(stderr)
760
761 return True
762
763 async def destroy_controller(
764 self,
765 cluster_uuid: str
766 ) -> bool:
767 """Destroy a Kubernetes controller
768
769 Destroy an existing Kubernetes controller.
770
771 :param cluster_uuid str: The UUID of the cluster to bootstrap.
772 :returns: True upon success or raises an exception.
773 """
774 cmd = [
775 self.juju_command,
776 "destroy-controller",
777 "--destroy-all-models",
778 "--destroy-storage",
779 "-y",
780 cluster_uuid
781 ]
782
783 process = await asyncio.create_subprocess_exec(
784 *cmd,
785 stdout=asyncio.subprocess.PIPE,
786 stderr=asyncio.subprocess.PIPE,
787 )
788
789 stdout, stderr = await process.communicate()
790
791 return_code = process.returncode
792
793 if return_code > 0:
794 #
795 if 'already exists' not in stderr:
796 raise Exception(stderr)
797
798 def get_config(
799 self,
800 cluster_uuid: str,
801 ) -> dict:
802 """Get the cluster configuration
803
804 Gets the configuration of the cluster
805
806 :param cluster_uuid str: The UUID of the cluster.
807 :return: A dict upon success, or raises an exception.
808 """
809 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
810 if os.path.exists(cluster_config):
811 with open(cluster_config, 'r') as f:
812 config = yaml.safe_load(f.read())
813 return config
814 else:
815 raise Exception(
816 "Unable to locate configuration for cluster {}".format(
817 cluster_uuid
818 )
819 )
820
821 async def get_model(
822 self,
823 model_name: str,
824 cluster_uuid: str,
825 ) -> juju.model.Model:
826 """Get a model from the Juju Controller.
827
828 Note: Model objects returned must call disconnected() before it goes
829 out of scope.
830
831 :param model_name str: The name of the model to get
832 :return The juju.model.Model object if found, or None.
833 """
834 if not self.authenticated:
835 await self.login(cluster_uuid)
836
837 model = None
838 models = await self.controller.list_models()
839 self.log.debug(models)
840 if model_name in models:
841 self.log.debug("Found model: {}".format(model_name))
842 model = await self.controller.get_model(
843 model_name
844 )
845 return model
846
847 def get_namespace(
848 self,
849 cluster_uuid: str,
850 ) -> str:
851 """Get the namespace UUID
852 Gets the namespace's unique name
853
854 :param cluster_uuid str: The UUID of the cluster
855 :returns: The namespace UUID, or raises an exception
856 """
857 config = self.get_config(cluster_uuid)
858
859 # Make sure the name is in the config
860 if 'namespace' not in config:
861 raise Exception("Namespace not found.")
862
863 # TODO: We want to make sure this is unique to the cluster, in case
864 # the cluster is being reused.
865 # Consider pre/appending the cluster id to the namespace string
866 return config['namespace']
867
868 async def has_model(
869 self,
870 model_name: str
871 ) -> bool:
872 """Check if a model exists in the controller
873
874 Checks to see if a model exists in the connected Juju controller.
875
876 :param model_name str: The name of the model
877 :return: A boolean indicating if the model exists
878 """
879 models = await self.controller.list_models()
880
881 if model_name in models:
882 return True
883 return False
884
885 def is_local_k8s(
886 self,
887 credentials: str,
888 ) -> bool:
889 """Check if a cluster is local
890
891 Checks if a cluster is running in the local host
892
893 :param credentials dict: A dictionary containing the k8s credentials
894 :returns: A boolean if the cluster is running locally
895 """
896 creds = yaml.safe_load(credentials)
897 if os.getenv("OSMLCM_VCA_APIPROXY"):
898 host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
899
900 if creds and host_ip:
901 for cluster in creds['clusters']:
902 if 'server' in cluster['cluster']:
903 if host_ip in cluster['cluster']['server']:
904 return True
905
906 return False
907
908 async def login(self, cluster_uuid):
909 """Login to the Juju controller."""
910
911 if self.authenticated:
912 return
913
914 self.connecting = True
915
916 # Test: Make sure we have the credentials loaded
917 config = self.get_config(cluster_uuid)
918
919 self.juju_endpoint = config['endpoint']
920 self.juju_user = config['username']
921 self.juju_secret = config['secret']
922 self.juju_ca_cert = config['cacert']
923 self.juju_public_key = None
924
925 self.controller = Controller()
926
927 if self.juju_secret:
928 self.log.debug(
929 "Connecting to controller... ws://{} as {}/{}".format(
930 self.juju_endpoint,
931 self.juju_user,
932 self.juju_secret,
933 )
934 )
935 try:
936 await self.controller.connect(
937 endpoint=self.juju_endpoint,
938 username=self.juju_user,
939 password=self.juju_secret,
940 cacert=self.juju_ca_cert,
941 )
942 self.authenticated = True
943 self.log.debug("JujuApi: Logged into controller")
944 except Exception as ex:
945 self.log.debug(ex)
946 self.log.debug("Caught exception: {}".format(ex))
947 pass
948 else:
949 self.log.fatal("VCA credentials not configured.")
950 self.authenticated = False
951
952 async def logout(self):
953 """Logout of the Juju controller."""
954 self.log.debug("[logout]")
955 if not self.authenticated:
956 return False
957
958 for model in self.models:
959 self.log.debug("Logging out of model {}".format(model))
960 await self.models[model].disconnect()
961
962 if self.controller:
963 self.log.debug("Disconnecting controller {}".format(
964 self.controller
965 ))
966 await self.controller.disconnect()
967 self.controller = None
968
969 self.authenticated = False
970
971 async def remove_cloud(
972 self,
973 cloud_name: str,
974 ) -> bool:
975 """Remove a k8s cloud from Juju
976
977 Removes a Kubernetes cloud from Juju.
978
979 :param cloud_name str: The name of the cloud to add.
980
981 :returns: True if successful, otherwise raises an exception.
982 """
983
984 # Remove the bootstrapped controller
985 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
986 process = await asyncio.create_subprocess_exec(
987 *cmd,
988 stdout=asyncio.subprocess.PIPE,
989 stderr=asyncio.subprocess.PIPE,
990 )
991
992 stdout, stderr = await process.communicate()
993
994 return_code = process.returncode
995
996 if return_code > 0:
997 raise Exception(stderr)
998
999 # Remove the cloud from the local config
1000 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1001 process = await asyncio.create_subprocess_exec(
1002 *cmd,
1003 stdout=asyncio.subprocess.PIPE,
1004 stderr=asyncio.subprocess.PIPE,
1005 )
1006
1007 stdout, stderr = await process.communicate()
1008
1009 return_code = process.returncode
1010
1011 if return_code > 0:
1012 raise Exception(stderr)
1013
1014 return True
1015
1016 async def set_config(
1017 self,
1018 cluster_uuid: str,
1019 config: dict,
1020 ) -> bool:
1021 """Save the cluster configuration
1022
1023 Saves the cluster information to the file store
1024
1025 :param cluster_uuid str: The UUID of the cluster
1026 :param config dict: A dictionary containing the cluster configuration
1027 :returns: Boolean upon success or raises an exception.
1028 """
1029
1030 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1031 if not os.path.exists(cluster_config):
1032 self.log.debug("Writing config to {}".format(cluster_config))
1033 with open(cluster_config, 'w') as f:
1034 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1035
1036 return True