K8s action support
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import concurrent
17 from .exceptions import NotImplemented
18
19 import io
20 import juju
21 # from juju.bundle import BundleHandler
22 from juju.controller import Controller
23 from juju.model import Model
24 from juju.errors import JujuAPIError, JujuError
25 from n2vc.exceptions import K8sException
26
27 from n2vc.k8s_conn import K8sConnector
28
29 import os
30 # import re
31 # import ssl
32 # from .vnf import N2VC
33
34 import uuid
35 import yaml
36
37
38 class K8sJujuConnector(K8sConnector):
39
40 def __init__(
41 self,
42 fs: object,
43 db: object,
44 kubectl_command: str = '/usr/bin/kubectl',
45 juju_command: str = '/usr/bin/juju',
46 log: object = None,
47 on_update_db=None,
48 ):
49 """
50
51 :param kubectl_command: path to kubectl executable
52 :param helm_command: path to helm executable
53 :param fs: file system for kubernetes and helm configuration
54 :param log: logger
55 """
56
57 # parent class
58 K8sConnector.__init__(
59 self,
60 db,
61 log=log,
62 on_update_db=on_update_db,
63 )
64
65 self.fs = fs
66 self.log.debug('Initializing K8S Juju connector')
67
68 self.authenticated = False
69 self.models = {}
70
71 self.juju_command = juju_command
72 self.juju_secret = ""
73
74 self.log.debug('K8S Juju connector initialized')
75
76 """Initialization"""
77 async def init_env(
78 self,
79 k8s_creds: str,
80 namespace: str = 'kube-system',
81 reuse_cluster_uuid: str = None,
82 ) -> (str, bool):
83 """
84 It prepares a given K8s cluster environment to run Juju bundles.
85
86 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid '.kube/config'
87 :param namespace: optional namespace to be used for juju. By default, 'kube-system' will be used
88 :param reuse_cluster_uuid: existing cluster uuid for reuse
89 :return: uuid of the K8s cluster and True if connector has installed some software in the cluster
90 (on error, an exception will be raised)
91 """
92
93 """Bootstrapping
94
95 Bootstrapping cannot be done, by design, through the API. We need to
96 use the CLI tools.
97 """
98
99 """
100 WIP: Workflow
101
102 1. Has the environment already been bootstrapped?
103 - Check the database to see if we have a record for this env
104
105 2. If this is a new env, create it
106 - Add the k8s cloud to Juju
107 - Bootstrap
108 - Record it in the database
109
110 3. Connect to the Juju controller for this cloud
111
112 """
113 # cluster_uuid = reuse_cluster_uuid
114 # if not cluster_uuid:
115 # cluster_uuid = str(uuid4())
116
117 ##################################################
118 # TODO: Pull info from db based on the namespace #
119 ##################################################
120
121 ###################################################
122 # TODO: Make it idempotent, calling add-k8s and #
123 # bootstrap whenever reuse_cluster_uuid is passed #
124 # as parameter #
125 # `init_env` is called to initialize the K8s #
126 # cluster for juju. If this initialization fails, #
127 # it can be called again by LCM with the param #
128 # reuse_cluster_uuid, e.g. to try to fix it. #
129 ###################################################
130
131 if not reuse_cluster_uuid:
132 # This is a new cluster, so bootstrap it
133
134 cluster_uuid = str(uuid.uuid4())
135
136 # Is a local k8s cluster?
137 localk8s = self.is_local_k8s(k8s_creds)
138
139 # If the k8s is external, the juju controller needs a loadbalancer
140 loadbalancer = False if localk8s else True
141
142 # Name the new k8s cloud
143 k8s_cloud = "k8s-{}".format(cluster_uuid)
144
145 self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
146 await self.add_k8s(k8s_cloud, k8s_creds)
147
148 # Bootstrap Juju controller
149 self.log.debug("Bootstrapping...")
150 await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
151 self.log.debug("Bootstrap done.")
152
153 # Get the controller information
154
155 # Parse ~/.local/share/juju/controllers.yaml
156 # controllers.testing.api-endpoints|ca-cert|uuid
157 self.log.debug("Getting controller endpoints")
158 with open(os.path.expanduser(
159 "~/.local/share/juju/controllers.yaml"
160 )) as f:
161 controllers = yaml.load(f, Loader=yaml.Loader)
162 controller = controllers['controllers'][cluster_uuid]
163 endpoints = controller['api-endpoints']
164 self.juju_endpoint = endpoints[0]
165 self.juju_ca_cert = controller['ca-cert']
166
167 # Parse ~/.local/share/juju/accounts
168 # controllers.testing.user|password
169 self.log.debug("Getting accounts")
170 with open(os.path.expanduser(
171 "~/.local/share/juju/accounts.yaml"
172 )) as f:
173 controllers = yaml.load(f, Loader=yaml.Loader)
174 controller = controllers['controllers'][cluster_uuid]
175
176 self.juju_user = controller['user']
177 self.juju_secret = controller['password']
178
179 # raise Exception("EOL")
180
181 self.juju_public_key = None
182
183 config = {
184 'endpoint': self.juju_endpoint,
185 'username': self.juju_user,
186 'secret': self.juju_secret,
187 'cacert': self.juju_ca_cert,
188 'namespace': namespace,
189 'loadbalancer': loadbalancer,
190 }
191
192 # Store the cluster configuration so it
193 # can be used for subsequent calls
194 self.log.debug("Setting config")
195 await self.set_config(cluster_uuid, config)
196
197 else:
198 # This is an existing cluster, so get its config
199 cluster_uuid = reuse_cluster_uuid
200
201 config = self.get_config(cluster_uuid)
202
203 self.juju_endpoint = config['endpoint']
204 self.juju_user = config['username']
205 self.juju_secret = config['secret']
206 self.juju_ca_cert = config['cacert']
207 self.juju_public_key = None
208
209 # Login to the k8s cluster
210 if not self.authenticated:
211 await self.login(cluster_uuid)
212
213 # We're creating a new cluster
214 #print("Getting model {}".format(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid))
215 #model = await self.get_model(
216 # self.get_namespace(cluster_uuid),
217 # cluster_uuid=cluster_uuid
218 #)
219
220 ## Disconnect from the model
221 #if model and model.is_connected():
222 # await model.disconnect()
223
224 return cluster_uuid, True
225
226 """Repo Management"""
227 async def repo_add(
228 self,
229 name: str,
230 url: str,
231 type: str = "charm",
232 ):
233 raise NotImplemented()
234
235 async def repo_list(self):
236 raise NotImplemented()
237
238 async def repo_remove(
239 self,
240 name: str,
241 ):
242 raise NotImplemented()
243
244 async def synchronize_repos(
245 self,
246 cluster_uuid: str,
247 name: str
248 ):
249 """
250 Returns None as currently add_repo is not implemented
251 """
252 return None
253
254 """Reset"""
255 async def reset(
256 self,
257 cluster_uuid: str,
258 force: bool = False,
259 uninstall_sw: bool = False
260 ) -> bool:
261 """Reset a cluster
262
263 Resets the Kubernetes cluster by removing the model that represents it.
264
265 :param cluster_uuid str: The UUID of the cluster to reset
266 :return: Returns True if successful or raises an exception.
267 """
268
269 try:
270 if not self.authenticated:
271 await self.login(cluster_uuid)
272
273 if self.controller.is_connected():
274 # Destroy the model
275 namespace = self.get_namespace(cluster_uuid)
276 if await self.has_model(namespace):
277 self.log.debug("[reset] Destroying model")
278 await self.controller.destroy_model(
279 namespace,
280 destroy_storage=True
281 )
282
283 # Disconnect from the controller
284 self.log.debug("[reset] Disconnecting controller")
285 await self.logout()
286
287 # Destroy the controller (via CLI)
288 self.log.debug("[reset] Destroying controller")
289 await self.destroy_controller(cluster_uuid)
290
291 self.log.debug("[reset] Removing k8s cloud")
292 k8s_cloud = "k8s-{}".format(cluster_uuid)
293 await self.remove_cloud(k8s_cloud)
294
295 except Exception as ex:
296 self.log.debug("Caught exception during reset: {}".format(ex))
297
298 return True
299
300 """Deployment"""
301
302 async def install(
303 self,
304 cluster_uuid: str,
305 kdu_model: str,
306 atomic: bool = True,
307 timeout: float = 300,
308 params: dict = None,
309 db_dict: dict = None,
310 kdu_name: str = None,
311 namespace: str = None
312 ) -> bool:
313 """Install a bundle
314
315 :param cluster_uuid str: The UUID of the cluster to install to
316 :param kdu_model str: The name or path of a bundle to install
317 :param atomic bool: If set, waits until the model is active and resets
318 the cluster on failure.
319 :param timeout int: The time, in seconds, to wait for the install
320 to finish
321 :param params dict: Key-value pairs of instantiation parameters
322 :param kdu_name: Name of the KDU instance to be installed
323 :param namespace: K8s namespace to use for the KDU instance
324
325 :return: If successful, returns ?
326 """
327
328 if not self.authenticated:
329 self.log.debug("[install] Logging in to the controller")
330 await self.login(cluster_uuid)
331
332 ##
333 # Get or create the model, based on the NS
334 # uuid.
335 if kdu_name:
336 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
337 else:
338 kdu_instance = db_dict["filter"]["_id"]
339
340 self.log.debug("Checking for model named {}".format(kdu_instance))
341
342 # Create the new model
343 self.log.debug("Adding model: {}".format(kdu_instance))
344 model = await self.add_model(kdu_instance, cluster_uuid=cluster_uuid)
345
346 if model:
347 # TODO: Instantiation parameters
348
349 """
350 "Juju bundle that models the KDU, in any of the following ways:
351 - <juju-repo>/<juju-bundle>
352 - <juju-bundle folder under k8s_models folder in the package>
353 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder in the package>
354 - <URL_where_to_fetch_juju_bundle>
355 """
356
357 previous_workdir = os.getcwd()
358
359 bundle = kdu_model
360 if kdu_model.startswith("cs:"):
361 bundle = kdu_model
362 elif kdu_model.startswith("http"):
363 # Download the file
364 pass
365 else:
366 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
367
368 os.chdir(new_workdir)
369
370 bundle = "local:{}".format(kdu_model)
371
372 if not bundle:
373 # Raise named exception that the bundle could not be found
374 raise Exception()
375
376 self.log.debug("[install] deploying {}".format(bundle))
377 await model.deploy(bundle)
378
379 # Get the application
380 if atomic:
381 # applications = model.applications
382 self.log.debug("[install] Applications: {}".format(model.applications))
383 for name in model.applications:
384 self.log.debug("[install] Waiting for {} to settle".format(name))
385 application = model.applications[name]
386 try:
387 # It's not enough to wait for all units to be active;
388 # the application status needs to be active as well.
389 self.log.debug("Waiting for all units to be active...")
390 await model.block_until(
391 lambda: all(
392 unit.agent_status == 'idle'
393 and application.status in ['active', 'unknown']
394 and unit.workload_status in [
395 'active', 'unknown'
396 ] for unit in application.units
397 ),
398 timeout=timeout
399 )
400 self.log.debug("All units active.")
401
402 except concurrent.futures._base.TimeoutError: # TODO use asyncio.TimeoutError
403 os.chdir(previous_workdir)
404 self.log.debug("[install] Timeout exceeded; resetting cluster")
405 await self.reset(cluster_uuid)
406 return False
407
408 # Wait for the application to be active
409 if model.is_connected():
410 self.log.debug("[install] Disconnecting model")
411 await model.disconnect()
412
413 os.chdir(previous_workdir)
414
415 return kdu_instance
416 raise Exception("Unable to install")
417
418 async def instances_list(
419 self,
420 cluster_uuid: str
421 ) -> list:
422 """
423 returns a list of deployed releases in a cluster
424
425 :param cluster_uuid: the cluster
426 :return:
427 """
428 return []
429
430 async def upgrade(
431 self,
432 cluster_uuid: str,
433 kdu_instance: str,
434 kdu_model: str = None,
435 params: dict = None,
436 ) -> str:
437 """Upgrade a model
438
439 :param cluster_uuid str: The UUID of the cluster to upgrade
440 :param kdu_instance str: The unique name of the KDU instance
441 :param kdu_model str: The name or path of the bundle to upgrade to
442 :param params dict: Key-value pairs of instantiation parameters
443
444 :return: If successful, reference to the new revision number of the
445 KDU instance.
446 """
447
448 # TODO: Loop through the bundle and upgrade each charm individually
449
450 """
451 The API doesn't have a concept of bundle upgrades, because there are
452 many possible changes: charm revision, disk, number of units, etc.
453
454 As such, we are only supporting a limited subset of upgrades. We'll
455 upgrade the charm revision but leave storage and scale untouched.
456
457 Scale changes should happen through OSM constructs, and changes to
458 storage would require a redeployment of the service, at least in this
459 initial release.
460 """
461 namespace = self.get_namespace(cluster_uuid)
462 model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
463
464 with open(kdu_model, 'r') as f:
465 bundle = yaml.safe_load(f)
466
467 """
468 {
469 'description': 'Test bundle',
470 'bundle': 'kubernetes',
471 'applications': {
472 'mariadb-k8s': {
473 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
474 'scale': 1,
475 'options': {
476 'password': 'manopw',
477 'root_password': 'osm4u',
478 'user': 'mano'
479 },
480 'series': 'kubernetes'
481 }
482 }
483 }
484 """
485 # TODO: This should be returned in an agreed-upon format
486 for name in bundle['applications']:
487 self.log.debug(model.applications)
488 application = model.applications[name]
489 self.log.debug(application)
490
491 path = bundle['applications'][name]['charm']
492
493 try:
494 await application.upgrade_charm(switch=path)
495 except juju.errors.JujuError as ex:
496 if 'already running charm' in str(ex):
497 # We're already running this version
498 pass
499
500 await model.disconnect()
501
502 return True
503 raise NotImplemented()
504
505 """Rollback"""
506 async def rollback(
507 self,
508 cluster_uuid: str,
509 kdu_instance: str,
510 revision: int = 0,
511 ) -> str:
512 """Rollback a model
513
514 :param cluster_uuid str: The UUID of the cluster to rollback
515 :param kdu_instance str: The unique name of the KDU instance
516 :param revision int: The revision to revert to. If omitted, rolls back
517 the previous upgrade.
518
519 :return: If successful, returns the revision of active KDU instance,
520 or raises an exception
521 """
522 raise NotImplemented()
523
524 """Deletion"""
525 async def uninstall(
526 self,
527 cluster_uuid: str,
528 kdu_instance: str
529 ) -> bool:
530 """Uninstall a KDU instance
531
532 :param cluster_uuid str: The UUID of the cluster
533 :param kdu_instance str: The unique name of the KDU instance
534
535 :return: Returns True if successful, or raises an exception
536 """
537 if not self.authenticated:
538 self.log.debug("[uninstall] Connecting to controller")
539 await self.login(cluster_uuid)
540
541 self.log.debug("[uninstall] Destroying model")
542
543 await self.controller.destroy_models(kdu_instance)
544
545 self.log.debug("[uninstall] Model destroyed and disconnecting")
546 await self.logout()
547
548 return True
549
550 async def exec_primitive(
551 self,
552 cluster_uuid: str = None,
553 kdu_instance: str = None,
554 primitive_name: str = None,
555 timeout: float = 300,
556 params: dict = None,
557 db_dict: dict = None,
558 ) -> str:
559 """Exec primitive (Juju action)
560
561 :param cluster_uuid str: The UUID of the cluster
562 :param kdu_instance str: The unique name of the KDU instance
563 :param primitive_name: Name of action that will be executed
564 :param timeout: Timeout for action execution
565 :param params: Dictionary of all the parameters needed for the action
566 :db_dict: Dictionary for any additional data
567
568 :return: Returns the output of the action
569 """
570 if not self.authenticated:
571 self.log.debug("[exec_primitive] Connecting to controller")
572 await self.login(cluster_uuid)
573
574 if not params or "application-name" not in params:
575 raise K8sException("Missing application-name argument, \
576 argument needed for K8s actions")
577 try:
578 self.log.debug("[exec_primitive] Getting model "
579 "kdu_instance: {}".format(kdu_instance))
580
581 model = await self.get_model(kdu_instance, cluster_uuid)
582
583 application_name = params["application-name"]
584 application = model.applications[application_name]
585
586 actions = await application.get_actions()
587 if primitive_name not in actions:
588 raise K8sException("Primitive {} not found".format(primitive_name))
589
590 unit = None
591 for u in application.units:
592 if await u.is_leader_from_status():
593 unit = u
594 break
595
596 if unit is None:
597 raise K8sException("No leader unit found to execute action")
598
599 self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
600 action = await unit.run_action(primitive_name, **params)
601
602 output = await model.get_action_output(action_uuid=action.entity_id)
603 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
604
605 status = (
606 status[action.entity_id] if action.entity_id in status else "failed"
607 )
608
609 if status != "completed":
610 raise K8sException("status is not completed: {} output: {}".format(status, output))
611
612 return output
613
614 except Exception as e:
615 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
616 self.log.error(error_msg)
617 raise K8sException(message=error_msg)
618
619 """Introspection"""
620 async def inspect_kdu(
621 self,
622 kdu_model: str,
623 ) -> dict:
624 """Inspect a KDU
625
626 Inspects a bundle and returns a dictionary of config parameters and
627 their default values.
628
629 :param kdu_model str: The name or path of the bundle to inspect.
630
631 :return: If successful, returns a dictionary of available parameters
632 and their default values.
633 """
634
635 kdu = {}
636 with open(kdu_model, 'r') as f:
637 bundle = yaml.safe_load(f)
638
639 """
640 {
641 'description': 'Test bundle',
642 'bundle': 'kubernetes',
643 'applications': {
644 'mariadb-k8s': {
645 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
646 'scale': 1,
647 'options': {
648 'password': 'manopw',
649 'root_password': 'osm4u',
650 'user': 'mano'
651 },
652 'series': 'kubernetes'
653 }
654 }
655 }
656 """
657 # TODO: This should be returned in an agreed-upon format
658 kdu = bundle['applications']
659
660 return kdu
661
662 async def help_kdu(
663 self,
664 kdu_model: str,
665 ) -> str:
666 """View the README
667
668 If available, returns the README of the bundle.
669
670 :param kdu_model str: The name or path of a bundle
671
672 :return: If found, returns the contents of the README.
673 """
674 readme = None
675
676 files = ['README', 'README.txt', 'README.md']
677 path = os.path.dirname(kdu_model)
678 for file in os.listdir(path):
679 if file in files:
680 with open(file, 'r') as f:
681 readme = f.read()
682 break
683
684 return readme
685
686 async def status_kdu(
687 self,
688 cluster_uuid: str,
689 kdu_instance: str,
690 ) -> dict:
691 """Get the status of the KDU
692
693 Get the current status of the KDU instance.
694
695 :param cluster_uuid str: The UUID of the cluster
696 :param kdu_instance str: The unique id of the KDU instance
697
698 :return: Returns a dictionary containing namespace, state, resources,
699 and deployment_time.
700 """
701 status = {}
702
703 model = await self.get_model(self.get_namespace(cluster_uuid), cluster_uuid=cluster_uuid)
704
705 # model = await self.get_model_by_uuid(cluster_uuid)
706 if model:
707 model_status = await model.get_status()
708 status = model_status.applications
709
710 for name in model_status.applications:
711 application = model_status.applications[name]
712 status[name] = {
713 'status': application['status']['status']
714 }
715
716 if model.is_connected():
717 await model.disconnect()
718
719 return status
720
721 # Private methods
722 async def add_k8s(
723 self,
724 cloud_name: str,
725 credentials: str,
726 ) -> bool:
727 """Add a k8s cloud to Juju
728
729 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
730 Juju Controller.
731
732 :param cloud_name str: The name of the cloud to add.
733 :param credentials dict: A dictionary representing the output of
734 `kubectl config view --raw`.
735
736 :returns: True if successful, otherwise raises an exception.
737 """
738
739 cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
740 self.log.debug(cmd)
741
742 process = await asyncio.create_subprocess_exec(
743 *cmd,
744 stdout=asyncio.subprocess.PIPE,
745 stderr=asyncio.subprocess.PIPE,
746 stdin=asyncio.subprocess.PIPE,
747 )
748
749 # Feed the process the credentials
750 process.stdin.write(credentials.encode("utf-8"))
751 await process.stdin.drain()
752 process.stdin.close()
753
754 stdout, stderr = await process.communicate()
755
756 return_code = process.returncode
757
758 self.log.debug("add-k8s return code: {}".format(return_code))
759
760 if return_code > 0:
761 raise Exception(stderr)
762
763 return True
764
765 async def add_model(
766 self,
767 model_name: str,
768 cluster_uuid: str,
769 ) -> juju.model.Model:
770 """Adds a model to the controller
771
772 Adds a new model to the Juju controller
773
774 :param model_name str: The name of the model to add.
775 :returns: The juju.model.Model object of the new model upon success or
776 raises an exception.
777 """
778 if not self.authenticated:
779 await self.login(cluster_uuid)
780
781 self.log.debug("Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid))
782 try:
783 model = await self.controller.add_model(
784 model_name,
785 config={'authorized-keys': self.juju_public_key}
786 )
787 except Exception as ex:
788 self.log.debug(ex)
789 self.log.debug("Caught exception: {}".format(ex))
790 pass
791
792 return model
793
794 async def bootstrap(
795 self,
796 cloud_name: str,
797 cluster_uuid: str,
798 loadbalancer: bool
799 ) -> bool:
800 """Bootstrap a Kubernetes controller
801
802 Bootstrap a Juju controller inside the Kubernetes cluster
803
804 :param cloud_name str: The name of the cloud.
805 :param cluster_uuid str: The UUID of the cluster to bootstrap.
806 :param loadbalancer bool: If the controller should use loadbalancer or not.
807 :returns: True upon success or raises an exception.
808 """
809
810 if not loadbalancer:
811 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
812 else:
813 """
814 For public clusters, specify that the controller service is using a LoadBalancer.
815 """
816 cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid, "--config", "controller-service-type=loadbalancer"]
817
818 self.log.debug("Bootstrapping controller {} in cloud {}".format(
819 cluster_uuid, cloud_name
820 ))
821
822 process = await asyncio.create_subprocess_exec(
823 *cmd,
824 stdout=asyncio.subprocess.PIPE,
825 stderr=asyncio.subprocess.PIPE,
826 )
827
828 stdout, stderr = await process.communicate()
829
830 return_code = process.returncode
831
832 if return_code > 0:
833 #
834 if b'already exists' not in stderr:
835 raise Exception(stderr)
836
837 return True
838
839 async def destroy_controller(
840 self,
841 cluster_uuid: str
842 ) -> bool:
843 """Destroy a Kubernetes controller
844
845 Destroy an existing Kubernetes controller.
846
847 :param cluster_uuid str: The UUID of the cluster to bootstrap.
848 :returns: True upon success or raises an exception.
849 """
850 cmd = [
851 self.juju_command,
852 "destroy-controller",
853 "--destroy-all-models",
854 "--destroy-storage",
855 "-y",
856 cluster_uuid
857 ]
858
859 process = await asyncio.create_subprocess_exec(
860 *cmd,
861 stdout=asyncio.subprocess.PIPE,
862 stderr=asyncio.subprocess.PIPE,
863 )
864
865 stdout, stderr = await process.communicate()
866
867 return_code = process.returncode
868
869 if return_code > 0:
870 #
871 if 'already exists' not in stderr:
872 raise Exception(stderr)
873
874 def get_config(
875 self,
876 cluster_uuid: str,
877 ) -> dict:
878 """Get the cluster configuration
879
880 Gets the configuration of the cluster
881
882 :param cluster_uuid str: The UUID of the cluster.
883 :return: A dict upon success, or raises an exception.
884 """
885 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
886 if os.path.exists(cluster_config):
887 with open(cluster_config, 'r') as f:
888 config = yaml.safe_load(f.read())
889 return config
890 else:
891 raise Exception(
892 "Unable to locate configuration for cluster {}".format(
893 cluster_uuid
894 )
895 )
896
897 async def get_model(
898 self,
899 model_name: str,
900 cluster_uuid: str,
901 ) -> juju.model.Model:
902 """Get a model from the Juju Controller.
903
904 Note: Model objects returned must call disconnected() before it goes
905 out of scope.
906
907 :param model_name str: The name of the model to get
908 :return The juju.model.Model object if found, or None.
909 """
910 if not self.authenticated:
911 await self.login(cluster_uuid)
912
913 model = None
914 models = await self.controller.list_models()
915 if model_name in models:
916 self.log.debug("Found model: {}".format(model_name))
917 model = await self.controller.get_model(
918 model_name
919 )
920 return model
921
922 def get_namespace(
923 self,
924 cluster_uuid: str,
925 ) -> str:
926 """Get the namespace UUID
927 Gets the namespace's unique name
928
929 :param cluster_uuid str: The UUID of the cluster
930 :returns: The namespace UUID, or raises an exception
931 """
932 config = self.get_config(cluster_uuid)
933
934 # Make sure the name is in the config
935 if 'namespace' not in config:
936 raise Exception("Namespace not found.")
937
938 # TODO: We want to make sure this is unique to the cluster, in case
939 # the cluster is being reused.
940 # Consider pre/appending the cluster id to the namespace string
941 return config['namespace']
942
943 async def has_model(
944 self,
945 model_name: str
946 ) -> bool:
947 """Check if a model exists in the controller
948
949 Checks to see if a model exists in the connected Juju controller.
950
951 :param model_name str: The name of the model
952 :return: A boolean indicating if the model exists
953 """
954 models = await self.controller.list_models()
955
956 if model_name in models:
957 return True
958 return False
959
960 def is_local_k8s(
961 self,
962 credentials: str,
963 ) -> bool:
964 """Check if a cluster is local
965
966 Checks if a cluster is running in the local host
967
968 :param credentials dict: A dictionary containing the k8s credentials
969 :returns: A boolean if the cluster is running locally
970 """
971 creds = yaml.safe_load(credentials)
972 if os.getenv("OSMLCM_VCA_APIPROXY"):
973 host_ip = os.getenv("OSMLCM_VCA_APIPROXY")
974
975 if creds and host_ip:
976 for cluster in creds['clusters']:
977 if 'server' in cluster['cluster']:
978 if host_ip in cluster['cluster']['server']:
979 return True
980
981 return False
982
983 async def login(self, cluster_uuid):
984 """Login to the Juju controller."""
985
986 if self.authenticated:
987 return
988
989 self.connecting = True
990
991 # Test: Make sure we have the credentials loaded
992 config = self.get_config(cluster_uuid)
993
994 self.juju_endpoint = config['endpoint']
995 self.juju_user = config['username']
996 self.juju_secret = config['secret']
997 self.juju_ca_cert = config['cacert']
998 self.juju_public_key = None
999
1000 self.controller = Controller()
1001
1002 if self.juju_secret:
1003 self.log.debug(
1004 "Connecting to controller... ws://{} as {}/{}".format(
1005 self.juju_endpoint,
1006 self.juju_user,
1007 self.juju_secret,
1008 )
1009 )
1010 try:
1011 await self.controller.connect(
1012 endpoint=self.juju_endpoint,
1013 username=self.juju_user,
1014 password=self.juju_secret,
1015 cacert=self.juju_ca_cert,
1016 )
1017 self.authenticated = True
1018 self.log.debug("JujuApi: Logged into controller")
1019 except Exception as ex:
1020 self.log.debug(ex)
1021 self.log.debug("Caught exception: {}".format(ex))
1022 pass
1023 else:
1024 self.log.fatal("VCA credentials not configured.")
1025 self.authenticated = False
1026
1027 async def logout(self):
1028 """Logout of the Juju controller."""
1029 self.log.debug("[logout]")
1030 if not self.authenticated:
1031 return False
1032
1033 for model in self.models:
1034 self.log.debug("Logging out of model {}".format(model))
1035 await self.models[model].disconnect()
1036
1037 if self.controller:
1038 self.log.debug("Disconnecting controller {}".format(
1039 self.controller
1040 ))
1041 await self.controller.disconnect()
1042 self.controller = None
1043
1044 self.authenticated = False
1045
1046 async def remove_cloud(
1047 self,
1048 cloud_name: str,
1049 ) -> bool:
1050 """Remove a k8s cloud from Juju
1051
1052 Removes a Kubernetes cloud from Juju.
1053
1054 :param cloud_name str: The name of the cloud to add.
1055
1056 :returns: True if successful, otherwise raises an exception.
1057 """
1058
1059 # Remove the bootstrapped controller
1060 cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1061 process = await asyncio.create_subprocess_exec(
1062 *cmd,
1063 stdout=asyncio.subprocess.PIPE,
1064 stderr=asyncio.subprocess.PIPE,
1065 )
1066
1067 stdout, stderr = await process.communicate()
1068
1069 return_code = process.returncode
1070
1071 if return_code > 0:
1072 raise Exception(stderr)
1073
1074 # Remove the cloud from the local config
1075 cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1076 process = await asyncio.create_subprocess_exec(
1077 *cmd,
1078 stdout=asyncio.subprocess.PIPE,
1079 stderr=asyncio.subprocess.PIPE,
1080 )
1081
1082 stdout, stderr = await process.communicate()
1083
1084 return_code = process.returncode
1085
1086 if return_code > 0:
1087 raise Exception(stderr)
1088
1089 return True
1090
1091 async def set_config(
1092 self,
1093 cluster_uuid: str,
1094 config: dict,
1095 ) -> bool:
1096 """Save the cluster configuration
1097
1098 Saves the cluster information to the file store
1099
1100 :param cluster_uuid str: The UUID of the cluster
1101 :param config dict: A dictionary containing the cluster configuration
1102 :returns: Boolean upon success or raises an exception.
1103 """
1104
1105 cluster_config = "{}/{}.yaml".format(self.fs.path, cluster_uuid)
1106 if not os.path.exists(cluster_config):
1107 self.log.debug("Writing config to {}".format(cluster_config))
1108 with open(cluster_config, 'w') as f:
1109 f.write(yaml.dump(config, Dumper=yaml.Dumper))
1110
1111 return True