Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 from typing import Union
17 import os
18 import uuid
19 import yaml
20 import tempfile
21 import binascii
22
23 from n2vc.config import EnvironConfig
24 from n2vc.definitions import RelationEndpoint
25 from n2vc.exceptions import K8sException
26 from n2vc.k8s_conn import K8sConnector
27 from n2vc.kubectl import Kubectl
28 from .exceptions import MethodNotImplemented
29 from n2vc.libjuju import Libjuju
30 from n2vc.utils import obj_to_dict, obj_to_yaml
31 from n2vc.store import MotorStore
32 from n2vc.vca.cloud import Cloud
33 from n2vc.vca.connection import get_connection
34
35
36 RBAC_LABEL_KEY_NAME = "rbac-id"
37 RBAC_STACK_PREFIX = "juju-credential"
38
39
40 def generate_rbac_id():
41 return binascii.hexlify(os.urandom(4)).decode()
42
43
44 class K8sJujuConnector(K8sConnector):
45 libjuju = None
46
47 def __init__(
48 self,
49 fs: object,
50 db: object,
51 kubectl_command: str = "/usr/bin/kubectl",
52 juju_command: str = "/usr/bin/juju",
53 log: object = None,
54 loop: object = None,
55 on_update_db=None,
56 ):
57 """
58 :param fs: file system for kubernetes and helm configuration
59 :param db: Database object
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
62 :param log: logger
63 :param: loop: Asyncio loop
64 """
65
66 # parent class
67 K8sConnector.__init__(self, db, log=log, on_update_db=on_update_db)
68
69 self.fs = fs
70 self.loop = loop or asyncio.get_event_loop()
71 self.log.debug("Initializing K8S Juju connector")
72
73 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
74 self._store = MotorStore(db_uri)
75 self.loading_libjuju = asyncio.Lock(loop=self.loop)
76 self.uninstall_locks = {}
77
78 self.log.debug("K8S Juju connector initialized")
79 # TODO: Remove these commented lines:
80 # self.authenticated = False
81 # self.models = {}
82 # self.juju_secret = ""
83
84 """Initialization"""
85
86 async def init_env(
87 self,
88 k8s_creds: str,
89 namespace: str = "kube-system",
90 reuse_cluster_uuid: str = None,
91 **kwargs,
92 ) -> (str, bool):
93 """
94 It prepares a given K8s cluster environment to run Juju bundles.
95
96 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
97 '.kube/config'
98 :param namespace: optional namespace to be used for juju. By default,
99 'kube-system' will be used
100 :param reuse_cluster_uuid: existing cluster uuid for reuse
101 :param: kwargs: Additional parameters
102 vca_id (str): VCA ID
103
104 :return: uuid of the K8s cluster and True if connector has installed some
105 software in the cluster
106 (on error, an exception will be raised)
107 """
108 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
109
110 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
111 kubectl = self._get_kubectl(k8s_creds)
112
113 # CREATING RESOURCES IN K8S
114 rbac_id = generate_rbac_id()
115 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
116 labels = {RBAC_STACK_PREFIX: rbac_id}
117
118 # Create cleanup dictionary to clean up created resources
119 # if it fails in the middle of the process
120 cleanup_data = []
121 try:
122 self.log.debug("Initializing K8s cluster for juju")
123 kubectl.create_cluster_role(name=metadata_name, labels=labels)
124 self.log.debug("Cluster role created")
125 cleanup_data.append(
126 {"delete": kubectl.delete_cluster_role, "args": (metadata_name,)}
127 )
128
129 kubectl.create_service_account(name=metadata_name, labels=labels)
130 self.log.debug("Service account created")
131 cleanup_data.append(
132 {"delete": kubectl.delete_service_account, "args": (metadata_name,)}
133 )
134
135 kubectl.create_cluster_role_binding(name=metadata_name, labels=labels)
136 self.log.debug("Role binding created")
137 cleanup_data.append(
138 {
139 "delete": kubectl.delete_cluster_role_binding,
140 "args": (metadata_name,),
141 }
142 )
143 token, client_cert_data = await kubectl.get_secret_data(metadata_name)
144
145 default_storage_class = kubectl.get_default_storage_class()
146 self.log.debug("Default storage class: {}".format(default_storage_class))
147 await libjuju.add_k8s(
148 name=cluster_uuid,
149 rbac_id=rbac_id,
150 token=token,
151 client_cert_data=client_cert_data,
152 configuration=kubectl.configuration,
153 storage_class=default_storage_class,
154 credential_name=self._get_credential_name(cluster_uuid),
155 )
156 self.log.debug("K8s cluster added to juju controller")
157 return cluster_uuid, True
158 except Exception as e:
159 self.log.error("Error initializing k8scluster: {}".format(e), exc_info=True)
160 if len(cleanup_data) > 0:
161 self.log.debug("Cleaning up created resources in k8s cluster...")
162 for item in cleanup_data:
163 delete_function = item["delete"]
164 delete_args = item["args"]
165 delete_function(*delete_args)
166 self.log.debug("Cleanup finished")
167 raise e
168
169 """Repo Management"""
170
171 async def repo_add(
172 self,
173 name: str,
174 url: str,
175 _type: str = "charm",
176 cert: str = None,
177 user: str = None,
178 password: str = None,
179 ):
180 raise MethodNotImplemented()
181
182 async def repo_list(self):
183 raise MethodNotImplemented()
184
185 async def repo_remove(self, name: str):
186 raise MethodNotImplemented()
187
188 async def synchronize_repos(self, cluster_uuid: str, name: str):
189 """
190 Returns None as currently add_repo is not implemented
191 """
192 return None
193
194 """Reset"""
195
196 async def reset(
197 self,
198 cluster_uuid: str,
199 force: bool = False,
200 uninstall_sw: bool = False,
201 **kwargs,
202 ) -> bool:
203 """Reset a cluster
204
205 Resets the Kubernetes cluster by removing the model that represents it.
206
207 :param cluster_uuid str: The UUID of the cluster to reset
208 :param force: Force reset
209 :param uninstall_sw: Boolean to uninstall sw
210 :param: kwargs: Additional parameters
211 vca_id (str): VCA ID
212
213 :return: Returns True if successful or raises an exception.
214 """
215
216 try:
217 self.log.debug("[reset] Removing k8s cloud")
218 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
219
220 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
221
222 cloud_creds = await libjuju.get_cloud_credentials(cloud)
223
224 await libjuju.remove_cloud(cluster_uuid)
225
226 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
227
228 kubectl = self._get_kubectl(credentials)
229
230 delete_functions = [
231 kubectl.delete_cluster_role_binding,
232 kubectl.delete_service_account,
233 kubectl.delete_cluster_role,
234 ]
235
236 credential_attrs = cloud_creds[0].result["attrs"]
237 if RBAC_LABEL_KEY_NAME in credential_attrs:
238 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
239 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
240 for delete_func in delete_functions:
241 try:
242 delete_func(metadata_name)
243 except Exception as e:
244 self.log.warning("Cannot remove resource in K8s {}".format(e))
245
246 except Exception as e:
247 self.log.debug("Caught exception during reset: {}".format(e))
248 raise e
249 return True
250
251 """Deployment"""
252
253 async def install(
254 self,
255 cluster_uuid: str,
256 kdu_model: str,
257 kdu_instance: str,
258 atomic: bool = True,
259 timeout: float = 1800,
260 params: dict = None,
261 db_dict: dict = None,
262 kdu_name: str = None,
263 namespace: str = None,
264 **kwargs,
265 ) -> bool:
266 """Install a bundle
267
268 :param cluster_uuid str: The UUID of the cluster to install to
269 :param kdu_model str: The name or path of a bundle to install
270 :param kdu_instance: Kdu instance name
271 :param atomic bool: If set, waits until the model is active and resets
272 the cluster on failure.
273 :param timeout int: The time, in seconds, to wait for the install
274 to finish
275 :param params dict: Key-value pairs of instantiation parameters
276 :param kdu_name: Name of the KDU instance to be installed
277 :param namespace: K8s namespace to use for the KDU instance
278 :param kwargs: Additional parameters
279 vca_id (str): VCA ID
280
281 :return: If successful, returns ?
282 """
283 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
284 bundle = kdu_model
285
286 if not db_dict:
287 raise K8sException("db_dict must be set")
288 if not bundle:
289 raise K8sException("bundle must be set")
290
291 if bundle.startswith("cs:"):
292 # For Juju Bundles provided by the Charm Store
293 pass
294 elif bundle.startswith("ch:"):
295 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
296 pass
297 elif bundle.startswith("http"):
298 # Download the file
299 pass
300 else:
301 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
302 os.chdir(new_workdir)
303 bundle = "local:{}".format(kdu_model)
304
305 # default namespace to kdu_instance
306 if not namespace:
307 namespace = kdu_instance
308
309 self.log.debug("Checking for model named {}".format(namespace))
310
311 # Create the new model
312 self.log.debug("Adding model: {}".format(namespace))
313 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
314 await libjuju.add_model(namespace, cloud)
315
316 # if model:
317 # TODO: Instantiation parameters
318
319 """
320 "Juju bundle that models the KDU, in any of the following ways:
321 - <juju-repo>/<juju-bundle>
322 - <juju-bundle folder under k8s_models folder in the package>
323 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
324 in the package>
325 - <URL_where_to_fetch_juju_bundle>
326 """
327 try:
328 previous_workdir = os.getcwd()
329 except FileNotFoundError:
330 previous_workdir = "/app/storage"
331
332 self.log.debug("[install] deploying {}".format(bundle))
333 instantiation_params = params.get("overlay") if params else None
334 await libjuju.deploy(
335 bundle,
336 model_name=namespace,
337 wait=atomic,
338 timeout=timeout,
339 instantiation_params=instantiation_params,
340 )
341 os.chdir(previous_workdir)
342
343 # update information in the database (first, the VCA status, and then, the namespace)
344 if self.on_update_db:
345 await self.on_update_db(
346 cluster_uuid,
347 kdu_instance,
348 filter=db_dict["filter"],
349 vca_id=kwargs.get("vca_id"),
350 )
351
352 self.db.set_one(
353 table="nsrs",
354 q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance},
355 update_dict={"_admin.deployed.K8s.$.namespace": namespace},
356 )
357
358 return True
359
360 async def scale(
361 self,
362 kdu_instance: str,
363 scale: int,
364 resource_name: str,
365 total_timeout: float = 1800,
366 namespace: str = None,
367 **kwargs,
368 ) -> bool:
369 """Scale an application in a model
370
371 :param: kdu_instance str: KDU instance name
372 :param: scale int: Scale to which to set the application
373 :param: resource_name str: The application name in the Juju Bundle
374 :param: timeout float: The time, in seconds, to wait for the install
375 to finish
376 :param namespace str: The namespace (model) where the Bundle was deployed
377 :param kwargs: Additional parameters
378 vca_id (str): VCA ID
379
380 :return: If successful, returns True
381 """
382
383 model_name = self._obtain_namespace(
384 kdu_instance=kdu_instance, namespace=namespace
385 )
386 try:
387 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
388 await libjuju.scale_application(
389 model_name=model_name,
390 application_name=resource_name,
391 scale=scale,
392 total_timeout=total_timeout,
393 )
394 except Exception as e:
395 error_msg = "Error scaling application {} of the model {} of the kdu instance {}: {}".format(
396 resource_name, model_name, kdu_instance, e
397 )
398 self.log.error(error_msg)
399 raise K8sException(message=error_msg)
400 return True
401
402 async def get_scale_count(
403 self, resource_name: str, kdu_instance: str, namespace: str = None, **kwargs
404 ) -> int:
405 """Get an application scale count
406
407 :param: resource_name str: The application name in the Juju Bundle
408 :param: kdu_instance str: KDU instance name
409 :param namespace str: The namespace (model) where the Bundle was deployed
410 :param kwargs: Additional parameters
411 vca_id (str): VCA ID
412 :return: Return application instance count
413 """
414
415 model_name = self._obtain_namespace(
416 kdu_instance=kdu_instance, namespace=namespace
417 )
418 try:
419 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
420 status = await libjuju.get_model_status(model_name=model_name)
421 return len(status.applications[resource_name].units)
422 except Exception as e:
423 error_msg = (
424 f"Error getting scale count from application {resource_name} of the model {model_name} of "
425 f"the kdu instance {kdu_instance}: {e}"
426 )
427 self.log.error(error_msg)
428 raise K8sException(message=error_msg)
429
430 async def instances_list(self, cluster_uuid: str) -> list:
431 """
432 returns a list of deployed releases in a cluster
433
434 :param cluster_uuid: the cluster
435 :return:
436 """
437 return []
438
439 async def upgrade(
440 self,
441 cluster_uuid: str,
442 kdu_instance: str,
443 kdu_model: str = None,
444 params: dict = None,
445 ) -> str:
446 """Upgrade a model
447
448 :param cluster_uuid str: The UUID of the cluster to upgrade
449 :param kdu_instance str: The unique name of the KDU instance
450 :param kdu_model str: The name or path of the bundle to upgrade to
451 :param params dict: Key-value pairs of instantiation parameters
452
453 :return: If successful, reference to the new revision number of the
454 KDU instance.
455 """
456
457 # TODO: Loop through the bundle and upgrade each charm individually
458
459 """
460 The API doesn't have a concept of bundle upgrades, because there are
461 many possible changes: charm revision, disk, number of units, etc.
462
463 As such, we are only supporting a limited subset of upgrades. We'll
464 upgrade the charm revision but leave storage and scale untouched.
465
466 Scale changes should happen through OSM constructs, and changes to
467 storage would require a redeployment of the service, at least in this
468 initial release.
469 """
470 raise MethodNotImplemented()
471
472 """Rollback"""
473
474 async def rollback(
475 self, cluster_uuid: str, kdu_instance: str, revision: int = 0
476 ) -> str:
477 """Rollback a model
478
479 :param cluster_uuid str: The UUID of the cluster to rollback
480 :param kdu_instance str: The unique name of the KDU instance
481 :param revision int: The revision to revert to. If omitted, rolls back
482 the previous upgrade.
483
484 :return: If successful, returns the revision of active KDU instance,
485 or raises an exception
486 """
487 raise MethodNotImplemented()
488
489 """Deletion"""
490
491 async def uninstall(
492 self, cluster_uuid: str, kdu_instance: str, namespace: str = None, **kwargs
493 ) -> bool:
494 """Uninstall a KDU instance
495
496 :param cluster_uuid str: The UUID of the cluster
497 :param kdu_instance str: The unique name of the KDU instance
498 :param namespace str: The namespace (model) where the Bundle was deployed
499 :param kwargs: Additional parameters
500 vca_id (str): VCA ID
501
502 :return: Returns True if successful, or raises an exception
503 """
504 model_name = self._obtain_namespace(
505 kdu_instance=kdu_instance, namespace=namespace
506 )
507
508 self.log.debug(f"[uninstall] Destroying model: {model_name}")
509
510 will_not_delete = False
511 if model_name not in self.uninstall_locks:
512 self.uninstall_locks[model_name] = asyncio.Lock(loop=self.loop)
513 delete_lock = self.uninstall_locks[model_name]
514
515 while delete_lock.locked():
516 will_not_delete = True
517 await asyncio.sleep(0.1)
518
519 if will_not_delete:
520 self.log.info("Model {} deleted by another worker.".format(model_name))
521 return True
522
523 try:
524 async with delete_lock:
525 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
526
527 await libjuju.destroy_model(model_name, total_timeout=3600)
528 finally:
529 self.uninstall_locks.pop(model_name)
530
531 self.log.debug(f"[uninstall] Model {model_name} destroyed")
532 return True
533
534 async def upgrade_charm(
535 self,
536 ee_id: str = None,
537 path: str = None,
538 charm_id: str = None,
539 charm_type: str = None,
540 timeout: float = None,
541 ) -> str:
542 """This method upgrade charms in VNFs
543
544 Args:
545 ee_id: Execution environment id
546 path: Local path to the charm
547 charm_id: charm-id
548 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
549 timeout: (Float) Timeout for the ns update operation
550
551 Returns:
552 The output of the update operation if status equals to "completed"
553 """
554 raise K8sException(
555 "KDUs deployed with Juju Bundle do not support charm upgrade"
556 )
557
558 async def exec_primitive(
559 self,
560 cluster_uuid: str = None,
561 kdu_instance: str = None,
562 primitive_name: str = None,
563 timeout: float = 300,
564 params: dict = None,
565 db_dict: dict = None,
566 namespace: str = None,
567 **kwargs,
568 ) -> str:
569 """Exec primitive (Juju action)
570
571 :param cluster_uuid str: The UUID of the cluster
572 :param kdu_instance str: The unique name of the KDU instance
573 :param primitive_name: Name of action that will be executed
574 :param timeout: Timeout for action execution
575 :param params: Dictionary of all the parameters needed for the action
576 :param db_dict: Dictionary for any additional data
577 :param namespace str: The namespace (model) where the Bundle was deployed
578 :param kwargs: Additional parameters
579 vca_id (str): VCA ID
580
581 :return: Returns the output of the action
582 """
583 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
584
585 namespace = self._obtain_namespace(
586 kdu_instance=kdu_instance, namespace=namespace
587 )
588
589 if not params or "application-name" not in params:
590 raise K8sException(
591 "Missing application-name argument, \
592 argument needed for K8s actions"
593 )
594 try:
595 self.log.debug(
596 "[exec_primitive] Getting model "
597 "{} for the kdu_instance: {}".format(namespace, kdu_instance)
598 )
599 application_name = params["application-name"]
600 actions = await libjuju.get_actions(
601 application_name=application_name, model_name=namespace
602 )
603 if primitive_name not in actions:
604 raise K8sException("Primitive {} not found".format(primitive_name))
605 output, status = await libjuju.execute_action(
606 application_name=application_name,
607 model_name=namespace,
608 action_name=primitive_name,
609 **params,
610 )
611
612 if status != "completed":
613 raise K8sException(
614 "status is not completed: {} output: {}".format(status, output)
615 )
616 if self.on_update_db:
617 await self.on_update_db(
618 cluster_uuid=cluster_uuid,
619 kdu_instance=kdu_instance,
620 filter=db_dict["filter"],
621 )
622
623 return output
624
625 except Exception as e:
626 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
627 self.log.error(error_msg)
628 raise K8sException(message=error_msg)
629
630 """Introspection"""
631
632 async def inspect_kdu(self, kdu_model: str) -> dict:
633 """Inspect a KDU
634
635 Inspects a bundle and returns a dictionary of config parameters and
636 their default values.
637
638 :param kdu_model str: The name or path of the bundle to inspect.
639
640 :return: If successful, returns a dictionary of available parameters
641 and their default values.
642 """
643
644 kdu = {}
645 if not os.path.exists(kdu_model):
646 raise K8sException("file {} not found".format(kdu_model))
647
648 with open(kdu_model, "r") as f:
649 bundle = yaml.safe_load(f.read())
650
651 """
652 {
653 'description': 'Test bundle',
654 'bundle': 'kubernetes',
655 'applications': {
656 'mariadb-k8s': {
657 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
658 'scale': 1,
659 'options': {
660 'password': 'manopw',
661 'root_password': 'osm4u',
662 'user': 'mano'
663 },
664 'series': 'kubernetes'
665 }
666 }
667 }
668 """
669 # TODO: This should be returned in an agreed-upon format
670 kdu = bundle["applications"]
671
672 return kdu
673
674 async def help_kdu(self, kdu_model: str) -> str:
675 """View the README
676
677 If available, returns the README of the bundle.
678
679 :param kdu_model str: The name or path of a bundle
680 f
681 :return: If found, returns the contents of the README.
682 """
683 readme = None
684
685 files = ["README", "README.txt", "README.md"]
686 path = os.path.dirname(kdu_model)
687 for file in os.listdir(path):
688 if file in files:
689 with open(file, "r") as f:
690 readme = f.read()
691 break
692
693 return readme
694
695 async def status_kdu(
696 self,
697 cluster_uuid: str,
698 kdu_instance: str,
699 complete_status: bool = False,
700 yaml_format: bool = False,
701 namespace: str = None,
702 **kwargs,
703 ) -> Union[str, dict]:
704 """Get the status of the KDU
705
706 Get the current status of the KDU instance.
707
708 :param cluster_uuid str: The UUID of the cluster
709 :param kdu_instance str: The unique id of the KDU instance
710 :param complete_status: To get the complete_status of the KDU
711 :param yaml_format: To get the status in proper format for NSR record
712 :param namespace str: The namespace (model) where the Bundle was deployed
713 :param: kwargs: Additional parameters
714 vca_id (str): VCA ID
715
716 :return: Returns a dictionary containing namespace, state, resources,
717 and deployment_time and returns complete_status if complete_status is True
718 """
719 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
720 status = {}
721
722 model_name = self._obtain_namespace(
723 kdu_instance=kdu_instance, namespace=namespace
724 )
725 model_status = await libjuju.get_model_status(model_name=model_name)
726
727 if not complete_status:
728 for name in model_status.applications:
729 application = model_status.applications[name]
730 status[name] = {"status": application["status"]["status"]}
731 else:
732 if yaml_format:
733 return obj_to_yaml(model_status)
734 else:
735 return obj_to_dict(model_status)
736
737 return status
738
739 async def add_relation(
740 self, provider: RelationEndpoint, requirer: RelationEndpoint
741 ):
742 """
743 Add relation between two charmed endpoints
744
745 :param: provider: Provider relation endpoint
746 :param: requirer: Requirer relation endpoint
747 """
748 self.log.debug(f"adding new relation between {provider} and {requirer}")
749 cross_model_relation = (
750 provider.model_name != requirer.model_name
751 or provider.vca_id != requirer.vca_id
752 )
753 try:
754 if cross_model_relation:
755 # Cross-model relation
756 provider_libjuju = await self._get_libjuju(provider.vca_id)
757 requirer_libjuju = await self._get_libjuju(requirer.vca_id)
758 offer = await provider_libjuju.offer(provider)
759 if offer:
760 saas_name = await requirer_libjuju.consume(
761 requirer.model_name, offer, provider_libjuju
762 )
763 await requirer_libjuju.add_relation(
764 requirer.model_name, requirer.endpoint, saas_name
765 )
766 else:
767 # Standard relation
768 vca_id = provider.vca_id
769 model = provider.model_name
770 libjuju = await self._get_libjuju(vca_id)
771 # add juju relations between two applications
772 await libjuju.add_relation(
773 model_name=model,
774 endpoint_1=provider.endpoint,
775 endpoint_2=requirer.endpoint,
776 )
777 except Exception as e:
778 message = f"Error adding relation between {provider} and {requirer}: {e}"
779 self.log.error(message)
780 raise Exception(message=message)
781
782 async def update_vca_status(
783 self, vcastatus: dict, kdu_instance: str, namespace: str = None, **kwargs
784 ):
785 """
786 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
787
788 :param vcastatus dict: dict containing vcastatus
789 :param kdu_instance str: The unique id of the KDU instance
790 :param namespace str: The namespace (model) where the Bundle was deployed
791 :param: kwargs: Additional parameters
792 vca_id (str): VCA ID
793
794 :return: None
795 """
796
797 model_name = self._obtain_namespace(
798 kdu_instance=kdu_instance, namespace=namespace
799 )
800
801 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
802 try:
803 for vca_model_name in vcastatus:
804 # Adding executed actions
805 vcastatus[vca_model_name][
806 "executedActions"
807 ] = await libjuju.get_executed_actions(model_name=model_name)
808
809 for application in vcastatus[vca_model_name]["applications"]:
810 # Adding application actions
811 vcastatus[vca_model_name]["applications"][application][
812 "actions"
813 ] = {}
814 # Adding application configs
815 vcastatus[vca_model_name]["applications"][application][
816 "configs"
817 ] = await libjuju.get_application_configs(
818 model_name=model_name, application_name=application
819 )
820
821 except Exception as e:
822 self.log.debug("Error in updating vca status: {}".format(str(e)))
823
824 async def get_services(
825 self, cluster_uuid: str, kdu_instance: str, namespace: str
826 ) -> list:
827 """Return a list of services of a kdu_instance"""
828
829 namespace = self._obtain_namespace(
830 kdu_instance=kdu_instance, namespace=namespace
831 )
832
833 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
834 kubectl = self._get_kubectl(credentials)
835 return kubectl.get_services(
836 field_selector="metadata.namespace={}".format(namespace)
837 )
838
839 async def get_service(
840 self, cluster_uuid: str, service_name: str, namespace: str
841 ) -> object:
842 """Return data for a specific service inside a namespace"""
843
844 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
845 kubectl = self._get_kubectl(credentials)
846 return kubectl.get_services(
847 field_selector="metadata.name={},metadata.namespace={}".format(
848 service_name, namespace
849 )
850 )[0]
851
852 def get_credentials(self, cluster_uuid: str) -> str:
853 """
854 Get Cluster Kubeconfig
855 """
856 k8scluster = self.db.get_one(
857 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
858 )
859
860 self.db.encrypt_decrypt_fields(
861 k8scluster.get("credentials"),
862 "decrypt",
863 ["password", "secret"],
864 schema_version=k8scluster["schema_version"],
865 salt=k8scluster["_id"],
866 )
867
868 return yaml.safe_dump(k8scluster.get("credentials"))
869
870 def _get_credential_name(self, cluster_uuid: str) -> str:
871 """
872 Get credential name for a k8s cloud
873
874 We cannot use the cluster_uuid for the credential name directly,
875 because it cannot start with a number, it must start with a letter.
876 Therefore, the k8s cloud credential name will be "cred-" followed
877 by the cluster uuid.
878
879 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
880
881 :return: Name to use for the credential name.
882 """
883 return "cred-{}".format(cluster_uuid)
884
885 def get_namespace(self, cluster_uuid: str) -> str:
886 """Get the namespace UUID
887 Gets the namespace's unique name
888
889 :param cluster_uuid str: The UUID of the cluster
890 :returns: The namespace UUID, or raises an exception
891 """
892 pass
893
894 @staticmethod
895 def generate_kdu_instance_name(**kwargs):
896 db_dict = kwargs.get("db_dict")
897 kdu_name = kwargs.get("kdu_name", None)
898 if kdu_name:
899 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
900 else:
901 kdu_instance = db_dict["filter"]["_id"]
902 return kdu_instance
903
904 async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
905 """
906 Get libjuju object
907
908 :param: vca_id: VCA ID
909 If None, get a libjuju object with a Connection to the default VCA
910 Else, geta libjuju object with a Connection to the specified VCA
911 """
912 if not vca_id:
913 while self.loading_libjuju.locked():
914 await asyncio.sleep(0.1)
915 if not self.libjuju:
916 async with self.loading_libjuju:
917 vca_connection = await get_connection(self._store)
918 self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
919 return self.libjuju
920 else:
921 vca_connection = await get_connection(self._store, vca_id)
922 return Libjuju(vca_connection, loop=self.loop, log=self.log, n2vc=self)
923
924 def _get_kubectl(self, credentials: str) -> Kubectl:
925 """
926 Get Kubectl object
927
928 :param: kubeconfig_credentials: Kubeconfig credentials
929 """
930 kubecfg = tempfile.NamedTemporaryFile()
931 with open(kubecfg.name, "w") as kubecfg_file:
932 kubecfg_file.write(credentials)
933 return Kubectl(config_file=kubecfg.name)
934
935 def _obtain_namespace(self, kdu_instance: str, namespace: str = None) -> str:
936 """
937 Obtain the namespace/model name to use in the instantiation of a Juju Bundle in K8s. The default namespace is
938 the kdu_instance name. However, if the user passes the namespace where he wants to deploy the bundle,
939 that namespace will be used.
940
941 :param kdu_instance: the default KDU instance name
942 :param namespace: the namespace passed by the User
943 """
944
945 # deault the namespace/model name to the kdu_instance name TODO -> this should be the real return... But
946 # once the namespace is not passed in most methods, I had to do this in another way. But I think this should
947 # be the procedure in the future return namespace if namespace else kdu_instance
948
949 # TODO -> has referred above, this should be avoided in the future, this is temporary, in order to avoid
950 # compatibility issues
951 return (
952 namespace
953 if namespace
954 else self._obtain_namespace_from_db(kdu_instance=kdu_instance)
955 )
956
957 def _obtain_namespace_from_db(self, kdu_instance: str) -> str:
958 db_nsrs = self.db.get_one(
959 table="nsrs", q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance}
960 )
961 for k8s in db_nsrs["_admin"]["deployed"]["K8s"]:
962 if k8s.get("kdu-instance") == kdu_instance:
963 return k8s.get("namespace")
964 return ""