1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 from juju
.controller
import Controller
22 from juju
.model
import Model
23 from n2vc
.exceptions
import K8sException
24 from n2vc
.k8s_conn
import K8sConnector
25 from n2vc
.kubectl
import Kubectl
26 from .exceptions
import MethodNotImplemented
, N2VCNotFound
29 # from juju.bundle import BundleHandler
32 # from .vnf import N2VC
33 class K8sJujuConnector(K8sConnector
):
38 kubectl_command
: str = "/usr/bin/kubectl",
39 juju_command
: str = "/usr/bin/juju",
43 vca_config
: dict = None,
47 :param kubectl_command: path to kubectl executable
48 :param helm_command: path to helm executable
49 :param fs: file system for kubernetes and helm configuration
54 K8sConnector
.__init
__(
55 self
, db
, log
=log
, on_update_db
=on_update_db
,
59 self
.log
.debug("Initializing K8S Juju connector")
61 self
.juju_command
= juju_command
62 self
.juju_public_key
= None
64 self
.log
.debug("K8S Juju connector initialized")
65 # TODO: Remove these commented lines:
66 # self.authenticated = False
68 # self.juju_secret = ""
75 namespace
: str = "kube-system",
76 reuse_cluster_uuid
: str = None,
79 It prepares a given K8s cluster environment to run Juju bundles.
81 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
83 :param namespace: optional namespace to be used for juju. By default,
84 'kube-system' will be used
85 :param reuse_cluster_uuid: existing cluster uuid for reuse
86 :return: uuid of the K8s cluster and True if connector has installed some
87 software in the cluster
88 (on error, an exception will be raised)
93 Bootstrapping cannot be done, by design, through the API. We need to
100 1. Has the environment already been bootstrapped?
101 - Check the database to see if we have a record for this env
103 2. If this is a new env, create it
104 - Add the k8s cloud to Juju
106 - Record it in the database
108 3. Connect to the Juju controller for this cloud
111 # cluster_uuid = reuse_cluster_uuid
112 # if not cluster_uuid:
113 # cluster_uuid = str(uuid4())
115 ##################################################
116 # TODO: Pull info from db based on the namespace #
117 ##################################################
119 ###################################################
120 # TODO: Make it idempotent, calling add-k8s and #
121 # bootstrap whenever reuse_cluster_uuid is passed #
123 # `init_env` is called to initialize the K8s #
124 # cluster for juju. If this initialization fails, #
125 # it can be called again by LCM with the param #
126 # reuse_cluster_uuid, e.g. to try to fix it. #
127 ###################################################
129 # This is a new cluster, so bootstrap it
131 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
133 # Is a local k8s cluster?
134 localk8s
= self
.is_local_k8s(k8s_creds
)
136 # If the k8s is external, the juju controller needs a loadbalancer
137 loadbalancer
= False if localk8s
else True
139 # Name the new k8s cloud
140 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
142 self
.log
.debug("Adding k8s cloud {}".format(k8s_cloud
))
143 await self
.add_k8s(k8s_cloud
, k8s_creds
)
145 # Bootstrap Juju controller
146 self
.log
.debug("Bootstrapping...")
147 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
148 self
.log
.debug("Bootstrap done.")
150 # Get the controller information
152 # Parse ~/.local/share/juju/controllers.yaml
153 # controllers.testing.api-endpoints|ca-cert|uuid
154 self
.log
.debug("Getting controller endpoints")
155 with
open(os
.path
.expanduser("~/.local/share/juju/controllers.yaml")) as f
:
156 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
157 controller
= controllers
["controllers"][cluster_uuid
]
158 endpoints
= controller
["api-endpoints"]
159 juju_endpoint
= endpoints
[0]
160 juju_ca_cert
= controller
["ca-cert"]
162 # Parse ~/.local/share/juju/accounts
163 # controllers.testing.user|password
164 self
.log
.debug("Getting accounts")
165 with
open(os
.path
.expanduser("~/.local/share/juju/accounts.yaml")) as f
:
166 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
167 controller
= controllers
["controllers"][cluster_uuid
]
169 juju_user
= controller
["user"]
170 juju_secret
= controller
["password"]
173 "endpoint": juju_endpoint
,
174 "username": juju_user
,
175 "secret": juju_secret
,
176 "cacert": juju_ca_cert
,
177 "loadbalancer": loadbalancer
,
180 # Store the cluster configuration so it
181 # can be used for subsequent calls
182 self
.log
.debug("Setting config")
183 await self
.set_config(cluster_uuid
, config
)
186 controller
= await self
.get_controller(cluster_uuid
)
187 await controller
.disconnect()
189 # TODO: Remove these commented lines
190 # raise Exception("EOL")
191 # self.juju_public_key = None
192 # Login to the k8s cluster
193 # if not self.authenticated:
194 # await self.login(cluster_uuid)
196 # We're creating a new cluster
197 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
198 # cluster_uuid=cluster_uuid))
199 # model = await self.get_model(
200 # self.get_namespace(cluster_uuid),
201 # cluster_uuid=cluster_uuid
204 # Disconnect from the model
205 # if model and model.is_connected():
206 # await model.disconnect()
208 return cluster_uuid
, True
210 """Repo Management"""
213 self
, name
: str, url
: str, _type
: str = "charm",
215 raise MethodNotImplemented()
217 async def repo_list(self
):
218 raise MethodNotImplemented()
220 async def repo_remove(
223 raise MethodNotImplemented()
225 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
227 Returns None as currently add_repo is not implemented
234 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
238 Resets the Kubernetes cluster by removing the model that represents it.
240 :param cluster_uuid str: The UUID of the cluster to reset
241 :return: Returns True if successful or raises an exception.
246 # Remove k8scluster from database
247 self
.log
.debug("[reset] Removing k8scluster from juju database")
248 juju_db
= self
.db
.get_one("admin", {"_id": "juju"})
250 for k
in juju_db
["k8sclusters"]:
251 if k
["_id"] == cluster_uuid
:
252 juju_db
["k8sclusters"].remove(k
)
255 q_filter
={"_id": "juju"},
256 update_dict
={"k8sclusters": juju_db
["k8sclusters"]},
260 # Destroy the controller (via CLI)
261 self
.log
.debug("[reset] Destroying controller")
262 await self
.destroy_controller(cluster_uuid
)
264 self
.log
.debug("[reset] Removing k8s cloud")
265 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
266 await self
.remove_cloud(k8s_cloud
)
268 except Exception as ex
:
269 self
.log
.debug("Caught exception during reset: {}".format(ex
))
271 # TODO: Remove these commented lines
272 # if not self.authenticated:
273 # await self.login(cluster_uuid)
275 # if self.controller.is_connected():
276 # # Destroy the model
277 # namespace = self.get_namespace(cluster_uuid)
278 # if await self.has_model(namespace):
279 # self.log.debug("[reset] Destroying model")
280 # await self.controller.destroy_model(namespace, destroy_storage=True)
282 # # Disconnect from the controller
283 # self.log.debug("[reset] Disconnecting controller")
284 # await self.logout()
293 timeout
: float = 300,
295 db_dict
: dict = None,
296 kdu_name
: str = None,
297 namespace
: str = None,
301 :param cluster_uuid str: The UUID of the cluster to install to
302 :param kdu_model str: The name or path of a bundle to install
303 :param atomic bool: If set, waits until the model is active and resets
304 the cluster on failure.
305 :param timeout int: The time, in seconds, to wait for the install
307 :param params dict: Key-value pairs of instantiation parameters
308 :param kdu_name: Name of the KDU instance to be installed
309 :param namespace: K8s namespace to use for the KDU instance
311 :return: If successful, returns ?
314 controller
= await self
.get_controller(cluster_uuid
)
317 # Get or create the model, based on the NS
320 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
322 kdu_instance
= db_dict
["filter"]["_id"]
324 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
326 # Create the new model
327 self
.log
.debug("Adding model: {}".format(kdu_instance
))
328 model
= await self
.add_model(
329 kdu_instance
, cluster_uuid
=cluster_uuid
, controller
=controller
333 # TODO: Instantiation parameters
336 "Juju bundle that models the KDU, in any of the following ways:
337 - <juju-repo>/<juju-bundle>
338 - <juju-bundle folder under k8s_models folder in the package>
339 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
341 - <URL_where_to_fetch_juju_bundle>
344 previous_workdir
= os
.getcwd()
345 except FileNotFoundError
:
346 previous_workdir
= "/app/storage"
349 if kdu_model
.startswith("cs:"):
351 elif kdu_model
.startswith("http"):
355 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
357 os
.chdir(new_workdir
)
359 bundle
= "local:{}".format(kdu_model
)
362 # Raise named exception that the bundle could not be found
365 self
.log
.debug("[install] deploying {}".format(bundle
))
366 await model
.deploy(bundle
)
368 # Get the application
370 # applications = model.applications
371 self
.log
.debug("[install] Applications: {}".format(model
.applications
))
372 for name
in model
.applications
:
373 self
.log
.debug("[install] Waiting for {} to settle".format(name
))
374 application
= model
.applications
[name
]
376 # It's not enough to wait for all units to be active;
377 # the application status needs to be active as well.
378 self
.log
.debug("Waiting for all units to be active...")
379 await model
.block_until(
381 unit
.agent_status
== "idle"
382 and application
.status
in ["active", "unknown"]
383 and unit
.workload_status
in ["active", "unknown"]
384 for unit
in application
.units
388 self
.log
.debug("All units active.")
390 # TODO use asyncio.TimeoutError
391 except concurrent
.futures
._base
.TimeoutError
:
392 os
.chdir(previous_workdir
)
393 self
.log
.debug("[install] Timeout exceeded; resetting cluster")
394 await self
.reset(cluster_uuid
)
397 # Wait for the application to be active
398 if model
.is_connected():
399 self
.log
.debug("[install] Disconnecting model")
400 await model
.disconnect()
401 await controller
.disconnect()
402 os
.chdir(previous_workdir
)
405 raise Exception("Unable to install")
407 async def instances_list(self
, cluster_uuid
: str) -> list:
409 returns a list of deployed releases in a cluster
411 :param cluster_uuid: the cluster
420 kdu_model
: str = None,
425 :param cluster_uuid str: The UUID of the cluster to upgrade
426 :param kdu_instance str: The unique name of the KDU instance
427 :param kdu_model str: The name or path of the bundle to upgrade to
428 :param params dict: Key-value pairs of instantiation parameters
430 :return: If successful, reference to the new revision number of the
434 # TODO: Loop through the bundle and upgrade each charm individually
437 The API doesn't have a concept of bundle upgrades, because there are
438 many possible changes: charm revision, disk, number of units, etc.
440 As such, we are only supporting a limited subset of upgrades. We'll
441 upgrade the charm revision but leave storage and scale untouched.
443 Scale changes should happen through OSM constructs, and changes to
444 storage would require a redeployment of the service, at least in this
447 raise MethodNotImplemented()
448 # TODO: Remove these commented lines
450 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
453 # namespace = self.get_namespace(cluster_uuid)
454 # controller = await self.get_controller(cluster_uuid)
457 # if namespace not in await controller.list_models():
458 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
460 # model = await controller.get_model(namespace)
461 # with open(kdu_model, "r") as f:
462 # bundle = yaml.safe_load(f)
466 # 'description': 'Test bundle',
467 # 'bundle': 'kubernetes',
470 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
473 # 'password': 'manopw',
474 # 'root_password': 'osm4u',
477 # 'series': 'kubernetes'
482 # # TODO: This should be returned in an agreed-upon format
483 # for name in bundle["applications"]:
484 # self.log.debug(model.applications)
485 # application = model.applications[name]
486 # self.log.debug(application)
488 # path = bundle["applications"][name]["charm"]
491 # await application.upgrade_charm(switch=path)
492 # except juju.errors.JujuError as ex:
493 # if "already running charm" in str(ex):
494 # # We're already running this version
498 # await model.disconnect()
499 # await controller.disconnect()
505 self
, cluster_uuid
: str, kdu_instance
: str, revision
: int = 0,
509 :param cluster_uuid str: The UUID of the cluster to rollback
510 :param kdu_instance str: The unique name of the KDU instance
511 :param revision int: The revision to revert to. If omitted, rolls back
512 the previous upgrade.
514 :return: If successful, returns the revision of active KDU instance,
515 or raises an exception
517 raise MethodNotImplemented()
521 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
522 """Uninstall a KDU instance
524 :param cluster_uuid str: The UUID of the cluster
525 :param kdu_instance str: The unique name of the KDU instance
527 :return: Returns True if successful, or raises an exception
530 controller
= await self
.get_controller(cluster_uuid
)
532 self
.log
.debug("[uninstall] Destroying model")
534 await controller
.destroy_models(kdu_instance
)
536 self
.log
.debug("[uninstall] Model destroyed and disconnecting")
537 await controller
.disconnect()
540 # TODO: Remove these commented lines
541 # if not self.authenticated:
542 # self.log.debug("[uninstall] Connecting to controller")
543 # await self.login(cluster_uuid)
545 async def exec_primitive(
547 cluster_uuid
: str = None,
548 kdu_instance
: str = None,
549 primitive_name
: str = None,
550 timeout
: float = 300,
552 db_dict
: dict = None,
554 """Exec primitive (Juju action)
556 :param cluster_uuid str: The UUID of the cluster
557 :param kdu_instance str: The unique name of the KDU instance
558 :param primitive_name: Name of action that will be executed
559 :param timeout: Timeout for action execution
560 :param params: Dictionary of all the parameters needed for the action
561 :db_dict: Dictionary for any additional data
563 :return: Returns the output of the action
566 controller
= await self
.get_controller(cluster_uuid
)
568 if not params
or "application-name" not in params
:
570 "Missing application-name argument, \
571 argument needed for K8s actions"
575 "[exec_primitive] Getting model "
576 "kdu_instance: {}".format(kdu_instance
)
579 model
= await self
.get_model(kdu_instance
, controller
=controller
)
581 application_name
= params
["application-name"]
582 application
= model
.applications
[application_name
]
584 actions
= await application
.get_actions()
585 if primitive_name
not in actions
:
586 raise K8sException("Primitive {} not found".format(primitive_name
))
589 for u
in application
.units
:
590 if await u
.is_leader_from_status():
595 raise K8sException("No leader unit found to execute action")
597 self
.log
.debug("[exec_primitive] Running action: {}".format(primitive_name
))
598 action
= await unit
.run_action(primitive_name
, **params
)
600 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
601 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
604 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
607 if status
!= "completed":
609 "status is not completed: {} output: {}".format(status
, output
)
614 except Exception as e
:
615 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
616 self
.log
.error(error_msg
)
617 raise K8sException(message
=error_msg
)
619 await controller
.disconnect()
620 # TODO: Remove these commented lines:
621 # if not self.authenticated:
622 # self.log.debug("[exec_primitive] Connecting to controller")
623 # await self.login(cluster_uuid)
627 async def inspect_kdu(self
, kdu_model
: str,) -> dict:
630 Inspects a bundle and returns a dictionary of config parameters and
631 their default values.
633 :param kdu_model str: The name or path of the bundle to inspect.
635 :return: If successful, returns a dictionary of available parameters
636 and their default values.
640 with
open(kdu_model
, "r") as f
:
641 bundle
= yaml
.safe_load(f
)
645 'description': 'Test bundle',
646 'bundle': 'kubernetes',
649 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
652 'password': 'manopw',
653 'root_password': 'osm4u',
656 'series': 'kubernetes'
661 # TODO: This should be returned in an agreed-upon format
662 kdu
= bundle
["applications"]
666 async def help_kdu(self
, kdu_model
: str,) -> str:
669 If available, returns the README of the bundle.
671 :param kdu_model str: The name or path of a bundle
673 :return: If found, returns the contents of the README.
677 files
= ["README", "README.txt", "README.md"]
678 path
= os
.path
.dirname(kdu_model
)
679 for file in os
.listdir(path
):
681 with
open(file, "r") as f
:
687 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str,) -> dict:
688 """Get the status of the KDU
690 Get the current status of the KDU instance.
692 :param cluster_uuid str: The UUID of the cluster
693 :param kdu_instance str: The unique id of the KDU instance
695 :return: Returns a dictionary containing namespace, state, resources,
699 controller
= await self
.get_controller(cluster_uuid
)
700 model
= await self
.get_model(kdu_instance
, controller
=controller
)
702 model_status
= await model
.get_status()
703 status
= model_status
.applications
705 for name
in model_status
.applications
:
706 application
= model_status
.applications
[name
]
707 status
[name
] = {"status": application
["status"]["status"]}
709 await model
.disconnect()
710 await controller
.disconnect()
714 async def get_services(
715 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
717 """Return a list of services of a kdu_instance"""
719 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
721 config_path
= "/tmp/{}".format(cluster_uuid
)
722 config_file
= "{}/config".format(config_path
)
724 if not os
.path
.exists(config_path
):
725 os
.makedirs(config_path
)
726 with
open(config_file
, "w") as f
:
729 kubectl
= Kubectl(config_file
=config_file
)
730 return kubectl
.get_services(
731 field_selector
="metadata.namespace={}".format(kdu_instance
)
734 async def get_service(
735 self
, cluster_uuid
: str, service_name
: str, namespace
: str
737 """Return data for a specific service inside a namespace"""
739 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
741 config_path
= "/tmp/{}".format(cluster_uuid
)
742 config_file
= "{}/config".format(config_path
)
744 if not os
.path
.exists(config_path
):
745 os
.makedirs(config_path
)
746 with
open(config_file
, "w") as f
:
749 kubectl
= Kubectl(config_file
=config_file
)
751 return kubectl
.get_services(
752 field_selector
="metadata.name={},metadata.namespace={}".format(
753 service_name
, namespace
758 async def add_k8s(self
, cloud_name
: str, credentials
: str,) -> bool:
759 """Add a k8s cloud to Juju
761 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
764 :param cloud_name str: The name of the cloud to add.
765 :param credentials dict: A dictionary representing the output of
766 `kubectl config view --raw`.
768 :returns: True if successful, otherwise raises an exception.
771 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
774 process
= await asyncio
.create_subprocess_exec(
776 stdout
=asyncio
.subprocess
.PIPE
,
777 stderr
=asyncio
.subprocess
.PIPE
,
778 stdin
=asyncio
.subprocess
.PIPE
,
781 # Feed the process the credentials
782 process
.stdin
.write(credentials
.encode("utf-8"))
783 await process
.stdin
.drain()
784 process
.stdin
.close()
786 _stdout
, stderr
= await process
.communicate()
788 return_code
= process
.returncode
790 self
.log
.debug("add-k8s return code: {}".format(return_code
))
793 raise Exception(stderr
)
798 self
, model_name
: str, cluster_uuid
: str, controller
: Controller
800 """Adds a model to the controller
802 Adds a new model to the Juju controller
804 :param model_name str: The name of the model to add.
805 :param cluster_uuid str: ID of the cluster.
806 :param controller: Controller object in which the model will be added
807 :returns: The juju.model.Model object of the new model upon success or
812 "Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
)
816 if self
.juju_public_key
is not None:
817 model
= await controller
.add_model(
818 model_name
, config
={"authorized-keys": self
.juju_public_key
}
821 model
= await controller
.add_model(model_name
)
822 except Exception as ex
:
824 self
.log
.debug("Caught exception: {}".format(ex
))
830 self
, cloud_name
: str, cluster_uuid
: str, loadbalancer
: bool
832 """Bootstrap a Kubernetes controller
834 Bootstrap a Juju controller inside the Kubernetes cluster
836 :param cloud_name str: The name of the cloud.
837 :param cluster_uuid str: The UUID of the cluster to bootstrap.
838 :param loadbalancer bool: If the controller should use loadbalancer or not.
839 :returns: True upon success or raises an exception.
843 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
846 For public clusters, specify that the controller service is using a
855 "controller-service-type=loadbalancer",
859 "Bootstrapping controller {} in cloud {}".format(cluster_uuid
, cloud_name
)
862 process
= await asyncio
.create_subprocess_exec(
863 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
866 _stdout
, stderr
= await process
.communicate()
868 return_code
= process
.returncode
872 if b
"already exists" not in stderr
:
873 raise Exception(stderr
)
877 async def destroy_controller(self
, cluster_uuid
: str) -> bool:
878 """Destroy a Kubernetes controller
880 Destroy an existing Kubernetes controller.
882 :param cluster_uuid str: The UUID of the cluster to bootstrap.
883 :returns: True upon success or raises an exception.
887 "destroy-controller",
888 "--destroy-all-models",
894 process
= await asyncio
.create_subprocess_exec(
895 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
898 _stdout
, stderr
= await process
.communicate()
900 return_code
= process
.returncode
904 if "already exists" not in stderr
:
905 raise Exception(stderr
)
907 def get_credentials(self
, cluster_uuid
: str) -> str:
909 Get Cluster Kubeconfig
911 k8scluster
= self
.db
.get_one(
912 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
915 self
.db
.encrypt_decrypt_fields(
916 k8scluster
.get("credentials"),
918 ["password", "secret"],
919 schema_version
=k8scluster
["schema_version"],
920 salt
=k8scluster
["_id"],
923 return yaml
.safe_dump(k8scluster
.get("credentials"))
925 def get_config(self
, cluster_uuid
: str,) -> dict:
926 """Get the cluster configuration
928 Gets the configuration of the cluster
930 :param cluster_uuid str: The UUID of the cluster.
931 :return: A dict upon success, or raises an exception.
934 juju_db
= self
.db
.get_one("admin", {"_id": "juju"})
936 for k
in juju_db
["k8sclusters"]:
937 if k
["_id"] == cluster_uuid
:
939 self
.db
.encrypt_decrypt_fields(
942 ["secret", "cacert"],
943 schema_version
="1.1",
949 "Unable to locate configuration for cluster {}".format(cluster_uuid
)
953 async def get_model(self
, model_name
: str, controller
: Controller
) -> Model
:
954 """Get a model from the Juju Controller.
956 Note: Model objects returned must call disconnected() before it goes
959 :param model_name str: The name of the model to get
960 :param controller Controller: Controller object
961 :return The juju.model.Model object if found, or None.
964 models
= await controller
.list_models()
965 if model_name
not in models
:
966 raise N2VCNotFound("Model {} not found".format(model_name
))
967 self
.log
.debug("Found model: {}".format(model_name
))
968 return await controller
.get_model(model_name
)
970 def get_namespace(self
, cluster_uuid
: str,) -> str:
971 """Get the namespace UUID
972 Gets the namespace's unique name
974 :param cluster_uuid str: The UUID of the cluster
975 :returns: The namespace UUID, or raises an exception
977 config
= self
.get_config(cluster_uuid
)
979 # Make sure the name is in the config
980 if "namespace" not in config
:
981 raise Exception("Namespace not found.")
983 # TODO: We want to make sure this is unique to the cluster, in case
984 # the cluster is being reused.
985 # Consider pre/appending the cluster id to the namespace string
986 return config
["namespace"]
988 # TODO: Remove these lines of code
989 # async def has_model(self, model_name: str) -> bool:
990 # """Check if a model exists in the controller
992 # Checks to see if a model exists in the connected Juju controller.
994 # :param model_name str: The name of the model
995 # :return: A boolean indicating if the model exists
997 # models = await self.controller.list_models()
999 # if model_name in models:
1003 def is_local_k8s(self
, credentials
: str,) -> bool:
1004 """Check if a cluster is local
1006 Checks if a cluster is running in the local host
1008 :param credentials dict: A dictionary containing the k8s credentials
1009 :returns: A boolean if the cluster is running locally
1012 creds
= yaml
.safe_load(credentials
)
1014 if creds
and os
.getenv("OSMLCM_VCA_APIPROXY"):
1015 for cluster
in creds
["clusters"]:
1016 if "server" in cluster
["cluster"]:
1017 if os
.getenv("OSMLCM_VCA_APIPROXY") in cluster
["cluster"]["server"]:
1022 async def get_controller(self
, cluster_uuid
):
1023 """Login to the Juju controller."""
1025 config
= self
.get_config(cluster_uuid
)
1027 juju_endpoint
= config
["endpoint"]
1028 juju_user
= config
["username"]
1029 juju_secret
= config
["secret"]
1030 juju_ca_cert
= config
["cacert"]
1032 controller
= Controller()
1036 "Connecting to controller... ws://{} as {}".format(
1037 juju_endpoint
, juju_user
,
1041 await controller
.connect(
1042 endpoint
=juju_endpoint
,
1044 password
=juju_secret
,
1045 cacert
=juju_ca_cert
,
1047 self
.log
.debug("JujuApi: Logged into controller")
1049 except Exception as ex
:
1051 self
.log
.debug("Caught exception: {}".format(ex
))
1053 self
.log
.fatal("VCA credentials not configured.")
1055 # TODO: Remove these commented lines
1056 # self.authenticated = False
1057 # if self.authenticated:
1060 # self.connecting = True
1061 # juju_public_key = None
1062 # self.authenticated = True
1063 # Test: Make sure we have the credentials loaded
1064 # async def logout(self):
1065 # """Logout of the Juju controller."""
1066 # self.log.debug("[logout]")
1067 # if not self.authenticated:
1070 # for model in self.models:
1071 # self.log.debug("Logging out of model {}".format(model))
1072 # await self.models[model].disconnect()
1074 # if self.controller:
1075 # self.log.debug("Disconnecting controller {}".format(self.controller))
1076 # await self.controller.disconnect()
1077 # self.controller = None
1079 # self.authenticated = False
1081 async def remove_cloud(self
, cloud_name
: str,) -> bool:
1082 """Remove a k8s cloud from Juju
1084 Removes a Kubernetes cloud from Juju.
1086 :param cloud_name str: The name of the cloud to add.
1088 :returns: True if successful, otherwise raises an exception.
1091 # Remove the bootstrapped controller
1092 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
1093 process
= await asyncio
.create_subprocess_exec(
1094 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1097 _stdout
, stderr
= await process
.communicate()
1099 return_code
= process
.returncode
1102 raise Exception(stderr
)
1104 # Remove the cloud from the local config
1105 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
1106 process
= await asyncio
.create_subprocess_exec(
1107 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1110 _stdout
, stderr
= await process
.communicate()
1112 return_code
= process
.returncode
1115 raise Exception(stderr
)
1119 async def set_config(self
, cluster_uuid
: str, config
: dict,) -> bool:
1120 """Save the cluster configuration
1122 Saves the cluster information to the Mongo database
1124 :param cluster_uuid str: The UUID of the cluster
1125 :param config dict: A dictionary containing the cluster configuration
1128 juju_db
= self
.db
.get_one("admin", {"_id": "juju"})
1130 k8sclusters
= juju_db
["k8sclusters"] if "k8sclusters" in juju_db
else []
1131 self
.db
.encrypt_decrypt_fields(
1134 ["secret", "cacert"],
1135 schema_version
="1.1",
1138 k8sclusters
.append({"_id": cluster_uuid
, "config": config
})
1141 q_filter
={"_id": "juju"},
1142 update_dict
={"k8sclusters": k8sclusters
},