Fix bug 1002
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 from .exceptions import NotImplemented
18
19 import io
20 import juju
21 # from juju.bundle import BundleHandler
22 from juju.controller import Controller
23 from juju.model import Model
24 from juju.errors import JujuAPIError, JujuError
25
26 import logging
27
28 from n2vc.k8s_conn import K8sConnector
29
30 import os
31 # import re
32 # import ssl
33 # from .vnf import N2VC
34
35 import uuid
36 import yaml
37
38
39 class K8sJujuConnector(K8sConnector):
40
41 def __init__(
42 self,
43 fs: object,
44 db: object,
45 kubectl_command: str = '/usr/bin/kubectl',
46 juju_command: str = '/usr/bin/juju',
47 log=None,
48 on_update_db=None,
49 ):
50 """
51
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param fs: file system for kubernetes and helm configuration
55 :param log: logger
56 """
57
58 # parent class
59 K8sConnector.__init__(
60 self,
61 db,
62 log=log,
63 on_update_db=on_update_db,
64 )
65
66 self.fs = fs
67 self.info('Initializing K8S Juju connector')
68
69 self.authenticated = False
70 self.models = {}
71 self.log = logging.getLogger(__name__)
72
73 self.juju_command = juju_command
74 self.juju_secret = ""
75
76 self.info('K8S Juju connector initialized')
77
78 """Initialization"""
79 async def init_env(
80 self,
81 k8s_creds: str,
82 namespace: str = 'kube-system',
83 reuse_cluster_uuid: str = None,
84 ) -> (str, bool):
85 """Initialize a Kubernetes environment
86
87 :param k8s_creds dict: A dictionary containing the Kubernetes cluster
88 configuration
89 :param namespace str: The Kubernetes namespace to initialize
90
91 :return: UUID of the k8s context or raises an exception
92 """
93
94 """Bootstrapping
95
96 Bootstrapping cannot be done, by design, through the API. We need to
97 use the CLI tools.
98 """
99
100 """
101 WIP: Workflow
102
103 1. Has the environment already been bootstrapped?
104 - Check the database to see if we have a record for this env
105
106 2. If this is a new env, create it
107 - Add the k8s cloud to Juju
108 - Bootstrap
109 - Record it in the database
110
111 3. Connect to the Juju controller for this cloud
112
113 """
114 # cluster_uuid = reuse_cluster_uuid
115 # if not cluster_uuid:
116 # cluster_uuid = str(uuid4())
117
118 ##################################################
119 # TODO: Pull info from db based on the namespace #
120 ##################################################
121
122 if not reuse_cluster_uuid:
123 # This is a new cluster, so bootstrap it
124
125 cluster_uuid = str(uuid.uuid4())
126
127 # Is a local k8s cluster?
128 localk8s = self.is_local_k8s(k8s_creds)
129
130 # If the k8s is external, the juju controller needs a loadbalancer
131 loadbalancer = False if localk8s else True
132
133 # Name the new k8s cloud
134 k8s_cloud = "{}-k8s".format(namespace)
135
136 print("Adding k8s cloud {}".format(k8s_cloud))
137 await self.add_k8s(k8s_cloud, k8s_creds)
138
139 # Bootstrap Juju controller
140 print("Bootstrapping...")
141 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
142 print("Bootstrap done.")
143
144 # Get the controller information
145
146 # Parse ~/.local/share/juju/controllers.yaml
147 # controllers.testing.api-endpoints|ca-cert|uuid
148 print("Getting controller endpoints")
149 with open(os.path.expanduser(
150 "~/.local/share/juju/controllers.yaml"
151 )) as f:
152 controllers = yaml.load(f, Loader=yaml.Loader)
153 controller = controllers['controllers'][cluster_uuid]
154 endpoints = controller['api-endpoints']
155 self.juju_endpoint = endpoints[0]
156 self.juju_ca_cert = controller['ca-cert']
157
158 # Parse ~/.local/share/juju/accounts
159 # controllers.testing.user|password
160 print("Getting accounts")
161 with open(os.path.expanduser(
162 "~/.local/share/juju/accounts.yaml"
163 )) as f:
164 controllers = yaml.load(f, Loader=yaml.Loader)
165 controller = controllers['controllers'][cluster_uuid]
166
167 self.juju_user = controller['user']
168 self.juju_secret = controller['password']
169
170 print("user: {}".format(self.juju_user))
171 print("secret: {}".format(self.juju_secret))
172 print("endpoint: {}".format(self.juju_endpoint))
173 print("ca-cert: {}".format(self.juju_ca_cert))
174
175 # raise Exception("EOL")
176
177 self.juju_public_key = None
178
179 config = {
180 'endpoint': self.juju_endpoint,
181 'username': self.juju_user,
182 'secret': self.juju_secret,
183 'cacert': self.juju_ca_cert,
184 'namespace': namespace,
185 'loadbalancer': loadbalancer,
186 }
187
188 # Store the cluster configuration so it
189 # can be used for subsequent calls
190 print("Setting config")
191 await self.set_config(cluster_uuid, config)
192
193 else:
194 # This is an existing cluster, so get its config
195 cluster_uuid = reuse_cluster_uuid
196
197 config = self.get_config(cluster_uuid)
198
199 self.juju_endpoint = config['endpoint']
200 self.juju_user = config['username']
201 self.juju_secret = config['secret']
202 self.juju_ca_cert = config['cacert']
203 self.juju_public_key = None
204
205 # Login to the k8s cluster
206 if not self.authenticated:
207 await self.login(cluster_uuid)
208
209 # We're creating a new cluster
210 print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
211 model = await self.get_model(
212 self.get_namespace(cluster_uuid),
213 cluster_uuid=cluster_uuid
214 )
215
216 # Disconnect from the model
217 if model and model.is_connected():
218 await model.disconnect()
219
220 return cluster_uuid, True
221
222 """Repo Management"""
223 async def repo_add(
224 self,
225 name: str,
226 url: str,
227 type: str = "charm",
228 ):
229 raise NotImplemented()
230
231 async def repo_list(self):
232 raise NotImplemented()
233
234 async def repo_remove(
235 self,
236 name: str,
237 ):
238 raise NotImplemented()
239
240 """Reset"""
241 async def reset(
242 self,
243 cluster_uuid: str,
244 force: bool = False,
245 uninstall_sw: bool = False
246 ) -> bool:
247 """Reset a cluster
248
249 Resets the Kubernetes cluster by removing the model that represents it.
250
251 :param cluster_uuid str: The UUID of the cluster to reset
252 :return: Returns True if successful or raises an exception.
253 """
254
255 try:
256 if not self.authenticated:
257 await self.login(cluster_uuid)
258
259 if self.controller.is_connected():
260 # Destroy the model
261 namespace = self.get_namespace(cluster_uuid)
262 if await self.has_model(namespace):
263 print("[reset] Destroying model")
264 await self.controller.destroy_model(
265 namespace,
266 destroy_storage=True
267 )
268
269 # Disconnect from the controller
270 print("[reset] Disconnecting controller")
271 await self.controller.disconnect()
272
273 # Destroy the controller (via CLI)
274 print("[reset] Destroying controller")
275 await self.destroy_controller(cluster_uuid)
276
277 print("[reset] Removing k8s cloud")
278 namespace = self.get_namespace(cluster_uuid)
279 k8s_cloud = "{}-k8s".format(namespace)
280 await self.remove_cloud(k8s_cloud)
281
282 except Exception as ex:
283 print("Caught exception during reset: {}".format(ex))
284
285 """Deployment"""
286
287 async def install(
288 self,
289 cluster_uuid: str,
290 kdu_model: str,
291 atomic: bool = True,
292 timeout: float = 300,
293 params: dict = None,
294 db_dict: dict = None
295 ) -> bool:
296 """Install a bundle
297
298 :param cluster_uuid str: The UUID of the cluster to install to
299 :param kdu_model str: The name or path of a bundle to install
300 :param atomic bool: If set, waits until the model is active and resets
301 the cluster on failure.
302 :param timeout int: The time, in seconds, to wait for the install
303 to finish
304 :param params dict: Key-value pairs of instantiation parameters
305
306 :return: If successful, returns ?
307 """
308
309 if not self.authenticated:
310 print("[install] Logging in to the controller")
311 await self.login(cluster_uuid)
312
313 ##
314 # Get or create the model, based on the NS
315 # uuid.
316 model_name = db_dict["filter"]["_id"]
317
318 self.log.debug("Checking for model named {}".format(model_name))
319 model = await self.get_model(model_name, cluster_uuid=cluster_uuid)
320 if not model:
321 # Create the new model
322 self.log.debug("Adding model: {}".format(model_name))
323 model = await self.add_model(model_name, cluster_uuid=cluster_uuid)
324
325 if model:
326 # TODO: Instantiation parameters
327
328 """
329 "Juju bundle that models the KDU, in any of the following ways:
330 - <juju-repo>/<juju-bundle>
331 - <juju-bundle folder under k8s_models folder in the package>
332 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
333 - <URL_where_to_fetch_juju_bundle>
334 """
335
336 bundle = kdu_model
337 if kdu_model.startswith("cs:"):
338 bundle = kdu_model
339 elif kdu_model.startswith("http"):
340 # Download the file
341 pass
342 else:
343 # Local file
344
345 # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
346 # Uncompress temporarily
347 # bundle = <uncompressed file>
348 pass
349
350 if not bundle:
351 # Raise named exception that the bundle could not be found
352 raise Exception()
353
354 print("[install] deploying {}".format(bundle))
355 await model.deploy(bundle)
356
357 # Get the application
358 if atomic:
359 # applications = model.applications
360 print("[install] Applications: {}".format(model.applications))
361 for name in model.applications:
362 print("[install] Waiting for {} to settle".format(name))
363 application = model.applications[name]
364 try:
365 # It's not enough to wait for all units to be active;
366 # the application status needs to be active as well.
367 print("Waiting for all units to be active...")
368 await model.block_until(
369 lambda: all(
370 unit.agent_status == 'idle'
371 and application.status in ['active', 'unknown']
372 and unit.workload_status in [
373 'active', 'unknown'
374 ] for unit in application.units
375 ),
376 timeout=timeout
377 )
378 print("All units active.")
379
380 except concurrent.futures._base.TimeoutError:
381 print("[install] Timeout exceeded; resetting cluster")
382 await self.reset(cluster_uuid)
383 return False
384
385 # Wait for the application to be active
386 if model.is_connected():
387 print("[install] Disconnecting model")
388 await model.disconnect()
389
390 return True
391 raise Exception("Unable to install")
392
393 async def instances_list(
394 self,
395 cluster_uuid: str
396 ) -> list:
397 """
398 returns a list of deployed releases in a cluster
399
400 :param cluster_uuid: the cluster
401 :return:
402 """
403 return []
404
405 async def upgrade(
406 self,
407 cluster_uuid: str,
408 kdu_instance: str,
409 kdu_model: str = None,
410 params: dict = None,
411 ) -> str:
412 """Upgrade a model
413
414 :param cluster_uuid str: The UUID of the cluster to upgrade
415 :param kdu_instance str: The unique name of the KDU instance
416 :param kdu_model str: The name or path of the bundle to upgrade to
417 :param params dict: Key-value pairs of instantiation parameters
418
419 :return: If successful, reference to the new revision number of the
420 KDU instance.
421 """
422
423 # TODO: Loop through the bundle and upgrade each charm individually
424
425 """
426 The API doesn't have a concept of bundle upgrades, because there are
427 many possible changes: charm revision, disk, number of units, etc.
428
429 As such, we are only supporting a limited subset of upgrades. We'll
430 upgrade the charm revision but leave storage and scale untouched.
431
432 Scale changes should happen through OSM constructs, and changes to
433 storage would require a redeployment of the service, at least in this
434 initial release.
435 """
436 namespace = self.get_namespace(cluster_uuid)
437 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
438
439 with open(kdu_model, 'r') as f:
440 bundle = yaml.safe_load(f)
441
442 """
443 {
444 'description': 'Test bundle',
445 'bundle': 'kubernetes',
446 'applications': {
447 'mariadb-k8s': {
448 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
449 'scale': 1,
450 'options': {
451 'password': 'manopw',
452 'root_password': 'osm4u',
453 'user': 'mano'
454 },
455 'series': 'kubernetes'
456 }
457 }
458 }
459 """
460 # TODO: This should be returned in an agreed-upon format
461 for name in bundle['applications']:
462 print(model.applications)
463 application = model.applications[name]
464 print(application)
465
466 path = bundle['applications'][name]['charm']
467
468 try:
469 await application.upgrade_charm(switch=path)
470 except juju.errors.JujuError as ex:
471 if 'already running charm' in str(ex):
472 # We're already running this version
473 pass
474
475 await model.disconnect()
476
477 return True
478 raise NotImplemented()
479
480 """Rollback"""
481 async def rollback(
482 self,
483 cluster_uuid: str,
484 kdu_instance: str,
485 revision: int = 0,
486 ) -> str:
487 """Rollback a model
488
489 :param cluster_uuid str: The UUID of the cluster to rollback
490 :param kdu_instance str: The unique name of the KDU instance
491 :param revision int: The revision to revert to. If omitted, rolls back
492 the previous upgrade.
493
494 :return: If successful, returns the revision of active KDU instance,
495 or raises an exception
496 """
497 raise NotImplemented()
498
499 """Deletion"""
500 async def uninstall(
501 self,
502 cluster_uuid: str,
503 kdu_instance: str,
504 ) -> bool:
505 """Uninstall a KDU instance
506
507 :param cluster_uuid str: The UUID of the cluster to uninstall
508 :param kdu_instance str: The unique name of the KDU instance
509
510 :return: Returns True if successful, or raises an exception
511 """
512 removed = False
513
514 # Remove an application from the model
515 model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
516
517 if model:
518 # Get the application
519 if kdu_instance not in model.applications:
520 # TODO: Raise a named exception
521 raise Exception("Application not found.")
522
523 application = model.applications[kdu_instance]
524
525 # Destroy the application
526 await application.destroy()
527
528 # TODO: Verify removal
529
530 removed = True
531 return removed
532
533 """Introspection"""
534 async def inspect_kdu(
535 self,
536 kdu_model: str,
537 ) -> dict:
538 """Inspect a KDU
539
540 Inspects a bundle and returns a dictionary of config parameters and
541 their default values.
542
543 :param kdu_model str: The name or path of the bundle to inspect.
544
545 :return: If successful, returns a dictionary of available parameters
546 and their default values.
547 """
548
549 kdu = {}
550 with open(kdu_model, 'r') as f:
551 bundle = yaml.safe_load(f)
552
553 """
554 {
555 'description': 'Test bundle',
556 'bundle': 'kubernetes',
557 'applications': {
558 'mariadb-k8s': {
559 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
560 'scale': 1,
561 'options': {
562 'password': 'manopw',
563 'root_password': 'osm4u',
564 'user': 'mano'
565 },
566 'series': 'kubernetes'
567 }
568 }
569 }
570 """
571 # TODO: This should be returned in an agreed-upon format
572 kdu = bundle['applications']
573
574 return kdu
575
576 async def help_kdu(
577 self,
578 kdu_model: str,
579 ) -> str:
580 """View the README
581
582 If available, returns the README of the bundle.
583
584 :param kdu_model str: The name or path of a bundle
585
586 :return: If found, returns the contents of the README.
587 """
588 readme = None
589
590 files = ['README', 'README.txt', 'README.md']
591 path = os.path.dirname(kdu_model)
592 for file in os.listdir(path):
593 if file in files:
594 with open(file, 'r') as f:
595 readme = f.read()
596 break
597
598 return readme
599
600 async def status_kdu(
601 self,
602 cluster_uuid: str,
603 kdu_instance: str,
604 ) -> dict:
605 """Get the status of the KDU
606
607 Get the current status of the KDU instance.
608
609 :param cluster_uuid str: The UUID of the cluster
610 :param kdu_instance str: The unique id of the KDU instance
611
612 :return: Returns a dictionary containing namespace, state, resources,
613 and deployment_time.
614 """
615 status = {}
616
617 model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
618
619 # model = await self.get_model_by_uuid(cluster_uuid)
620 if model:
621 model_status = await model.get_status()
622 status = model_status.applications
623
624 for name in model_status.applications:
625 application = model_status.applications[name]
626 status[name] = {
627 'status': application['status']['status']
628 }
629
630 if model.is_connected():
631 await model.disconnect()
632
633 return status
634
635 # Private methods
636 async def add_k8s(
637 self,
638 cloud_name: str,
639 credentials: str,
640 ) -> bool:
641 """Add a k8s cloud to Juju
642
643 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
644 Juju Controller.
645
646 :param cloud_name str: The name of the cloud to add.
647 :param credentials dict: A dictionary representing the output of
648 `kubectl config view --raw`.
649
650 :returns: True if successful, otherwise raises an exception.
651 """
652
653 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
654 print(cmd)
655
656 process = await asyncio.create_subprocess_exec(
657 *cmd,
658 stdout=asyncio.subprocess.PIPE,
659 stderr=asyncio.subprocess.PIPE,
660 stdin=asyncio.subprocess.PIPE,
661 )
662
663 # Feed the process the credentials
664 process.stdin.write(credentials.encode("utf-8"))
665 await process.stdin.drain()
666 process.stdin.close()
667
668 stdout, stderr = await process.communicate()
669
670 return_code = process.returncode
671
672 print("add-k8s return code: {}".format(return_code))
673
674 if return_code > 0:
675 raise Exception(stderr)
676
677 return True
678
679 async def add_model(
680 self,
681 model_name: str,
682 cluster_uuid: str,
683 ) -> juju.model.Model:
684 """Adds a model to the controller
685
686 Adds a new model to the Juju controller
687
688 :param model_name str: The name of the model to add.
689 :returns: The juju.model.Model object of the new model upon success or
690 raises an exception.
691 """
692 if not self.authenticated:
693 await self.login(cluster_uuid)
694
695 self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
696 model = await self.controller.add_model(
697 model_name,
698 config={'authorized-keys': self.juju_public_key}
699 )
700 return model
701
702 async def bootstrap(
703 self,
704 cloud_name: str,
705 cluster_uuid: str,
706 loadbalancer: bool
707 ) -> bool:
708 """Bootstrap a Kubernetes controller
709
710 Bootstrap a Juju controller inside the Kubernetes cluster
711
712 :param cloud_name str: The name of the cloud.
713 :param cluster_uuid str: The UUID of the cluster to bootstrap.
714 :param loadbalancer bool: If the controller should use loadbalancer or not.
715 :returns: True upon success or raises an exception.
716 """
717
718 if not loadbalancer:
719 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
720 else:
721 """
722 For public clusters, specify that the controller service is using a LoadBalancer.
723 """
724 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
725
726 print("Bootstrapping controller {} in cloud {}".format(
727 cluster_uuid, cloud_name
728 ))
729
730 process = await asyncio.create_subprocess_exec(
731 *cmd,
732 stdout=asyncio.subprocess.PIPE,
733 stderr=asyncio.subprocess.PIPE,
734 )
735
736 stdout, stderr = await process.communicate()
737
738 return_code = process.returncode
739
740 if return_code > 0:
741 #
742 if b'already exists' not in stderr:
743 raise Exception(stderr)
744
745 return True
746
747 async def destroy_controller(
748 self,
749 cluster_uuid: str
750 ) -> bool:
751 """Destroy a Kubernetes controller
752
753 Destroy an existing Kubernetes controller.
754
755 :param cluster_uuid str: The UUID of the cluster to bootstrap.
756 :returns: True upon success or raises an exception.
757 """
758 cmd = [
759 self.juju_command,
760 "destroy-controller",
761 "--destroy-all-models",
762 "--destroy-storage",
763 "-y",
764 cluster_uuid
765 ]
766
767 process = await asyncio.create_subprocess_exec(
768 *cmd,
769 stdout=asyncio.subprocess.PIPE,
770 stderr=asyncio.subprocess.PIPE,
771 )
772
773 stdout, stderr = await process.communicate()
774
775 return_code = process.returncode
776
777 if return_code > 0:
778 #
779 if 'already exists' not in stderr:
780 raise Exception(stderr)
781
782 def get_config(
783 self,
784 cluster_uuid: str,
785 ) -> dict:
786 """Get the cluster configuration
787
788 Gets the configuration of the cluster
789
790 :param cluster_uuid str: The UUID of the cluster.
791 :return: A dict upon success, or raises an exception.
792 """
793 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
794 if os.path.exists(cluster_config):
795 with open(cluster_config, 'r') as f:
796 config = yaml.safe_load(f.read())
797 return config
798 else:
799 raise Exception(
800 "Unable to locate configuration for cluster {}".format(
801 cluster_uuid
802 )
803 )
804
805 async def get_model(
806 self,
807 model_name: str,
808 cluster_uuid: str,
809 ) -> juju.model.Model:
810 """Get a model from the Juju Controller.
811
812 Note: Model objects returned must call disconnected() before it goes
813 out of scope.
814
815 :param model_name str: The name of the model to get
816 :return The juju.model.Model object if found, or None.
817 """
818 if not self.authenticated:
819 await self.login(cluster_uuid)
820
821 model = None
822 models = await self.controller.list_models()
823 self.log.debug(models)
824 if model_name in models:
825 self.log.debug("Found model: {}".format(model_name))
826 model = await self.controller.get_model(
827 model_name
828 )
829 return model
830
831 def get_namespace(
832 self,
833 cluster_uuid: str,
834 ) -> str:
835 """Get the namespace UUID
836 Gets the namespace's unique name
837
838 :param cluster_uuid str: The UUID of the cluster
839 :returns: The namespace UUID, or raises an exception
840 """
841 config = self.get_config(cluster_uuid)
842
843 # Make sure the name is in the config
844 if 'namespace' not in config:
845 raise Exception("Namespace not found.")
846
847 # TODO: We want to make sure this is unique to the cluster, in case
848 # the cluster is being reused.
849 # Consider pre/appending the cluster id to the namespace string
850 return config['namespace']
851
852 async def has_model(
853 self,
854 model_name: str
855 ) -> bool:
856 """Check if a model exists in the controller
857
858 Checks to see if a model exists in the connected Juju controller.
859
860 :param model_name str: The name of the model
861 :return: A boolean indicating if the model exists
862 """
863 models = await self.controller.list_models()
864
865 if model_name in models:
866 return True
867 return False
868
869 def is_local_k8s(
870 self,
871 credentials: str,
872 ) -> bool:
873 """Check if a cluster is local
874
875 Checks if a cluster is running in the local host
876
877 :param credentials dict: A dictionary containing the k8s credentials
878 :returns: A boolean if the cluster is running locally
879 """
880 creds = yaml.safe_load(credentials)
881 if os.getenv("OSMLCM_VCA_APIPROXY"):
882 host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
883
884 if creds and host_ip:
885 for cluster in creds['clusters']:
886 if 'server' in cluster['cluster']:
887 if host_ip in cluster['cluster']['server']:
888 return True
889
890 return False
891
892 async def login(self, cluster_uuid):
893 """Login to the Juju controller."""
894
895 if self.authenticated:
896 return
897
898 self.connecting = True
899
900 # Test: Make sure we have the credentials loaded
901 config = self.get_config(cluster_uuid)
902
903 self.juju_endpoint = config['endpoint']
904 self.juju_user = config['username']
905 self.juju_secret = config['secret']
906 self.juju_ca_cert = config['cacert']
907 self.juju_public_key = None
908
909 self.controller = Controller()
910
911 if self.juju_secret:
912 self.log.debug(
913 "Connecting to controller... ws://{} as {}/{}".format(
914 self.juju_endpoint,
915 self.juju_user,
916 self.juju_secret,
917 )
918 )
919 try:
920 await self.controller.connect(
921 endpoint=self.juju_endpoint,
922 username=self.juju_user,
923 password=self.juju_secret,
924 cacert=self.juju_ca_cert,
925 )
926 self.authenticated = True
927 self.log.debug("JujuApi: Logged into controller")
928 except Exception as ex:
929 print(ex)
930 self.log.debug("Caught exception: {}".format(ex))
931 pass
932 else:
933 self.log.fatal("VCA credentials not configured.")
934 self.authenticated = False
935
936 async def logout(self):
937 """Logout of the Juju controller."""
938 print("[logout]")
939 if not self.authenticated:
940 return False
941
942 for model in self.models:
943 print("Logging out of model {}".format(model))
944 await self.models[model].disconnect()
945
946 if self.controller:
947 self.log.debug("Disconnecting controller {}".format(
948 self.controller
949 ))
950 await self.controller.disconnect()
951 self.controller = None
952
953 self.authenticated = False
954
955 async def remove_cloud(
956 self,
957 cloud_name: str,
958 ) -> bool:
959 """Remove a k8s cloud from Juju
960
961 Removes a Kubernetes cloud from Juju.
962
963 :param cloud_name str: The name of the cloud to add.
964
965 :returns: True if successful, otherwise raises an exception.
966 """
967
968 # Remove the bootstrapped controller
969 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
970 process = await asyncio.create_subprocess_exec(
971 *cmd,
972 stdout=asyncio.subprocess.PIPE,
973 stderr=asyncio.subprocess.PIPE,
974 )
975
976 stdout, stderr = await process.communicate()
977
978 return_code = process.returncode
979
980 if return_code > 0:
981 raise Exception(stderr)
982
983 # Remove the cloud from the local config
984 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
985 process = await asyncio.create_subprocess_exec(
986 *cmd,
987 stdout=asyncio.subprocess.PIPE,
988 stderr=asyncio.subprocess.PIPE,
989 )
990
991 stdout, stderr = await process.communicate()
992
993 return_code = process.returncode
994
995 if return_code > 0:
996 raise Exception(stderr)
997
998 return True
999
1000 async def set_config(
1001 self,
1002 cluster_uuid: str,
1003 config: dict,
1004 ) -> bool:
1005 """Save the cluster configuration
1006
1007 Saves the cluster information to the file store
1008
1009 :param cluster_uuid str: The UUID of the cluster
1010 :param config dict: A dictionary containing the cluster configuration
1011 :returns: Boolean upon success or raises an exception.
1012 """
1013
1014 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1015 if not os.path.exists(cluster_config):
1016 print("Writing config to {}".format(cluster_config))
1017 with open(cluster_config, 'w') as f:
1018 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1019
1020 return True