3d58385ede494b7e0378c0e3605a1bad2bb4808d
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import os
17 import uuid
18 import yaml
19 import tempfile
20 import binascii
21 import base64
22
23 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
24 from n2vc.k8s_conn import K8sConnector
25 from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
26 from .exceptions import MethodNotImplemented
27 from n2vc.utils import base64_to_cacert
28 from n2vc.libjuju import Libjuju
29
30 from kubernetes.client.models import (
31 V1ClusterRole,
32 V1ObjectMeta,
33 V1PolicyRule,
34 V1ServiceAccount,
35 V1ClusterRoleBinding,
36 V1RoleRef,
37 V1Subject,
38 )
39
40 from typing import Dict
41
42 SERVICE_ACCOUNT_TOKEN_KEY = "token"
43 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
44 RBAC_LABEL_KEY_NAME = "rbac-id"
45
46 ADMIN_NAMESPACE = "kube-system"
47 RBAC_STACK_PREFIX = "juju-credential"
48
49 # from juju.bundle import BundleHandler
50 # import re
51 # import ssl
52 # from .vnf import N2VC
53
54
55 def generate_rbac_id():
56 return binascii.hexlify(os.urandom(4)).decode()
57
58
59 class K8sJujuConnector(K8sConnector):
60 def __init__(
61 self,
62 fs: object,
63 db: object,
64 kubectl_command: str = "/usr/bin/kubectl",
65 juju_command: str = "/usr/bin/juju",
66 log: object = None,
67 loop: object = None,
68 on_update_db=None,
69 vca_config: dict = None,
70 ):
71 """
72 :param fs: file system for kubernetes and helm configuration
73 :param db: Database object
74 :param kubectl_command: path to kubectl executable
75 :param helm_command: path to helm executable
76 :param log: logger
77 :param: loop: Asyncio loop
78 """
79
80 # parent class
81 K8sConnector.__init__(
82 self,
83 db,
84 log=log,
85 on_update_db=on_update_db,
86 )
87
88 self.fs = fs
89 self.loop = loop or asyncio.get_event_loop()
90 self.log.debug("Initializing K8S Juju connector")
91
92 required_vca_config = [
93 "host",
94 "user",
95 "secret",
96 "ca_cert",
97 ]
98 if not vca_config or not all(k in vca_config for k in required_vca_config):
99 raise N2VCBadArgumentsException(
100 message="Missing arguments in vca_config: {}".format(vca_config),
101 bad_args=required_vca_config,
102 )
103 port = vca_config["port"] if "port" in vca_config else 17070
104 url = "{}:{}".format(vca_config["host"], port)
105 enable_os_upgrade = vca_config.get("enable_os_upgrade", True)
106 apt_mirror = vca_config.get("apt_mirror", None)
107 username = vca_config["user"]
108 secret = vca_config["secret"]
109 ca_cert = base64_to_cacert(vca_config["ca_cert"])
110
111 self.libjuju = Libjuju(
112 endpoint=url,
113 api_proxy=None, # Not needed for k8s charms
114 enable_os_upgrade=enable_os_upgrade,
115 apt_mirror=apt_mirror,
116 username=username,
117 password=secret,
118 cacert=ca_cert,
119 loop=self.loop,
120 log=self.log,
121 db=self.db,
122 )
123 self.log.debug("K8S Juju connector initialized")
124 # TODO: Remove these commented lines:
125 # self.authenticated = False
126 # self.models = {}
127 # self.juju_secret = ""
128
129 """Initialization"""
130
131 async def init_env(
132 self,
133 k8s_creds: str,
134 namespace: str = "kube-system",
135 reuse_cluster_uuid: str = None,
136 ) -> (str, bool):
137 """
138 It prepares a given K8s cluster environment to run Juju bundles.
139
140 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
141 '.kube/config'
142 :param namespace: optional namespace to be used for juju. By default,
143 'kube-system' will be used
144 :param reuse_cluster_uuid: existing cluster uuid for reuse
145 :return: uuid of the K8s cluster and True if connector has installed some
146 software in the cluster
147 (on error, an exception will be raised)
148 """
149
150 # """Bootstrapping
151
152 # Bootstrapping cannot be done, by design, through the API. We need to
153 # use the CLI tools.
154 # """
155
156 # """
157 # WIP: Workflow
158
159 # 1. Has the environment already been bootstrapped?
160 # - Check the database to see if we have a record for this env
161
162 # 2. If this is a new env, create it
163 # - Add the k8s cloud to Juju
164 # - Bootstrap
165 # - Record it in the database
166
167 # 3. Connect to the Juju controller for this cloud
168
169 # """
170 # cluster_uuid = reuse_cluster_uuid
171 # if not cluster_uuid:
172 # cluster_uuid = str(uuid4())
173
174 ##################################################
175 # TODO: Pull info from db based on the namespace #
176 ##################################################
177
178 ###################################################
179 # TODO: Make it idempotent, calling add-k8s and #
180 # bootstrap whenever reuse_cluster_uuid is passed #
181 # as parameter #
182 # `init_env` is called to initialize the K8s #
183 # cluster for juju. If this initialization fails, #
184 # it can be called again by LCM with the param #
185 # reuse_cluster_uuid, e.g. to try to fix it. #
186 ###################################################
187
188 # This is a new cluster, so bootstrap it
189
190 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
191
192 # Is a local k8s cluster?
193 # localk8s = self.is_local_k8s(k8s_creds)
194
195 # If the k8s is external, the juju controller needs a loadbalancer
196 # loadbalancer = False if localk8s else True
197
198 # Name the new k8s cloud
199 # k8s_cloud = "k8s-{}".format(cluster_uuid)
200
201 # self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
202 # await self.add_k8s(k8s_cloud, k8s_creds)
203
204 # Bootstrap Juju controller
205 # self.log.debug("Bootstrapping...")
206 # await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
207 # self.log.debug("Bootstrap done.")
208
209 # Get the controller information
210
211 # Parse ~/.local/share/juju/controllers.yaml
212 # controllers.testing.api-endpoints|ca-cert|uuid
213 # self.log.debug("Getting controller endpoints")
214 # with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
215 # controllers = yaml.load(f, Loader=yaml.Loader)
216 # controller = controllers["controllers"][cluster_uuid]
217 # endpoints = controller["api-endpoints"]
218 # juju_endpoint = endpoints[0]
219 # juju_ca_cert = controller["ca-cert"]
220
221 # Parse ~/.local/share/juju/accounts
222 # controllers.testing.user|password
223 # self.log.debug("Getting accounts")
224 # with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
225 # controllers = yaml.load(f, Loader=yaml.Loader)
226 # controller = controllers["controllers"][cluster_uuid]
227
228 # juju_user = controller["user"]
229 # juju_secret = controller["password"]
230
231 # config = {
232 # "endpoint": juju_endpoint,
233 # "username": juju_user,
234 # "secret": juju_secret,
235 # "cacert": juju_ca_cert,
236 # "loadbalancer": loadbalancer,
237 # }
238
239 # Store the cluster configuration so it
240 # can be used for subsequent calls
241 kubecfg = tempfile.NamedTemporaryFile()
242 with open(kubecfg.name, "w") as kubecfg_file:
243 kubecfg_file.write(k8s_creds)
244 kubectl = Kubectl(config_file=kubecfg.name)
245
246 # CREATING RESOURCES IN K8S
247 rbac_id = generate_rbac_id()
248 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
249 labels = {RBAC_STACK_PREFIX: rbac_id}
250
251 # Create cleanup dictionary to clean up created resources
252 # if it fails in the middle of the process
253 cleanup_data = []
254 try:
255 self._create_cluster_role(
256 kubectl,
257 name=metadata_name,
258 labels=labels,
259 )
260 cleanup_data.append(
261 {
262 "delete": self._delete_cluster_role,
263 "args": (kubectl, metadata_name),
264 }
265 )
266
267 self._create_service_account(
268 kubectl,
269 name=metadata_name,
270 labels=labels,
271 )
272 cleanup_data.append(
273 {
274 "delete": self._delete_service_account,
275 "args": (kubectl, metadata_name),
276 }
277 )
278
279 self._create_cluster_role_binding(
280 kubectl,
281 name=metadata_name,
282 labels=labels,
283 )
284 cleanup_data.append(
285 {
286 "delete": self._delete_service_account,
287 "args": (kubectl, metadata_name),
288 }
289 )
290 token, client_cert_data = await self._get_secret_data(
291 kubectl,
292 metadata_name,
293 )
294
295 default_storage_class = kubectl.get_default_storage_class()
296 await self.libjuju.add_k8s(
297 name=cluster_uuid,
298 rbac_id=rbac_id,
299 token=token,
300 client_cert_data=client_cert_data,
301 configuration=kubectl.configuration,
302 storage_class=default_storage_class,
303 credential_name=self._get_credential_name(cluster_uuid),
304 )
305 # self.log.debug("Setting config")
306 # await self.set_config(cluster_uuid, config)
307
308 # Test connection
309 # controller = await self.get_controller(cluster_uuid)
310 # await controller.disconnect()
311
312 # TODO: Remove these commented lines
313 # raise Exception("EOL")
314 # self.juju_public_key = None
315 # Login to the k8s cluster
316 # if not self.authenticated:
317 # await self.login(cluster_uuid)
318
319 # We're creating a new cluster
320 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
321 # cluster_uuid=cluster_uuid))
322 # model = await self.get_model(
323 # self.get_namespace(cluster_uuid),
324 # cluster_uuid=cluster_uuid
325 # )
326
327 # Disconnect from the model
328 # if model and model.is_connected():
329 # await model.disconnect()
330
331 return cluster_uuid, True
332 except Exception as e:
333 self.log.error("Error initializing k8scluster: {}".format(e))
334 if len(cleanup_data) > 0:
335 self.log.debug("Cleaning up created resources in k8s cluster...")
336 for item in cleanup_data:
337 delete_function = item["delete"]
338 delete_args = item["args"]
339 delete_function(*delete_args)
340 self.log.debug("Cleanup finished")
341 raise e
342
343 """Repo Management"""
344
345 async def repo_add(
346 self,
347 name: str,
348 url: str,
349 _type: str = "charm",
350 ):
351 raise MethodNotImplemented()
352
353 async def repo_list(self):
354 raise MethodNotImplemented()
355
356 async def repo_remove(
357 self,
358 name: str,
359 ):
360 raise MethodNotImplemented()
361
362 async def synchronize_repos(self, cluster_uuid: str, name: str):
363 """
364 Returns None as currently add_repo is not implemented
365 """
366 return None
367
368 """Reset"""
369
370 async def reset(
371 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
372 ) -> bool:
373 """Reset a cluster
374
375 Resets the Kubernetes cluster by removing the model that represents it.
376
377 :param cluster_uuid str: The UUID of the cluster to reset
378 :return: Returns True if successful or raises an exception.
379 """
380
381 try:
382 # Remove k8scluster from database
383 # self.log.debug("[reset] Removing k8scluster from juju database")
384 # juju_db = self.db.get_one("admin", {"_id": "juju"})
385
386 # for k in juju_db["k8sclusters"]:
387 # if k["_id"] == cluster_uuid:
388 # juju_db["k8sclusters"].remove(k)
389 # self.db.set_one(
390 # table="admin",
391 # q_filter={"_id": "juju"},
392 # update_dict={"k8sclusters": juju_db["k8sclusters"]},
393 # )
394 # break
395
396 # Destroy the controller (via CLI)
397 # self.log.debug("[reset] Destroying controller")
398 # await self.destroy_controller(cluster_uuid)
399 self.log.debug("[reset] Removing k8s cloud")
400 # k8s_cloud = "k8s-{}".format(cluster_uuid)
401 # await self.remove_cloud(k8s_cloud)
402
403 cloud_creds = await self.libjuju.get_cloud_credentials(
404 cluster_uuid,
405 self._get_credential_name(cluster_uuid),
406 )
407
408 await self.libjuju.remove_cloud(cluster_uuid)
409
410 kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
411
412 kubecfg_file = tempfile.NamedTemporaryFile()
413 with open(kubecfg_file.name, "w") as f:
414 f.write(kubecfg)
415 kubectl = Kubectl(config_file=kubecfg_file.name)
416
417 delete_functions = [
418 self._delete_cluster_role_binding,
419 self._delete_service_account,
420 self._delete_cluster_role,
421 ]
422
423 credential_attrs = cloud_creds[0].result["attrs"]
424 if RBAC_LABEL_KEY_NAME in credential_attrs:
425 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
426 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
427 delete_args = (kubectl, metadata_name)
428 for delete_func in delete_functions:
429 try:
430 delete_func(*delete_args)
431 except Exception as e:
432 self.log.warning("Cannot remove resource in K8s {}".format(e))
433
434 except Exception as e:
435 self.log.debug("Caught exception during reset: {}".format(e))
436 raise e
437 return True
438 # TODO: Remove these commented lines
439 # if not self.authenticated:
440 # await self.login(cluster_uuid)
441
442 # if self.controller.is_connected():
443 # # Destroy the model
444 # namespace = self.get_namespace(cluster_uuid)
445 # if await self.has_model(namespace):
446 # self.log.debug("[reset] Destroying model")
447 # await self.controller.destroy_model(namespace, destroy_storage=True)
448
449 # # Disconnect from the controller
450 # self.log.debug("[reset] Disconnecting controller")
451 # await self.logout()
452
453 """Deployment"""
454
455 async def install(
456 self,
457 cluster_uuid: str,
458 kdu_model: str,
459 atomic: bool = True,
460 timeout: float = 1800,
461 params: dict = None,
462 db_dict: dict = None,
463 kdu_name: str = None,
464 namespace: str = None,
465 ) -> bool:
466 """Install a bundle
467
468 :param cluster_uuid str: The UUID of the cluster to install to
469 :param kdu_model str: The name or path of a bundle to install
470 :param atomic bool: If set, waits until the model is active and resets
471 the cluster on failure.
472 :param timeout int: The time, in seconds, to wait for the install
473 to finish
474 :param params dict: Key-value pairs of instantiation parameters
475 :param kdu_name: Name of the KDU instance to be installed
476 :param namespace: K8s namespace to use for the KDU instance
477
478 :return: If successful, returns ?
479 """
480 bundle = kdu_model
481
482 # controller = await self.get_controller(cluster_uuid)
483
484 ##
485 # Get or create the model, based on the NS
486 # uuid.
487
488 if not db_dict:
489 raise K8sException("db_dict must be set")
490 if not bundle:
491 raise K8sException("bundle must be set")
492
493 if bundle.startswith("cs:"):
494 pass
495 elif bundle.startswith("http"):
496 # Download the file
497 pass
498 else:
499 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
500 os.chdir(new_workdir)
501 bundle = "local:{}".format(kdu_model)
502
503 if kdu_name:
504 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
505 else:
506 kdu_instance = db_dict["filter"]["_id"]
507
508 self.log.debug("Checking for model named {}".format(kdu_instance))
509
510 # Create the new model
511 self.log.debug("Adding model: {}".format(kdu_instance))
512 await self.libjuju.add_model(
513 model_name=kdu_instance,
514 cloud_name=cluster_uuid,
515 credential_name=self._get_credential_name(cluster_uuid),
516 )
517
518 # if model:
519 # TODO: Instantiation parameters
520
521 """
522 "Juju bundle that models the KDU, in any of the following ways:
523 - <juju-repo>/<juju-bundle>
524 - <juju-bundle folder under k8s_models folder in the package>
525 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
526 in the package>
527 - <URL_where_to_fetch_juju_bundle>
528 """
529 try:
530 previous_workdir = os.getcwd()
531 except FileNotFoundError:
532 previous_workdir = "/app/storage"
533
534 self.log.debug("[install] deploying {}".format(bundle))
535 await self.libjuju.deploy(
536 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
537 )
538
539 # Get the application
540 # if atomic:
541 # # applications = model.applications
542 # self.log.debug("[install] Applications: {}".format(model.applications))
543 # for name in model.applications:
544 # self.log.debug("[install] Waiting for {} to settle".format(name))
545 # application = model.applications[name]
546 # try:
547 # # It's not enough to wait for all units to be active;
548 # # the application status needs to be active as well.
549 # self.log.debug("Waiting for all units to be active...")
550 # await model.block_until(
551 # lambda: all(
552 # unit.agent_status == "idle"
553 # and application.status in ["active", "unknown"]
554 # and unit.workload_status in ["active", "unknown"]
555 # for unit in application.units
556 # ),
557 # timeout=timeout,
558 # )
559 # self.log.debug("All units active.")
560
561 # # TODO use asyncio.TimeoutError
562 # except concurrent.futures._base.TimeoutError:
563 # os.chdir(previous_workdir)
564 # self.log.debug("[install] Timeout exceeded; resetting cluster")
565 # await self.reset(cluster_uuid)
566 # return False
567
568 # Wait for the application to be active
569 # if model.is_connected():
570 # self.log.debug("[install] Disconnecting model")
571 # await model.disconnect()
572 # await controller.disconnect()
573 os.chdir(previous_workdir)
574
575 return kdu_instance
576
577 async def instances_list(self, cluster_uuid: str) -> list:
578 """
579 returns a list of deployed releases in a cluster
580
581 :param cluster_uuid: the cluster
582 :return:
583 """
584 return []
585
586 async def upgrade(
587 self,
588 cluster_uuid: str,
589 kdu_instance: str,
590 kdu_model: str = None,
591 params: dict = None,
592 ) -> str:
593 """Upgrade a model
594
595 :param cluster_uuid str: The UUID of the cluster to upgrade
596 :param kdu_instance str: The unique name of the KDU instance
597 :param kdu_model str: The name or path of the bundle to upgrade to
598 :param params dict: Key-value pairs of instantiation parameters
599
600 :return: If successful, reference to the new revision number of the
601 KDU instance.
602 """
603
604 # TODO: Loop through the bundle and upgrade each charm individually
605
606 """
607 The API doesn't have a concept of bundle upgrades, because there are
608 many possible changes: charm revision, disk, number of units, etc.
609
610 As such, we are only supporting a limited subset of upgrades. We'll
611 upgrade the charm revision but leave storage and scale untouched.
612
613 Scale changes should happen through OSM constructs, and changes to
614 storage would require a redeployment of the service, at least in this
615 initial release.
616 """
617 raise MethodNotImplemented()
618 # TODO: Remove these commented lines
619
620 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
621
622 # model = None
623 # namespace = self.get_namespace(cluster_uuid)
624 # controller = await self.get_controller(cluster_uuid)
625
626 # try:
627 # if namespace not in await controller.list_models():
628 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
629
630 # model = await controller.get_model(namespace)
631 # with open(kdu_model, "r") as f:
632 # bundle = yaml.safe_load(f)
633
634 # """
635 # {
636 # 'description': 'Test bundle',
637 # 'bundle': 'kubernetes',
638 # 'applications': {
639 # 'mariadb-k8s': {
640 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
641 # 'scale': 1,
642 # 'options': {
643 # 'password': 'manopw',
644 # 'root_password': 'osm4u',
645 # 'user': 'mano'
646 # },
647 # 'series': 'kubernetes'
648 # }
649 # }
650 # }
651 # """
652 # # TODO: This should be returned in an agreed-upon format
653 # for name in bundle["applications"]:
654 # self.log.debug(model.applications)
655 # application = model.applications[name]
656 # self.log.debug(application)
657
658 # path = bundle["applications"][name]["charm"]
659
660 # try:
661 # await application.upgrade_charm(switch=path)
662 # except juju.errors.JujuError as ex:
663 # if "already running charm" in str(ex):
664 # # We're already running this version
665 # pass
666 # finally:
667 # if model:
668 # await model.disconnect()
669 # await controller.disconnect()
670 # return True
671
672 """Rollback"""
673
674 async def rollback(
675 self,
676 cluster_uuid: str,
677 kdu_instance: str,
678 revision: int = 0,
679 ) -> str:
680 """Rollback a model
681
682 :param cluster_uuid str: The UUID of the cluster to rollback
683 :param kdu_instance str: The unique name of the KDU instance
684 :param revision int: The revision to revert to. If omitted, rolls back
685 the previous upgrade.
686
687 :return: If successful, returns the revision of active KDU instance,
688 or raises an exception
689 """
690 raise MethodNotImplemented()
691
692 """Deletion"""
693
694 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
695 """Uninstall a KDU instance
696
697 :param cluster_uuid str: The UUID of the cluster
698 :param kdu_instance str: The unique name of the KDU instance
699
700 :return: Returns True if successful, or raises an exception
701 """
702
703 # controller = await self.get_controller(cluster_uuid)
704
705 self.log.debug("[uninstall] Destroying model")
706
707 await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
708
709 # self.log.debug("[uninstall] Model destroyed and disconnecting")
710 # await controller.disconnect()
711
712 return True
713 # TODO: Remove these commented lines
714 # if not self.authenticated:
715 # self.log.debug("[uninstall] Connecting to controller")
716 # await self.login(cluster_uuid)
717
718 async def exec_primitive(
719 self,
720 cluster_uuid: str = None,
721 kdu_instance: str = None,
722 primitive_name: str = None,
723 timeout: float = 300,
724 params: dict = None,
725 db_dict: dict = None,
726 ) -> str:
727 """Exec primitive (Juju action)
728
729 :param cluster_uuid str: The UUID of the cluster
730 :param kdu_instance str: The unique name of the KDU instance
731 :param primitive_name: Name of action that will be executed
732 :param timeout: Timeout for action execution
733 :param params: Dictionary of all the parameters needed for the action
734 :db_dict: Dictionary for any additional data
735
736 :return: Returns the output of the action
737 """
738
739 # controller = await self.get_controller(cluster_uuid)
740
741 if not params or "application-name" not in params:
742 raise K8sException(
743 "Missing application-name argument, \
744 argument needed for K8s actions"
745 )
746 try:
747 self.log.debug(
748 "[exec_primitive] Getting model "
749 "kdu_instance: {}".format(kdu_instance)
750 )
751 application_name = params["application-name"]
752 actions = await self.libjuju.get_actions(application_name, kdu_instance)
753 if primitive_name not in actions:
754 raise K8sException("Primitive {} not found".format(primitive_name))
755 output, status = await self.libjuju.execute_action(
756 application_name, kdu_instance, primitive_name, **params
757 )
758 # model = await self.get_model(kdu_instance, controller=controller)
759
760 # application_name = params["application-name"]
761 # application = model.applications[application_name]
762
763 # actions = await application.get_actions()
764 # if primitive_name not in actions:
765 # raise K8sException("Primitive {} not found".format(primitive_name))
766
767 # unit = None
768 # for u in application.units:
769 # if await u.is_leader_from_status():
770 # unit = u
771 # break
772
773 # if unit is None:
774 # raise K8sException("No leader unit found to execute action")
775
776 # self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
777 # action = await unit.run_action(primitive_name, **params)
778
779 # output = await model.get_action_output(action_uuid=action.entity_id)
780 # status = await model.get_action_status(uuid_or_prefix=action.entity_id)
781
782 # status = (
783 # status[action.entity_id] if action.entity_id in status else "failed"
784 # )
785
786 if status != "completed":
787 raise K8sException(
788 "status is not completed: {} output: {}".format(status, output)
789 )
790
791 return output
792
793 except Exception as e:
794 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
795 self.log.error(error_msg)
796 raise K8sException(message=error_msg)
797 # finally:
798 # await controller.disconnect()
799 # TODO: Remove these commented lines:
800 # if not self.authenticated:
801 # self.log.debug("[exec_primitive] Connecting to controller")
802 # await self.login(cluster_uuid)
803
804 """Introspection"""
805
806 async def inspect_kdu(
807 self,
808 kdu_model: str,
809 ) -> dict:
810 """Inspect a KDU
811
812 Inspects a bundle and returns a dictionary of config parameters and
813 their default values.
814
815 :param kdu_model str: The name or path of the bundle to inspect.
816
817 :return: If successful, returns a dictionary of available parameters
818 and their default values.
819 """
820
821 kdu = {}
822 if not os.path.exists(kdu_model):
823 raise K8sException("file {} not found".format(kdu_model))
824
825 with open(kdu_model, "r") as f:
826 bundle = yaml.safe_load(f.read())
827
828 """
829 {
830 'description': 'Test bundle',
831 'bundle': 'kubernetes',
832 'applications': {
833 'mariadb-k8s': {
834 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
835 'scale': 1,
836 'options': {
837 'password': 'manopw',
838 'root_password': 'osm4u',
839 'user': 'mano'
840 },
841 'series': 'kubernetes'
842 }
843 }
844 }
845 """
846 # TODO: This should be returned in an agreed-upon format
847 kdu = bundle["applications"]
848
849 return kdu
850
851 async def help_kdu(
852 self,
853 kdu_model: str,
854 ) -> str:
855 """View the README
856
857 If available, returns the README of the bundle.
858
859 :param kdu_model str: The name or path of a bundle
860
861 :return: If found, returns the contents of the README.
862 """
863 readme = None
864
865 files = ["README", "README.txt", "README.md"]
866 path = os.path.dirname(kdu_model)
867 for file in os.listdir(path):
868 if file in files:
869 with open(file, "r") as f:
870 readme = f.read()
871 break
872
873 return readme
874
875 async def status_kdu(
876 self,
877 cluster_uuid: str,
878 kdu_instance: str,
879 ) -> dict:
880 """Get the status of the KDU
881
882 Get the current status of the KDU instance.
883
884 :param cluster_uuid str: The UUID of the cluster
885 :param kdu_instance str: The unique id of the KDU instance
886
887 :return: Returns a dictionary containing namespace, state, resources,
888 and deployment_time.
889 """
890 status = {}
891 # controller = await self.get_controller(cluster_uuid)
892 # model = await self.get_model(kdu_instance, controller=controller)
893
894 # model_status = await model.get_status()
895 # status = model_status.applications
896 model_status = await self.libjuju.get_model_status(kdu_instance)
897 for name in model_status.applications:
898 application = model_status.applications[name]
899 status[name] = {"status": application["status"]["status"]}
900
901 # await model.disconnect()
902 # await controller.disconnect()
903
904 return status
905
906 async def get_services(
907 self, cluster_uuid: str, kdu_instance: str, namespace: str
908 ) -> list:
909 """Return a list of services of a kdu_instance"""
910
911 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
912
913 # config_path = "/tmp/{}".format(cluster_uuid)
914 # config_file = "{}/config".format(config_path)
915
916 # if not os.path.exists(config_path):
917 # os.makedirs(config_path)
918 # with open(config_file, "w") as f:
919 # f.write(credentials)
920
921 kubecfg = tempfile.NamedTemporaryFile()
922 with open(kubecfg.name, "w") as kubecfg_file:
923 kubecfg_file.write(credentials)
924 kubectl = Kubectl(config_file=kubecfg.name)
925
926 return kubectl.get_services(
927 field_selector="metadata.namespace={}".format(kdu_instance)
928 )
929
930 async def get_service(
931 self, cluster_uuid: str, service_name: str, namespace: str
932 ) -> object:
933 """Return data for a specific service inside a namespace"""
934
935 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
936
937 # config_path = "/tmp/{}".format(cluster_uuid)
938 # config_file = "{}/config".format(config_path)
939
940 # if not os.path.exists(config_path):
941 # os.makedirs(config_path)
942 # with open(config_file, "w") as f:
943 # f.write(credentials)
944
945 kubecfg = tempfile.NamedTemporaryFile()
946 with open(kubecfg.name, "w") as kubecfg_file:
947 kubecfg_file.write(credentials)
948 kubectl = Kubectl(config_file=kubecfg.name)
949
950 return kubectl.get_services(
951 field_selector="metadata.name={},metadata.namespace={}".format(
952 service_name, namespace
953 )
954 )[0]
955
956 # Private methods
957 # async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
958 # """Add a k8s cloud to Juju
959
960 # Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
961 # Juju Controller.
962
963 # :param cloud_name str: The name of the cloud to add.
964 # :param credentials dict: A dictionary representing the output of
965 # `kubectl config view --raw`.
966
967 # :returns: True if successful, otherwise raises an exception.
968 # """
969
970 # cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
971 # self.log.debug(cmd)
972
973 # process = await asyncio.create_subprocess_exec(
974 # *cmd,
975 # stdout=asyncio.subprocess.PIPE,
976 # stderr=asyncio.subprocess.PIPE,
977 # stdin=asyncio.subprocess.PIPE,
978 # )
979
980 # # Feed the process the credentials
981 # process.stdin.write(credentials.encode("utf-8"))
982 # await process.stdin.drain()
983 # process.stdin.close()
984
985 # _stdout, stderr = await process.communicate()
986
987 # return_code = process.returncode
988
989 # self.log.debug("add-k8s return code: {}".format(return_code))
990
991 # if return_code > 0:
992 # raise Exception(stderr)
993
994 # return True
995
996 # async def add_model(
997 # self, model_name: str, cluster_uuid: str, controller: Controller
998 # ) -> Model:
999 # """Adds a model to the controller
1000
1001 # Adds a new model to the Juju controller
1002
1003 # :param model_name str: The name of the model to add.
1004 # :param cluster_uuid str: ID of the cluster.
1005 # :param controller: Controller object in which the model will be added
1006 # :returns: The juju.model.Model object of the new model upon success or
1007 # raises an exception.
1008 # """
1009
1010 # self.log.debug(
1011 # "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
1012 # )
1013 # model = None
1014 # try:
1015 # if self.juju_public_key is not None:
1016 # model = await controller.add_model(
1017 # model_name, config={"authorized-keys": self.juju_public_key}
1018 # )
1019 # else:
1020 # model = await controller.add_model(model_name)
1021 # except Exception as ex:
1022 # self.log.debug(ex)
1023 # self.log.debug("Caught exception: {}".format(ex))
1024 # pass
1025
1026 # return model
1027
1028 # async def bootstrap(
1029 # self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
1030 # ) -> bool:
1031 # """Bootstrap a Kubernetes controller
1032
1033 # Bootstrap a Juju controller inside the Kubernetes cluster
1034
1035 # :param cloud_name str: The name of the cloud.
1036 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1037 # :param loadbalancer bool: If the controller should use loadbalancer or not.
1038 # :returns: True upon success or raises an exception.
1039 # """
1040
1041 # if not loadbalancer:
1042 # cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
1043 # else:
1044 # """
1045 # For public clusters, specify that the controller service is using a
1046 # LoadBalancer.
1047 # """
1048 # cmd = [
1049 # self.juju_command,
1050 # "bootstrap",
1051 # cloud_name,
1052 # cluster_uuid,
1053 # "--config",
1054 # "controller-service-type=loadbalancer",
1055 # ]
1056
1057 # self.log.debug(
1058 # "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
1059 # )
1060
1061 # process = await asyncio.create_subprocess_exec(
1062 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1063 # )
1064
1065 # _stdout, stderr = await process.communicate()
1066
1067 # return_code = process.returncode
1068
1069 # if return_code > 0:
1070 # #
1071 # if b"already exists" not in stderr:
1072 # raise Exception(stderr)
1073
1074 # return True
1075
1076 # async def destroy_controller(self, cluster_uuid: str) -> bool:
1077 # """Destroy a Kubernetes controller
1078
1079 # Destroy an existing Kubernetes controller.
1080
1081 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1082 # :returns: True upon success or raises an exception.
1083 # """
1084 # cmd = [
1085 # self.juju_command,
1086 # "destroy-controller",
1087 # "--destroy-all-models",
1088 # "--destroy-storage",
1089 # "-y",
1090 # cluster_uuid,
1091 # ]
1092
1093 # process = await asyncio.create_subprocess_exec(
1094 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1095 # )
1096
1097 # _stdout, stderr = await process.communicate()
1098
1099 # return_code = process.returncode
1100
1101 # if return_code > 0:
1102 # #
1103 # if "already exists" not in stderr:
1104 # raise Exception(stderr)
1105
1106 def get_credentials(self, cluster_uuid: str) -> str:
1107 """
1108 Get Cluster Kubeconfig
1109 """
1110 k8scluster = self.db.get_one(
1111 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
1112 )
1113
1114 self.db.encrypt_decrypt_fields(
1115 k8scluster.get("credentials"),
1116 "decrypt",
1117 ["password", "secret"],
1118 schema_version=k8scluster["schema_version"],
1119 salt=k8scluster["_id"],
1120 )
1121
1122 return yaml.safe_dump(k8scluster.get("credentials"))
1123
1124 def _get_credential_name(self, cluster_uuid: str) -> str:
1125 """
1126 Get credential name for a k8s cloud
1127
1128 We cannot use the cluster_uuid for the credential name directly,
1129 because it cannot start with a number, it must start with a letter.
1130 Therefore, the k8s cloud credential name will be "cred-" followed
1131 by the cluster uuid.
1132
1133 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
1134
1135 :return: Name to use for the credential name.
1136 """
1137 return "cred-{}".format(cluster_uuid)
1138
1139 # def get_config(self, cluster_uuid: str,) -> dict:
1140 # """Get the cluster configuration
1141
1142 # Gets the configuration of the cluster
1143
1144 # :param cluster_uuid str: The UUID of the cluster.
1145 # :return: A dict upon success, or raises an exception.
1146 # """
1147
1148 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1149 # config = None
1150 # for k in juju_db["k8sclusters"]:
1151 # if k["_id"] == cluster_uuid:
1152 # config = k["config"]
1153 # self.db.encrypt_decrypt_fields(
1154 # config,
1155 # "decrypt",
1156 # ["secret", "cacert"],
1157 # schema_version="1.1",
1158 # salt=k["_id"],
1159 # )
1160 # break
1161 # if not config:
1162 # raise Exception(
1163 # "Unable to locate configuration for cluster {}".format(cluster_uuid)
1164 # )
1165 # return config
1166
1167 # async def get_model(self, model_name: str, controller: Controller) -> Model:
1168 # """Get a model from the Juju Controller.
1169
1170 # Note: Model objects returned must call disconnected() before it goes
1171 # out of scope.
1172
1173 # :param model_name str: The name of the model to get
1174 # :param controller Controller: Controller object
1175 # :return The juju.model.Model object if found, or None.
1176 # """
1177
1178 # models = await controller.list_models()
1179 # if model_name not in models:
1180 # raise N2VCNotFound("Model {} not found".format(model_name))
1181 # self.log.debug("Found model: {}".format(model_name))
1182 # return await controller.get_model(model_name)
1183
1184 def get_namespace(
1185 self,
1186 cluster_uuid: str,
1187 ) -> str:
1188 """Get the namespace UUID
1189 Gets the namespace's unique name
1190
1191 :param cluster_uuid str: The UUID of the cluster
1192 :returns: The namespace UUID, or raises an exception
1193 """
1194 # config = self.get_config(cluster_uuid)
1195
1196 # Make sure the name is in the config
1197 # if "namespace" not in config:
1198 # raise Exception("Namespace not found.")
1199
1200 # TODO: We want to make sure this is unique to the cluster, in case
1201 # the cluster is being reused.
1202 # Consider pre/appending the cluster id to the namespace string
1203 pass
1204
1205 # TODO: Remove these lines of code
1206 # async def has_model(self, model_name: str) -> bool:
1207 # """Check if a model exists in the controller
1208
1209 # Checks to see if a model exists in the connected Juju controller.
1210
1211 # :param model_name str: The name of the model
1212 # :return: A boolean indicating if the model exists
1213 # """
1214 # models = await self.controller.list_models()
1215
1216 # if model_name in models:
1217 # return True
1218 # return False
1219
1220 # def is_local_k8s(self, credentials: str,) -> bool:
1221 # """Check if a cluster is local
1222
1223 # Checks if a cluster is running in the local host
1224
1225 # :param credentials dict: A dictionary containing the k8s credentials
1226 # :returns: A boolean if the cluster is running locally
1227 # """
1228
1229 # creds = yaml.safe_load(credentials)
1230
1231 # if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1232 # for cluster in creds["clusters"]:
1233 # if "server" in cluster["cluster"]:
1234 # if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1235 # return True
1236
1237 # return False
1238
1239 # async def get_controller(self, cluster_uuid):
1240 # """Login to the Juju controller."""
1241
1242 # config = self.get_config(cluster_uuid)
1243
1244 # juju_endpoint = config["endpoint"]
1245 # juju_user = config["username"]
1246 # juju_secret = config["secret"]
1247 # juju_ca_cert = config["cacert"]
1248
1249 # controller = Controller()
1250
1251 # if juju_secret:
1252 # self.log.debug(
1253 # "Connecting to controller... ws://{} as {}".format(
1254 # juju_endpoint, juju_user,
1255 # )
1256 # )
1257 # try:
1258 # await controller.connect(
1259 # endpoint=juju_endpoint,
1260 # username=juju_user,
1261 # password=juju_secret,
1262 # cacert=juju_ca_cert,
1263 # )
1264 # self.log.debug("JujuApi: Logged into controller")
1265 # return controller
1266 # except Exception as ex:
1267 # self.log.debug(ex)
1268 # self.log.debug("Caught exception: {}".format(ex))
1269 # else:
1270 # self.log.fatal("VCA credentials not configured.")
1271
1272 # TODO: Remove these commented lines
1273 # self.authenticated = False
1274 # if self.authenticated:
1275 # return
1276
1277 # self.connecting = True
1278 # juju_public_key = None
1279 # self.authenticated = True
1280 # Test: Make sure we have the credentials loaded
1281 # async def logout(self):
1282 # """Logout of the Juju controller."""
1283 # self.log.debug("[logout]")
1284 # if not self.authenticated:
1285 # return False
1286
1287 # for model in self.models:
1288 # self.log.debug("Logging out of model {}".format(model))
1289 # await self.models[model].disconnect()
1290
1291 # if self.controller:
1292 # self.log.debug("Disconnecting controller {}".format(self.controller))
1293 # await self.controller.disconnect()
1294 # self.controller = None
1295
1296 # self.authenticated = False
1297
1298 # async def remove_cloud(self, cloud_name: str,) -> bool:
1299 # """Remove a k8s cloud from Juju
1300
1301 # Removes a Kubernetes cloud from Juju.
1302
1303 # :param cloud_name str: The name of the cloud to add.
1304
1305 # :returns: True if successful, otherwise raises an exception.
1306 # """
1307
1308 # # Remove the bootstrapped controller
1309 # cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1310 # process = await asyncio.create_subprocess_exec(
1311 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1312 # )
1313
1314 # _stdout, stderr = await process.communicate()
1315
1316 # return_code = process.returncode
1317
1318 # if return_code > 0:
1319 # raise Exception(stderr)
1320
1321 # # Remove the cloud from the local config
1322 # cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1323 # process = await asyncio.create_subprocess_exec(
1324 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1325 # )
1326
1327 # _stdout, stderr = await process.communicate()
1328
1329 # return_code = process.returncode
1330
1331 # if return_code > 0:
1332 # raise Exception(stderr)
1333
1334 # return True
1335
1336 # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1337 # """Save the cluster configuration
1338
1339 # Saves the cluster information to the Mongo database
1340
1341 # :param cluster_uuid str: The UUID of the cluster
1342 # :param config dict: A dictionary containing the cluster configuration
1343 # """
1344
1345 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1346
1347 # k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1348 # self.db.encrypt_decrypt_fields(
1349 # config,
1350 # "encrypt",
1351 # ["secret", "cacert"],
1352 # schema_version="1.1",
1353 # salt=cluster_uuid,
1354 # )
1355 # k8sclusters.append({"_id": cluster_uuid, "config": config})
1356 # self.db.set_one(
1357 # table="admin",
1358 # q_filter={"_id": "juju"},
1359 # update_dict={"k8sclusters": k8sclusters},
1360 # )
1361
1362 # Private methods to create/delete needed resources in the
1363 # Kubernetes cluster to create the K8s cloud in Juju
1364
1365 def _create_cluster_role(
1366 self,
1367 kubectl: Kubectl,
1368 name: str,
1369 labels: Dict[str, str],
1370 ):
1371 cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
1372 field_selector="metadata.name={}".format(name)
1373 )
1374
1375 if len(cluster_roles.items) > 0:
1376 raise Exception(
1377 "Cluster role with metadata.name={} already exists".format(name)
1378 )
1379
1380 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
1381 # Cluster role
1382 cluster_role = V1ClusterRole(
1383 metadata=metadata,
1384 rules=[
1385 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
1386 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
1387 ],
1388 )
1389
1390 kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
1391
1392 def _delete_cluster_role(self, kubectl: Kubectl, name: str):
1393 kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
1394
1395 def _create_service_account(
1396 self,
1397 kubectl: Kubectl,
1398 name: str,
1399 labels: Dict[str, str],
1400 ):
1401 service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
1402 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
1403 )
1404 if len(service_accounts.items) > 0:
1405 raise Exception(
1406 "Service account with metadata.name={} already exists".format(name)
1407 )
1408
1409 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
1410 service_account = V1ServiceAccount(metadata=metadata)
1411
1412 kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
1413 ADMIN_NAMESPACE, service_account
1414 )
1415
1416 def _delete_service_account(self, kubectl: Kubectl, name: str):
1417 kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
1418 name, ADMIN_NAMESPACE
1419 )
1420
1421 def _create_cluster_role_binding(
1422 self,
1423 kubectl: Kubectl,
1424 name: str,
1425 labels: Dict[str, str],
1426 ):
1427 role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
1428 field_selector="metadata.name={}".format(name)
1429 )
1430 if len(role_bindings.items) > 0:
1431 raise Exception("Generated rbac id already exists")
1432
1433 role_binding = V1ClusterRoleBinding(
1434 metadata=V1ObjectMeta(name=name, labels=labels),
1435 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
1436 subjects=[
1437 V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
1438 ],
1439 )
1440 kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
1441
1442 def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
1443 kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
1444
1445 async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
1446 v1_core = kubectl.clients[CORE_CLIENT]
1447
1448 retries_limit = 10
1449 secret_name = None
1450 while True:
1451 retries_limit -= 1
1452 service_accounts = v1_core.list_namespaced_service_account(
1453 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
1454 )
1455 if len(service_accounts.items) == 0:
1456 raise Exception(
1457 "Service account not found with metadata.name={}".format(name)
1458 )
1459 service_account = service_accounts.items[0]
1460 if service_account.secrets and len(service_account.secrets) > 0:
1461 secret_name = service_account.secrets[0].name
1462 if secret_name is not None or not retries_limit:
1463 break
1464 if not secret_name:
1465 raise Exception(
1466 "Failed getting the secret from service account {}".format(name)
1467 )
1468 secret = v1_core.list_namespaced_secret(
1469 ADMIN_NAMESPACE,
1470 field_selector="metadata.name={}".format(secret_name),
1471 ).items[0]
1472
1473 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
1474 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
1475
1476 return (
1477 base64.b64decode(token).decode("utf-8"),
1478 base64.b64decode(client_certificate_data).decode("utf-8"),
1479 )