Add generate_kdu_instance_name method in K8sConn
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import os
17 import uuid
18 import yaml
19 import tempfile
20 import binascii
21 import base64
22
23 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
24 from n2vc.k8s_conn import K8sConnector
25 from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
26 from .exceptions import MethodNotImplemented
27 from n2vc.utils import base64_to_cacert
28 from n2vc.libjuju import Libjuju
29
30 from kubernetes.client.models import (
31 V1ClusterRole,
32 V1ObjectMeta,
33 V1PolicyRule,
34 V1ServiceAccount,
35 V1ClusterRoleBinding,
36 V1RoleRef,
37 V1Subject,
38 )
39
40 from typing import Dict
41
42 SERVICE_ACCOUNT_TOKEN_KEY = "token"
43 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
44 RBAC_LABEL_KEY_NAME = "rbac-id"
45
46 ADMIN_NAMESPACE = "kube-system"
47 RBAC_STACK_PREFIX = "juju-credential"
48
49 # from juju.bundle import BundleHandler
50 # import re
51 # import ssl
52 # from .vnf import N2VC
53
54
55 def generate_rbac_id():
56 return binascii.hexlify(os.urandom(4)).decode()
57
58
59 class K8sJujuConnector(K8sConnector):
60 def __init__(
61 self,
62 fs: object,
63 db: object,
64 kubectl_command: str = "/usr/bin/kubectl",
65 juju_command: str = "/usr/bin/juju",
66 log: object = None,
67 loop: object = None,
68 on_update_db=None,
69 vca_config: dict = None,
70 ):
71 """
72 :param fs: file system for kubernetes and helm configuration
73 :param db: Database object
74 :param kubectl_command: path to kubectl executable
75 :param helm_command: path to helm executable
76 :param log: logger
77 :param: loop: Asyncio loop
78 """
79
80 # parent class
81 K8sConnector.__init__(
82 self,
83 db,
84 log=log,
85 on_update_db=on_update_db,
86 )
87
88 self.fs = fs
89 self.loop = loop or asyncio.get_event_loop()
90 self.log.debug("Initializing K8S Juju connector")
91
92 required_vca_config = [
93 "host",
94 "user",
95 "secret",
96 "ca_cert",
97 ]
98 if not vca_config or not all(k in vca_config for k in required_vca_config):
99 raise N2VCBadArgumentsException(
100 message="Missing arguments in vca_config: {}".format(vca_config),
101 bad_args=required_vca_config,
102 )
103 port = vca_config["port"] if "port" in vca_config else 17070
104 url = "{}:{}".format(vca_config["host"], port)
105 enable_os_upgrade = vca_config.get("enable_os_upgrade", True)
106 apt_mirror = vca_config.get("apt_mirror", None)
107 username = vca_config["user"]
108 secret = vca_config["secret"]
109 ca_cert = base64_to_cacert(vca_config["ca_cert"])
110
111 self.libjuju = Libjuju(
112 endpoint=url,
113 api_proxy=None, # Not needed for k8s charms
114 enable_os_upgrade=enable_os_upgrade,
115 apt_mirror=apt_mirror,
116 username=username,
117 password=secret,
118 cacert=ca_cert,
119 loop=self.loop,
120 log=self.log,
121 db=self.db,
122 )
123 self.log.debug("K8S Juju connector initialized")
124 # TODO: Remove these commented lines:
125 # self.authenticated = False
126 # self.models = {}
127 # self.juju_secret = ""
128
129 """Initialization"""
130
131 async def init_env(
132 self,
133 k8s_creds: str,
134 namespace: str = "kube-system",
135 reuse_cluster_uuid: str = None,
136 ) -> (str, bool):
137 """
138 It prepares a given K8s cluster environment to run Juju bundles.
139
140 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
141 '.kube/config'
142 :param namespace: optional namespace to be used for juju. By default,
143 'kube-system' will be used
144 :param reuse_cluster_uuid: existing cluster uuid for reuse
145 :return: uuid of the K8s cluster and True if connector has installed some
146 software in the cluster
147 (on error, an exception will be raised)
148 """
149
150 # """Bootstrapping
151
152 # Bootstrapping cannot be done, by design, through the API. We need to
153 # use the CLI tools.
154 # """
155
156 # """
157 # WIP: Workflow
158
159 # 1. Has the environment already been bootstrapped?
160 # - Check the database to see if we have a record for this env
161
162 # 2. If this is a new env, create it
163 # - Add the k8s cloud to Juju
164 # - Bootstrap
165 # - Record it in the database
166
167 # 3. Connect to the Juju controller for this cloud
168
169 # """
170 # cluster_uuid = reuse_cluster_uuid
171 # if not cluster_uuid:
172 # cluster_uuid = str(uuid4())
173
174 ##################################################
175 # TODO: Pull info from db based on the namespace #
176 ##################################################
177
178 ###################################################
179 # TODO: Make it idempotent, calling add-k8s and #
180 # bootstrap whenever reuse_cluster_uuid is passed #
181 # as parameter #
182 # `init_env` is called to initialize the K8s #
183 # cluster for juju. If this initialization fails, #
184 # it can be called again by LCM with the param #
185 # reuse_cluster_uuid, e.g. to try to fix it. #
186 ###################################################
187
188 # This is a new cluster, so bootstrap it
189
190 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
191
192 # Is a local k8s cluster?
193 # localk8s = self.is_local_k8s(k8s_creds)
194
195 # If the k8s is external, the juju controller needs a loadbalancer
196 # loadbalancer = False if localk8s else True
197
198 # Name the new k8s cloud
199 # k8s_cloud = "k8s-{}".format(cluster_uuid)
200
201 # self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
202 # await self.add_k8s(k8s_cloud, k8s_creds)
203
204 # Bootstrap Juju controller
205 # self.log.debug("Bootstrapping...")
206 # await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
207 # self.log.debug("Bootstrap done.")
208
209 # Get the controller information
210
211 # Parse ~/.local/share/juju/controllers.yaml
212 # controllers.testing.api-endpoints|ca-cert|uuid
213 # self.log.debug("Getting controller endpoints")
214 # with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
215 # controllers = yaml.load(f, Loader=yaml.Loader)
216 # controller = controllers["controllers"][cluster_uuid]
217 # endpoints = controller["api-endpoints"]
218 # juju_endpoint = endpoints[0]
219 # juju_ca_cert = controller["ca-cert"]
220
221 # Parse ~/.local/share/juju/accounts
222 # controllers.testing.user|password
223 # self.log.debug("Getting accounts")
224 # with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
225 # controllers = yaml.load(f, Loader=yaml.Loader)
226 # controller = controllers["controllers"][cluster_uuid]
227
228 # juju_user = controller["user"]
229 # juju_secret = controller["password"]
230
231 # config = {
232 # "endpoint": juju_endpoint,
233 # "username": juju_user,
234 # "secret": juju_secret,
235 # "cacert": juju_ca_cert,
236 # "loadbalancer": loadbalancer,
237 # }
238
239 # Store the cluster configuration so it
240 # can be used for subsequent calls
241 kubecfg = tempfile.NamedTemporaryFile()
242 with open(kubecfg.name, "w") as kubecfg_file:
243 kubecfg_file.write(k8s_creds)
244 kubectl = Kubectl(config_file=kubecfg.name)
245
246 # CREATING RESOURCES IN K8S
247 rbac_id = generate_rbac_id()
248 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
249 labels = {RBAC_STACK_PREFIX: rbac_id}
250
251 # Create cleanup dictionary to clean up created resources
252 # if it fails in the middle of the process
253 cleanup_data = []
254 try:
255 self._create_cluster_role(
256 kubectl,
257 name=metadata_name,
258 labels=labels,
259 )
260 cleanup_data.append(
261 {
262 "delete": self._delete_cluster_role,
263 "args": (kubectl, metadata_name),
264 }
265 )
266
267 self._create_service_account(
268 kubectl,
269 name=metadata_name,
270 labels=labels,
271 )
272 cleanup_data.append(
273 {
274 "delete": self._delete_service_account,
275 "args": (kubectl, metadata_name),
276 }
277 )
278
279 self._create_cluster_role_binding(
280 kubectl,
281 name=metadata_name,
282 labels=labels,
283 )
284 cleanup_data.append(
285 {
286 "delete": self._delete_service_account,
287 "args": (kubectl, metadata_name),
288 }
289 )
290 token, client_cert_data = await self._get_secret_data(
291 kubectl,
292 metadata_name,
293 )
294
295 default_storage_class = kubectl.get_default_storage_class()
296 await self.libjuju.add_k8s(
297 name=cluster_uuid,
298 rbac_id=rbac_id,
299 token=token,
300 client_cert_data=client_cert_data,
301 configuration=kubectl.configuration,
302 storage_class=default_storage_class,
303 credential_name=self._get_credential_name(cluster_uuid),
304 )
305 # self.log.debug("Setting config")
306 # await self.set_config(cluster_uuid, config)
307
308 # Test connection
309 # controller = await self.get_controller(cluster_uuid)
310 # await controller.disconnect()
311
312 # TODO: Remove these commented lines
313 # raise Exception("EOL")
314 # self.juju_public_key = None
315 # Login to the k8s cluster
316 # if not self.authenticated:
317 # await self.login(cluster_uuid)
318
319 # We're creating a new cluster
320 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
321 # cluster_uuid=cluster_uuid))
322 # model = await self.get_model(
323 # self.get_namespace(cluster_uuid),
324 # cluster_uuid=cluster_uuid
325 # )
326
327 # Disconnect from the model
328 # if model and model.is_connected():
329 # await model.disconnect()
330
331 return cluster_uuid, True
332 except Exception as e:
333 self.log.error("Error initializing k8scluster: {}".format(e))
334 if len(cleanup_data) > 0:
335 self.log.debug("Cleaning up created resources in k8s cluster...")
336 for item in cleanup_data:
337 delete_function = item["delete"]
338 delete_args = item["args"]
339 delete_function(*delete_args)
340 self.log.debug("Cleanup finished")
341 raise e
342
343 """Repo Management"""
344
345 async def repo_add(
346 self,
347 name: str,
348 url: str,
349 _type: str = "charm",
350 ):
351 raise MethodNotImplemented()
352
353 async def repo_list(self):
354 raise MethodNotImplemented()
355
356 async def repo_remove(
357 self,
358 name: str,
359 ):
360 raise MethodNotImplemented()
361
362 async def synchronize_repos(self, cluster_uuid: str, name: str):
363 """
364 Returns None as currently add_repo is not implemented
365 """
366 return None
367
368 """Reset"""
369
370 async def reset(
371 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
372 ) -> bool:
373 """Reset a cluster
374
375 Resets the Kubernetes cluster by removing the model that represents it.
376
377 :param cluster_uuid str: The UUID of the cluster to reset
378 :return: Returns True if successful or raises an exception.
379 """
380
381 try:
382 # Remove k8scluster from database
383 # self.log.debug("[reset] Removing k8scluster from juju database")
384 # juju_db = self.db.get_one("admin", {"_id": "juju"})
385
386 # for k in juju_db["k8sclusters"]:
387 # if k["_id"] == cluster_uuid:
388 # juju_db["k8sclusters"].remove(k)
389 # self.db.set_one(
390 # table="admin",
391 # q_filter={"_id": "juju"},
392 # update_dict={"k8sclusters": juju_db["k8sclusters"]},
393 # )
394 # break
395
396 # Destroy the controller (via CLI)
397 # self.log.debug("[reset] Destroying controller")
398 # await self.destroy_controller(cluster_uuid)
399 self.log.debug("[reset] Removing k8s cloud")
400 # k8s_cloud = "k8s-{}".format(cluster_uuid)
401 # await self.remove_cloud(k8s_cloud)
402
403 cloud_creds = await self.libjuju.get_cloud_credentials(
404 cluster_uuid,
405 self._get_credential_name(cluster_uuid),
406 )
407
408 await self.libjuju.remove_cloud(cluster_uuid)
409
410 kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
411
412 kubecfg_file = tempfile.NamedTemporaryFile()
413 with open(kubecfg_file.name, "w") as f:
414 f.write(kubecfg)
415 kubectl = Kubectl(config_file=kubecfg_file.name)
416
417 delete_functions = [
418 self._delete_cluster_role_binding,
419 self._delete_service_account,
420 self._delete_cluster_role,
421 ]
422
423 credential_attrs = cloud_creds[0].result["attrs"]
424 if RBAC_LABEL_KEY_NAME in credential_attrs:
425 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
426 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
427 delete_args = (kubectl, metadata_name)
428 for delete_func in delete_functions:
429 try:
430 delete_func(*delete_args)
431 except Exception as e:
432 self.log.warning("Cannot remove resource in K8s {}".format(e))
433
434 except Exception as e:
435 self.log.debug("Caught exception during reset: {}".format(e))
436 raise e
437 return True
438 # TODO: Remove these commented lines
439 # if not self.authenticated:
440 # await self.login(cluster_uuid)
441
442 # if self.controller.is_connected():
443 # # Destroy the model
444 # namespace = self.get_namespace(cluster_uuid)
445 # if await self.has_model(namespace):
446 # self.log.debug("[reset] Destroying model")
447 # await self.controller.destroy_model(namespace, destroy_storage=True)
448
449 # # Disconnect from the controller
450 # self.log.debug("[reset] Disconnecting controller")
451 # await self.logout()
452
453 """Deployment"""
454
455 async def install(
456 self,
457 cluster_uuid: str,
458 kdu_model: str,
459 kdu_instance: str,
460 atomic: bool = True,
461 timeout: float = 1800,
462 params: dict = None,
463 db_dict: dict = None,
464 kdu_name: str = None,
465 namespace: str = None,
466 ) -> bool:
467 """Install a bundle
468
469 :param cluster_uuid str: The UUID of the cluster to install to
470 :param kdu_model str: The name or path of a bundle to install
471 :param kdu_instance: Kdu instance name
472 :param atomic bool: If set, waits until the model is active and resets
473 the cluster on failure.
474 :param timeout int: The time, in seconds, to wait for the install
475 to finish
476 :param params dict: Key-value pairs of instantiation parameters
477 :param kdu_name: Name of the KDU instance to be installed
478 :param namespace: K8s namespace to use for the KDU instance
479
480 :return: If successful, returns ?
481 """
482 bundle = kdu_model
483
484 # controller = await self.get_controller(cluster_uuid)
485
486 ##
487 # Get or create the model, based on the NS
488 # uuid.
489
490 if not db_dict:
491 raise K8sException("db_dict must be set")
492 if not bundle:
493 raise K8sException("bundle must be set")
494
495 if bundle.startswith("cs:"):
496 pass
497 elif bundle.startswith("http"):
498 # Download the file
499 pass
500 else:
501 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
502 os.chdir(new_workdir)
503 bundle = "local:{}".format(kdu_model)
504
505 self.log.debug("Checking for model named {}".format(kdu_instance))
506
507 # Create the new model
508 self.log.debug("Adding model: {}".format(kdu_instance))
509 await self.libjuju.add_model(
510 model_name=kdu_instance,
511 cloud_name=cluster_uuid,
512 credential_name=self._get_credential_name(cluster_uuid),
513 )
514
515 # if model:
516 # TODO: Instantiation parameters
517
518 """
519 "Juju bundle that models the KDU, in any of the following ways:
520 - <juju-repo>/<juju-bundle>
521 - <juju-bundle folder under k8s_models folder in the package>
522 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
523 in the package>
524 - <URL_where_to_fetch_juju_bundle>
525 """
526 try:
527 previous_workdir = os.getcwd()
528 except FileNotFoundError:
529 previous_workdir = "/app/storage"
530
531 self.log.debug("[install] deploying {}".format(bundle))
532 await self.libjuju.deploy(
533 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
534 )
535
536 # Get the application
537 # if atomic:
538 # # applications = model.applications
539 # self.log.debug("[install] Applications: {}".format(model.applications))
540 # for name in model.applications:
541 # self.log.debug("[install] Waiting for {} to settle".format(name))
542 # application = model.applications[name]
543 # try:
544 # # It's not enough to wait for all units to be active;
545 # # the application status needs to be active as well.
546 # self.log.debug("Waiting for all units to be active...")
547 # await model.block_until(
548 # lambda: all(
549 # unit.agent_status == "idle"
550 # and application.status in ["active", "unknown"]
551 # and unit.workload_status in ["active", "unknown"]
552 # for unit in application.units
553 # ),
554 # timeout=timeout,
555 # )
556 # self.log.debug("All units active.")
557
558 # # TODO use asyncio.TimeoutError
559 # except concurrent.futures._base.TimeoutError:
560 # os.chdir(previous_workdir)
561 # self.log.debug("[install] Timeout exceeded; resetting cluster")
562 # await self.reset(cluster_uuid)
563 # return False
564
565 # Wait for the application to be active
566 # if model.is_connected():
567 # self.log.debug("[install] Disconnecting model")
568 # await model.disconnect()
569 # await controller.disconnect()
570 os.chdir(previous_workdir)
571 return True
572
573 async def instances_list(self, cluster_uuid: str) -> list:
574 """
575 returns a list of deployed releases in a cluster
576
577 :param cluster_uuid: the cluster
578 :return:
579 """
580 return []
581
582 async def upgrade(
583 self,
584 cluster_uuid: str,
585 kdu_instance: str,
586 kdu_model: str = None,
587 params: dict = None,
588 ) -> str:
589 """Upgrade a model
590
591 :param cluster_uuid str: The UUID of the cluster to upgrade
592 :param kdu_instance str: The unique name of the KDU instance
593 :param kdu_model str: The name or path of the bundle to upgrade to
594 :param params dict: Key-value pairs of instantiation parameters
595
596 :return: If successful, reference to the new revision number of the
597 KDU instance.
598 """
599
600 # TODO: Loop through the bundle and upgrade each charm individually
601
602 """
603 The API doesn't have a concept of bundle upgrades, because there are
604 many possible changes: charm revision, disk, number of units, etc.
605
606 As such, we are only supporting a limited subset of upgrades. We'll
607 upgrade the charm revision but leave storage and scale untouched.
608
609 Scale changes should happen through OSM constructs, and changes to
610 storage would require a redeployment of the service, at least in this
611 initial release.
612 """
613 raise MethodNotImplemented()
614 # TODO: Remove these commented lines
615
616 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
617
618 # model = None
619 # namespace = self.get_namespace(cluster_uuid)
620 # controller = await self.get_controller(cluster_uuid)
621
622 # try:
623 # if namespace not in await controller.list_models():
624 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
625
626 # model = await controller.get_model(namespace)
627 # with open(kdu_model, "r") as f:
628 # bundle = yaml.safe_load(f)
629
630 # """
631 # {
632 # 'description': 'Test bundle',
633 # 'bundle': 'kubernetes',
634 # 'applications': {
635 # 'mariadb-k8s': {
636 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
637 # 'scale': 1,
638 # 'options': {
639 # 'password': 'manopw',
640 # 'root_password': 'osm4u',
641 # 'user': 'mano'
642 # },
643 # 'series': 'kubernetes'
644 # }
645 # }
646 # }
647 # """
648 # # TODO: This should be returned in an agreed-upon format
649 # for name in bundle["applications"]:
650 # self.log.debug(model.applications)
651 # application = model.applications[name]
652 # self.log.debug(application)
653
654 # path = bundle["applications"][name]["charm"]
655
656 # try:
657 # await application.upgrade_charm(switch=path)
658 # except juju.errors.JujuError as ex:
659 # if "already running charm" in str(ex):
660 # # We're already running this version
661 # pass
662 # finally:
663 # if model:
664 # await model.disconnect()
665 # await controller.disconnect()
666 # return True
667
668 """Rollback"""
669
670 async def rollback(
671 self,
672 cluster_uuid: str,
673 kdu_instance: str,
674 revision: int = 0,
675 ) -> str:
676 """Rollback a model
677
678 :param cluster_uuid str: The UUID of the cluster to rollback
679 :param kdu_instance str: The unique name of the KDU instance
680 :param revision int: The revision to revert to. If omitted, rolls back
681 the previous upgrade.
682
683 :return: If successful, returns the revision of active KDU instance,
684 or raises an exception
685 """
686 raise MethodNotImplemented()
687
688 """Deletion"""
689
690 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
691 """Uninstall a KDU instance
692
693 :param cluster_uuid str: The UUID of the cluster
694 :param kdu_instance str: The unique name of the KDU instance
695
696 :return: Returns True if successful, or raises an exception
697 """
698
699 # controller = await self.get_controller(cluster_uuid)
700
701 self.log.debug("[uninstall] Destroying model")
702
703 await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
704
705 # self.log.debug("[uninstall] Model destroyed and disconnecting")
706 # await controller.disconnect()
707
708 return True
709 # TODO: Remove these commented lines
710 # if not self.authenticated:
711 # self.log.debug("[uninstall] Connecting to controller")
712 # await self.login(cluster_uuid)
713
714 async def exec_primitive(
715 self,
716 cluster_uuid: str = None,
717 kdu_instance: str = None,
718 primitive_name: str = None,
719 timeout: float = 300,
720 params: dict = None,
721 db_dict: dict = None,
722 ) -> str:
723 """Exec primitive (Juju action)
724
725 :param cluster_uuid str: The UUID of the cluster
726 :param kdu_instance str: The unique name of the KDU instance
727 :param primitive_name: Name of action that will be executed
728 :param timeout: Timeout for action execution
729 :param params: Dictionary of all the parameters needed for the action
730 :db_dict: Dictionary for any additional data
731
732 :return: Returns the output of the action
733 """
734
735 # controller = await self.get_controller(cluster_uuid)
736
737 if not params or "application-name" not in params:
738 raise K8sException(
739 "Missing application-name argument, \
740 argument needed for K8s actions"
741 )
742 try:
743 self.log.debug(
744 "[exec_primitive] Getting model "
745 "kdu_instance: {}".format(kdu_instance)
746 )
747 application_name = params["application-name"]
748 actions = await self.libjuju.get_actions(application_name, kdu_instance)
749 if primitive_name not in actions:
750 raise K8sException("Primitive {} not found".format(primitive_name))
751 output, status = await self.libjuju.execute_action(
752 application_name, kdu_instance, primitive_name, **params
753 )
754 # model = await self.get_model(kdu_instance, controller=controller)
755
756 # application_name = params["application-name"]
757 # application = model.applications[application_name]
758
759 # actions = await application.get_actions()
760 # if primitive_name not in actions:
761 # raise K8sException("Primitive {} not found".format(primitive_name))
762
763 # unit = None
764 # for u in application.units:
765 # if await u.is_leader_from_status():
766 # unit = u
767 # break
768
769 # if unit is None:
770 # raise K8sException("No leader unit found to execute action")
771
772 # self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
773 # action = await unit.run_action(primitive_name, **params)
774
775 # output = await model.get_action_output(action_uuid=action.entity_id)
776 # status = await model.get_action_status(uuid_or_prefix=action.entity_id)
777
778 # status = (
779 # status[action.entity_id] if action.entity_id in status else "failed"
780 # )
781
782 if status != "completed":
783 raise K8sException(
784 "status is not completed: {} output: {}".format(status, output)
785 )
786
787 return output
788
789 except Exception as e:
790 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
791 self.log.error(error_msg)
792 raise K8sException(message=error_msg)
793 # finally:
794 # await controller.disconnect()
795 # TODO: Remove these commented lines:
796 # if not self.authenticated:
797 # self.log.debug("[exec_primitive] Connecting to controller")
798 # await self.login(cluster_uuid)
799
800 """Introspection"""
801
802 async def inspect_kdu(
803 self,
804 kdu_model: str,
805 ) -> dict:
806 """Inspect a KDU
807
808 Inspects a bundle and returns a dictionary of config parameters and
809 their default values.
810
811 :param kdu_model str: The name or path of the bundle to inspect.
812
813 :return: If successful, returns a dictionary of available parameters
814 and their default values.
815 """
816
817 kdu = {}
818 if not os.path.exists(kdu_model):
819 raise K8sException("file {} not found".format(kdu_model))
820
821 with open(kdu_model, "r") as f:
822 bundle = yaml.safe_load(f.read())
823
824 """
825 {
826 'description': 'Test bundle',
827 'bundle': 'kubernetes',
828 'applications': {
829 'mariadb-k8s': {
830 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
831 'scale': 1,
832 'options': {
833 'password': 'manopw',
834 'root_password': 'osm4u',
835 'user': 'mano'
836 },
837 'series': 'kubernetes'
838 }
839 }
840 }
841 """
842 # TODO: This should be returned in an agreed-upon format
843 kdu = bundle["applications"]
844
845 return kdu
846
847 async def help_kdu(
848 self,
849 kdu_model: str,
850 ) -> str:
851 """View the README
852
853 If available, returns the README of the bundle.
854
855 :param kdu_model str: The name or path of a bundle
856
857 :return: If found, returns the contents of the README.
858 """
859 readme = None
860
861 files = ["README", "README.txt", "README.md"]
862 path = os.path.dirname(kdu_model)
863 for file in os.listdir(path):
864 if file in files:
865 with open(file, "r") as f:
866 readme = f.read()
867 break
868
869 return readme
870
871 async def status_kdu(
872 self,
873 cluster_uuid: str,
874 kdu_instance: str,
875 ) -> dict:
876 """Get the status of the KDU
877
878 Get the current status of the KDU instance.
879
880 :param cluster_uuid str: The UUID of the cluster
881 :param kdu_instance str: The unique id of the KDU instance
882
883 :return: Returns a dictionary containing namespace, state, resources,
884 and deployment_time.
885 """
886 status = {}
887 # controller = await self.get_controller(cluster_uuid)
888 # model = await self.get_model(kdu_instance, controller=controller)
889
890 # model_status = await model.get_status()
891 # status = model_status.applications
892 model_status = await self.libjuju.get_model_status(kdu_instance)
893 for name in model_status.applications:
894 application = model_status.applications[name]
895 status[name] = {"status": application["status"]["status"]}
896
897 # await model.disconnect()
898 # await controller.disconnect()
899
900 return status
901
902 async def get_services(
903 self, cluster_uuid: str, kdu_instance: str, namespace: str
904 ) -> list:
905 """Return a list of services of a kdu_instance"""
906
907 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
908
909 # config_path = "/tmp/{}".format(cluster_uuid)
910 # config_file = "{}/config".format(config_path)
911
912 # if not os.path.exists(config_path):
913 # os.makedirs(config_path)
914 # with open(config_file, "w") as f:
915 # f.write(credentials)
916
917 kubecfg = tempfile.NamedTemporaryFile()
918 with open(kubecfg.name, "w") as kubecfg_file:
919 kubecfg_file.write(credentials)
920 kubectl = Kubectl(config_file=kubecfg.name)
921
922 return kubectl.get_services(
923 field_selector="metadata.namespace={}".format(kdu_instance)
924 )
925
926 async def get_service(
927 self, cluster_uuid: str, service_name: str, namespace: str
928 ) -> object:
929 """Return data for a specific service inside a namespace"""
930
931 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
932
933 # config_path = "/tmp/{}".format(cluster_uuid)
934 # config_file = "{}/config".format(config_path)
935
936 # if not os.path.exists(config_path):
937 # os.makedirs(config_path)
938 # with open(config_file, "w") as f:
939 # f.write(credentials)
940
941 kubecfg = tempfile.NamedTemporaryFile()
942 with open(kubecfg.name, "w") as kubecfg_file:
943 kubecfg_file.write(credentials)
944 kubectl = Kubectl(config_file=kubecfg.name)
945
946 return kubectl.get_services(
947 field_selector="metadata.name={},metadata.namespace={}".format(
948 service_name, namespace
949 )
950 )[0]
951
952 # Private methods
953 # async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
954 # """Add a k8s cloud to Juju
955
956 # Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
957 # Juju Controller.
958
959 # :param cloud_name str: The name of the cloud to add.
960 # :param credentials dict: A dictionary representing the output of
961 # `kubectl config view --raw`.
962
963 # :returns: True if successful, otherwise raises an exception.
964 # """
965
966 # cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
967 # self.log.debug(cmd)
968
969 # process = await asyncio.create_subprocess_exec(
970 # *cmd,
971 # stdout=asyncio.subprocess.PIPE,
972 # stderr=asyncio.subprocess.PIPE,
973 # stdin=asyncio.subprocess.PIPE,
974 # )
975
976 # # Feed the process the credentials
977 # process.stdin.write(credentials.encode("utf-8"))
978 # await process.stdin.drain()
979 # process.stdin.close()
980
981 # _stdout, stderr = await process.communicate()
982
983 # return_code = process.returncode
984
985 # self.log.debug("add-k8s return code: {}".format(return_code))
986
987 # if return_code > 0:
988 # raise Exception(stderr)
989
990 # return True
991
992 # async def add_model(
993 # self, model_name: str, cluster_uuid: str, controller: Controller
994 # ) -> Model:
995 # """Adds a model to the controller
996
997 # Adds a new model to the Juju controller
998
999 # :param model_name str: The name of the model to add.
1000 # :param cluster_uuid str: ID of the cluster.
1001 # :param controller: Controller object in which the model will be added
1002 # :returns: The juju.model.Model object of the new model upon success or
1003 # raises an exception.
1004 # """
1005
1006 # self.log.debug(
1007 # "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
1008 # )
1009 # model = None
1010 # try:
1011 # if self.juju_public_key is not None:
1012 # model = await controller.add_model(
1013 # model_name, config={"authorized-keys": self.juju_public_key}
1014 # )
1015 # else:
1016 # model = await controller.add_model(model_name)
1017 # except Exception as ex:
1018 # self.log.debug(ex)
1019 # self.log.debug("Caught exception: {}".format(ex))
1020 # pass
1021
1022 # return model
1023
1024 # async def bootstrap(
1025 # self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
1026 # ) -> bool:
1027 # """Bootstrap a Kubernetes controller
1028
1029 # Bootstrap a Juju controller inside the Kubernetes cluster
1030
1031 # :param cloud_name str: The name of the cloud.
1032 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1033 # :param loadbalancer bool: If the controller should use loadbalancer or not.
1034 # :returns: True upon success or raises an exception.
1035 # """
1036
1037 # if not loadbalancer:
1038 # cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
1039 # else:
1040 # """
1041 # For public clusters, specify that the controller service is using a
1042 # LoadBalancer.
1043 # """
1044 # cmd = [
1045 # self.juju_command,
1046 # "bootstrap",
1047 # cloud_name,
1048 # cluster_uuid,
1049 # "--config",
1050 # "controller-service-type=loadbalancer",
1051 # ]
1052
1053 # self.log.debug(
1054 # "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
1055 # )
1056
1057 # process = await asyncio.create_subprocess_exec(
1058 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1059 # )
1060
1061 # _stdout, stderr = await process.communicate()
1062
1063 # return_code = process.returncode
1064
1065 # if return_code > 0:
1066 # #
1067 # if b"already exists" not in stderr:
1068 # raise Exception(stderr)
1069
1070 # return True
1071
1072 # async def destroy_controller(self, cluster_uuid: str) -> bool:
1073 # """Destroy a Kubernetes controller
1074
1075 # Destroy an existing Kubernetes controller.
1076
1077 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1078 # :returns: True upon success or raises an exception.
1079 # """
1080 # cmd = [
1081 # self.juju_command,
1082 # "destroy-controller",
1083 # "--destroy-all-models",
1084 # "--destroy-storage",
1085 # "-y",
1086 # cluster_uuid,
1087 # ]
1088
1089 # process = await asyncio.create_subprocess_exec(
1090 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1091 # )
1092
1093 # _stdout, stderr = await process.communicate()
1094
1095 # return_code = process.returncode
1096
1097 # if return_code > 0:
1098 # #
1099 # if "already exists" not in stderr:
1100 # raise Exception(stderr)
1101
1102 def get_credentials(self, cluster_uuid: str) -> str:
1103 """
1104 Get Cluster Kubeconfig
1105 """
1106 k8scluster = self.db.get_one(
1107 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
1108 )
1109
1110 self.db.encrypt_decrypt_fields(
1111 k8scluster.get("credentials"),
1112 "decrypt",
1113 ["password", "secret"],
1114 schema_version=k8scluster["schema_version"],
1115 salt=k8scluster["_id"],
1116 )
1117
1118 return yaml.safe_dump(k8scluster.get("credentials"))
1119
1120 def _get_credential_name(self, cluster_uuid: str) -> str:
1121 """
1122 Get credential name for a k8s cloud
1123
1124 We cannot use the cluster_uuid for the credential name directly,
1125 because it cannot start with a number, it must start with a letter.
1126 Therefore, the k8s cloud credential name will be "cred-" followed
1127 by the cluster uuid.
1128
1129 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
1130
1131 :return: Name to use for the credential name.
1132 """
1133 return "cred-{}".format(cluster_uuid)
1134
1135 # def get_config(self, cluster_uuid: str,) -> dict:
1136 # """Get the cluster configuration
1137
1138 # Gets the configuration of the cluster
1139
1140 # :param cluster_uuid str: The UUID of the cluster.
1141 # :return: A dict upon success, or raises an exception.
1142 # """
1143
1144 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1145 # config = None
1146 # for k in juju_db["k8sclusters"]:
1147 # if k["_id"] == cluster_uuid:
1148 # config = k["config"]
1149 # self.db.encrypt_decrypt_fields(
1150 # config,
1151 # "decrypt",
1152 # ["secret", "cacert"],
1153 # schema_version="1.1",
1154 # salt=k["_id"],
1155 # )
1156 # break
1157 # if not config:
1158 # raise Exception(
1159 # "Unable to locate configuration for cluster {}".format(cluster_uuid)
1160 # )
1161 # return config
1162
1163 # async def get_model(self, model_name: str, controller: Controller) -> Model:
1164 # """Get a model from the Juju Controller.
1165
1166 # Note: Model objects returned must call disconnected() before it goes
1167 # out of scope.
1168
1169 # :param model_name str: The name of the model to get
1170 # :param controller Controller: Controller object
1171 # :return The juju.model.Model object if found, or None.
1172 # """
1173
1174 # models = await controller.list_models()
1175 # if model_name not in models:
1176 # raise N2VCNotFound("Model {} not found".format(model_name))
1177 # self.log.debug("Found model: {}".format(model_name))
1178 # return await controller.get_model(model_name)
1179
1180 def get_namespace(
1181 self,
1182 cluster_uuid: str,
1183 ) -> str:
1184 """Get the namespace UUID
1185 Gets the namespace's unique name
1186
1187 :param cluster_uuid str: The UUID of the cluster
1188 :returns: The namespace UUID, or raises an exception
1189 """
1190 # config = self.get_config(cluster_uuid)
1191
1192 # Make sure the name is in the config
1193 # if "namespace" not in config:
1194 # raise Exception("Namespace not found.")
1195
1196 # TODO: We want to make sure this is unique to the cluster, in case
1197 # the cluster is being reused.
1198 # Consider pre/appending the cluster id to the namespace string
1199 pass
1200
1201 # TODO: Remove these lines of code
1202 # async def has_model(self, model_name: str) -> bool:
1203 # """Check if a model exists in the controller
1204
1205 # Checks to see if a model exists in the connected Juju controller.
1206
1207 # :param model_name str: The name of the model
1208 # :return: A boolean indicating if the model exists
1209 # """
1210 # models = await self.controller.list_models()
1211
1212 # if model_name in models:
1213 # return True
1214 # return False
1215
1216 # def is_local_k8s(self, credentials: str,) -> bool:
1217 # """Check if a cluster is local
1218
1219 # Checks if a cluster is running in the local host
1220
1221 # :param credentials dict: A dictionary containing the k8s credentials
1222 # :returns: A boolean if the cluster is running locally
1223 # """
1224
1225 # creds = yaml.safe_load(credentials)
1226
1227 # if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1228 # for cluster in creds["clusters"]:
1229 # if "server" in cluster["cluster"]:
1230 # if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1231 # return True
1232
1233 # return False
1234
1235 # async def get_controller(self, cluster_uuid):
1236 # """Login to the Juju controller."""
1237
1238 # config = self.get_config(cluster_uuid)
1239
1240 # juju_endpoint = config["endpoint"]
1241 # juju_user = config["username"]
1242 # juju_secret = config["secret"]
1243 # juju_ca_cert = config["cacert"]
1244
1245 # controller = Controller()
1246
1247 # if juju_secret:
1248 # self.log.debug(
1249 # "Connecting to controller... ws://{} as {}".format(
1250 # juju_endpoint, juju_user,
1251 # )
1252 # )
1253 # try:
1254 # await controller.connect(
1255 # endpoint=juju_endpoint,
1256 # username=juju_user,
1257 # password=juju_secret,
1258 # cacert=juju_ca_cert,
1259 # )
1260 # self.log.debug("JujuApi: Logged into controller")
1261 # return controller
1262 # except Exception as ex:
1263 # self.log.debug(ex)
1264 # self.log.debug("Caught exception: {}".format(ex))
1265 # else:
1266 # self.log.fatal("VCA credentials not configured.")
1267
1268 # TODO: Remove these commented lines
1269 # self.authenticated = False
1270 # if self.authenticated:
1271 # return
1272
1273 # self.connecting = True
1274 # juju_public_key = None
1275 # self.authenticated = True
1276 # Test: Make sure we have the credentials loaded
1277 # async def logout(self):
1278 # """Logout of the Juju controller."""
1279 # self.log.debug("[logout]")
1280 # if not self.authenticated:
1281 # return False
1282
1283 # for model in self.models:
1284 # self.log.debug("Logging out of model {}".format(model))
1285 # await self.models[model].disconnect()
1286
1287 # if self.controller:
1288 # self.log.debug("Disconnecting controller {}".format(self.controller))
1289 # await self.controller.disconnect()
1290 # self.controller = None
1291
1292 # self.authenticated = False
1293
1294 # async def remove_cloud(self, cloud_name: str,) -> bool:
1295 # """Remove a k8s cloud from Juju
1296
1297 # Removes a Kubernetes cloud from Juju.
1298
1299 # :param cloud_name str: The name of the cloud to add.
1300
1301 # :returns: True if successful, otherwise raises an exception.
1302 # """
1303
1304 # # Remove the bootstrapped controller
1305 # cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1306 # process = await asyncio.create_subprocess_exec(
1307 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1308 # )
1309
1310 # _stdout, stderr = await process.communicate()
1311
1312 # return_code = process.returncode
1313
1314 # if return_code > 0:
1315 # raise Exception(stderr)
1316
1317 # # Remove the cloud from the local config
1318 # cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1319 # process = await asyncio.create_subprocess_exec(
1320 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1321 # )
1322
1323 # _stdout, stderr = await process.communicate()
1324
1325 # return_code = process.returncode
1326
1327 # if return_code > 0:
1328 # raise Exception(stderr)
1329
1330 # return True
1331
1332 # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1333 # """Save the cluster configuration
1334
1335 # Saves the cluster information to the Mongo database
1336
1337 # :param cluster_uuid str: The UUID of the cluster
1338 # :param config dict: A dictionary containing the cluster configuration
1339 # """
1340
1341 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1342
1343 # k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1344 # self.db.encrypt_decrypt_fields(
1345 # config,
1346 # "encrypt",
1347 # ["secret", "cacert"],
1348 # schema_version="1.1",
1349 # salt=cluster_uuid,
1350 # )
1351 # k8sclusters.append({"_id": cluster_uuid, "config": config})
1352 # self.db.set_one(
1353 # table="admin",
1354 # q_filter={"_id": "juju"},
1355 # update_dict={"k8sclusters": k8sclusters},
1356 # )
1357
1358 # Private methods to create/delete needed resources in the
1359 # Kubernetes cluster to create the K8s cloud in Juju
1360
1361 def _create_cluster_role(
1362 self,
1363 kubectl: Kubectl,
1364 name: str,
1365 labels: Dict[str, str],
1366 ):
1367 cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
1368 field_selector="metadata.name={}".format(name)
1369 )
1370
1371 if len(cluster_roles.items) > 0:
1372 raise Exception(
1373 "Cluster role with metadata.name={} already exists".format(name)
1374 )
1375
1376 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
1377 # Cluster role
1378 cluster_role = V1ClusterRole(
1379 metadata=metadata,
1380 rules=[
1381 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
1382 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
1383 ],
1384 )
1385
1386 kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
1387
1388 def _delete_cluster_role(self, kubectl: Kubectl, name: str):
1389 kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
1390
1391 def _create_service_account(
1392 self,
1393 kubectl: Kubectl,
1394 name: str,
1395 labels: Dict[str, str],
1396 ):
1397 service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
1398 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
1399 )
1400 if len(service_accounts.items) > 0:
1401 raise Exception(
1402 "Service account with metadata.name={} already exists".format(name)
1403 )
1404
1405 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
1406 service_account = V1ServiceAccount(metadata=metadata)
1407
1408 kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
1409 ADMIN_NAMESPACE, service_account
1410 )
1411
1412 def _delete_service_account(self, kubectl: Kubectl, name: str):
1413 kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
1414 name, ADMIN_NAMESPACE
1415 )
1416
1417 def _create_cluster_role_binding(
1418 self,
1419 kubectl: Kubectl,
1420 name: str,
1421 labels: Dict[str, str],
1422 ):
1423 role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
1424 field_selector="metadata.name={}".format(name)
1425 )
1426 if len(role_bindings.items) > 0:
1427 raise Exception("Generated rbac id already exists")
1428
1429 role_binding = V1ClusterRoleBinding(
1430 metadata=V1ObjectMeta(name=name, labels=labels),
1431 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
1432 subjects=[
1433 V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
1434 ],
1435 )
1436 kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
1437
1438 def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
1439 kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
1440
1441 async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
1442 v1_core = kubectl.clients[CORE_CLIENT]
1443
1444 retries_limit = 10
1445 secret_name = None
1446 while True:
1447 retries_limit -= 1
1448 service_accounts = v1_core.list_namespaced_service_account(
1449 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
1450 )
1451 if len(service_accounts.items) == 0:
1452 raise Exception(
1453 "Service account not found with metadata.name={}".format(name)
1454 )
1455 service_account = service_accounts.items[0]
1456 if service_account.secrets and len(service_account.secrets) > 0:
1457 secret_name = service_account.secrets[0].name
1458 if secret_name is not None or not retries_limit:
1459 break
1460 if not secret_name:
1461 raise Exception(
1462 "Failed getting the secret from service account {}".format(name)
1463 )
1464 secret = v1_core.list_namespaced_secret(
1465 ADMIN_NAMESPACE,
1466 field_selector="metadata.name={}".format(secret_name),
1467 ).items[0]
1468
1469 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
1470 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
1471
1472 return (
1473 base64.b64decode(token).decode("utf-8"),
1474 base64.b64decode(client_certificate_data).decode("utf-8"),
1475 )
1476
1477 @staticmethod
1478 def generate_kdu_instance_name(**kwargs):
1479 db_dict = kwargs.get("db_dict")
1480 kdu_name = kwargs.get("kdu_name", None)
1481 if kdu_name:
1482 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
1483 else:
1484 kdu_instance = db_dict["filter"]["_id"]
1485 return kdu_instance