Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 from typing import Union
17 import os
18 import uuid
19 import yaml
20 import tempfile
21 import binascii
22
23 from n2vc.config import EnvironConfig
24 from n2vc.definitions import RelationEndpoint
25 from n2vc.exceptions import K8sException
26 from n2vc.k8s_conn import K8sConnector
27 from n2vc.kubectl import Kubectl
28 from .exceptions import MethodNotImplemented
29 from n2vc.libjuju import Libjuju
30 from n2vc.utils import obj_to_dict, obj_to_yaml
31 from n2vc.store import MotorStore
32 from n2vc.vca.cloud import Cloud
33 from n2vc.vca.connection import get_connection
34
35
36 RBAC_LABEL_KEY_NAME = "rbac-id"
37 RBAC_STACK_PREFIX = "juju-credential"
38
39
40 def generate_rbac_id():
41 return binascii.hexlify(os.urandom(4)).decode()
42
43
44 class K8sJujuConnector(K8sConnector):
45 libjuju = None
46
47 def __init__(
48 self,
49 fs: object,
50 db: object,
51 kubectl_command: str = "/usr/bin/kubectl",
52 juju_command: str = "/usr/bin/juju",
53 log: object = None,
54 on_update_db=None,
55 ):
56 """
57 :param fs: file system for kubernetes and helm configuration
58 :param db: Database object
59 :param kubectl_command: path to kubectl executable
60 :param helm_command: path to helm executable
61 :param log: logger
62 """
63
64 # parent class
65 K8sConnector.__init__(self, db, log=log, on_update_db=on_update_db)
66
67 self.fs = fs
68 self.log.debug("Initializing K8S Juju connector")
69
70 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
71 self._store = MotorStore(db_uri)
72 self.loading_libjuju = asyncio.Lock()
73 self.uninstall_locks = {}
74
75 self.log.debug("K8S Juju connector initialized")
76 # TODO: Remove these commented lines:
77 # self.authenticated = False
78 # self.models = {}
79 # self.juju_secret = ""
80
81 """Initialization"""
82
83 async def init_env(
84 self,
85 k8s_creds: str,
86 namespace: str = "kube-system",
87 reuse_cluster_uuid: str = None,
88 **kwargs,
89 ) -> (str, bool):
90 """
91 It prepares a given K8s cluster environment to run Juju bundles.
92
93 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
94 '.kube/config'
95 :param namespace: optional namespace to be used for juju. By default,
96 'kube-system' will be used
97 :param reuse_cluster_uuid: existing cluster uuid for reuse
98 :param: kwargs: Additional parameters
99 vca_id (str): VCA ID
100
101 :return: uuid of the K8s cluster and True if connector has installed some
102 software in the cluster
103 (on error, an exception will be raised)
104 """
105 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
106
107 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
108 kubectl = self._get_kubectl(k8s_creds)
109
110 # CREATING RESOURCES IN K8S
111 rbac_id = generate_rbac_id()
112 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
113 labels = {RBAC_STACK_PREFIX: rbac_id}
114
115 # Create cleanup dictionary to clean up created resources
116 # if it fails in the middle of the process
117 cleanup_data = []
118 try:
119 self.log.debug("Initializing K8s cluster for juju")
120 kubectl.create_cluster_role(name=metadata_name, labels=labels)
121 self.log.debug("Cluster role created")
122 cleanup_data.append(
123 {"delete": kubectl.delete_cluster_role, "args": (metadata_name,)}
124 )
125
126 kubectl.create_service_account(name=metadata_name, labels=labels)
127 self.log.debug("Service account created")
128 cleanup_data.append(
129 {"delete": kubectl.delete_service_account, "args": (metadata_name,)}
130 )
131
132 kubectl.create_cluster_role_binding(name=metadata_name, labels=labels)
133 self.log.debug("Role binding created")
134 cleanup_data.append(
135 {
136 "delete": kubectl.delete_cluster_role_binding,
137 "args": (metadata_name,),
138 }
139 )
140 token, client_cert_data = await kubectl.get_secret_data(metadata_name)
141
142 default_storage_class = kubectl.get_default_storage_class()
143 self.log.debug("Default storage class: {}".format(default_storage_class))
144 await libjuju.add_k8s(
145 name=cluster_uuid,
146 rbac_id=rbac_id,
147 token=token,
148 client_cert_data=client_cert_data,
149 configuration=kubectl.configuration,
150 storage_class=default_storage_class,
151 credential_name=self._get_credential_name(cluster_uuid),
152 )
153 self.log.debug("K8s cluster added to juju controller")
154 return cluster_uuid, True
155 except Exception as e:
156 self.log.error("Error initializing k8scluster: {}".format(e), exc_info=True)
157 if len(cleanup_data) > 0:
158 self.log.debug("Cleaning up created resources in k8s cluster...")
159 for item in cleanup_data:
160 delete_function = item["delete"]
161 delete_args = item["args"]
162 delete_function(*delete_args)
163 self.log.debug("Cleanup finished")
164 raise e
165
166 """Repo Management"""
167
168 async def repo_add(
169 self,
170 name: str,
171 url: str,
172 _type: str = "charm",
173 cert: str = None,
174 user: str = None,
175 password: str = None,
176 ):
177 raise MethodNotImplemented()
178
179 async def repo_list(self):
180 raise MethodNotImplemented()
181
182 async def repo_remove(self, name: str):
183 raise MethodNotImplemented()
184
185 async def synchronize_repos(self, cluster_uuid: str, name: str):
186 """
187 Returns None as currently add_repo is not implemented
188 """
189 return None
190
191 """Reset"""
192
193 async def reset(
194 self,
195 cluster_uuid: str,
196 force: bool = False,
197 uninstall_sw: bool = False,
198 **kwargs,
199 ) -> bool:
200 """Reset a cluster
201
202 Resets the Kubernetes cluster by removing the model that represents it.
203
204 :param cluster_uuid str: The UUID of the cluster to reset
205 :param force: Force reset
206 :param uninstall_sw: Boolean to uninstall sw
207 :param: kwargs: Additional parameters
208 vca_id (str): VCA ID
209
210 :return: Returns True if successful or raises an exception.
211 """
212
213 try:
214 self.log.debug("[reset] Removing k8s cloud")
215 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
216
217 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
218
219 cloud_creds = await libjuju.get_cloud_credentials(cloud)
220
221 await libjuju.remove_cloud(cluster_uuid)
222
223 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
224
225 kubectl = self._get_kubectl(credentials)
226
227 delete_functions = [
228 kubectl.delete_cluster_role_binding,
229 kubectl.delete_service_account,
230 kubectl.delete_cluster_role,
231 ]
232
233 credential_attrs = cloud_creds[0].result["attrs"]
234 if RBAC_LABEL_KEY_NAME in credential_attrs:
235 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
236 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
237 for delete_func in delete_functions:
238 try:
239 delete_func(metadata_name)
240 except Exception as e:
241 self.log.warning("Cannot remove resource in K8s {}".format(e))
242
243 except Exception as e:
244 self.log.debug("Caught exception during reset: {}".format(e))
245 raise e
246 return True
247
248 """Deployment"""
249
250 async def install(
251 self,
252 cluster_uuid: str,
253 kdu_model: str,
254 kdu_instance: str,
255 atomic: bool = True,
256 timeout: float = 1800,
257 params: dict = None,
258 db_dict: dict = None,
259 kdu_name: str = None,
260 namespace: str = None,
261 **kwargs,
262 ) -> bool:
263 """Install a bundle
264
265 :param cluster_uuid str: The UUID of the cluster to install to
266 :param kdu_model str: The name or path of a bundle to install
267 :param kdu_instance: Kdu instance name
268 :param atomic bool: If set, waits until the model is active and resets
269 the cluster on failure.
270 :param timeout int: The time, in seconds, to wait for the install
271 to finish
272 :param params dict: Key-value pairs of instantiation parameters
273 :param kdu_name: Name of the KDU instance to be installed
274 :param namespace: K8s namespace to use for the KDU instance
275 :param kwargs: Additional parameters
276 vca_id (str): VCA ID
277
278 :return: If successful, returns ?
279 """
280 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
281 bundle = kdu_model
282
283 if not db_dict:
284 raise K8sException("db_dict must be set")
285 if not bundle:
286 raise K8sException("bundle must be set")
287
288 if bundle.startswith("cs:"):
289 # For Juju Bundles provided by the Charm Store
290 pass
291 elif bundle.startswith("ch:"):
292 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
293 pass
294 elif bundle.startswith("http"):
295 # Download the file
296 pass
297 else:
298 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
299 os.chdir(new_workdir)
300 bundle = "local:{}".format(kdu_model)
301
302 # default namespace to kdu_instance
303 if not namespace:
304 namespace = kdu_instance
305
306 self.log.debug("Checking for model named {}".format(namespace))
307
308 # Create the new model
309 self.log.debug("Adding model: {}".format(namespace))
310 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
311 await libjuju.add_model(namespace, cloud)
312
313 # if model:
314 # TODO: Instantiation parameters
315
316 """
317 "Juju bundle that models the KDU, in any of the following ways:
318 - <juju-repo>/<juju-bundle>
319 - <juju-bundle folder under k8s_models folder in the package>
320 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
321 in the package>
322 - <URL_where_to_fetch_juju_bundle>
323 """
324 try:
325 previous_workdir = os.getcwd()
326 except FileNotFoundError:
327 previous_workdir = "/app/storage"
328
329 self.log.debug("[install] deploying {}".format(bundle))
330 instantiation_params = params.get("overlay") if params else None
331 await libjuju.deploy(
332 bundle,
333 model_name=namespace,
334 wait=atomic,
335 timeout=timeout,
336 instantiation_params=instantiation_params,
337 )
338 os.chdir(previous_workdir)
339
340 # update information in the database (first, the VCA status, and then, the namespace)
341 if self.on_update_db:
342 await self.on_update_db(
343 cluster_uuid,
344 kdu_instance,
345 filter=db_dict["filter"],
346 vca_id=kwargs.get("vca_id"),
347 )
348
349 self.db.set_one(
350 table="nsrs",
351 q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance},
352 update_dict={"_admin.deployed.K8s.$.namespace": namespace},
353 )
354
355 return True
356
357 async def scale(
358 self,
359 kdu_instance: str,
360 scale: int,
361 resource_name: str,
362 total_timeout: float = 1800,
363 namespace: str = None,
364 **kwargs,
365 ) -> bool:
366 """Scale an application in a model
367
368 :param: kdu_instance str: KDU instance name
369 :param: scale int: Scale to which to set the application
370 :param: resource_name str: The application name in the Juju Bundle
371 :param: timeout float: The time, in seconds, to wait for the install
372 to finish
373 :param namespace str: The namespace (model) where the Bundle was deployed
374 :param kwargs: Additional parameters
375 vca_id (str): VCA ID
376
377 :return: If successful, returns True
378 """
379
380 model_name = self._obtain_namespace(
381 kdu_instance=kdu_instance, namespace=namespace
382 )
383 try:
384 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
385 await libjuju.scale_application(
386 model_name=model_name,
387 application_name=resource_name,
388 scale=scale,
389 total_timeout=total_timeout,
390 )
391 except Exception as e:
392 error_msg = "Error scaling application {} of the model {} of the kdu instance {}: {}".format(
393 resource_name, model_name, kdu_instance, e
394 )
395 self.log.error(error_msg)
396 raise K8sException(message=error_msg)
397 return True
398
399 async def get_scale_count(
400 self, resource_name: str, kdu_instance: str, namespace: str = None, **kwargs
401 ) -> int:
402 """Get an application scale count
403
404 :param: resource_name str: The application name in the Juju Bundle
405 :param: kdu_instance str: KDU instance name
406 :param namespace str: The namespace (model) where the Bundle was deployed
407 :param kwargs: Additional parameters
408 vca_id (str): VCA ID
409 :return: Return application instance count
410 """
411
412 model_name = self._obtain_namespace(
413 kdu_instance=kdu_instance, namespace=namespace
414 )
415 try:
416 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
417 status = await libjuju.get_model_status(model_name=model_name)
418 return len(status.applications[resource_name].units)
419 except Exception as e:
420 error_msg = (
421 f"Error getting scale count from application {resource_name} of the model {model_name} of "
422 f"the kdu instance {kdu_instance}: {e}"
423 )
424 self.log.error(error_msg)
425 raise K8sException(message=error_msg)
426
427 async def instances_list(self, cluster_uuid: str) -> list:
428 """
429 returns a list of deployed releases in a cluster
430
431 :param cluster_uuid: the cluster
432 :return:
433 """
434 return []
435
436 async def upgrade(
437 self,
438 cluster_uuid: str,
439 kdu_instance: str,
440 kdu_model: str = None,
441 params: dict = None,
442 ) -> str:
443 """Upgrade a model
444
445 :param cluster_uuid str: The UUID of the cluster to upgrade
446 :param kdu_instance str: The unique name of the KDU instance
447 :param kdu_model str: The name or path of the bundle to upgrade to
448 :param params dict: Key-value pairs of instantiation parameters
449
450 :return: If successful, reference to the new revision number of the
451 KDU instance.
452 """
453
454 # TODO: Loop through the bundle and upgrade each charm individually
455
456 """
457 The API doesn't have a concept of bundle upgrades, because there are
458 many possible changes: charm revision, disk, number of units, etc.
459
460 As such, we are only supporting a limited subset of upgrades. We'll
461 upgrade the charm revision but leave storage and scale untouched.
462
463 Scale changes should happen through OSM constructs, and changes to
464 storage would require a redeployment of the service, at least in this
465 initial release.
466 """
467 raise MethodNotImplemented()
468
469 """Rollback"""
470
471 async def rollback(
472 self, cluster_uuid: str, kdu_instance: str, revision: int = 0
473 ) -> str:
474 """Rollback a model
475
476 :param cluster_uuid str: The UUID of the cluster to rollback
477 :param kdu_instance str: The unique name of the KDU instance
478 :param revision int: The revision to revert to. If omitted, rolls back
479 the previous upgrade.
480
481 :return: If successful, returns the revision of active KDU instance,
482 or raises an exception
483 """
484 raise MethodNotImplemented()
485
486 """Deletion"""
487
488 async def uninstall(
489 self, cluster_uuid: str, kdu_instance: str, namespace: str = None, **kwargs
490 ) -> bool:
491 """Uninstall a KDU instance
492
493 :param cluster_uuid str: The UUID of the cluster
494 :param kdu_instance str: The unique name of the KDU instance
495 :param namespace str: The namespace (model) where the Bundle was deployed
496 :param kwargs: Additional parameters
497 vca_id (str): VCA ID
498
499 :return: Returns True if successful, or raises an exception
500 """
501 model_name = self._obtain_namespace(
502 kdu_instance=kdu_instance, namespace=namespace
503 )
504
505 self.log.debug(f"[uninstall] Destroying model: {model_name}")
506
507 will_not_delete = False
508 if model_name not in self.uninstall_locks:
509 self.uninstall_locks[model_name] = asyncio.Lock()
510 delete_lock = self.uninstall_locks[model_name]
511
512 while delete_lock.locked():
513 will_not_delete = True
514 await asyncio.sleep(0.1)
515
516 if will_not_delete:
517 self.log.info("Model {} deleted by another worker.".format(model_name))
518 return True
519
520 try:
521 async with delete_lock:
522 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
523
524 await libjuju.destroy_model(model_name, total_timeout=3600)
525 finally:
526 self.uninstall_locks.pop(model_name)
527
528 self.log.debug(f"[uninstall] Model {model_name} destroyed")
529 return True
530
531 async def upgrade_charm(
532 self,
533 ee_id: str = None,
534 path: str = None,
535 charm_id: str = None,
536 charm_type: str = None,
537 timeout: float = None,
538 ) -> str:
539 """This method upgrade charms in VNFs
540
541 Args:
542 ee_id: Execution environment id
543 path: Local path to the charm
544 charm_id: charm-id
545 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
546 timeout: (Float) Timeout for the ns update operation
547
548 Returns:
549 The output of the update operation if status equals to "completed"
550 """
551 raise K8sException(
552 "KDUs deployed with Juju Bundle do not support charm upgrade"
553 )
554
555 async def exec_primitive(
556 self,
557 cluster_uuid: str = None,
558 kdu_instance: str = None,
559 primitive_name: str = None,
560 timeout: float = 300,
561 params: dict = None,
562 db_dict: dict = None,
563 namespace: str = None,
564 **kwargs,
565 ) -> str:
566 """Exec primitive (Juju action)
567
568 :param cluster_uuid str: The UUID of the cluster
569 :param kdu_instance str: The unique name of the KDU instance
570 :param primitive_name: Name of action that will be executed
571 :param timeout: Timeout for action execution
572 :param params: Dictionary of all the parameters needed for the action
573 :param db_dict: Dictionary for any additional data
574 :param namespace str: The namespace (model) where the Bundle was deployed
575 :param kwargs: Additional parameters
576 vca_id (str): VCA ID
577
578 :return: Returns the output of the action
579 """
580 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
581
582 namespace = self._obtain_namespace(
583 kdu_instance=kdu_instance, namespace=namespace
584 )
585
586 if not params or "application-name" not in params:
587 raise K8sException(
588 "Missing application-name argument, \
589 argument needed for K8s actions"
590 )
591 try:
592 self.log.debug(
593 "[exec_primitive] Getting model "
594 "{} for the kdu_instance: {}".format(namespace, kdu_instance)
595 )
596 application_name = params["application-name"]
597 actions = await libjuju.get_actions(
598 application_name=application_name, model_name=namespace
599 )
600 if primitive_name not in actions:
601 raise K8sException("Primitive {} not found".format(primitive_name))
602 output, status = await libjuju.execute_action(
603 application_name=application_name,
604 model_name=namespace,
605 action_name=primitive_name,
606 **params,
607 )
608
609 if status != "completed":
610 raise K8sException(
611 "status is not completed: {} output: {}".format(status, output)
612 )
613 if self.on_update_db:
614 await self.on_update_db(
615 cluster_uuid=cluster_uuid,
616 kdu_instance=kdu_instance,
617 filter=db_dict["filter"],
618 )
619
620 return output
621
622 except Exception as e:
623 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
624 self.log.error(error_msg)
625 raise K8sException(message=error_msg)
626
627 """Introspection"""
628
629 async def inspect_kdu(self, kdu_model: str) -> dict:
630 """Inspect a KDU
631
632 Inspects a bundle and returns a dictionary of config parameters and
633 their default values.
634
635 :param kdu_model str: The name or path of the bundle to inspect.
636
637 :return: If successful, returns a dictionary of available parameters
638 and their default values.
639 """
640
641 kdu = {}
642 if not os.path.exists(kdu_model):
643 raise K8sException("file {} not found".format(kdu_model))
644
645 with open(kdu_model, "r") as f:
646 bundle = yaml.safe_load(f.read())
647
648 """
649 {
650 'description': 'Test bundle',
651 'bundle': 'kubernetes',
652 'applications': {
653 'mariadb-k8s': {
654 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
655 'scale': 1,
656 'options': {
657 'password': 'manopw',
658 'root_password': 'osm4u',
659 'user': 'mano'
660 },
661 'series': 'kubernetes'
662 }
663 }
664 }
665 """
666 # TODO: This should be returned in an agreed-upon format
667 kdu = bundle["applications"]
668
669 return kdu
670
671 async def help_kdu(self, kdu_model: str) -> str:
672 """View the README
673
674 If available, returns the README of the bundle.
675
676 :param kdu_model str: The name or path of a bundle
677 f
678 :return: If found, returns the contents of the README.
679 """
680 readme = None
681
682 files = ["README", "README.txt", "README.md"]
683 path = os.path.dirname(kdu_model)
684 for file in os.listdir(path):
685 if file in files:
686 with open(file, "r") as f:
687 readme = f.read()
688 break
689
690 return readme
691
692 async def status_kdu(
693 self,
694 cluster_uuid: str,
695 kdu_instance: str,
696 complete_status: bool = False,
697 yaml_format: bool = False,
698 namespace: str = None,
699 **kwargs,
700 ) -> Union[str, dict]:
701 """Get the status of the KDU
702
703 Get the current status of the KDU instance.
704
705 :param cluster_uuid str: The UUID of the cluster
706 :param kdu_instance str: The unique id of the KDU instance
707 :param complete_status: To get the complete_status of the KDU
708 :param yaml_format: To get the status in proper format for NSR record
709 :param namespace str: The namespace (model) where the Bundle was deployed
710 :param: kwargs: Additional parameters
711 vca_id (str): VCA ID
712
713 :return: Returns a dictionary containing namespace, state, resources,
714 and deployment_time and returns complete_status if complete_status is True
715 """
716 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
717 status = {}
718
719 model_name = self._obtain_namespace(
720 kdu_instance=kdu_instance, namespace=namespace
721 )
722 model_status = await libjuju.get_model_status(model_name=model_name)
723
724 if not complete_status:
725 for name in model_status.applications:
726 application = model_status.applications[name]
727 status[name] = {"status": application["status"]["status"]}
728 else:
729 if yaml_format:
730 return obj_to_yaml(model_status)
731 else:
732 return obj_to_dict(model_status)
733
734 return status
735
736 async def add_relation(
737 self, provider: RelationEndpoint, requirer: RelationEndpoint
738 ):
739 """
740 Add relation between two charmed endpoints
741
742 :param: provider: Provider relation endpoint
743 :param: requirer: Requirer relation endpoint
744 """
745 self.log.debug(f"adding new relation between {provider} and {requirer}")
746 cross_model_relation = (
747 provider.model_name != requirer.model_name
748 or provider.vca_id != requirer.vca_id
749 )
750 try:
751 if cross_model_relation:
752 # Cross-model relation
753 provider_libjuju = await self._get_libjuju(provider.vca_id)
754 requirer_libjuju = await self._get_libjuju(requirer.vca_id)
755 offer = await provider_libjuju.offer(provider)
756 if offer:
757 saas_name = await requirer_libjuju.consume(
758 requirer.model_name, offer, provider_libjuju
759 )
760 await requirer_libjuju.add_relation(
761 requirer.model_name, requirer.endpoint, saas_name
762 )
763 else:
764 # Standard relation
765 vca_id = provider.vca_id
766 model = provider.model_name
767 libjuju = await self._get_libjuju(vca_id)
768 # add juju relations between two applications
769 await libjuju.add_relation(
770 model_name=model,
771 endpoint_1=provider.endpoint,
772 endpoint_2=requirer.endpoint,
773 )
774 except Exception as e:
775 message = f"Error adding relation between {provider} and {requirer}: {e}"
776 self.log.error(message)
777 raise Exception(message=message)
778
779 async def update_vca_status(
780 self, vcastatus: dict, kdu_instance: str, namespace: str = None, **kwargs
781 ):
782 """
783 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
784
785 :param vcastatus dict: dict containing vcastatus
786 :param kdu_instance str: The unique id of the KDU instance
787 :param namespace str: The namespace (model) where the Bundle was deployed
788 :param: kwargs: Additional parameters
789 vca_id (str): VCA ID
790
791 :return: None
792 """
793
794 model_name = self._obtain_namespace(
795 kdu_instance=kdu_instance, namespace=namespace
796 )
797
798 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
799 try:
800 for vca_model_name in vcastatus:
801 # Adding executed actions
802 vcastatus[vca_model_name][
803 "executedActions"
804 ] = await libjuju.get_executed_actions(model_name=model_name)
805
806 for application in vcastatus[vca_model_name]["applications"]:
807 # Adding application actions
808 vcastatus[vca_model_name]["applications"][application][
809 "actions"
810 ] = {}
811 # Adding application configs
812 vcastatus[vca_model_name]["applications"][application][
813 "configs"
814 ] = await libjuju.get_application_configs(
815 model_name=model_name, application_name=application
816 )
817
818 except Exception as e:
819 self.log.debug("Error in updating vca status: {}".format(str(e)))
820
821 async def get_services(
822 self, cluster_uuid: str, kdu_instance: str, namespace: str
823 ) -> list:
824 """Return a list of services of a kdu_instance"""
825
826 namespace = self._obtain_namespace(
827 kdu_instance=kdu_instance, namespace=namespace
828 )
829
830 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
831 kubectl = self._get_kubectl(credentials)
832 return kubectl.get_services(
833 field_selector="metadata.namespace={}".format(namespace)
834 )
835
836 async def get_service(
837 self, cluster_uuid: str, service_name: str, namespace: str
838 ) -> object:
839 """Return data for a specific service inside a namespace"""
840
841 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
842 kubectl = self._get_kubectl(credentials)
843 return kubectl.get_services(
844 field_selector="metadata.name={},metadata.namespace={}".format(
845 service_name, namespace
846 )
847 )[0]
848
849 def get_credentials(self, cluster_uuid: str) -> str:
850 """
851 Get Cluster Kubeconfig
852 """
853 k8scluster = self.db.get_one(
854 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
855 )
856
857 self.db.encrypt_decrypt_fields(
858 k8scluster.get("credentials"),
859 "decrypt",
860 ["password", "secret"],
861 schema_version=k8scluster["schema_version"],
862 salt=k8scluster["_id"],
863 )
864
865 return yaml.safe_dump(k8scluster.get("credentials"))
866
867 def _get_credential_name(self, cluster_uuid: str) -> str:
868 """
869 Get credential name for a k8s cloud
870
871 We cannot use the cluster_uuid for the credential name directly,
872 because it cannot start with a number, it must start with a letter.
873 Therefore, the k8s cloud credential name will be "cred-" followed
874 by the cluster uuid.
875
876 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
877
878 :return: Name to use for the credential name.
879 """
880 return "cred-{}".format(cluster_uuid)
881
882 def get_namespace(self, cluster_uuid: str) -> str:
883 """Get the namespace UUID
884 Gets the namespace's unique name
885
886 :param cluster_uuid str: The UUID of the cluster
887 :returns: The namespace UUID, or raises an exception
888 """
889 pass
890
891 @staticmethod
892 def generate_kdu_instance_name(**kwargs):
893 db_dict = kwargs.get("db_dict")
894 kdu_name = kwargs.get("kdu_name", None)
895 if kdu_name:
896 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
897 else:
898 kdu_instance = db_dict["filter"]["_id"]
899 return kdu_instance
900
901 async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
902 """
903 Get libjuju object
904
905 :param: vca_id: VCA ID
906 If None, get a libjuju object with a Connection to the default VCA
907 Else, geta libjuju object with a Connection to the specified VCA
908 """
909 if not vca_id:
910 while self.loading_libjuju.locked():
911 await asyncio.sleep(0.1)
912 if not self.libjuju:
913 async with self.loading_libjuju:
914 vca_connection = await get_connection(self._store)
915 self.libjuju = Libjuju(vca_connection, log=self.log)
916 return self.libjuju
917 else:
918 vca_connection = await get_connection(self._store, vca_id)
919 return Libjuju(vca_connection, log=self.log, n2vc=self)
920
921 def _get_kubectl(self, credentials: str) -> Kubectl:
922 """
923 Get Kubectl object
924
925 :param: kubeconfig_credentials: Kubeconfig credentials
926 """
927 kubecfg = tempfile.NamedTemporaryFile()
928 with open(kubecfg.name, "w") as kubecfg_file:
929 kubecfg_file.write(credentials)
930 return Kubectl(config_file=kubecfg.name)
931
932 def _obtain_namespace(self, kdu_instance: str, namespace: str = None) -> str:
933 """
934 Obtain the namespace/model name to use in the instantiation of a Juju Bundle in K8s. The default namespace is
935 the kdu_instance name. However, if the user passes the namespace where he wants to deploy the bundle,
936 that namespace will be used.
937
938 :param kdu_instance: the default KDU instance name
939 :param namespace: the namespace passed by the User
940 """
941
942 # deault the namespace/model name to the kdu_instance name TODO -> this should be the real return... But
943 # once the namespace is not passed in most methods, I had to do this in another way. But I think this should
944 # be the procedure in the future return namespace if namespace else kdu_instance
945
946 # TODO -> has referred above, this should be avoided in the future, this is temporary, in order to avoid
947 # compatibility issues
948 return (
949 namespace
950 if namespace
951 else self._obtain_namespace_from_db(kdu_instance=kdu_instance)
952 )
953
954 def _obtain_namespace_from_db(self, kdu_instance: str) -> str:
955 db_nsrs = self.db.get_one(
956 table="nsrs", q_filter={"_admin.deployed.K8s.kdu-instance": kdu_instance}
957 )
958 for k8s in db_nsrs["_admin"]["deployed"]["K8s"]:
959 if k8s.get("kdu-instance") == kdu_instance:
960 return k8s.get("namespace")
961 return ""