Add ModelConfig
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import os
17 import uuid
18 import yaml
19 import tempfile
20 import binascii
21 import base64
22
23 from n2vc.config import ModelConfig
24 from n2vc.exceptions import K8sException, N2VCBadArgumentsException
25 from n2vc.k8s_conn import K8sConnector
26 from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
27 from .exceptions import MethodNotImplemented
28 from n2vc.utils import base64_to_cacert
29 from n2vc.libjuju import Libjuju
30
31 from kubernetes.client.models import (
32 V1ClusterRole,
33 V1ObjectMeta,
34 V1PolicyRule,
35 V1ServiceAccount,
36 V1ClusterRoleBinding,
37 V1RoleRef,
38 V1Subject,
39 )
40
41 from typing import Dict
42
43 SERVICE_ACCOUNT_TOKEN_KEY = "token"
44 SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
45 RBAC_LABEL_KEY_NAME = "rbac-id"
46
47 ADMIN_NAMESPACE = "kube-system"
48 RBAC_STACK_PREFIX = "juju-credential"
49
50 # from juju.bundle import BundleHandler
51 # import re
52 # import ssl
53 # from .vnf import N2VC
54
55
56 def generate_rbac_id():
57 return binascii.hexlify(os.urandom(4)).decode()
58
59
60 class K8sJujuConnector(K8sConnector):
61 def __init__(
62 self,
63 fs: object,
64 db: object,
65 kubectl_command: str = "/usr/bin/kubectl",
66 juju_command: str = "/usr/bin/juju",
67 log: object = None,
68 loop: object = None,
69 on_update_db=None,
70 vca_config: dict = None,
71 ):
72 """
73 :param fs: file system for kubernetes and helm configuration
74 :param db: Database object
75 :param kubectl_command: path to kubectl executable
76 :param helm_command: path to helm executable
77 :param log: logger
78 :param: loop: Asyncio loop
79 """
80
81 # parent class
82 K8sConnector.__init__(
83 self,
84 db,
85 log=log,
86 on_update_db=on_update_db,
87 )
88
89 self.fs = fs
90 self.loop = loop or asyncio.get_event_loop()
91 self.log.debug("Initializing K8S Juju connector")
92
93 required_vca_config = [
94 "host",
95 "user",
96 "secret",
97 "ca_cert",
98 ]
99 if not vca_config or not all(k in vca_config for k in required_vca_config):
100 raise N2VCBadArgumentsException(
101 message="Missing arguments in vca_config: {}".format(vca_config),
102 bad_args=required_vca_config,
103 )
104 port = vca_config["port"] if "port" in vca_config else 17070
105 url = "{}:{}".format(vca_config["host"], port)
106 model_config = ModelConfig(vca_config)
107 username = vca_config["user"]
108 secret = vca_config["secret"]
109 ca_cert = base64_to_cacert(vca_config["ca_cert"])
110
111 self.libjuju = Libjuju(
112 endpoint=url,
113 api_proxy=None, # Not needed for k8s charms
114 model_config=model_config,
115 username=username,
116 password=secret,
117 cacert=ca_cert,
118 loop=self.loop,
119 log=self.log,
120 db=self.db,
121 )
122 self.log.debug("K8S Juju connector initialized")
123 # TODO: Remove these commented lines:
124 # self.authenticated = False
125 # self.models = {}
126 # self.juju_secret = ""
127
128 """Initialization"""
129
130 async def init_env(
131 self,
132 k8s_creds: str,
133 namespace: str = "kube-system",
134 reuse_cluster_uuid: str = None,
135 ) -> (str, bool):
136 """
137 It prepares a given K8s cluster environment to run Juju bundles.
138
139 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
140 '.kube/config'
141 :param namespace: optional namespace to be used for juju. By default,
142 'kube-system' will be used
143 :param reuse_cluster_uuid: existing cluster uuid for reuse
144 :return: uuid of the K8s cluster and True if connector has installed some
145 software in the cluster
146 (on error, an exception will be raised)
147 """
148
149 # """Bootstrapping
150
151 # Bootstrapping cannot be done, by design, through the API. We need to
152 # use the CLI tools.
153 # """
154
155 # """
156 # WIP: Workflow
157
158 # 1. Has the environment already been bootstrapped?
159 # - Check the database to see if we have a record for this env
160
161 # 2. If this is a new env, create it
162 # - Add the k8s cloud to Juju
163 # - Bootstrap
164 # - Record it in the database
165
166 # 3. Connect to the Juju controller for this cloud
167
168 # """
169 # cluster_uuid = reuse_cluster_uuid
170 # if not cluster_uuid:
171 # cluster_uuid = str(uuid4())
172
173 ##################################################
174 # TODO: Pull info from db based on the namespace #
175 ##################################################
176
177 ###################################################
178 # TODO: Make it idempotent, calling add-k8s and #
179 # bootstrap whenever reuse_cluster_uuid is passed #
180 # as parameter #
181 # `init_env` is called to initialize the K8s #
182 # cluster for juju. If this initialization fails, #
183 # it can be called again by LCM with the param #
184 # reuse_cluster_uuid, e.g. to try to fix it. #
185 ###################################################
186
187 # This is a new cluster, so bootstrap it
188
189 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
190
191 # Is a local k8s cluster?
192 # localk8s = self.is_local_k8s(k8s_creds)
193
194 # If the k8s is external, the juju controller needs a loadbalancer
195 # loadbalancer = False if localk8s else True
196
197 # Name the new k8s cloud
198 # k8s_cloud = "k8s-{}".format(cluster_uuid)
199
200 # self.log.debug("Adding k8s cloud {}".format(k8s_cloud))
201 # await self.add_k8s(k8s_cloud, k8s_creds)
202
203 # Bootstrap Juju controller
204 # self.log.debug("Bootstrapping...")
205 # await self.bootstrap(k8s_cloud, cluster_uuid, loadbalancer)
206 # self.log.debug("Bootstrap done.")
207
208 # Get the controller information
209
210 # Parse ~/.local/share/juju/controllers.yaml
211 # controllers.testing.api-endpoints|ca-cert|uuid
212 # self.log.debug("Getting controller endpoints")
213 # with open(os.path.expanduser("~/.local/share/juju/controllers.yaml")) as f:
214 # controllers = yaml.load(f, Loader=yaml.Loader)
215 # controller = controllers["controllers"][cluster_uuid]
216 # endpoints = controller["api-endpoints"]
217 # juju_endpoint = endpoints[0]
218 # juju_ca_cert = controller["ca-cert"]
219
220 # Parse ~/.local/share/juju/accounts
221 # controllers.testing.user|password
222 # self.log.debug("Getting accounts")
223 # with open(os.path.expanduser("~/.local/share/juju/accounts.yaml")) as f:
224 # controllers = yaml.load(f, Loader=yaml.Loader)
225 # controller = controllers["controllers"][cluster_uuid]
226
227 # juju_user = controller["user"]
228 # juju_secret = controller["password"]
229
230 # config = {
231 # "endpoint": juju_endpoint,
232 # "username": juju_user,
233 # "secret": juju_secret,
234 # "cacert": juju_ca_cert,
235 # "loadbalancer": loadbalancer,
236 # }
237
238 # Store the cluster configuration so it
239 # can be used for subsequent calls
240 kubecfg = tempfile.NamedTemporaryFile()
241 with open(kubecfg.name, "w") as kubecfg_file:
242 kubecfg_file.write(k8s_creds)
243 kubectl = Kubectl(config_file=kubecfg.name)
244
245 # CREATING RESOURCES IN K8S
246 rbac_id = generate_rbac_id()
247 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
248 labels = {RBAC_STACK_PREFIX: rbac_id}
249
250 # Create cleanup dictionary to clean up created resources
251 # if it fails in the middle of the process
252 cleanup_data = []
253 try:
254 self._create_cluster_role(
255 kubectl,
256 name=metadata_name,
257 labels=labels,
258 )
259 cleanup_data.append(
260 {
261 "delete": self._delete_cluster_role,
262 "args": (kubectl, metadata_name),
263 }
264 )
265
266 self._create_service_account(
267 kubectl,
268 name=metadata_name,
269 labels=labels,
270 )
271 cleanup_data.append(
272 {
273 "delete": self._delete_service_account,
274 "args": (kubectl, metadata_name),
275 }
276 )
277
278 self._create_cluster_role_binding(
279 kubectl,
280 name=metadata_name,
281 labels=labels,
282 )
283 cleanup_data.append(
284 {
285 "delete": self._delete_service_account,
286 "args": (kubectl, metadata_name),
287 }
288 )
289 token, client_cert_data = await self._get_secret_data(
290 kubectl,
291 metadata_name,
292 )
293
294 default_storage_class = kubectl.get_default_storage_class()
295 await self.libjuju.add_k8s(
296 name=cluster_uuid,
297 rbac_id=rbac_id,
298 token=token,
299 client_cert_data=client_cert_data,
300 configuration=kubectl.configuration,
301 storage_class=default_storage_class,
302 credential_name=self._get_credential_name(cluster_uuid),
303 )
304 # self.log.debug("Setting config")
305 # await self.set_config(cluster_uuid, config)
306
307 # Test connection
308 # controller = await self.get_controller(cluster_uuid)
309 # await controller.disconnect()
310
311 # TODO: Remove these commented lines
312 # raise Exception("EOL")
313 # self.juju_public_key = None
314 # Login to the k8s cluster
315 # if not self.authenticated:
316 # await self.login(cluster_uuid)
317
318 # We're creating a new cluster
319 # print("Getting model {}".format(self.get_namespace(cluster_uuid),
320 # cluster_uuid=cluster_uuid))
321 # model = await self.get_model(
322 # self.get_namespace(cluster_uuid),
323 # cluster_uuid=cluster_uuid
324 # )
325
326 # Disconnect from the model
327 # if model and model.is_connected():
328 # await model.disconnect()
329
330 return cluster_uuid, True
331 except Exception as e:
332 self.log.error("Error initializing k8scluster: {}".format(e))
333 if len(cleanup_data) > 0:
334 self.log.debug("Cleaning up created resources in k8s cluster...")
335 for item in cleanup_data:
336 delete_function = item["delete"]
337 delete_args = item["args"]
338 delete_function(*delete_args)
339 self.log.debug("Cleanup finished")
340 raise e
341
342 """Repo Management"""
343
344 async def repo_add(
345 self,
346 name: str,
347 url: str,
348 _type: str = "charm",
349 ):
350 raise MethodNotImplemented()
351
352 async def repo_list(self):
353 raise MethodNotImplemented()
354
355 async def repo_remove(
356 self,
357 name: str,
358 ):
359 raise MethodNotImplemented()
360
361 async def synchronize_repos(self, cluster_uuid: str, name: str):
362 """
363 Returns None as currently add_repo is not implemented
364 """
365 return None
366
367 """Reset"""
368
369 async def reset(
370 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
371 ) -> bool:
372 """Reset a cluster
373
374 Resets the Kubernetes cluster by removing the model that represents it.
375
376 :param cluster_uuid str: The UUID of the cluster to reset
377 :return: Returns True if successful or raises an exception.
378 """
379
380 try:
381 # Remove k8scluster from database
382 # self.log.debug("[reset] Removing k8scluster from juju database")
383 # juju_db = self.db.get_one("admin", {"_id": "juju"})
384
385 # for k in juju_db["k8sclusters"]:
386 # if k["_id"] == cluster_uuid:
387 # juju_db["k8sclusters"].remove(k)
388 # self.db.set_one(
389 # table="admin",
390 # q_filter={"_id": "juju"},
391 # update_dict={"k8sclusters": juju_db["k8sclusters"]},
392 # )
393 # break
394
395 # Destroy the controller (via CLI)
396 # self.log.debug("[reset] Destroying controller")
397 # await self.destroy_controller(cluster_uuid)
398 self.log.debug("[reset] Removing k8s cloud")
399 # k8s_cloud = "k8s-{}".format(cluster_uuid)
400 # await self.remove_cloud(k8s_cloud)
401
402 cloud_creds = await self.libjuju.get_cloud_credentials(
403 cluster_uuid,
404 self._get_credential_name(cluster_uuid),
405 )
406
407 await self.libjuju.remove_cloud(cluster_uuid)
408
409 kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
410
411 kubecfg_file = tempfile.NamedTemporaryFile()
412 with open(kubecfg_file.name, "w") as f:
413 f.write(kubecfg)
414 kubectl = Kubectl(config_file=kubecfg_file.name)
415
416 delete_functions = [
417 self._delete_cluster_role_binding,
418 self._delete_service_account,
419 self._delete_cluster_role,
420 ]
421
422 credential_attrs = cloud_creds[0].result["attrs"]
423 if RBAC_LABEL_KEY_NAME in credential_attrs:
424 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
425 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
426 delete_args = (kubectl, metadata_name)
427 for delete_func in delete_functions:
428 try:
429 delete_func(*delete_args)
430 except Exception as e:
431 self.log.warning("Cannot remove resource in K8s {}".format(e))
432
433 except Exception as e:
434 self.log.debug("Caught exception during reset: {}".format(e))
435 raise e
436 return True
437 # TODO: Remove these commented lines
438 # if not self.authenticated:
439 # await self.login(cluster_uuid)
440
441 # if self.controller.is_connected():
442 # # Destroy the model
443 # namespace = self.get_namespace(cluster_uuid)
444 # if await self.has_model(namespace):
445 # self.log.debug("[reset] Destroying model")
446 # await self.controller.destroy_model(namespace, destroy_storage=True)
447
448 # # Disconnect from the controller
449 # self.log.debug("[reset] Disconnecting controller")
450 # await self.logout()
451
452 """Deployment"""
453
454 async def install(
455 self,
456 cluster_uuid: str,
457 kdu_model: str,
458 kdu_instance: str,
459 atomic: bool = True,
460 timeout: float = 1800,
461 params: dict = None,
462 db_dict: dict = None,
463 kdu_name: str = None,
464 namespace: str = None,
465 ) -> bool:
466 """Install a bundle
467
468 :param cluster_uuid str: The UUID of the cluster to install to
469 :param kdu_model str: The name or path of a bundle to install
470 :param kdu_instance: Kdu instance name
471 :param atomic bool: If set, waits until the model is active and resets
472 the cluster on failure.
473 :param timeout int: The time, in seconds, to wait for the install
474 to finish
475 :param params dict: Key-value pairs of instantiation parameters
476 :param kdu_name: Name of the KDU instance to be installed
477 :param namespace: K8s namespace to use for the KDU instance
478
479 :return: If successful, returns ?
480 """
481 bundle = kdu_model
482
483 # controller = await self.get_controller(cluster_uuid)
484
485 ##
486 # Get or create the model, based on the NS
487 # uuid.
488
489 if not db_dict:
490 raise K8sException("db_dict must be set")
491 if not bundle:
492 raise K8sException("bundle must be set")
493
494 if bundle.startswith("cs:"):
495 pass
496 elif bundle.startswith("http"):
497 # Download the file
498 pass
499 else:
500 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
501 os.chdir(new_workdir)
502 bundle = "local:{}".format(kdu_model)
503
504 self.log.debug("Checking for model named {}".format(kdu_instance))
505
506 # Create the new model
507 self.log.debug("Adding model: {}".format(kdu_instance))
508 await self.libjuju.add_model(
509 model_name=kdu_instance,
510 cloud_name=cluster_uuid,
511 credential_name=self._get_credential_name(cluster_uuid),
512 )
513
514 # if model:
515 # TODO: Instantiation parameters
516
517 """
518 "Juju bundle that models the KDU, in any of the following ways:
519 - <juju-repo>/<juju-bundle>
520 - <juju-bundle folder under k8s_models folder in the package>
521 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
522 in the package>
523 - <URL_where_to_fetch_juju_bundle>
524 """
525 try:
526 previous_workdir = os.getcwd()
527 except FileNotFoundError:
528 previous_workdir = "/app/storage"
529
530 self.log.debug("[install] deploying {}".format(bundle))
531 await self.libjuju.deploy(
532 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
533 )
534
535 # Get the application
536 # if atomic:
537 # # applications = model.applications
538 # self.log.debug("[install] Applications: {}".format(model.applications))
539 # for name in model.applications:
540 # self.log.debug("[install] Waiting for {} to settle".format(name))
541 # application = model.applications[name]
542 # try:
543 # # It's not enough to wait for all units to be active;
544 # # the application status needs to be active as well.
545 # self.log.debug("Waiting for all units to be active...")
546 # await model.block_until(
547 # lambda: all(
548 # unit.agent_status == "idle"
549 # and application.status in ["active", "unknown"]
550 # and unit.workload_status in ["active", "unknown"]
551 # for unit in application.units
552 # ),
553 # timeout=timeout,
554 # )
555 # self.log.debug("All units active.")
556
557 # # TODO use asyncio.TimeoutError
558 # except concurrent.futures._base.TimeoutError:
559 # os.chdir(previous_workdir)
560 # self.log.debug("[install] Timeout exceeded; resetting cluster")
561 # await self.reset(cluster_uuid)
562 # return False
563
564 # Wait for the application to be active
565 # if model.is_connected():
566 # self.log.debug("[install] Disconnecting model")
567 # await model.disconnect()
568 # await controller.disconnect()
569 os.chdir(previous_workdir)
570 return True
571
572 async def instances_list(self, cluster_uuid: str) -> list:
573 """
574 returns a list of deployed releases in a cluster
575
576 :param cluster_uuid: the cluster
577 :return:
578 """
579 return []
580
581 async def upgrade(
582 self,
583 cluster_uuid: str,
584 kdu_instance: str,
585 kdu_model: str = None,
586 params: dict = None,
587 ) -> str:
588 """Upgrade a model
589
590 :param cluster_uuid str: The UUID of the cluster to upgrade
591 :param kdu_instance str: The unique name of the KDU instance
592 :param kdu_model str: The name or path of the bundle to upgrade to
593 :param params dict: Key-value pairs of instantiation parameters
594
595 :return: If successful, reference to the new revision number of the
596 KDU instance.
597 """
598
599 # TODO: Loop through the bundle and upgrade each charm individually
600
601 """
602 The API doesn't have a concept of bundle upgrades, because there are
603 many possible changes: charm revision, disk, number of units, etc.
604
605 As such, we are only supporting a limited subset of upgrades. We'll
606 upgrade the charm revision but leave storage and scale untouched.
607
608 Scale changes should happen through OSM constructs, and changes to
609 storage would require a redeployment of the service, at least in this
610 initial release.
611 """
612 raise MethodNotImplemented()
613 # TODO: Remove these commented lines
614
615 # model = await self.get_model(namespace, cluster_uuid=cluster_uuid)
616
617 # model = None
618 # namespace = self.get_namespace(cluster_uuid)
619 # controller = await self.get_controller(cluster_uuid)
620
621 # try:
622 # if namespace not in await controller.list_models():
623 # raise N2VCNotFound(message="Model {} does not exist".format(namespace))
624
625 # model = await controller.get_model(namespace)
626 # with open(kdu_model, "r") as f:
627 # bundle = yaml.safe_load(f)
628
629 # """
630 # {
631 # 'description': 'Test bundle',
632 # 'bundle': 'kubernetes',
633 # 'applications': {
634 # 'mariadb-k8s': {
635 # 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
636 # 'scale': 1,
637 # 'options': {
638 # 'password': 'manopw',
639 # 'root_password': 'osm4u',
640 # 'user': 'mano'
641 # },
642 # 'series': 'kubernetes'
643 # }
644 # }
645 # }
646 # """
647 # # TODO: This should be returned in an agreed-upon format
648 # for name in bundle["applications"]:
649 # self.log.debug(model.applications)
650 # application = model.applications[name]
651 # self.log.debug(application)
652
653 # path = bundle["applications"][name]["charm"]
654
655 # try:
656 # await application.upgrade_charm(switch=path)
657 # except juju.errors.JujuError as ex:
658 # if "already running charm" in str(ex):
659 # # We're already running this version
660 # pass
661 # finally:
662 # if model:
663 # await model.disconnect()
664 # await controller.disconnect()
665 # return True
666
667 """Rollback"""
668
669 async def rollback(
670 self,
671 cluster_uuid: str,
672 kdu_instance: str,
673 revision: int = 0,
674 ) -> str:
675 """Rollback a model
676
677 :param cluster_uuid str: The UUID of the cluster to rollback
678 :param kdu_instance str: The unique name of the KDU instance
679 :param revision int: The revision to revert to. If omitted, rolls back
680 the previous upgrade.
681
682 :return: If successful, returns the revision of active KDU instance,
683 or raises an exception
684 """
685 raise MethodNotImplemented()
686
687 """Deletion"""
688
689 async def uninstall(self, cluster_uuid: str, kdu_instance: str) -> bool:
690 """Uninstall a KDU instance
691
692 :param cluster_uuid str: The UUID of the cluster
693 :param kdu_instance str: The unique name of the KDU instance
694
695 :return: Returns True if successful, or raises an exception
696 """
697
698 # controller = await self.get_controller(cluster_uuid)
699
700 self.log.debug("[uninstall] Destroying model")
701
702 await self.libjuju.destroy_model(kdu_instance, total_timeout=3600)
703
704 # self.log.debug("[uninstall] Model destroyed and disconnecting")
705 # await controller.disconnect()
706
707 return True
708 # TODO: Remove these commented lines
709 # if not self.authenticated:
710 # self.log.debug("[uninstall] Connecting to controller")
711 # await self.login(cluster_uuid)
712
713 async def exec_primitive(
714 self,
715 cluster_uuid: str = None,
716 kdu_instance: str = None,
717 primitive_name: str = None,
718 timeout: float = 300,
719 params: dict = None,
720 db_dict: dict = None,
721 ) -> str:
722 """Exec primitive (Juju action)
723
724 :param cluster_uuid str: The UUID of the cluster
725 :param kdu_instance str: The unique name of the KDU instance
726 :param primitive_name: Name of action that will be executed
727 :param timeout: Timeout for action execution
728 :param params: Dictionary of all the parameters needed for the action
729 :db_dict: Dictionary for any additional data
730
731 :return: Returns the output of the action
732 """
733
734 # controller = await self.get_controller(cluster_uuid)
735
736 if not params or "application-name" not in params:
737 raise K8sException(
738 "Missing application-name argument, \
739 argument needed for K8s actions"
740 )
741 try:
742 self.log.debug(
743 "[exec_primitive] Getting model "
744 "kdu_instance: {}".format(kdu_instance)
745 )
746 application_name = params["application-name"]
747 actions = await self.libjuju.get_actions(application_name, kdu_instance)
748 if primitive_name not in actions:
749 raise K8sException("Primitive {} not found".format(primitive_name))
750 output, status = await self.libjuju.execute_action(
751 application_name, kdu_instance, primitive_name, **params
752 )
753 # model = await self.get_model(kdu_instance, controller=controller)
754
755 # application_name = params["application-name"]
756 # application = model.applications[application_name]
757
758 # actions = await application.get_actions()
759 # if primitive_name not in actions:
760 # raise K8sException("Primitive {} not found".format(primitive_name))
761
762 # unit = None
763 # for u in application.units:
764 # if await u.is_leader_from_status():
765 # unit = u
766 # break
767
768 # if unit is None:
769 # raise K8sException("No leader unit found to execute action")
770
771 # self.log.debug("[exec_primitive] Running action: {}".format(primitive_name))
772 # action = await unit.run_action(primitive_name, **params)
773
774 # output = await model.get_action_output(action_uuid=action.entity_id)
775 # status = await model.get_action_status(uuid_or_prefix=action.entity_id)
776
777 # status = (
778 # status[action.entity_id] if action.entity_id in status else "failed"
779 # )
780
781 if status != "completed":
782 raise K8sException(
783 "status is not completed: {} output: {}".format(status, output)
784 )
785
786 return output
787
788 except Exception as e:
789 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
790 self.log.error(error_msg)
791 raise K8sException(message=error_msg)
792 # finally:
793 # await controller.disconnect()
794 # TODO: Remove these commented lines:
795 # if not self.authenticated:
796 # self.log.debug("[exec_primitive] Connecting to controller")
797 # await self.login(cluster_uuid)
798
799 """Introspection"""
800
801 async def inspect_kdu(
802 self,
803 kdu_model: str,
804 ) -> dict:
805 """Inspect a KDU
806
807 Inspects a bundle and returns a dictionary of config parameters and
808 their default values.
809
810 :param kdu_model str: The name or path of the bundle to inspect.
811
812 :return: If successful, returns a dictionary of available parameters
813 and their default values.
814 """
815
816 kdu = {}
817 if not os.path.exists(kdu_model):
818 raise K8sException("file {} not found".format(kdu_model))
819
820 with open(kdu_model, "r") as f:
821 bundle = yaml.safe_load(f.read())
822
823 """
824 {
825 'description': 'Test bundle',
826 'bundle': 'kubernetes',
827 'applications': {
828 'mariadb-k8s': {
829 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
830 'scale': 1,
831 'options': {
832 'password': 'manopw',
833 'root_password': 'osm4u',
834 'user': 'mano'
835 },
836 'series': 'kubernetes'
837 }
838 }
839 }
840 """
841 # TODO: This should be returned in an agreed-upon format
842 kdu = bundle["applications"]
843
844 return kdu
845
846 async def help_kdu(
847 self,
848 kdu_model: str,
849 ) -> str:
850 """View the README
851
852 If available, returns the README of the bundle.
853
854 :param kdu_model str: The name or path of a bundle
855
856 :return: If found, returns the contents of the README.
857 """
858 readme = None
859
860 files = ["README", "README.txt", "README.md"]
861 path = os.path.dirname(kdu_model)
862 for file in os.listdir(path):
863 if file in files:
864 with open(file, "r") as f:
865 readme = f.read()
866 break
867
868 return readme
869
870 async def status_kdu(
871 self,
872 cluster_uuid: str,
873 kdu_instance: str,
874 ) -> dict:
875 """Get the status of the KDU
876
877 Get the current status of the KDU instance.
878
879 :param cluster_uuid str: The UUID of the cluster
880 :param kdu_instance str: The unique id of the KDU instance
881
882 :return: Returns a dictionary containing namespace, state, resources,
883 and deployment_time.
884 """
885 status = {}
886 # controller = await self.get_controller(cluster_uuid)
887 # model = await self.get_model(kdu_instance, controller=controller)
888
889 # model_status = await model.get_status()
890 # status = model_status.applications
891 model_status = await self.libjuju.get_model_status(kdu_instance)
892 for name in model_status.applications:
893 application = model_status.applications[name]
894 status[name] = {"status": application["status"]["status"]}
895
896 # await model.disconnect()
897 # await controller.disconnect()
898
899 return status
900
901 async def get_services(
902 self, cluster_uuid: str, kdu_instance: str, namespace: str
903 ) -> list:
904 """Return a list of services of a kdu_instance"""
905
906 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
907
908 # config_path = "/tmp/{}".format(cluster_uuid)
909 # config_file = "{}/config".format(config_path)
910
911 # if not os.path.exists(config_path):
912 # os.makedirs(config_path)
913 # with open(config_file, "w") as f:
914 # f.write(credentials)
915
916 kubecfg = tempfile.NamedTemporaryFile()
917 with open(kubecfg.name, "w") as kubecfg_file:
918 kubecfg_file.write(credentials)
919 kubectl = Kubectl(config_file=kubecfg.name)
920
921 return kubectl.get_services(
922 field_selector="metadata.namespace={}".format(kdu_instance)
923 )
924
925 async def get_service(
926 self, cluster_uuid: str, service_name: str, namespace: str
927 ) -> object:
928 """Return data for a specific service inside a namespace"""
929
930 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
931
932 # config_path = "/tmp/{}".format(cluster_uuid)
933 # config_file = "{}/config".format(config_path)
934
935 # if not os.path.exists(config_path):
936 # os.makedirs(config_path)
937 # with open(config_file, "w") as f:
938 # f.write(credentials)
939
940 kubecfg = tempfile.NamedTemporaryFile()
941 with open(kubecfg.name, "w") as kubecfg_file:
942 kubecfg_file.write(credentials)
943 kubectl = Kubectl(config_file=kubecfg.name)
944
945 return kubectl.get_services(
946 field_selector="metadata.name={},metadata.namespace={}".format(
947 service_name, namespace
948 )
949 )[0]
950
951 # Private methods
952 # async def add_k8s(self, cloud_name: str, credentials: str,) -> bool:
953 # """Add a k8s cloud to Juju
954
955 # Adds a Kubernetes cloud to Juju, so it can be bootstrapped with a
956 # Juju Controller.
957
958 # :param cloud_name str: The name of the cloud to add.
959 # :param credentials dict: A dictionary representing the output of
960 # `kubectl config view --raw`.
961
962 # :returns: True if successful, otherwise raises an exception.
963 # """
964
965 # cmd = [self.juju_command, "add-k8s", "--local", cloud_name]
966 # self.log.debug(cmd)
967
968 # process = await asyncio.create_subprocess_exec(
969 # *cmd,
970 # stdout=asyncio.subprocess.PIPE,
971 # stderr=asyncio.subprocess.PIPE,
972 # stdin=asyncio.subprocess.PIPE,
973 # )
974
975 # # Feed the process the credentials
976 # process.stdin.write(credentials.encode("utf-8"))
977 # await process.stdin.drain()
978 # process.stdin.close()
979
980 # _stdout, stderr = await process.communicate()
981
982 # return_code = process.returncode
983
984 # self.log.debug("add-k8s return code: {}".format(return_code))
985
986 # if return_code > 0:
987 # raise Exception(stderr)
988
989 # return True
990
991 # async def add_model(
992 # self, model_name: str, cluster_uuid: str, controller: Controller
993 # ) -> Model:
994 # """Adds a model to the controller
995
996 # Adds a new model to the Juju controller
997
998 # :param model_name str: The name of the model to add.
999 # :param cluster_uuid str: ID of the cluster.
1000 # :param controller: Controller object in which the model will be added
1001 # :returns: The juju.model.Model object of the new model upon success or
1002 # raises an exception.
1003 # """
1004
1005 # self.log.debug(
1006 # "Adding model '{}' to cluster_uuid '{}'".format(model_name, cluster_uuid)
1007 # )
1008 # model = None
1009 # try:
1010 # if self.juju_public_key is not None:
1011 # model = await controller.add_model(
1012 # model_name, config={"authorized-keys": self.juju_public_key}
1013 # )
1014 # else:
1015 # model = await controller.add_model(model_name)
1016 # except Exception as ex:
1017 # self.log.debug(ex)
1018 # self.log.debug("Caught exception: {}".format(ex))
1019 # pass
1020
1021 # return model
1022
1023 # async def bootstrap(
1024 # self, cloud_name: str, cluster_uuid: str, loadbalancer: bool
1025 # ) -> bool:
1026 # """Bootstrap a Kubernetes controller
1027
1028 # Bootstrap a Juju controller inside the Kubernetes cluster
1029
1030 # :param cloud_name str: The name of the cloud.
1031 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1032 # :param loadbalancer bool: If the controller should use loadbalancer or not.
1033 # :returns: True upon success or raises an exception.
1034 # """
1035
1036 # if not loadbalancer:
1037 # cmd = [self.juju_command, "bootstrap", cloud_name, cluster_uuid]
1038 # else:
1039 # """
1040 # For public clusters, specify that the controller service is using a
1041 # LoadBalancer.
1042 # """
1043 # cmd = [
1044 # self.juju_command,
1045 # "bootstrap",
1046 # cloud_name,
1047 # cluster_uuid,
1048 # "--config",
1049 # "controller-service-type=loadbalancer",
1050 # ]
1051
1052 # self.log.debug(
1053 # "Bootstrapping controller {} in cloud {}".format(cluster_uuid, cloud_name)
1054 # )
1055
1056 # process = await asyncio.create_subprocess_exec(
1057 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1058 # )
1059
1060 # _stdout, stderr = await process.communicate()
1061
1062 # return_code = process.returncode
1063
1064 # if return_code > 0:
1065 # #
1066 # if b"already exists" not in stderr:
1067 # raise Exception(stderr)
1068
1069 # return True
1070
1071 # async def destroy_controller(self, cluster_uuid: str) -> bool:
1072 # """Destroy a Kubernetes controller
1073
1074 # Destroy an existing Kubernetes controller.
1075
1076 # :param cluster_uuid str: The UUID of the cluster to bootstrap.
1077 # :returns: True upon success or raises an exception.
1078 # """
1079 # cmd = [
1080 # self.juju_command,
1081 # "destroy-controller",
1082 # "--destroy-all-models",
1083 # "--destroy-storage",
1084 # "-y",
1085 # cluster_uuid,
1086 # ]
1087
1088 # process = await asyncio.create_subprocess_exec(
1089 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1090 # )
1091
1092 # _stdout, stderr = await process.communicate()
1093
1094 # return_code = process.returncode
1095
1096 # if return_code > 0:
1097 # #
1098 # if "already exists" not in stderr:
1099 # raise Exception(stderr)
1100
1101 def get_credentials(self, cluster_uuid: str) -> str:
1102 """
1103 Get Cluster Kubeconfig
1104 """
1105 k8scluster = self.db.get_one(
1106 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
1107 )
1108
1109 self.db.encrypt_decrypt_fields(
1110 k8scluster.get("credentials"),
1111 "decrypt",
1112 ["password", "secret"],
1113 schema_version=k8scluster["schema_version"],
1114 salt=k8scluster["_id"],
1115 )
1116
1117 return yaml.safe_dump(k8scluster.get("credentials"))
1118
1119 def _get_credential_name(self, cluster_uuid: str) -> str:
1120 """
1121 Get credential name for a k8s cloud
1122
1123 We cannot use the cluster_uuid for the credential name directly,
1124 because it cannot start with a number, it must start with a letter.
1125 Therefore, the k8s cloud credential name will be "cred-" followed
1126 by the cluster uuid.
1127
1128 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
1129
1130 :return: Name to use for the credential name.
1131 """
1132 return "cred-{}".format(cluster_uuid)
1133
1134 # def get_config(self, cluster_uuid: str,) -> dict:
1135 # """Get the cluster configuration
1136
1137 # Gets the configuration of the cluster
1138
1139 # :param cluster_uuid str: The UUID of the cluster.
1140 # :return: A dict upon success, or raises an exception.
1141 # """
1142
1143 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1144 # config = None
1145 # for k in juju_db["k8sclusters"]:
1146 # if k["_id"] == cluster_uuid:
1147 # config = k["config"]
1148 # self.db.encrypt_decrypt_fields(
1149 # config,
1150 # "decrypt",
1151 # ["secret", "cacert"],
1152 # schema_version="1.1",
1153 # salt=k["_id"],
1154 # )
1155 # break
1156 # if not config:
1157 # raise Exception(
1158 # "Unable to locate configuration for cluster {}".format(cluster_uuid)
1159 # )
1160 # return config
1161
1162 # async def get_model(self, model_name: str, controller: Controller) -> Model:
1163 # """Get a model from the Juju Controller.
1164
1165 # Note: Model objects returned must call disconnected() before it goes
1166 # out of scope.
1167
1168 # :param model_name str: The name of the model to get
1169 # :param controller Controller: Controller object
1170 # :return The juju.model.Model object if found, or None.
1171 # """
1172
1173 # models = await controller.list_models()
1174 # if model_name not in models:
1175 # raise N2VCNotFound("Model {} not found".format(model_name))
1176 # self.log.debug("Found model: {}".format(model_name))
1177 # return await controller.get_model(model_name)
1178
1179 def get_namespace(
1180 self,
1181 cluster_uuid: str,
1182 ) -> str:
1183 """Get the namespace UUID
1184 Gets the namespace's unique name
1185
1186 :param cluster_uuid str: The UUID of the cluster
1187 :returns: The namespace UUID, or raises an exception
1188 """
1189 # config = self.get_config(cluster_uuid)
1190
1191 # Make sure the name is in the config
1192 # if "namespace" not in config:
1193 # raise Exception("Namespace not found.")
1194
1195 # TODO: We want to make sure this is unique to the cluster, in case
1196 # the cluster is being reused.
1197 # Consider pre/appending the cluster id to the namespace string
1198 pass
1199
1200 # TODO: Remove these lines of code
1201 # async def has_model(self, model_name: str) -> bool:
1202 # """Check if a model exists in the controller
1203
1204 # Checks to see if a model exists in the connected Juju controller.
1205
1206 # :param model_name str: The name of the model
1207 # :return: A boolean indicating if the model exists
1208 # """
1209 # models = await self.controller.list_models()
1210
1211 # if model_name in models:
1212 # return True
1213 # return False
1214
1215 # def is_local_k8s(self, credentials: str,) -> bool:
1216 # """Check if a cluster is local
1217
1218 # Checks if a cluster is running in the local host
1219
1220 # :param credentials dict: A dictionary containing the k8s credentials
1221 # :returns: A boolean if the cluster is running locally
1222 # """
1223
1224 # creds = yaml.safe_load(credentials)
1225
1226 # if creds and os.getenv("OSMLCM_VCA_APIPROXY"):
1227 # for cluster in creds["clusters"]:
1228 # if "server" in cluster["cluster"]:
1229 # if os.getenv("OSMLCM_VCA_APIPROXY") in cluster["cluster"]["server"]:
1230 # return True
1231
1232 # return False
1233
1234 # async def get_controller(self, cluster_uuid):
1235 # """Login to the Juju controller."""
1236
1237 # config = self.get_config(cluster_uuid)
1238
1239 # juju_endpoint = config["endpoint"]
1240 # juju_user = config["username"]
1241 # juju_secret = config["secret"]
1242 # juju_ca_cert = config["cacert"]
1243
1244 # controller = Controller()
1245
1246 # if juju_secret:
1247 # self.log.debug(
1248 # "Connecting to controller... ws://{} as {}".format(
1249 # juju_endpoint, juju_user,
1250 # )
1251 # )
1252 # try:
1253 # await controller.connect(
1254 # endpoint=juju_endpoint,
1255 # username=juju_user,
1256 # password=juju_secret,
1257 # cacert=juju_ca_cert,
1258 # )
1259 # self.log.debug("JujuApi: Logged into controller")
1260 # return controller
1261 # except Exception as ex:
1262 # self.log.debug(ex)
1263 # self.log.debug("Caught exception: {}".format(ex))
1264 # else:
1265 # self.log.fatal("VCA credentials not configured.")
1266
1267 # TODO: Remove these commented lines
1268 # self.authenticated = False
1269 # if self.authenticated:
1270 # return
1271
1272 # self.connecting = True
1273 # juju_public_key = None
1274 # self.authenticated = True
1275 # Test: Make sure we have the credentials loaded
1276 # async def logout(self):
1277 # """Logout of the Juju controller."""
1278 # self.log.debug("[logout]")
1279 # if not self.authenticated:
1280 # return False
1281
1282 # for model in self.models:
1283 # self.log.debug("Logging out of model {}".format(model))
1284 # await self.models[model].disconnect()
1285
1286 # if self.controller:
1287 # self.log.debug("Disconnecting controller {}".format(self.controller))
1288 # await self.controller.disconnect()
1289 # self.controller = None
1290
1291 # self.authenticated = False
1292
1293 # async def remove_cloud(self, cloud_name: str,) -> bool:
1294 # """Remove a k8s cloud from Juju
1295
1296 # Removes a Kubernetes cloud from Juju.
1297
1298 # :param cloud_name str: The name of the cloud to add.
1299
1300 # :returns: True if successful, otherwise raises an exception.
1301 # """
1302
1303 # # Remove the bootstrapped controller
1304 # cmd = [self.juju_command, "remove-k8s", "--client", cloud_name]
1305 # process = await asyncio.create_subprocess_exec(
1306 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1307 # )
1308
1309 # _stdout, stderr = await process.communicate()
1310
1311 # return_code = process.returncode
1312
1313 # if return_code > 0:
1314 # raise Exception(stderr)
1315
1316 # # Remove the cloud from the local config
1317 # cmd = [self.juju_command, "remove-cloud", "--client", cloud_name]
1318 # process = await asyncio.create_subprocess_exec(
1319 # *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1320 # )
1321
1322 # _stdout, stderr = await process.communicate()
1323
1324 # return_code = process.returncode
1325
1326 # if return_code > 0:
1327 # raise Exception(stderr)
1328
1329 # return True
1330
1331 # async def set_config(self, cluster_uuid: str, config: dict,) -> bool:
1332 # """Save the cluster configuration
1333
1334 # Saves the cluster information to the Mongo database
1335
1336 # :param cluster_uuid str: The UUID of the cluster
1337 # :param config dict: A dictionary containing the cluster configuration
1338 # """
1339
1340 # juju_db = self.db.get_one("admin", {"_id": "juju"})
1341
1342 # k8sclusters = juju_db["k8sclusters"] if "k8sclusters" in juju_db else []
1343 # self.db.encrypt_decrypt_fields(
1344 # config,
1345 # "encrypt",
1346 # ["secret", "cacert"],
1347 # schema_version="1.1",
1348 # salt=cluster_uuid,
1349 # )
1350 # k8sclusters.append({"_id": cluster_uuid, "config": config})
1351 # self.db.set_one(
1352 # table="admin",
1353 # q_filter={"_id": "juju"},
1354 # update_dict={"k8sclusters": k8sclusters},
1355 # )
1356
1357 # Private methods to create/delete needed resources in the
1358 # Kubernetes cluster to create the K8s cloud in Juju
1359
1360 def _create_cluster_role(
1361 self,
1362 kubectl: Kubectl,
1363 name: str,
1364 labels: Dict[str, str],
1365 ):
1366 cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
1367 field_selector="metadata.name={}".format(name)
1368 )
1369
1370 if len(cluster_roles.items) > 0:
1371 raise Exception(
1372 "Cluster role with metadata.name={} already exists".format(name)
1373 )
1374
1375 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
1376 # Cluster role
1377 cluster_role = V1ClusterRole(
1378 metadata=metadata,
1379 rules=[
1380 V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
1381 V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
1382 ],
1383 )
1384
1385 kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
1386
1387 def _delete_cluster_role(self, kubectl: Kubectl, name: str):
1388 kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
1389
1390 def _create_service_account(
1391 self,
1392 kubectl: Kubectl,
1393 name: str,
1394 labels: Dict[str, str],
1395 ):
1396 service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
1397 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
1398 )
1399 if len(service_accounts.items) > 0:
1400 raise Exception(
1401 "Service account with metadata.name={} already exists".format(name)
1402 )
1403
1404 metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
1405 service_account = V1ServiceAccount(metadata=metadata)
1406
1407 kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
1408 ADMIN_NAMESPACE, service_account
1409 )
1410
1411 def _delete_service_account(self, kubectl: Kubectl, name: str):
1412 kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
1413 name, ADMIN_NAMESPACE
1414 )
1415
1416 def _create_cluster_role_binding(
1417 self,
1418 kubectl: Kubectl,
1419 name: str,
1420 labels: Dict[str, str],
1421 ):
1422 role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
1423 field_selector="metadata.name={}".format(name)
1424 )
1425 if len(role_bindings.items) > 0:
1426 raise Exception("Generated rbac id already exists")
1427
1428 role_binding = V1ClusterRoleBinding(
1429 metadata=V1ObjectMeta(name=name, labels=labels),
1430 role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
1431 subjects=[
1432 V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
1433 ],
1434 )
1435 kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
1436
1437 def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
1438 kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
1439
1440 async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
1441 v1_core = kubectl.clients[CORE_CLIENT]
1442
1443 retries_limit = 10
1444 secret_name = None
1445 while True:
1446 retries_limit -= 1
1447 service_accounts = v1_core.list_namespaced_service_account(
1448 ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
1449 )
1450 if len(service_accounts.items) == 0:
1451 raise Exception(
1452 "Service account not found with metadata.name={}".format(name)
1453 )
1454 service_account = service_accounts.items[0]
1455 if service_account.secrets and len(service_account.secrets) > 0:
1456 secret_name = service_account.secrets[0].name
1457 if secret_name is not None or not retries_limit:
1458 break
1459 if not secret_name:
1460 raise Exception(
1461 "Failed getting the secret from service account {}".format(name)
1462 )
1463 secret = v1_core.list_namespaced_secret(
1464 ADMIN_NAMESPACE,
1465 field_selector="metadata.name={}".format(secret_name),
1466 ).items[0]
1467
1468 token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
1469 client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
1470
1471 return (
1472 base64.b64decode(token).decode("utf-8"),
1473 base64.b64decode(client_certificate_data).decode("utf-8"),
1474 )
1475
1476 @staticmethod
1477 def generate_kdu_instance_name(**kwargs):
1478 db_dict = kwargs.get("db_dict")
1479 kdu_name = kwargs.get("kdu_name", None)
1480 if kdu_name:
1481 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
1482 else:
1483 kdu_instance = db_dict["filter"]["_id"]
1484 return kdu_instance