Synchronize helm repos on ns instantiation instead of creation
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 from .exceptions import NotImplemented
18
19 import io
20 import juju
21 # from juju.bundle import BundleHandler
22 from juju.controller import Controller
23 from juju.model import Model
24 from juju.errors import JujuAPIError, JujuError
25
26 import logging
27
28 from n2vc.k8s_conn import K8sConnector
29
30 import os
31 # import re
32 # import ssl
33 # from .vnf import N2VC
34
35 import uuid
36 import yaml
37
38
39 class K8sJujuConnector(K8sConnector):
40
41 def __init__(
42 self,
43 fs: object,
44 db: object,
45 kubectl_command: str = '/usr/bin/kubectl',
46 juju_command: str = '/usr/bin/juju',
47 log=None,
48 on_update_db=None,
49 ):
50 """
51
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param fs: file system for kubernetes and helm configuration
55 :param log: logger
56 """
57
58 # parent class
59 K8sConnector.__init__(
60 self,
61 db,
62 log=log,
63 on_update_db=on_update_db,
64 )
65
66 self.fs = fs
67 self.info('Initializing K8S Juju connector')
68
69 self.authenticated = False
70 self.models = {}
71 self.log = logging.getLogger(__name__)
72
73 self.juju_command = juju_command
74 self.juju_secret = ""
75
76 self.info('K8S Juju connector initialized')
77
78 """Initialization"""
79 async def init_env(
80 self,
81 k8s_creds: str,
82 namespace: str = 'kube-system',
83 reuse_cluster_uuid: str = None,
84 ) -> (str, bool):
85 """
86 It prepares a given K8s cluster environment to run Juju bundles.
87
88 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
89 :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
90 :param reuse_cluster_uuid: existing cluster uuid for reuse
91 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
92 (on error, an exception will be raised)
93 """
94
95 """Bootstrapping
96
97 Bootstrapping cannot be done, by design, through the API. We need to
98 use the CLI tools.
99 """
100
101 """
102 WIP: Workflow
103
104 1. Has the environment already been bootstrapped?
105 - Check the database to see if we have a record for this env
106
107 2. If this is a new env, create it
108 - Add the k8s cloud to Juju
109 - Bootstrap
110 - Record it in the database
111
112 3. Connect to the Juju controller for this cloud
113
114 """
115 # cluster_uuid = reuse_cluster_uuid
116 # if not cluster_uuid:
117 # cluster_uuid = str(uuid4())
118
119 ##################################################
120 # TODO: Pull info from db based on the namespace #
121 ##################################################
122
123 ###################################################
124 # TODO: Make it idempotent, calling add-k8s and #
125 # bootstrap whenever reuse_cluster_uuid is passed #
126 # as parameter #
127 # `init_env` is called to initialize the K8s #
128 # cluster for juju. If this initialization fails, #
129 # it can be called again by LCM with the param #
130 # reuse_cluster_uuid, e.g. to try to fix it. #
131 ###################################################
132
133 if not reuse_cluster_uuid:
134 # This is a new cluster, so bootstrap it
135
136 cluster_uuid = str(uuid.uuid4())
137
138 # Is a local k8s cluster?
139 localk8s = self.is_local_k8s(k8s_creds)
140
141 # If the k8s is external, the juju controller needs a loadbalancer
142 loadbalancer = False if localk8s else True
143
144 # Name the new k8s cloud
145 k8s_cloud = "k8s-{}".format(cluster_uuid)
146
147 print("Adding k8s cloud {}".format(k8s_cloud))
148 await self.add_k8s(k8s_cloud, k8s_creds)
149
150 # Bootstrap Juju controller
151 print("Bootstrapping...")
152 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
153 print("Bootstrap done.")
154
155 # Get the controller information
156
157 # Parse ~/.local/share/juju/controllers.yaml
158 # controllers.testing.api-endpoints|ca-cert|uuid
159 print("Getting controller endpoints")
160 with open(os.path.expanduser(
161 "~/.local/share/juju/controllers.yaml"
162 )) as f:
163 controllers = yaml.load(f, Loader=yaml.Loader)
164 controller = controllers['controllers'][cluster_uuid]
165 endpoints = controller['api-endpoints']
166 self.juju_endpoint = endpoints[0]
167 self.juju_ca_cert = controller['ca-cert']
168
169 # Parse ~/.local/share/juju/accounts
170 # controllers.testing.user|password
171 print("Getting accounts")
172 with open(os.path.expanduser(
173 "~/.local/share/juju/accounts.yaml"
174 )) as f:
175 controllers = yaml.load(f, Loader=yaml.Loader)
176 controller = controllers['controllers'][cluster_uuid]
177
178 self.juju_user = controller['user']
179 self.juju_secret = controller['password']
180
181 print("user: {}".format(self.juju_user))
182 print("secret: {}".format(self.juju_secret))
183 print("endpoint: {}".format(self.juju_endpoint))
184 print("ca-cert: {}".format(self.juju_ca_cert))
185
186 # raise Exception("EOL")
187
188 self.juju_public_key = None
189
190 config = {
191 'endpoint': self.juju_endpoint,
192 'username': self.juju_user,
193 'secret': self.juju_secret,
194 'cacert': self.juju_ca_cert,
195 'namespace': namespace,
196 'loadbalancer': loadbalancer,
197 }
198
199 # Store the cluster configuration so it
200 # can be used for subsequent calls
201 print("Setting config")
202 await self.set_config(cluster_uuid, config)
203
204 else:
205 # This is an existing cluster, so get its config
206 cluster_uuid = reuse_cluster_uuid
207
208 config = self.get_config(cluster_uuid)
209
210 self.juju_endpoint = config['endpoint']
211 self.juju_user = config['username']
212 self.juju_secret = config['secret']
213 self.juju_ca_cert = config['cacert']
214 self.juju_public_key = None
215
216 # Login to the k8s cluster
217 if not self.authenticated:
218 await self.login(cluster_uuid)
219
220 # We're creating a new cluster
221 print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
222 model = await self.get_model(
223 self.get_namespace(cluster_uuid),
224 cluster_uuid=cluster_uuid
225 )
226
227 # Disconnect from the model
228 if model and model.is_connected():
229 await model.disconnect()
230
231 return cluster_uuid, True
232
233 """Repo Management"""
234 async def repo_add(
235 self,
236 name: str,
237 url: str,
238 type: str = "charm",
239 ):
240 raise NotImplemented()
241
242 async def repo_list(self):
243 raise NotImplemented()
244
245 async def repo_remove(
246 self,
247 name: str,
248 ):
249 raise NotImplemented()
250
251 async def synchronize_repos(
252 self,
253 cluster_uuid: str,
254 name: str
255 ):
256 """
257 Returns None as currently add_repo is not implemented
258 """
259 return None
260
261 """Reset"""
262 async def reset(
263 self,
264 cluster_uuid: str,
265 force: bool = False,
266 uninstall_sw: bool = False
267 ) -> bool:
268 """Reset a cluster
269
270 Resets the Kubernetes cluster by removing the model that represents it.
271
272 :param cluster_uuid str: The UUID of the cluster to reset
273 :return: Returns True if successful or raises an exception.
274 """
275
276 try:
277 if not self.authenticated:
278 await self.login(cluster_uuid)
279
280 if self.controller.is_connected():
281 # Destroy the model
282 namespace = self.get_namespace(cluster_uuid)
283 if await self.has_model(namespace):
284 print("[reset] Destroying model")
285 await self.controller.destroy_model(
286 namespace,
287 destroy_storage=True
288 )
289
290 # Disconnect from the controller
291 print("[reset] Disconnecting controller")
292 await self.controller.disconnect()
293
294 # Destroy the controller (via CLI)
295 print("[reset] Destroying controller")
296 await self.destroy_controller(cluster_uuid)
297
298 print("[reset] Removing k8s cloud")
299 namespace = self.get_namespace(cluster_uuid)
300 k8s_cloud = "{}-k8s".format(namespace)
301 await self.remove_cloud(k8s_cloud)
302
303 except Exception as ex:
304 print("Caught exception during reset: {}".format(ex))
305
306 """Deployment"""
307
308 async def install(
309 self,
310 cluster_uuid: str,
311 kdu_model: str,
312 atomic: bool = True,
313 timeout: float = 300,
314 params: dict = None,
315 db_dict: dict = None,
316 kdu_name: str = None
317 ) -> bool:
318 """Install a bundle
319
320 :param cluster_uuid str: The UUID of the cluster to install to
321 :param kdu_model str: The name or path of a bundle to install
322 :param atomic bool: If set, waits until the model is active and resets
323 the cluster on failure.
324 :param timeout int: The time, in seconds, to wait for the install
325 to finish
326 :param params dict: Key-value pairs of instantiation parameters
327 :param kdu_name: Name of the KDU instance to be installed
328
329 :return: If successful, returns ?
330 """
331
332 if not self.authenticated:
333 print("[install] Logging in to the controller")
334 await self.login(cluster_uuid)
335
336 ##
337 # Get or create the model, based on the NS
338 # uuid.
339 if kdu_name:
340 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
341 else:
342 kdu_instance = db_dict["filter"]["_id"]
343
344 self.log.debug("Checking for model named {}".format(kdu_instance))
345 model = await self.get_model(kdu_instance, cluster_uuid=cluster_uuid)
346 if not model:
347 # Create the new model
348 self.log.debug("Adding model: {}".format(kdu_instance))
349 model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
350
351 if model:
352 # TODO: Instantiation parameters
353
354 """
355 "Juju bundle that models the KDU, in any of the following ways:
356 - <juju-repo>/<juju-bundle>
357 - <juju-bundle folder under k8s_models folder in the package>
358 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
359 - <URL_where_to_fetch_juju_bundle>
360 """
361
362 bundle = kdu_model
363 if kdu_model.startswith("cs:"):
364 bundle = kdu_model
365 elif kdu_model.startswith("http"):
366 # Download the file
367 pass
368 else:
369 # Local file
370
371 # if kdu_model.endswith(".tar.gz") or kdu_model.endswith(".tgz")
372 # Uncompress temporarily
373 # bundle = <uncompressed file>
374 pass
375
376 if not bundle:
377 # Raise named exception that the bundle could not be found
378 raise Exception()
379
380 print("[install] deploying {}".format(bundle))
381 await model.deploy(bundle)
382
383 # Get the application
384 if atomic:
385 # applications = model.applications
386 print("[install] Applications: {}".format(model.applications))
387 for name in model.applications:
388 print("[install] Waiting for {} to settle".format(name))
389 application = model.applications[name]
390 try:
391 # It's not enough to wait for all units to be active;
392 # the application status needs to be active as well.
393 print("Waiting for all units to be active...")
394 await model.block_until(
395 lambda: all(
396 unit.agent_status == 'idle'
397 and application.status in ['active', 'unknown']
398 and unit.workload_status in [
399 'active', 'unknown'
400 ] for unit in application.units
401 ),
402 timeout=timeout
403 )
404 print("All units active.")
405
406 except concurrent.futures._base.TimeoutError:
407 print("[install] Timeout exceeded; resetting cluster")
408 await self.reset(cluster_uuid)
409 return False
410
411 # Wait for the application to be active
412 if model.is_connected():
413 print("[install] Disconnecting model")
414 await model.disconnect()
415
416 return kdu_instance
417 raise Exception("Unable to install")
418
419 async def instances_list(
420 self,
421 cluster_uuid: str
422 ) -> list:
423 """
424 returns a list of deployed releases in a cluster
425
426 :param cluster_uuid: the cluster
427 :return:
428 """
429 return []
430
431 async def upgrade(
432 self,
433 cluster_uuid: str,
434 kdu_instance: str,
435 kdu_model: str = None,
436 params: dict = None,
437 ) -> str:
438 """Upgrade a model
439
440 :param cluster_uuid str: The UUID of the cluster to upgrade
441 :param kdu_instance str: The unique name of the KDU instance
442 :param kdu_model str: The name or path of the bundle to upgrade to
443 :param params dict: Key-value pairs of instantiation parameters
444
445 :return: If successful, reference to the new revision number of the
446 KDU instance.
447 """
448
449 # TODO: Loop through the bundle and upgrade each charm individually
450
451 """
452 The API doesn't have a concept of bundle upgrades, because there are
453 many possible changes: charm revision, disk, number of units, etc.
454
455 As such, we are only supporting a limited subset of upgrades. We'll
456 upgrade the charm revision but leave storage and scale untouched.
457
458 Scale changes should happen through OSM constructs, and changes to
459 storage would require a redeployment of the service, at least in this
460 initial release.
461 """
462 namespace = self.get_namespace(cluster_uuid)
463 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
464
465 with open(kdu_model, 'r') as f:
466 bundle = yaml.safe_load(f)
467
468 """
469 {
470 'description': 'Test bundle',
471 'bundle': 'kubernetes',
472 'applications': {
473 'mariadb-k8s': {
474 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
475 'scale': 1,
476 'options': {
477 'password': 'manopw',
478 'root_password': 'osm4u',
479 'user': 'mano'
480 },
481 'series': 'kubernetes'
482 }
483 }
484 }
485 """
486 # TODO: This should be returned in an agreed-upon format
487 for name in bundle['applications']:
488 print(model.applications)
489 application = model.applications[name]
490 print(application)
491
492 path = bundle['applications'][name]['charm']
493
494 try:
495 await application.upgrade_charm(switch=path)
496 except juju.errors.JujuError as ex:
497 if 'already running charm' in str(ex):
498 # We're already running this version
499 pass
500
501 await model.disconnect()
502
503 return True
504 raise NotImplemented()
505
506 """Rollback"""
507 async def rollback(
508 self,
509 cluster_uuid: str,
510 kdu_instance: str,
511 revision: int = 0,
512 ) -> str:
513 """Rollback a model
514
515 :param cluster_uuid str: The UUID of the cluster to rollback
516 :param kdu_instance str: The unique name of the KDU instance
517 :param revision int: The revision to revert to. If omitted, rolls back
518 the previous upgrade.
519
520 :return: If successful, returns the revision of active KDU instance,
521 or raises an exception
522 """
523 raise NotImplemented()
524
525 """Deletion"""
526 async def uninstall(
527 self,
528 cluster_uuid: str,
529 kdu_instance: str
530 ) -> bool:
531 """Uninstall a KDU instance
532
533 :param cluster_uuid str: The UUID of the cluster
534 :param kdu_instance str: The unique name of the KDU instance
535
536 :return: Returns True if successful, or raises an exception
537 """
538 await self.controller.destroy_models(kdu_instance)
539
540 return True
541
542 """Introspection"""
543 async def inspect_kdu(
544 self,
545 kdu_model: str,
546 ) -> dict:
547 """Inspect a KDU
548
549 Inspects a bundle and returns a dictionary of config parameters and
550 their default values.
551
552 :param kdu_model str: The name or path of the bundle to inspect.
553
554 :return: If successful, returns a dictionary of available parameters
555 and their default values.
556 """
557
558 kdu = {}
559 with open(kdu_model, 'r') as f:
560 bundle = yaml.safe_load(f)
561
562 """
563 {
564 'description': 'Test bundle',
565 'bundle': 'kubernetes',
566 'applications': {
567 'mariadb-k8s': {
568 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
569 'scale': 1,
570 'options': {
571 'password': 'manopw',
572 'root_password': 'osm4u',
573 'user': 'mano'
574 },
575 'series': 'kubernetes'
576 }
577 }
578 }
579 """
580 # TODO: This should be returned in an agreed-upon format
581 kdu = bundle['applications']
582
583 return kdu
584
585 async def help_kdu(
586 self,
587 kdu_model: str,
588 ) -> str:
589 """View the README
590
591 If available, returns the README of the bundle.
592
593 :param kdu_model str: The name or path of a bundle
594
595 :return: If found, returns the contents of the README.
596 """
597 readme = None
598
599 files = ['README', 'README.txt', 'README.md']
600 path = os.path.dirname(kdu_model)
601 for file in os.listdir(path):
602 if file in files:
603 with open(file, 'r') as f:
604 readme = f.read()
605 break
606
607 return readme
608
609 async def status_kdu(
610 self,
611 cluster_uuid: str,
612 kdu_instance: str,
613 ) -> dict:
614 """Get the status of the KDU
615
616 Get the current status of the KDU instance.
617
618 :param cluster_uuid str: The UUID of the cluster
619 :param kdu_instance str: The unique id of the KDU instance
620
621 :return: Returns a dictionary containing namespace, state, resources,
622 and deployment_time.
623 """
624 status = {}
625
626 model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
627
628 # model = await self.get_model_by_uuid(cluster_uuid)
629 if model:
630 model_status = await model.get_status()
631 status = model_status.applications
632
633 for name in model_status.applications:
634 application = model_status.applications[name]
635 status[name] = {
636 'status': application['status']['status']
637 }
638
639 if model.is_connected():
640 await model.disconnect()
641
642 return status
643
644 # Private methods
645 async def add_k8s(
646 self,
647 cloud_name: str,
648 credentials: str,
649 ) -> bool:
650 """Add a k8s cloud to Juju
651
652 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
653 Juju Controller.
654
655 :param cloud_name str: The name of the cloud to add.
656 :param credentials dict: A dictionary representing the output of
657 `kubectl config view --raw`.
658
659 :returns: True if successful, otherwise raises an exception.
660 """
661
662 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
663 print(cmd)
664
665 process = await asyncio.create_subprocess_exec(
666 *cmd,
667 stdout=asyncio.subprocess.PIPE,
668 stderr=asyncio.subprocess.PIPE,
669 stdin=asyncio.subprocess.PIPE,
670 )
671
672 # Feed the process the credentials
673 process.stdin.write(credentials.encode("utf-8"))
674 await process.stdin.drain()
675 process.stdin.close()
676
677 stdout, stderr = await process.communicate()
678
679 return_code = process.returncode
680
681 print("add-k8s return code: {}".format(return_code))
682
683 if return_code > 0:
684 raise Exception(stderr)
685
686 return True
687
688 async def add_model(
689 self,
690 model_name: str,
691 cluster_uuid: str,
692 ) -> juju.model.Model:
693 """Adds a model to the controller
694
695 Adds a new model to the Juju controller
696
697 :param model_name str: The name of the model to add.
698 :returns: The juju.model.Model object of the new model upon success or
699 raises an exception.
700 """
701 if not self.authenticated:
702 await self.login(cluster_uuid)
703
704 self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
705 model = await self.controller.add_model(
706 model_name,
707 config={'authorized-keys': self.juju_public_key}
708 )
709 return model
710
711 async def bootstrap(
712 self,
713 cloud_name: str,
714 cluster_uuid: str,
715 loadbalancer: bool
716 ) -> bool:
717 """Bootstrap a Kubernetes controller
718
719 Bootstrap a Juju controller inside the Kubernetes cluster
720
721 :param cloud_name str: The name of the cloud.
722 :param cluster_uuid str: The UUID of the cluster to bootstrap.
723 :param loadbalancer bool: If the controller should use loadbalancer or not.
724 :returns: True upon success or raises an exception.
725 """
726
727 if not loadbalancer:
728 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
729 else:
730 """
731 For public clusters, specify that the controller service is using a LoadBalancer.
732 """
733 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
734
735 print("Bootstrapping controller {} in cloud {}".format(
736 cluster_uuid, cloud_name
737 ))
738
739 process = await asyncio.create_subprocess_exec(
740 *cmd,
741 stdout=asyncio.subprocess.PIPE,
742 stderr=asyncio.subprocess.PIPE,
743 )
744
745 stdout, stderr = await process.communicate()
746
747 return_code = process.returncode
748
749 if return_code > 0:
750 #
751 if b'already exists' not in stderr:
752 raise Exception(stderr)
753
754 return True
755
756 async def destroy_controller(
757 self,
758 cluster_uuid: str
759 ) -> bool:
760 """Destroy a Kubernetes controller
761
762 Destroy an existing Kubernetes controller.
763
764 :param cluster_uuid str: The UUID of the cluster to bootstrap.
765 :returns: True upon success or raises an exception.
766 """
767 cmd = [
768 self.juju_command,
769 "destroy-controller",
770 "--destroy-all-models",
771 "--destroy-storage",
772 "-y",
773 cluster_uuid
774 ]
775
776 process = await asyncio.create_subprocess_exec(
777 *cmd,
778 stdout=asyncio.subprocess.PIPE,
779 stderr=asyncio.subprocess.PIPE,
780 )
781
782 stdout, stderr = await process.communicate()
783
784 return_code = process.returncode
785
786 if return_code > 0:
787 #
788 if 'already exists' not in stderr:
789 raise Exception(stderr)
790
791 def get_config(
792 self,
793 cluster_uuid: str,
794 ) -> dict:
795 """Get the cluster configuration
796
797 Gets the configuration of the cluster
798
799 :param cluster_uuid str: The UUID of the cluster.
800 :return: A dict upon success, or raises an exception.
801 """
802 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
803 if os.path.exists(cluster_config):
804 with open(cluster_config, 'r') as f:
805 config = yaml.safe_load(f.read())
806 return config
807 else:
808 raise Exception(
809 "Unable to locate configuration for cluster {}".format(
810 cluster_uuid
811 )
812 )
813
814 async def get_model(
815 self,
816 model_name: str,
817 cluster_uuid: str,
818 ) -> juju.model.Model:
819 """Get a model from the Juju Controller.
820
821 Note: Model objects returned must call disconnected() before it goes
822 out of scope.
823
824 :param model_name str: The name of the model to get
825 :return The juju.model.Model object if found, or None.
826 """
827 if not self.authenticated:
828 await self.login(cluster_uuid)
829
830 model = None
831 models = await self.controller.list_models()
832 self.log.debug(models)
833 if model_name in models:
834 self.log.debug("Found model: {}".format(model_name))
835 model = await self.controller.get_model(
836 model_name
837 )
838 return model
839
840 def get_namespace(
841 self,
842 cluster_uuid: str,
843 ) -> str:
844 """Get the namespace UUID
845 Gets the namespace's unique name
846
847 :param cluster_uuid str: The UUID of the cluster
848 :returns: The namespace UUID, or raises an exception
849 """
850 config = self.get_config(cluster_uuid)
851
852 # Make sure the name is in the config
853 if 'namespace' not in config:
854 raise Exception("Namespace not found.")
855
856 # TODO: We want to make sure this is unique to the cluster, in case
857 # the cluster is being reused.
858 # Consider pre/appending the cluster id to the namespace string
859 return config['namespace']
860
861 async def has_model(
862 self,
863 model_name: str
864 ) -> bool:
865 """Check if a model exists in the controller
866
867 Checks to see if a model exists in the connected Juju controller.
868
869 :param model_name str: The name of the model
870 :return: A boolean indicating if the model exists
871 """
872 models = await self.controller.list_models()
873
874 if model_name in models:
875 return True
876 return False
877
878 def is_local_k8s(
879 self,
880 credentials: str,
881 ) -> bool:
882 """Check if a cluster is local
883
884 Checks if a cluster is running in the local host
885
886 :param credentials dict: A dictionary containing the k8s credentials
887 :returns: A boolean if the cluster is running locally
888 """
889 creds = yaml.safe_load(credentials)
890 if os.getenv("OSMLCM_VCA_APIPROXY"):
891 host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
892
893 if creds and host_ip:
894 for cluster in creds['clusters']:
895 if 'server' in cluster['cluster']:
896 if host_ip in cluster['cluster']['server']:
897 return True
898
899 return False
900
901 async def login(self, cluster_uuid):
902 """Login to the Juju controller."""
903
904 if self.authenticated:
905 return
906
907 self.connecting = True
908
909 # Test: Make sure we have the credentials loaded
910 config = self.get_config(cluster_uuid)
911
912 self.juju_endpoint = config['endpoint']
913 self.juju_user = config['username']
914 self.juju_secret = config['secret']
915 self.juju_ca_cert = config['cacert']
916 self.juju_public_key = None
917
918 self.controller = Controller()
919
920 if self.juju_secret:
921 self.log.debug(
922 "Connecting to controller... ws://{} as {}/{}".format(
923 self.juju_endpoint,
924 self.juju_user,
925 self.juju_secret,
926 )
927 )
928 try:
929 await self.controller.connect(
930 endpoint=self.juju_endpoint,
931 username=self.juju_user,
932 password=self.juju_secret,
933 cacert=self.juju_ca_cert,
934 )
935 self.authenticated = True
936 self.log.debug("JujuApi: Logged into controller")
937 except Exception as ex:
938 print(ex)
939 self.log.debug("Caught exception: {}".format(ex))
940 pass
941 else:
942 self.log.fatal("VCA credentials not configured.")
943 self.authenticated = False
944
945 async def logout(self):
946 """Logout of the Juju controller."""
947 print("[logout]")
948 if not self.authenticated:
949 return False
950
951 for model in self.models:
952 print("Logging out of model {}".format(model))
953 await self.models[model].disconnect()
954
955 if self.controller:
956 self.log.debug("Disconnecting controller {}".format(
957 self.controller
958 ))
959 await self.controller.disconnect()
960 self.controller = None
961
962 self.authenticated = False
963
964 async def remove_cloud(
965 self,
966 cloud_name: str,
967 ) -> bool:
968 """Remove a k8s cloud from Juju
969
970 Removes a Kubernetes cloud from Juju.
971
972 :param cloud_name str: The name of the cloud to add.
973
974 :returns: True if successful, otherwise raises an exception.
975 """
976
977 # Remove the bootstrapped controller
978 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
979 process = await asyncio.create_subprocess_exec(
980 *cmd,
981 stdout=asyncio.subprocess.PIPE,
982 stderr=asyncio.subprocess.PIPE,
983 )
984
985 stdout, stderr = await process.communicate()
986
987 return_code = process.returncode
988
989 if return_code > 0:
990 raise Exception(stderr)
991
992 # Remove the cloud from the local config
993 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
994 process = await asyncio.create_subprocess_exec(
995 *cmd,
996 stdout=asyncio.subprocess.PIPE,
997 stderr=asyncio.subprocess.PIPE,
998 )
999
1000 stdout, stderr = await process.communicate()
1001
1002 return_code = process.returncode
1003
1004 if return_code > 0:
1005 raise Exception(stderr)
1006
1007 return True
1008
1009 async def set_config(
1010 self,
1011 cluster_uuid: str,
1012 config: dict,
1013 ) -> bool:
1014 """Save the cluster configuration
1015
1016 Saves the cluster information to the file store
1017
1018 :param cluster_uuid str: The UUID of the cluster
1019 :param config dict: A dictionary containing the cluster configuration
1020 :returns: Boolean upon success or raises an exception.
1021 """
1022
1023 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1024 if not os.path.exists(cluster_config):
1025 print("Writing config to {}".format(cluster_config))
1026 with open(cluster_config, 'w') as f:
1027 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1028
1029 return True