1 # Copyright 2019 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 from juju
.controller
import Controller
22 from juju
.model
import Model
23 from n2vc
.exceptions
import K8sException
, JujuError
24 from n2vc
.k8s_conn
import K8sConnector
25 from n2vc
.kubectl
import Kubectl
26 from .exceptions
import MethodNotImplemented
, N2VCNotFound
27 from n2vc
.utils
import obj_to_dict
, obj_to_yaml
30 # from juju.bundle import BundleHandler
33 # from .vnf import N2VC
34 class K8sJujuConnector(K8sConnector
):
39 kubectl_command
: str = "/usr/bin/kubectl",
40 juju_command
: str = "/usr/bin/juju",
46 :param kubectl_command: path to kubectl executable
47 :param helm_command: path to helm executable
48 :param fs: file system for kubernetes and helm configuration
53 K8sConnector
.__init
__(
54 self
, db
, log
=log
, on_update_db
=on_update_db
,
58 self
.log
.debug("Initializing K8S Juju connector")
60 self
.juju_command
= juju_command
61 self
.juju_public_key
= None
63 self
.log
.debug("K8S Juju connector initialized")
64 # TODO: Remove these commented lines:
65 # self.authenticated = False
67 # self.juju_secret = ""
74 namespace
: str = "kube-system",
75 reuse_cluster_uuid
: str = None,
78 It prepares a given K8s cluster environment to run Juju bundles.
80 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
82 :param namespace: optional namespace to be used for juju. By default,
83 'kube-system' will be used
84 :param reuse_cluster_uuid: existing cluster uuid for reuse
85 :return: uuid of the K8s cluster and True if connector has installed some
86 software in the cluster
87 (on error, an exception will be raised)
92 Bootstrapping cannot be done, by design, through the API. We need to
99 1. Has the environment already been bootstrapped?
100 - Check the database to see if we have a record for this env
102 2. If this is a new env, create it
103 - Add the k8s cloud to Juju
105 - Record it in the database
107 3. Connect to the Juju controller for this cloud
110 # cluster_uuid = reuse_cluster_uuid
111 # if not cluster_uuid:
112 # cluster_uuid = str(uuid4())
114 ##################################################
115 # TODO: Pull info from db based on the namespace #
116 ##################################################
118 ###################################################
119 # TODO: Make it idempotent, calling add-k8s and #
120 # bootstrap whenever reuse_cluster_uuid is passed #
122 # `init_env` is called to initialize the K8s #
123 # cluster for juju. If this initialization fails, #
124 # it can be called again by LCM with the param #
125 # reuse_cluster_uuid, e.g. to try to fix it. #
126 ###################################################
128 # This is a new cluster, so bootstrap it
130 cluster_uuid
= reuse_cluster_uuid
or str(uuid
.uuid4())
132 # Is a local k8s cluster?
133 localk8s
= self
.is_local_k8s(k8s_creds
)
135 # If the k8s is external, the juju controller needs a loadbalancer
136 loadbalancer
= False if localk8s
else True
138 # Name the new k8s cloud
139 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
141 self
.log
.debug("Adding k8s cloud {}".format(k8s_cloud
))
142 await self
.add_k8s(k8s_cloud
, k8s_creds
)
144 # Bootstrap Juju controller
145 self
.log
.debug("Bootstrapping...")
146 await self
.bootstrap(k8s_cloud
, cluster_uuid
, loadbalancer
)
147 self
.log
.debug("Bootstrap done.")
149 # Get the controller information
151 # Parse ~/.local/share/juju/controllers.yaml
152 # controllers.testing.api-endpoints|ca-cert|uuid
153 self
.log
.debug("Getting controller endpoints")
154 with
open(os
.path
.expanduser("~/.local/share/juju/controllers.yaml")) as f
:
155 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
156 controller
= controllers
["controllers"][cluster_uuid
]
157 endpoints
= controller
["api-endpoints"]
158 juju_endpoint
= endpoints
[0]
159 juju_ca_cert
= controller
["ca-cert"]
161 # Parse ~/.local/share/juju/accounts
162 # controllers.testing.user|password
163 self
.log
.debug("Getting accounts")
164 with
open(os
.path
.expanduser("~/.local/share/juju/accounts.yaml")) as f
:
165 controllers
= yaml
.load(f
, Loader
=yaml
.Loader
)
166 controller
= controllers
["controllers"][cluster_uuid
]
168 juju_user
= controller
["user"]
169 juju_secret
= controller
["password"]
172 "endpoint": juju_endpoint
,
173 "username": juju_user
,
174 "secret": juju_secret
,
175 "cacert": juju_ca_cert
,
176 "loadbalancer": loadbalancer
,
179 # Store the cluster configuration so it
180 # can be used for subsequent calls
181 self
.log
.debug("Setting config")
182 await self
.set_config(cluster_uuid
, config
)
185 controller
= await self
.get_controller(cluster_uuid
)
186 await controller
.disconnect()
188 # TODO: Remove these commented lines
189 # raise Exception("EOL")
190 # self.juju_public_key = None
191 # Login to the k8s cluster
192 # if not self.authenticated:
193 # await self.login(cluster_uuid)
195 # We're creating a new cluster
196 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
197 # cluster_uuid=cluster_uuid))
198 # model = await self.get_model(
199 # self.get_namespace(cluster_uuid),
200 # cluster_uuid=cluster_uuid
203 # Disconnect from the model
204 # if model and model.is_connected():
205 # await model.disconnect()
207 return cluster_uuid
, True
209 """Repo Management"""
212 self
, name
: str, url
: str, _type
: str = "charm",
214 raise MethodNotImplemented()
216 async def repo_list(self
):
217 raise MethodNotImplemented()
219 async def repo_remove(
222 raise MethodNotImplemented()
224 async def synchronize_repos(self
, cluster_uuid
: str, name
: str):
226 Returns None as currently add_repo is not implemented
233 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
237 Resets the Kubernetes cluster by removing the model that represents it.
239 :param cluster_uuid str: The UUID of the cluster to reset
240 :return: Returns True if successful or raises an exception.
245 # Remove k8scluster from database
246 self
.log
.debug("[reset] Removing k8scluster from juju database")
247 juju_db
= self
.db
.get_one("admin", {"_id": "juju"})
249 for k
in juju_db
["k8sclusters"]:
250 if k
["_id"] == cluster_uuid
:
251 juju_db
["k8sclusters"].remove(k
)
254 q_filter
={"_id": "juju"},
255 update_dict
={"k8sclusters": juju_db
["k8sclusters"]},
259 # Destroy the controller (via CLI)
260 self
.log
.debug("[reset] Destroying controller")
261 await self
.destroy_controller(cluster_uuid
)
263 self
.log
.debug("[reset] Removing k8s cloud")
264 k8s_cloud
= "k8s-{}".format(cluster_uuid
)
265 await self
.remove_cloud(k8s_cloud
)
267 except Exception as ex
:
268 self
.log
.debug("Caught exception during reset: {}".format(ex
))
270 # TODO: Remove these commented lines
271 # if not self.authenticated:
272 # await self.login(cluster_uuid)
274 # if self.controller.is_connected():
275 # # Destroy the model
276 # namespace = self.get_namespace(cluster_uuid)
277 # if await self.has_model(namespace):
278 # self.log.debug("[reset] Destroying model")
279 # await self.controller.destroy_model(namespace, destroy_storage=True)
281 # # Disconnect from the controller
282 # self.log.debug("[reset] Disconnecting controller")
283 # await self.logout()
292 timeout
: float = 300,
294 db_dict
: dict = None,
295 kdu_name
: str = None,
296 namespace
: str = None,
300 :param cluster_uuid str: The UUID of the cluster to install to
301 :param kdu_model str: The name or path of a bundle to install
302 :param atomic bool: If set, waits until the model is active and resets
303 the cluster on failure.
304 :param timeout int: The time, in seconds, to wait for the install
306 :param params dict: Key-value pairs of instantiation parameters
307 :param kdu_name: Name of the KDU instance to be installed
308 :param namespace: K8s namespace to use for the KDU instance
310 :return: If successful, returns ?
313 controller
= await self
.get_controller(cluster_uuid
)
316 # Get or create the model, based on the NS
319 kdu_instance
= "{}-{}".format(kdu_name
, db_dict
["filter"]["_id"])
321 kdu_instance
= db_dict
["filter"]["_id"]
323 self
.log
.debug("Checking for model named {}".format(kdu_instance
))
325 if self
.on_update_db
:
326 await self
.on_update_db(cluster_uuid
, kdu_instance
, filter=db_dict
["filter"])
327 except Exception as e
:
328 self
.log
.debug("Error in updating vca status {}".format(str(e
)))
330 # Create the new model
331 self
.log
.debug("Adding model: {}".format(kdu_instance
))
332 model
= await self
.add_model(
333 kdu_instance
, cluster_uuid
=cluster_uuid
, controller
=controller
337 # TODO: Instantiation parameters
340 "Juju bundle that models the KDU, in any of the following ways:
341 - <juju-repo>/<juju-bundle>
342 - <juju-bundle folder under k8s_models folder in the package>
343 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
345 - <URL_where_to_fetch_juju_bundle>
348 previous_workdir
= os
.getcwd()
349 except FileNotFoundError
:
350 previous_workdir
= "/app/storage"
353 if kdu_model
.startswith("cs:"):
355 elif kdu_model
.startswith("http"):
359 new_workdir
= kdu_model
.strip(kdu_model
.split("/")[-1])
361 os
.chdir(new_workdir
)
363 bundle
= "local:{}".format(kdu_model
)
366 # Raise named exception that the bundle could not be found
369 self
.log
.debug("[install] deploying {}".format(bundle
))
370 await model
.deploy(bundle
)
372 # Get the application
374 # applications = model.applications
375 self
.log
.debug("[install] Applications: {}".format(model
.applications
))
376 for name
in model
.applications
:
377 self
.log
.debug("[install] Waiting for {} to settle".format(name
))
378 application
= model
.applications
[name
]
380 # It's not enough to wait for all units to be active;
381 # the application status needs to be active as well.
382 self
.log
.debug("Waiting for all units to be active...")
383 await model
.block_until(
385 unit
.agent_status
== "idle"
386 and application
.status
in ["active", "unknown"]
387 and unit
.workload_status
in ["active", "unknown"]
388 for unit
in application
.units
392 self
.log
.debug("All units active.")
394 # TODO use asyncio.TimeoutError
395 except concurrent
.futures
._base
.TimeoutError
:
396 os
.chdir(previous_workdir
)
397 self
.log
.debug("[install] Timeout exceeded; resetting cluster")
398 await self
.reset(cluster_uuid
)
401 # Wait for the application to be active
402 if model
.is_connected():
403 self
.log
.debug("[install] Disconnecting model")
404 await model
.disconnect()
405 await controller
.disconnect()
406 os
.chdir(previous_workdir
)
407 if self
.on_update_db
:
408 await self
.on_update_db(cluster_uuid
, kdu_instance
, filter=db_dict
["filter"])
410 raise Exception("Unable to install")
412 async def instances_list(self
, cluster_uuid
: str) -> list:
414 returns a list of deployed releases in a cluster
416 :param cluster_uuid: the cluster
425 kdu_model
: str = None,
430 :param cluster_uuid str: The UUID of the cluster to upgrade
431 :param kdu_instance str: The unique name of the KDU instance
432 :param kdu_model str: The name or path of the bundle to upgrade to
433 :param params dict: Key-value pairs of instantiation parameters
435 :return: If successful, reference to the new revision number of the
439 # TODO: Loop through the bundle and upgrade each charm individually
442 The API doesn't have a concept of bundle upgrades, because there are
443 many possible changes: charm revision, disk, number of units, etc.
445 As such, we are only supporting a limited subset of upgrades. We'll
446 upgrade the charm revision but leave storage and scale untouched.
448 Scale changes should happen through OSM constructs, and changes to
449 storage would require a redeployment of the service, at least in this
452 raise MethodNotImplemented()
453 # TODO: Remove these commented lines
455 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
458 # namespace = self.get_namespace(cluster_uuid)
459 # controller = await self.get_controller(cluster_uuid)
462 # if namespace not in await controller.list_models():
463 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
465 # model = await controller.get_model(namespace)
466 # with open(kdu_model, "r") as f:
467 # bundle = yaml.safe_load(f)
471 # 'description': 'Test bundle',
472 # 'bundle': 'kubernetes',
475 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
478 # 'password': 'manopw',
479 # 'root_password': 'osm4u',
482 # 'series': 'kubernetes'
487 # # TODO: This should be returned in an agreed-upon format
488 # for name in bundle["applications"]:
489 # self.log.debug(model.applications)
490 # application = model.applications[name]
491 # self.log.debug(application)
493 # path = bundle["applications"][name]["charm"]
496 # await application.upgrade_charm(switch=path)
497 # except juju.errors.JujuError as ex:
498 # if "already running charm" in str(ex):
499 # # We're already running this version
503 # await model.disconnect()
504 # await controller.disconnect()
510 self
, cluster_uuid
: str, kdu_instance
: str, revision
: int = 0,
514 :param cluster_uuid str: The UUID of the cluster to rollback
515 :param kdu_instance str: The unique name of the KDU instance
516 :param revision int: The revision to revert to. If omitted, rolls back
517 the previous upgrade.
519 :return: If successful, returns the revision of active KDU instance,
520 or raises an exception
522 raise MethodNotImplemented()
526 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str) -> bool:
527 """Uninstall a KDU instance
529 :param cluster_uuid str: The UUID of the cluster
530 :param kdu_instance str: The unique name of the KDU instance
532 :return: Returns True if successful, or raises an exception
535 controller
= await self
.get_controller(cluster_uuid
)
537 self
.log
.debug("[uninstall] Destroying model")
539 await controller
.destroy_models(kdu_instance
)
541 self
.log
.debug("[uninstall] Model destroyed and disconnecting")
542 await controller
.disconnect()
545 # TODO: Remove these commented lines
546 # if not self.authenticated:
547 # self.log.debug("[uninstall] Connecting to controller")
548 # await self.login(cluster_uuid)
550 async def exec_primitive(
552 cluster_uuid
: str = None,
553 kdu_instance
: str = None,
554 primitive_name
: str = None,
555 timeout
: float = 300,
557 db_dict
: dict = None,
559 """Exec primitive (Juju action)
561 :param cluster_uuid str: The UUID of the cluster
562 :param kdu_instance str: The unique name of the KDU instance
563 :param primitive_name: Name of action that will be executed
564 :param timeout: Timeout for action execution
565 :param params: Dictionary of all the parameters needed for the action
566 :db_dict: Dictionary for any additional data
568 :return: Returns the output of the action
571 controller
= await self
.get_controller(cluster_uuid
)
573 if not params
or "application-name" not in params
:
575 "Missing application-name argument, \
576 argument needed for K8s actions"
580 "[exec_primitive] Getting model "
581 "kdu_instance: {}".format(kdu_instance
)
584 model
= await self
.get_model(kdu_instance
, controller
=controller
)
586 application_name
= params
["application-name"]
587 application
= model
.applications
[application_name
]
589 actions
= await application
.get_actions()
590 if primitive_name
not in actions
:
591 raise K8sException("Primitive {} not found".format(primitive_name
))
594 for u
in application
.units
:
595 if await u
.is_leader_from_status():
600 raise K8sException("No leader unit found to execute action")
602 self
.log
.debug("[exec_primitive] Running action: {}".format(primitive_name
))
603 action
= await unit
.run_action(primitive_name
, **params
)
605 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
606 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
609 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
612 if status
!= "completed":
614 "status: {}, output: {}".format(status
, output
)
616 if self
.on_update_db
:
617 await self
.on_update_db(cluster_uuid
, kdu_instance
, filter=db_dict
["filter"])
621 except Exception as e
:
622 error_msg
= "Error executing primitive {}: {}".format(primitive_name
, e
)
623 self
.log
.error(error_msg
)
624 raise K8sException(message
=error_msg
)
626 await controller
.disconnect()
627 # TODO: Remove these commented lines:
628 # if not self.authenticated:
629 # self.log.debug("[exec_primitive] Connecting to controller")
630 # await self.login(cluster_uuid)
634 async def inspect_kdu(self
, kdu_model
: str,) -> dict:
637 Inspects a bundle and returns a dictionary of config parameters and
638 their default values.
640 :param kdu_model str: The name or path of the bundle to inspect.
642 :return: If successful, returns a dictionary of available parameters
643 and their default values.
647 with
open(kdu_model
, "r") as f
:
648 bundle
= yaml
.safe_load(f
)
652 'description': 'Test bundle',
653 'bundle': 'kubernetes',
656 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
659 'password': 'manopw',
660 'root_password': 'osm4u',
663 'series': 'kubernetes'
668 # TODO: This should be returned in an agreed-upon format
669 kdu
= bundle
["applications"]
673 async def help_kdu(self
, kdu_model
: str,) -> str:
676 If available, returns the README of the bundle.
678 :param kdu_model str: The name or path of a bundle
680 :return: If found, returns the contents of the README.
684 files
= ["README", "README.txt", "README.md"]
685 path
= os
.path
.dirname(kdu_model
)
686 for file in os
.listdir(path
):
688 with
open(file, "r") as f
:
694 async def status_kdu(
698 complete_status
: bool = False,
699 yaml_format
: bool = False
701 """Get the status of the KDU
703 Get the current status of the KDU instance.
705 :param cluster_uuid str: The UUID of the cluster
706 :param kdu_instance str: The unique id of the KDU instance
707 :param complete_status: To get the complete_status of the KDU
708 :param yaml_format: To get the status in proper format for NSR record
710 :return: Returns a dictionary containing namespace, state, resources,
711 and deployment_time and returns complete_status if complete_status is True
714 controller
= await self
.get_controller(cluster_uuid
)
716 model
= await self
.get_model(kdu_instance
, controller
=controller
)
717 model_status
= await model
.get_status()
718 status
= model_status
.applications
720 if not complete_status
:
721 for name
in model_status
.applications
:
722 application
= model_status
.applications
[name
]
723 status
[name
] = {"status": application
["status"]["status"]}
726 return obj_to_yaml(model_status
)
728 return obj_to_dict(model_status
)
729 except Exception as e
:
730 self
.log
.debug("Error in getting model_status for kdu_instance: {}. Error: {}"
731 .format(kdu_instance
, str(e
)))
734 await model
.disconnect()
735 await controller
.disconnect()
738 async def get_application_actions(
740 application_name
: str,
746 Get available actions for an application
748 :param application_name str: Application name
749 :model_name str: Model name
750 :param cluster_uuid str: The UUID of the cluster
751 :param kdu_instance str: The unique id of the KDU instance
753 :return: Returns a dictionary which has action list of the Application
756 application_actions
= {}
757 controller
= await self
.get_controller(cluster_uuid
)
759 model
= await self
.get_model(kdu_instance
, controller
=controller
)
760 application
= model
.applications
[application_name
]
761 application_actions
= await application
.get_actions()
762 except Exception as e
:
763 raise JujuError("Error in getting actions for application: {} in model: {}. Error: {}"
764 .format(application_name
, model_name
, str(e
)))
767 await model
.disconnect()
768 await controller
.disconnect()
770 return application_actions
772 async def get_application_configs(
774 application_name
: str,
780 Get available configs for an application.
782 :param application_name str: Application name
783 :model_name str: Model name
784 :param cluster_uuid str: The UUID of the cluster
785 :param kdu_instance str: The unique id of the KDU instance
787 :return: Returns a dictionary which has config list of the Application
790 application_configs
= {}
791 controller
= await self
.get_controller(cluster_uuid
)
793 model
= await self
.get_model(kdu_instance
, controller
=controller
)
794 application
= model
.applications
[application_name
]
795 application_configs
= await application
.get_config()
796 except Exception as e
:
797 raise JujuError("Error in getting configs for application: {} in model: {}. Error: {}"
798 .format(application_name
, model_name
, str(e
)))
801 await model
.disconnect()
802 await controller
.disconnect()
803 return application_configs
805 async def get_executed_actions(
812 Get executed/history of actions for a model.
813 :model_name str: Model name
814 :param cluster_uuid str: The UUID of the cluster
815 :param kdu_instance str: The unique id of the KDU instance
817 :return: List of executed actions for a model.
820 executed_actions
= []
821 controller
= await self
.get_controller(cluster_uuid
)
823 model
= await self
.get_model(kdu_instance
, controller
=controller
)
824 # Get all unique action names
826 for application
in model
.applications
:
827 application_actions
= await self
.get_application_actions(application
, model
,
828 cluster_uuid
, kdu_instance
)
829 actions
.update(application_actions
)
830 # Get status of all actions
831 for application_action
in actions
:
832 application_action_status_list
= \
833 await model
.get_action_status(name
=application_action
)
834 for action_id
, action_status
in application_action_status_list
.items():
835 executed_action
= {"id": action_id
,
836 "action": application_action
,
837 "status": action_status
}
838 # Get action output by id
839 action_status
= await model
.get_action_output(executed_action
["id"])
840 for k
, v
in action_status
.items():
841 executed_action
[k
] = v
842 executed_actions
.append(executed_action
)
843 except Exception as e
:
844 raise JujuError("Error in getting executed actions for model: {}. Error: {}"
845 .format(model_name
, str(e
)))
848 await model
.disconnect()
849 await controller
.disconnect()
850 return executed_actions
852 async def update_vca_status(self
, vcastatus
: dict, cluster_uuid
: str, kdu_instance
: str):
854 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
856 :param vcastatus dict: dict containing vcastatus
857 :param cluster_uuid str: The UUID of the cluster
858 :param kdu_instance str: The unique id of the KDU instance
862 for model_name
in vcastatus
:
863 # Adding executed actions
864 vcastatus
[model_name
]["executedActions"] = \
865 await self
.get_executed_actions(model_name
, cluster_uuid
, kdu_instance
)
867 for application
in vcastatus
[model_name
]["applications"]:
868 # Adding application actions
869 vcastatus
[model_name
]["applications"][application
]["actions"] = \
870 await self
.get_application_actions(application
, model_name
,
871 cluster_uuid
, kdu_instance
)
872 # Adding application configs
873 vcastatus
[model_name
]["applications"][application
]["configs"] = \
874 await self
.get_application_configs(application
, model_name
,
875 cluster_uuid
, kdu_instance
)
876 except Exception as e
:
877 self
.log
.debug("Error in updating vca status: {}".format(str(e
)))
879 async def get_services(
880 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
882 """Return a list of services of a kdu_instance"""
884 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
886 config_path
= "/tmp/{}".format(cluster_uuid
)
887 config_file
= "{}/config".format(config_path
)
889 if not os
.path
.exists(config_path
):
890 os
.makedirs(config_path
)
891 with
open(config_file
, "w") as f
:
894 kubectl
= Kubectl(config_file
=config_file
)
895 return kubectl
.get_services(
896 field_selector
="metadata.namespace={}".format(kdu_instance
)
899 async def get_service(
900 self
, cluster_uuid
: str, service_name
: str, namespace
: str
902 """Return data for a specific service inside a namespace"""
904 credentials
= self
.get_credentials(cluster_uuid
=cluster_uuid
)
906 config_path
= "/tmp/{}".format(cluster_uuid
)
907 config_file
= "{}/config".format(config_path
)
909 if not os
.path
.exists(config_path
):
910 os
.makedirs(config_path
)
911 with
open(config_file
, "w") as f
:
914 kubectl
= Kubectl(config_file
=config_file
)
916 return kubectl
.get_services(
917 field_selector
="metadata.name={},metadata.namespace={}".format(
918 service_name
, namespace
923 async def add_k8s(self
, cloud_name
: str, credentials
: str,) -> bool:
924 """Add a k8s cloud to Juju
926 Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
929 :param cloud_name str: The name of the cloud to add.
930 :param credentials dict: A dictionary representing the output of
931 `kubectl config view --raw`.
933 :returns: True if successful, otherwise raises an exception.
936 cmd
= [self
.juju_command
, "add-k8s", "--local", cloud_name
]
939 process
= await asyncio
.create_subprocess_exec(
941 stdout
=asyncio
.subprocess
.PIPE
,
942 stderr
=asyncio
.subprocess
.PIPE
,
943 stdin
=asyncio
.subprocess
.PIPE
,
946 # Feed the process the credentials
947 process
.stdin
.write(credentials
.encode("utf-8"))
948 await process
.stdin
.drain()
949 process
.stdin
.close()
951 _stdout
, stderr
= await process
.communicate()
953 return_code
= process
.returncode
955 self
.log
.debug("add-k8s return code: {}".format(return_code
))
958 raise Exception(stderr
)
963 self
, model_name
: str, cluster_uuid
: str, controller
: Controller
965 """Adds a model to the controller
967 Adds a new model to the Juju controller
969 :param model_name str: The name of the model to add.
970 :param cluster_uuid str: ID of the cluster.
971 :param controller: Controller object in which the model will be added
972 :returns: The juju.model.Model object of the new model upon success or
977 "Adding model '{}' to cluster_uuid '{}'".format(model_name
, cluster_uuid
)
981 if self
.juju_public_key
is not None:
982 model
= await controller
.add_model(
983 model_name
, config
={"authorized-keys": self
.juju_public_key
}
986 model
= await controller
.add_model(model_name
)
987 except Exception as ex
:
989 self
.log
.debug("Caught exception: {}".format(ex
))
995 self
, cloud_name
: str, cluster_uuid
: str, loadbalancer
: bool
997 """Bootstrap a Kubernetes controller
999 Bootstrap a Juju controller inside the Kubernetes cluster
1001 :param cloud_name str: The name of the cloud.
1002 :param cluster_uuid str: The UUID of the cluster to bootstrap.
1003 :param loadbalancer bool: If the controller should use loadbalancer or not.
1004 :returns: True upon success or raises an exception.
1007 if not loadbalancer
:
1008 cmd
= [self
.juju_command
, "bootstrap", cloud_name
, cluster_uuid
]
1011 For public clusters, specify that the controller service is using a
1020 "controller-service-type=loadbalancer",
1024 "Bootstrapping controller {} in cloud {}".format(cluster_uuid
, cloud_name
)
1027 process
= await asyncio
.create_subprocess_exec(
1028 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1031 _stdout
, stderr
= await process
.communicate()
1033 return_code
= process
.returncode
1037 if b
"already exists" not in stderr
:
1038 raise Exception(stderr
)
1042 async def destroy_controller(self
, cluster_uuid
: str) -> bool:
1043 """Destroy a Kubernetes controller
1045 Destroy an existing Kubernetes controller.
1047 :param cluster_uuid str: The UUID of the cluster to bootstrap.
1048 :returns: True upon success or raises an exception.
1052 "destroy-controller",
1053 "--destroy-all-models",
1054 "--destroy-storage",
1059 process
= await asyncio
.create_subprocess_exec(
1060 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1063 _stdout
, stderr
= await process
.communicate()
1065 return_code
= process
.returncode
1069 if "already exists" not in stderr
:
1070 raise Exception(stderr
)
1072 def get_credentials(self
, cluster_uuid
: str) -> str:
1074 Get Cluster Kubeconfig
1076 k8scluster
= self
.db
.get_one(
1077 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
1080 self
.db
.encrypt_decrypt_fields(
1081 k8scluster
.get("credentials"),
1083 ["password", "secret"],
1084 schema_version
=k8scluster
["schema_version"],
1085 salt
=k8scluster
["_id"],
1088 return yaml
.safe_dump(k8scluster
.get("credentials"))
1090 def get_config(self
, cluster_uuid
: str,) -> dict:
1091 """Get the cluster configuration
1093 Gets the configuration of the cluster
1095 :param cluster_uuid str: The UUID of the cluster.
1096 :return: A dict upon success, or raises an exception.
1099 juju_db
= self
.db
.get_one("admin", {"_id": "juju"})
1101 for k
in juju_db
["k8sclusters"]:
1102 if k
["_id"] == cluster_uuid
:
1103 config
= k
["config"]
1104 self
.db
.encrypt_decrypt_fields(
1107 ["secret", "cacert"],
1108 schema_version
="1.1",
1114 "Unable to locate configuration for cluster {}".format(cluster_uuid
)
1118 async def get_model(self
, model_name
: str, controller
: Controller
) -> Model
:
1119 """Get a model from the Juju Controller.
1121 Note: Model objects returned must call disconnected() before it goes
1124 :param model_name str: The name of the model to get
1125 :param controller Controller: Controller object
1126 :return The juju.model.Model object if found, or None.
1129 models
= await controller
.list_models()
1130 if model_name
not in models
:
1131 raise N2VCNotFound("Model {} not found".format(model_name
))
1132 self
.log
.debug("Found model: {}".format(model_name
))
1133 return await controller
.get_model(model_name
)
1135 def get_namespace(self
, cluster_uuid
: str,) -> str:
1136 """Get the namespace UUID
1137 Gets the namespace's unique name
1139 :param cluster_uuid str: The UUID of the cluster
1140 :returns: The namespace UUID, or raises an exception
1142 config
= self
.get_config(cluster_uuid
)
1144 # Make sure the name is in the config
1145 if "namespace" not in config
:
1146 raise Exception("Namespace not found.")
1148 # TODO: We want to make sure this is unique to the cluster, in case
1149 # the cluster is being reused.
1150 # Consider pre/appending the cluster id to the namespace string
1151 return config
["namespace"]
1153 # TODO: Remove these lines of code
1154 # async def has_model(self, model_name: str) -> bool:
1155 # """Check if a model exists in the controller
1157 # Checks to see if a model exists in the connected Juju controller.
1159 # :param model_name str: The name of the model
1160 # :return: A boolean indicating if the model exists
1162 # models = await self.controller.list_models()
1164 # if model_name in models:
1168 def is_local_k8s(self
, credentials
: str,) -> bool:
1169 """Check if a cluster is local
1171 Checks if a cluster is running in the local host
1173 :param credentials dict: A dictionary containing the k8s credentials
1174 :returns: A boolean if the cluster is running locally
1177 creds
= yaml
.safe_load(credentials
)
1179 if creds
and os
.getenv("OSMLCM_VCA_APIPROXY"):
1180 for cluster
in creds
["clusters"]:
1181 if "server" in cluster
["cluster"]:
1182 if os
.getenv("OSMLCM_VCA_APIPROXY") in cluster
["cluster"]["server"]:
1187 async def get_controller(self
, cluster_uuid
):
1188 """Login to the Juju controller."""
1190 config
= self
.get_config(cluster_uuid
)
1192 juju_endpoint
= config
["endpoint"]
1193 juju_user
= config
["username"]
1194 juju_secret
= config
["secret"]
1195 juju_ca_cert
= config
["cacert"]
1197 controller
= Controller()
1201 "Connecting to controller... ws://{} as {}".format(
1202 juju_endpoint
, juju_user
,
1206 await controller
.connect(
1207 endpoint
=juju_endpoint
,
1209 password
=juju_secret
,
1210 cacert
=juju_ca_cert
,
1212 self
.log
.debug("JujuApi: Logged into controller")
1214 except Exception as ex
:
1216 self
.log
.debug("Caught exception: {}".format(ex
))
1218 self
.log
.fatal("VCA credentials not configured.")
1220 # TODO: Remove these commented lines
1221 # self.authenticated = False
1222 # if self.authenticated:
1225 # self.connecting = True
1226 # juju_public_key = None
1227 # self.authenticated = True
1228 # Test: Make sure we have the credentials loaded
1229 # async def logout(self):
1230 # """Logout of the Juju controller."""
1231 # self.log.debug("[logout]")
1232 # if not self.authenticated:
1235 # for model in self.models:
1236 # self.log.debug("Logging out of model {}".format(model))
1237 # await self.models[model].disconnect()
1239 # if self.controller:
1240 # self.log.debug("Disconnecting controller {}".format(self.controller))
1241 # await self.controller.disconnect()
1242 # self.controller = None
1244 # self.authenticated = False
1246 async def remove_cloud(self
, cloud_name
: str,) -> bool:
1247 """Remove a k8s cloud from Juju
1249 Removes a Kubernetes cloud from Juju.
1251 :param cloud_name str: The name of the cloud to add.
1253 :returns: True if successful, otherwise raises an exception.
1256 # Remove the bootstrapped controller
1257 cmd
= [self
.juju_command
, "remove-k8s", "--client", cloud_name
]
1258 process
= await asyncio
.create_subprocess_exec(
1259 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1262 _stdout
, stderr
= await process
.communicate()
1264 return_code
= process
.returncode
1267 raise Exception(stderr
)
1269 # Remove the cloud from the local config
1270 cmd
= [self
.juju_command
, "remove-cloud", "--client", cloud_name
]
1271 process
= await asyncio
.create_subprocess_exec(
1272 *cmd
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1275 _stdout
, stderr
= await process
.communicate()
1277 return_code
= process
.returncode
1280 raise Exception(stderr
)
1284 async def set_config(self
, cluster_uuid
: str, config
: dict,) -> bool:
1285 """Save the cluster configuration
1287 Saves the cluster information to the Mongo database
1289 :param cluster_uuid str: The UUID of the cluster
1290 :param config dict: A dictionary containing the cluster configuration
1293 juju_db
= self
.db
.get_one("admin", {"_id": "juju"})
1295 k8sclusters
= juju_db
["k8sclusters"] if "k8sclusters" in juju_db
else []
1296 self
.db
.encrypt_decrypt_fields(
1299 ["secret", "cacert"],
1300 schema_version
="1.1",
1303 k8sclusters
.append({"_id": cluster_uuid
, "config": config
})
1306 q_filter
={"_id": "juju"},
1307 update_dict
={"k8sclusters": k8sclusters
},