Bug 1995 fixed: possibility of defining the K8s namespace for Juju Bundles
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 from typing import Union
17 import os
18 import uuid
19 import yaml
20 import tempfile
21 import binascii
22
23 from n2vc.config import EnvironConfig
24 from n2vc.definitions import RelationEndpoint
25 from n2vc.exceptions import K8sException
26 from n2vc.k8s_conn import K8sConnector
27 from n2vc.kubectl import Kubectl
28 from .exceptions import MethodNotImplemented
29 from n2vc.libjuju import Libjuju
30 from n2vc.utils import obj_to_dict, obj_to_yaml
31 from n2vc.store import MotorStore
32 from n2vc.vca.cloud import Cloud
33 from n2vc.vca.connection import get_connection
34
35
36 RBAC_LABEL_KEY_NAME = "rbac-id"
37 RBAC_STACK_PREFIX = "juju-credential"
38
39
40 def generate_rbac_id():
41 return binascii.hexlify(os.urandom(4)).decode()
42
43
44 class K8sJujuConnector(K8sConnector):
45 libjuju = None
46
47 def __init__(
48 self,
49 fs: object,
50 db: object,
51 kubectl_command: str = "/usr/bin/kubectl",
52 juju_command: str = "/usr/bin/juju",
53 log: object = None,
54 loop: object = None,
55 on_update_db=None,
56 ):
57 """
58 :param fs: file system for kubernetes and helm configuration
59 :param db: Database object
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
62 :param log: logger
63 :param: loop: Asyncio loop
64 """
65
66 # parent class
67 K8sConnector.__init__(
68 self,
69 db,
70 log=log,
71 on_update_db=on_update_db,
72 )
73
74 self.fs = fs
75 self.loop = loop or asyncio.get_event_loop()
76 self.log.debug("Initializing K8S Juju connector")
77
78 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
79 self._store = MotorStore(db_uri)
80 self.loading_libjuju = asyncio.Lock(loop=self.loop)
81 self.uninstall_locks = {}
82
83 self.log.debug("K8S Juju connector initialized")
84 # TODO: Remove these commented lines:
85 # self.authenticated = False
86 # self.models = {}
87 # self.juju_secret = ""
88
89 """Initialization"""
90
91 async def init_env(
92 self,
93 k8s_creds: str,
94 namespace: str = "kube-system",
95 reuse_cluster_uuid: str = None,
96 **kwargs,
97 ) -> (str, bool):
98 """
99 It prepares a given K8s cluster environment to run Juju bundles.
100
101 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
102 '.kube/config'
103 :param namespace: optional namespace to be used for juju. By default,
104 'kube-system' will be used
105 :param reuse_cluster_uuid: existing cluster uuid for reuse
106 :param: kwargs: Additional parameters
107 vca_id (str): VCA ID
108
109 :return: uuid of the K8s cluster and True if connector has installed some
110 software in the cluster
111 (on error, an exception will be raised)
112 """
113 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
114
115 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
116 kubectl = self._get_kubectl(k8s_creds)
117
118 # CREATING RESOURCES IN K8S
119 rbac_id = generate_rbac_id()
120 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
121 labels = {RBAC_STACK_PREFIX: rbac_id}
122
123 # Create cleanup dictionary to clean up created resources
124 # if it fails in the middle of the process
125 cleanup_data = []
126 try:
127 self.log.debug("Initializing K8s cluster for juju")
128 kubectl.create_cluster_role(
129 name=metadata_name,
130 labels=labels,
131 )
132 self.log.debug("Cluster role created")
133 cleanup_data.append(
134 {
135 "delete": kubectl.delete_cluster_role,
136 "args": (metadata_name,),
137 }
138 )
139
140 kubectl.create_service_account(
141 name=metadata_name,
142 labels=labels,
143 )
144 self.log.debug("Service account created")
145 cleanup_data.append(
146 {
147 "delete": kubectl.delete_service_account,
148 "args": (metadata_name,),
149 }
150 )
151
152 kubectl.create_cluster_role_binding(
153 name=metadata_name,
154 labels=labels,
155 )
156 self.log.debug("Role binding created")
157 cleanup_data.append(
158 {
159 "delete": kubectl.delete_service_account,
160 "args": (metadata_name,),
161 }
162 )
163 token, client_cert_data = await kubectl.get_secret_data(
164 metadata_name,
165 )
166
167 default_storage_class = kubectl.get_default_storage_class()
168 self.log.debug("Default storage class: {}".format(default_storage_class))
169 await libjuju.add_k8s(
170 name=cluster_uuid,
171 rbac_id=rbac_id,
172 token=token,
173 client_cert_data=client_cert_data,
174 configuration=kubectl.configuration,
175 storage_class=default_storage_class,
176 credential_name=self._get_credential_name(cluster_uuid),
177 )
178 self.log.debug("K8s cluster added to juju controller")
179 return cluster_uuid, True
180 except Exception as e:
181 self.log.error("Error initializing k8scluster: {}".format(e), exc_info=True)
182 if len(cleanup_data) > 0:
183 self.log.debug("Cleaning up created resources in k8s cluster...")
184 for item in cleanup_data:
185 delete_function = item["delete"]
186 delete_args = item["args"]
187 delete_function(*delete_args)
188 self.log.debug("Cleanup finished")
189 raise e
190
191 """Repo Management"""
192
193 async def repo_add(
194 self,
195 name: str,
196 url: str,
197 _type: str = "charm",
198 cert: str = None,
199 user: str = None,
200 password: str = None,
201 ):
202 raise MethodNotImplemented()
203
204 async def repo_list(self):
205 raise MethodNotImplemented()
206
207 async def repo_remove(
208 self,
209 name: str,
210 ):
211 raise MethodNotImplemented()
212
213 async def synchronize_repos(self, cluster_uuid: str, name: str):
214 """
215 Returns None as currently add_repo is not implemented
216 """
217 return None
218
219 """Reset"""
220
221 async def reset(
222 self,
223 cluster_uuid: str,
224 force: bool = False,
225 uninstall_sw: bool = False,
226 **kwargs,
227 ) -> bool:
228 """Reset a cluster
229
230 Resets the Kubernetes cluster by removing the model that represents it.
231
232 :param cluster_uuid str: The UUID of the cluster to reset
233 :param force: Force reset
234 :param uninstall_sw: Boolean to uninstall sw
235 :param: kwargs: Additional parameters
236 vca_id (str): VCA ID
237
238 :return: Returns True if successful or raises an exception.
239 """
240
241 try:
242 self.log.debug("[reset] Removing k8s cloud")
243 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
244
245 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
246
247 cloud_creds = await libjuju.get_cloud_credentials(cloud)
248
249 await libjuju.remove_cloud(cluster_uuid)
250
251 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
252
253 kubectl = self._get_kubectl(credentials)
254
255 delete_functions = [
256 kubectl.delete_cluster_role_binding,
257 kubectl.delete_service_account,
258 kubectl.delete_cluster_role,
259 ]
260
261 credential_attrs = cloud_creds[0].result["attrs"]
262 if RBAC_LABEL_KEY_NAME in credential_attrs:
263 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
264 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
265 for delete_func in delete_functions:
266 try:
267 delete_func(metadata_name)
268 except Exception as e:
269 self.log.warning("Cannot remove resource in K8s {}".format(e))
270
271 except Exception as e:
272 self.log.debug("Caught exception during reset: {}".format(e))
273 raise e
274 return True
275
276 """Deployment"""
277
278 async def install(
279 self,
280 cluster_uuid: str,
281 kdu_model: str,
282 kdu_instance: str,
283 atomic: bool = True,
284 timeout: float = 1800,
285 params: dict = None,
286 db_dict: dict = None,
287 kdu_name: str = None,
288 namespace: str = None,
289 **kwargs,
290 ) -> bool:
291 """Install a bundle
292
293 :param cluster_uuid str: The UUID of the cluster to install to
294 :param kdu_model str: The name or path of a bundle to install
295 :param kdu_instance: Kdu instance name
296 :param atomic bool: If set, waits until the model is active and resets
297 the cluster on failure.
298 :param timeout int: The time, in seconds, to wait for the install
299 to finish
300 :param params dict: Key-value pairs of instantiation parameters
301 :param kdu_name: Name of the KDU instance to be installed
302 :param namespace: K8s namespace to use for the KDU instance
303 :param kwargs: Additional parameters
304 vca_id (str): VCA ID
305
306 :return: If successful, returns ?
307 """
308 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
309 bundle = kdu_model
310
311 if not db_dict:
312 raise K8sException("db_dict must be set")
313 if not bundle:
314 raise K8sException("bundle must be set")
315
316 if bundle.startswith("cs:"):
317 # For Juju Bundles provided by the Charm Store
318 pass
319 elif bundle.startswith("ch:"):
320 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
321 pass
322 elif bundle.startswith("http"):
323 # Download the file
324 pass
325 else:
326 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
327 os.chdir(new_workdir)
328 bundle = "local:{}".format(kdu_model)
329
330 # default namespace to kdu_instance
331 if not namespace:
332 namespace = kdu_instance
333
334 self.log.debug("Checking for model named {}".format(namespace))
335
336 # Create the new model
337 self.log.debug("Adding model: {}".format(namespace))
338 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
339 await libjuju.add_model(namespace, cloud)
340
341 # if model:
342 # TODO: Instantiation parameters
343
344 """
345 "Juju bundle that models the KDU, in any of the following ways:
346 - <juju-repo>/<juju-bundle>
347 - <juju-bundle folder under k8s_models folder in the package>
348 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
349 in the package>
350 - <URL_where_to_fetch_juju_bundle>
351 """
352 try:
353 previous_workdir = os.getcwd()
354 except FileNotFoundError:
355 previous_workdir = "/app/storage"
356
357 self.log.debug("[install] deploying {}".format(bundle))
358 await libjuju.deploy(bundle, model_name=namespace, wait=atomic, timeout=timeout)
359 os.chdir(previous_workdir)
360
361 # update information in the database (first, the VCA status, and then, the namespace)
362 if self.on_update_db:
363 await self.on_update_db(
364 cluster_uuid,
365 kdu_instance,
366 filter=db_dict["filter"],
367 vca_id=kwargs.get("vca_id"),
368 )
369
370 self.db.set_one(
371 table="nsrs",
372 q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance},
373 update_dict={"_admin.deployed.K8s.$.namespace": namespace},
374 )
375
376 return True
377
378 async def scale(
379 self,
380 kdu_instance: str,
381 scale: int,
382 resource_name: str,
383 total_timeout: float = 1800,
384 namespace: str = None,
385 **kwargs,
386 ) -> bool:
387 """Scale an application in a model
388
389 :param: kdu_instance str: KDU instance name
390 :param: scale int: Scale to which to set the application
391 :param: resource_name str: The application name in the Juju Bundle
392 :param: timeout float: The time, in seconds, to wait for the install
393 to finish
394 :param namespace str: The namespace (model) where the Bundle was deployed
395 :param kwargs: Additional parameters
396 vca_id (str): VCA ID
397
398 :return: If successful, returns True
399 """
400
401 model_name = self._obtain_namespace(
402 kdu_instance=kdu_instance, namespace=namespace
403 )
404 try:
405 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
406 await libjuju.scale_application(
407 model_name=model_name,
408 application_name=resource_name,
409 scale=scale,
410 total_timeout=total_timeout,
411 )
412 except Exception as e:
413 error_msg = "Error scaling application {} of the model {} of the kdu instance {}: {}".format(
414 resource_name, model_name, kdu_instance, e
415 )
416 self.log.error(error_msg)
417 raise K8sException(message=error_msg)
418 return True
419
420 async def get_scale_count(
421 self,
422 resource_name: str,
423 kdu_instance: str,
424 namespace: str = None,
425 **kwargs,
426 ) -> int:
427 """Get an application scale count
428
429 :param: resource_name str: The application name in the Juju Bundle
430 :param: kdu_instance str: KDU instance name
431 :param namespace str: The namespace (model) where the Bundle was deployed
432 :param kwargs: Additional parameters
433 vca_id (str): VCA ID
434 :return: Return application instance count
435 """
436
437 model_name = self._obtain_namespace(
438 kdu_instance=kdu_instance, namespace=namespace
439 )
440 try:
441 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
442 status = await libjuju.get_model_status(model_name=model_name)
443 return len(status.applications[resource_name].units)
444 except Exception as e:
445 error_msg = (
446 f"Error getting scale count from application {resource_name} of the model {model_name} of "
447 f"the kdu instance {kdu_instance}: {e}"
448 )
449 self.log.error(error_msg)
450 raise K8sException(message=error_msg)
451
452 async def instances_list(self, cluster_uuid: str) -> list:
453 """
454 returns a list of deployed releases in a cluster
455
456 :param cluster_uuid: the cluster
457 :return:
458 """
459 return []
460
461 async def upgrade(
462 self,
463 cluster_uuid: str,
464 kdu_instance: str,
465 kdu_model: str = None,
466 params: dict = None,
467 ) -> str:
468 """Upgrade a model
469
470 :param cluster_uuid str: The UUID of the cluster to upgrade
471 :param kdu_instance str: The unique name of the KDU instance
472 :param kdu_model str: The name or path of the bundle to upgrade to
473 :param params dict: Key-value pairs of instantiation parameters
474
475 :return: If successful, reference to the new revision number of the
476 KDU instance.
477 """
478
479 # TODO: Loop through the bundle and upgrade each charm individually
480
481 """
482 The API doesn't have a concept of bundle upgrades, because there are
483 many possible changes: charm revision, disk, number of units, etc.
484
485 As such, we are only supporting a limited subset of upgrades. We'll
486 upgrade the charm revision but leave storage and scale untouched.
487
488 Scale changes should happen through OSM constructs, and changes to
489 storage would require a redeployment of the service, at least in this
490 initial release.
491 """
492 raise MethodNotImplemented()
493
494 """Rollback"""
495
496 async def rollback(
497 self,
498 cluster_uuid: str,
499 kdu_instance: str,
500 revision: int = 0,
501 ) -> str:
502 """Rollback a model
503
504 :param cluster_uuid str: The UUID of the cluster to rollback
505 :param kdu_instance str: The unique name of the KDU instance
506 :param revision int: The revision to revert to. If omitted, rolls back
507 the previous upgrade.
508
509 :return: If successful, returns the revision of active KDU instance,
510 or raises an exception
511 """
512 raise MethodNotImplemented()
513
514 """Deletion"""
515
516 async def uninstall(
517 self,
518 cluster_uuid: str,
519 kdu_instance: str,
520 namespace: str = None,
521 **kwargs,
522 ) -> bool:
523 """Uninstall a KDU instance
524
525 :param cluster_uuid str: The UUID of the cluster
526 :param kdu_instance str: The unique name of the KDU instance
527 :param namespace str: The namespace (model) where the Bundle was deployed
528 :param kwargs: Additional parameters
529 vca_id (str): VCA ID
530
531 :return: Returns True if successful, or raises an exception
532 """
533 model_name = self._obtain_namespace(
534 kdu_instance=kdu_instance, namespace=namespace
535 )
536
537 self.log.debug(f"[uninstall] Destroying model: {model_name}")
538
539 will_not_delete = False
540 if model_name not in self.uninstall_locks:
541 self.uninstall_locks[model_name] = asyncio.Lock(loop=self.loop)
542 delete_lock = self.uninstall_locks[model_name]
543
544 while delete_lock.locked():
545 will_not_delete = True
546 await asyncio.sleep(0.1)
547
548 if will_not_delete:
549 self.log.info("Model {} deleted by another worker.".format(model_name))
550 return True
551
552 try:
553 async with delete_lock:
554 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
555
556 await libjuju.destroy_model(model_name, total_timeout=3600)
557 finally:
558 self.uninstall_locks.pop(model_name)
559
560 self.log.debug(f"[uninstall] Model {model_name} destroyed")
561 return True
562
563 async def upgrade_charm(
564 self,
565 ee_id: str = None,
566 path: str = None,
567 charm_id: str = None,
568 charm_type: str = None,
569 timeout: float = None,
570 ) -> str:
571 """This method upgrade charms in VNFs
572
573 Args:
574 ee_id: Execution environment id
575 path: Local path to the charm
576 charm_id: charm-id
577 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
578 timeout: (Float) Timeout for the ns update operation
579
580 Returns:
581 The output of the update operation if status equals to "completed"
582 """
583 raise K8sException(
584 "KDUs deployed with Juju Bundle do not support charm upgrade"
585 )
586
587 async def exec_primitive(
588 self,
589 cluster_uuid: str = None,
590 kdu_instance: str = None,
591 primitive_name: str = None,
592 timeout: float = 300,
593 params: dict = None,
594 db_dict: dict = None,
595 namespace: str = None,
596 **kwargs,
597 ) -> str:
598 """Exec primitive (Juju action)
599
600 :param cluster_uuid str: The UUID of the cluster
601 :param kdu_instance str: The unique name of the KDU instance
602 :param primitive_name: Name of action that will be executed
603 :param timeout: Timeout for action execution
604 :param params: Dictionary of all the parameters needed for the action
605 :param db_dict: Dictionary for any additional data
606 :param namespace str: The namespace (model) where the Bundle was deployed
607 :param kwargs: Additional parameters
608 vca_id (str): VCA ID
609
610 :return: Returns the output of the action
611 """
612 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
613
614 namespace = self._obtain_namespace(
615 kdu_instance=kdu_instance, namespace=namespace
616 )
617
618 if not params or "application-name" not in params:
619 raise K8sException(
620 "Missing application-name argument, \
621 argument needed for K8s actions"
622 )
623 try:
624 self.log.debug(
625 "[exec_primitive] Getting model "
626 "{} for the kdu_instance: {}".format(namespace, kdu_instance)
627 )
628 application_name = params["application-name"]
629 actions = await libjuju.get_actions(
630 application_name=application_name, model_name=namespace
631 )
632 if primitive_name not in actions:
633 raise K8sException("Primitive {} not found".format(primitive_name))
634 output, status = await libjuju.execute_action(
635 application_name=application_name,
636 model_name=namespace,
637 action_name=primitive_name,
638 **params,
639 )
640
641 if status != "completed":
642 raise K8sException(
643 "status is not completed: {} output: {}".format(status, output)
644 )
645 if self.on_update_db:
646 await self.on_update_db(
647 cluster_uuid=cluster_uuid,
648 kdu_instance=kdu_instance,
649 filter=db_dict["filter"],
650 )
651
652 return output
653
654 except Exception as e:
655 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
656 self.log.error(error_msg)
657 raise K8sException(message=error_msg)
658
659 """Introspection"""
660
661 async def inspect_kdu(
662 self,
663 kdu_model: str,
664 ) -> dict:
665 """Inspect a KDU
666
667 Inspects a bundle and returns a dictionary of config parameters and
668 their default values.
669
670 :param kdu_model str: The name or path of the bundle to inspect.
671
672 :return: If successful, returns a dictionary of available parameters
673 and their default values.
674 """
675
676 kdu = {}
677 if not os.path.exists(kdu_model):
678 raise K8sException("file {} not found".format(kdu_model))
679
680 with open(kdu_model, "r") as f:
681 bundle = yaml.safe_load(f.read())
682
683 """
684 {
685 'description': 'Test bundle',
686 'bundle': 'kubernetes',
687 'applications': {
688 'mariadb-k8s': {
689 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
690 'scale': 1,
691 'options': {
692 'password': 'manopw',
693 'root_password': 'osm4u',
694 'user': 'mano'
695 },
696 'series': 'kubernetes'
697 }
698 }
699 }
700 """
701 # TODO: This should be returned in an agreed-upon format
702 kdu = bundle["applications"]
703
704 return kdu
705
706 async def help_kdu(
707 self,
708 kdu_model: str,
709 ) -> str:
710 """View the README
711
712 If available, returns the README of the bundle.
713
714 :param kdu_model str: The name or path of a bundle
715 f
716 :return: If found, returns the contents of the README.
717 """
718 readme = None
719
720 files = ["README", "README.txt", "README.md"]
721 path = os.path.dirname(kdu_model)
722 for file in os.listdir(path):
723 if file in files:
724 with open(file, "r") as f:
725 readme = f.read()
726 break
727
728 return readme
729
730 async def status_kdu(
731 self,
732 cluster_uuid: str,
733 kdu_instance: str,
734 complete_status: bool = False,
735 yaml_format: bool = False,
736 namespace: str = None,
737 **kwargs,
738 ) -> Union[str, dict]:
739 """Get the status of the KDU
740
741 Get the current status of the KDU instance.
742
743 :param cluster_uuid str: The UUID of the cluster
744 :param kdu_instance str: The unique id of the KDU instance
745 :param complete_status: To get the complete_status of the KDU
746 :param yaml_format: To get the status in proper format for NSR record
747 :param namespace str: The namespace (model) where the Bundle was deployed
748 :param: kwargs: Additional parameters
749 vca_id (str): VCA ID
750
751 :return: Returns a dictionary containing namespace, state, resources,
752 and deployment_time and returns complete_status if complete_status is True
753 """
754 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
755 status = {}
756
757 model_name = self._obtain_namespace(
758 kdu_instance=kdu_instance, namespace=namespace
759 )
760 model_status = await libjuju.get_model_status(model_name=model_name)
761
762 if not complete_status:
763 for name in model_status.applications:
764 application = model_status.applications[name]
765 status[name] = {"status": application["status"]["status"]}
766 else:
767 if yaml_format:
768 return obj_to_yaml(model_status)
769 else:
770 return obj_to_dict(model_status)
771
772 return status
773
774 async def add_relation(
775 self,
776 provider: RelationEndpoint,
777 requirer: RelationEndpoint,
778 ):
779 """
780 Add relation between two charmed endpoints
781
782 :param: provider: Provider relation endpoint
783 :param: requirer: Requirer relation endpoint
784 """
785 self.log.debug(f"adding new relation between {provider} and {requirer}")
786 cross_model_relation = (
787 provider.model_name != requirer.model_name
788 or requirer.vca_id != requirer.vca_id
789 )
790 try:
791 if cross_model_relation:
792 # Cross-model relation
793 provider_libjuju = await self._get_libjuju(provider.vca_id)
794 requirer_libjuju = await self._get_libjuju(requirer.vca_id)
795 offer = await provider_libjuju.offer(provider)
796 if offer:
797 saas_name = await requirer_libjuju.consume(
798 requirer.model_name, offer, provider_libjuju
799 )
800 await requirer_libjuju.add_relation(
801 requirer.model_name,
802 requirer.endpoint,
803 saas_name,
804 )
805 else:
806 # Standard relation
807 vca_id = provider.vca_id
808 model = provider.model_name
809 libjuju = await self._get_libjuju(vca_id)
810 # add juju relations between two applications
811 await libjuju.add_relation(
812 model_name=model,
813 endpoint_1=provider.endpoint,
814 endpoint_2=requirer.endpoint,
815 )
816 except Exception as e:
817 message = f"Error adding relation between {provider} and {requirer}: {e}"
818 self.log.error(message)
819 raise Exception(message=message)
820
821 async def update_vca_status(
822 self, vcastatus: dict, kdu_instance: str, namespace: str = None, **kwargs
823 ):
824 """
825 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
826
827 :param vcastatus dict: dict containing vcastatus
828 :param kdu_instance str: The unique id of the KDU instance
829 :param namespace str: The namespace (model) where the Bundle was deployed
830 :param: kwargs: Additional parameters
831 vca_id (str): VCA ID
832
833 :return: None
834 """
835
836 model_name = self._obtain_namespace(
837 kdu_instance=kdu_instance, namespace=namespace
838 )
839
840 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
841 try:
842 for vca_model_name in vcastatus:
843 # Adding executed actions
844 vcastatus[vca_model_name][
845 "executedActions"
846 ] = await libjuju.get_executed_actions(model_name=model_name)
847
848 for application in vcastatus[vca_model_name]["applications"]:
849 # Adding application actions
850 vcastatus[vca_model_name]["applications"][application][
851 "actions"
852 ] = {}
853 # Adding application configs
854 vcastatus[vca_model_name]["applications"][application][
855 "configs"
856 ] = await libjuju.get_application_configs(
857 model_name=model_name, application_name=application
858 )
859
860 except Exception as e:
861 self.log.debug("Error in updating vca status: {}".format(str(e)))
862
863 async def get_services(
864 self, cluster_uuid: str, kdu_instance: str, namespace: str
865 ) -> list:
866 """Return a list of services of a kdu_instance"""
867
868 namespace = self._obtain_namespace(
869 kdu_instance=kdu_instance, namespace=namespace
870 )
871
872 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
873 kubectl = self._get_kubectl(credentials)
874 return kubectl.get_services(
875 field_selector="metadata.namespace={}".format(namespace)
876 )
877
878 async def get_service(
879 self, cluster_uuid: str, service_name: str, namespace: str
880 ) -> object:
881 """Return data for a specific service inside a namespace"""
882
883 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
884 kubectl = self._get_kubectl(credentials)
885 return kubectl.get_services(
886 field_selector="metadata.name={},metadata.namespace={}".format(
887 service_name, namespace
888 )
889 )[0]
890
891 def get_credentials(self, cluster_uuid: str) -> str:
892 """
893 Get Cluster Kubeconfig
894 """
895 k8scluster = self.db.get_one(
896 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
897 )
898
899 self.db.encrypt_decrypt_fields(
900 k8scluster.get("credentials"),
901 "decrypt",
902 ["password", "secret"],
903 schema_version=k8scluster["schema_version"],
904 salt=k8scluster["_id"],
905 )
906
907 return yaml.safe_dump(k8scluster.get("credentials"))
908
909 def _get_credential_name(self, cluster_uuid: str) -> str:
910 """
911 Get credential name for a k8s cloud
912
913 We cannot use the cluster_uuid for the credential name directly,
914 because it cannot start with a number, it must start with a letter.
915 Therefore, the k8s cloud credential name will be "cred-" followed
916 by the cluster uuid.
917
918 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
919
920 :return: Name to use for the credential name.
921 """
922 return "cred-{}".format(cluster_uuid)
923
924 def get_namespace(
925 self,
926 cluster_uuid: str,
927 ) -> str:
928 """Get the namespace UUID
929 Gets the namespace's unique name
930
931 :param cluster_uuid str: The UUID of the cluster
932 :returns: The namespace UUID, or raises an exception
933 """
934 pass
935
936 @staticmethod
937 def generate_kdu_instance_name(**kwargs):
938 db_dict = kwargs.get("db_dict")
939 kdu_name = kwargs.get("kdu_name", None)
940 if kdu_name:
941 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
942 else:
943 kdu_instance = db_dict["filter"]["_id"]
944 return kdu_instance
945
946 async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
947 """
948 Get libjuju object
949
950 :param: vca_id: VCA ID
951 If None, get a libjuju object with a Connection to the default VCA
952 Else, geta libjuju object with a Connection to the specified VCA
953 """
954 if not vca_id:
955 while self.loading_libjuju.locked():
956 await asyncio.sleep(0.1)
957 if not self.libjuju:
958 async with self.loading_libjuju:
959 vca_connection = await get_connection(self._store)
960 self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
961 return self.libjuju
962 else:
963 vca_connection = await get_connection(self._store, vca_id)
964 return Libjuju(
965 vca_connection,
966 loop=self.loop,
967 log=self.log,
968 n2vc=self,
969 )
970
971 def _get_kubectl(self, credentials: str) -> Kubectl:
972 """
973 Get Kubectl object
974
975 :param: kubeconfig_credentials: Kubeconfig credentials
976 """
977 kubecfg = tempfile.NamedTemporaryFile()
978 with open(kubecfg.name, "w") as kubecfg_file:
979 kubecfg_file.write(credentials)
980 return Kubectl(config_file=kubecfg.name)
981
982 def _obtain_namespace(self, kdu_instance: str, namespace: str = None) -> str:
983 """
984 Obtain the namespace/model name to use in the instantiation of a Juju Bundle in K8s. The default namespace is
985 the kdu_instance name. However, if the user passes the namespace where he wants to deploy the bundle,
986 that namespace will be used.
987
988 :param kdu_instance: the default KDU instance name
989 :param namespace: the namespace passed by the User
990 """
991
992 # deault the namespace/model name to the kdu_instance name TODO -> this should be the real return... But
993 # once the namespace is not passed in most methods, I had to do this in another way. But I think this should
994 # be the procedure in the future return namespace if namespace else kdu_instance
995
996 # TODO -> has referred above, this should be avoided in the future, this is temporary, in order to avoid
997 # compatibility issues
998 return (
999 namespace
1000 if namespace
1001 else self._obtain_namespace_from_db(kdu_instance=kdu_instance)
1002 )
1003
1004 def _obtain_namespace_from_db(self, kdu_instance: str) -> str:
1005 db_nsrs = self.db.get_one(
1006 table="nsrs", q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance}
1007 )
1008 for k8s in db_nsrs["_admin"]["deployed"]["K8s"]:
1009 if k8s.get("kdu-instance") == kdu_instance:
1010 return k8s.get("namespace")
1011 return ""