Fix black issues
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 from typing import Union
17 import os
18 import uuid
19 import yaml
20 import tempfile
21 import binascii
22
23 from n2vc.config import EnvironConfig
24 from n2vc.exceptions import K8sException
25 from n2vc.k8s_conn import K8sConnector
26 from n2vc.kubectl import Kubectl
27 from .exceptions import MethodNotImplemented
28 from n2vc.libjuju import Libjuju
29 from n2vc.utils import obj_to_dict, obj_to_yaml
30 from n2vc.store import MotorStore
31 from n2vc.vca.cloud import Cloud
32 from n2vc.vca.connection import get_connection
33
34
35 RBAC_LABEL_KEY_NAME = "rbac-id"
36 RBAC_STACK_PREFIX = "juju-credential"
37
38
39 def generate_rbac_id():
40 return binascii.hexlify(os.urandom(4)).decode()
41
42
43 class K8sJujuConnector(K8sConnector):
44 libjuju = None
45
46 def __init__(
47 self,
48 fs: object,
49 db: object,
50 kubectl_command: str = "/usr/bin/kubectl",
51 juju_command: str = "/usr/bin/juju",
52 log: object = None,
53 loop: object = None,
54 on_update_db=None,
55 ):
56 """
57 :param fs: file system for kubernetes and helm configuration
58 :param db: Database object
59 :param kubectl_command: path to kubectl executable
60 :param helm_command: path to helm executable
61 :param log: logger
62 :param: loop: Asyncio loop
63 """
64
65 # parent class
66 K8sConnector.__init__(
67 self,
68 db,
69 log=log,
70 on_update_db=on_update_db,
71 )
72
73 self.fs = fs
74 self.loop = loop or asyncio.get_event_loop()
75 self.log.debug("Initializing K8S Juju connector")
76
77 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
78 self._store = MotorStore(db_uri)
79 self.loading_libjuju = asyncio.Lock(loop=self.loop)
80 self.uninstall_locks = {}
81
82 self.log.debug("K8S Juju connector initialized")
83 # TODO: Remove these commented lines:
84 # self.authenticated = False
85 # self.models = {}
86 # self.juju_secret = ""
87
88 """Initialization"""
89
90 async def init_env(
91 self,
92 k8s_creds: str,
93 namespace: str = "kube-system",
94 reuse_cluster_uuid: str = None,
95 **kwargs,
96 ) -> (str, bool):
97 """
98 It prepares a given K8s cluster environment to run Juju bundles.
99
100 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
101 '.kube/config'
102 :param namespace: optional namespace to be used for juju. By default,
103 'kube-system' will be used
104 :param reuse_cluster_uuid: existing cluster uuid for reuse
105 :param: kwargs: Additional parameters
106 vca_id (str): VCA ID
107
108 :return: uuid of the K8s cluster and True if connector has installed some
109 software in the cluster
110 (on error, an exception will be raised)
111 """
112 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
113
114 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
115 kubectl = self._get_kubectl(k8s_creds)
116
117 # CREATING RESOURCES IN K8S
118 rbac_id = generate_rbac_id()
119 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
120 labels = {RBAC_STACK_PREFIX: rbac_id}
121
122 # Create cleanup dictionary to clean up created resources
123 # if it fails in the middle of the process
124 cleanup_data = []
125 try:
126 kubectl.create_cluster_role(
127 name=metadata_name,
128 labels=labels,
129 )
130 cleanup_data.append(
131 {
132 "delete": kubectl.delete_cluster_role,
133 "args": (metadata_name,),
134 }
135 )
136
137 kubectl.create_service_account(
138 name=metadata_name,
139 labels=labels,
140 )
141 cleanup_data.append(
142 {
143 "delete": kubectl.delete_service_account,
144 "args": (metadata_name,),
145 }
146 )
147
148 kubectl.create_cluster_role_binding(
149 name=metadata_name,
150 labels=labels,
151 )
152 cleanup_data.append(
153 {
154 "delete": kubectl.delete_service_account,
155 "args": (metadata_name,),
156 }
157 )
158 token, client_cert_data = await kubectl.get_secret_data(
159 metadata_name,
160 )
161
162 default_storage_class = kubectl.get_default_storage_class()
163 await libjuju.add_k8s(
164 name=cluster_uuid,
165 rbac_id=rbac_id,
166 token=token,
167 client_cert_data=client_cert_data,
168 configuration=kubectl.configuration,
169 storage_class=default_storage_class,
170 credential_name=self._get_credential_name(cluster_uuid),
171 )
172 return cluster_uuid, True
173 except Exception as e:
174 self.log.error("Error initializing k8scluster: {}".format(e))
175 if len(cleanup_data) > 0:
176 self.log.debug("Cleaning up created resources in k8s cluster...")
177 for item in cleanup_data:
178 delete_function = item["delete"]
179 delete_args = item["args"]
180 delete_function(*delete_args)
181 self.log.debug("Cleanup finished")
182 raise e
183
184 """Repo Management"""
185
186 async def repo_add(
187 self,
188 name: str,
189 url: str,
190 _type: str = "charm",
191 ):
192 raise MethodNotImplemented()
193
194 async def repo_list(self):
195 raise MethodNotImplemented()
196
197 async def repo_remove(
198 self,
199 name: str,
200 ):
201 raise MethodNotImplemented()
202
203 async def synchronize_repos(self, cluster_uuid: str, name: str):
204 """
205 Returns None as currently add_repo is not implemented
206 """
207 return None
208
209 """Reset"""
210
211 async def reset(
212 self,
213 cluster_uuid: str,
214 force: bool = False,
215 uninstall_sw: bool = False,
216 **kwargs,
217 ) -> bool:
218 """Reset a cluster
219
220 Resets the Kubernetes cluster by removing the model that represents it.
221
222 :param cluster_uuid str: The UUID of the cluster to reset
223 :param force: Force reset
224 :param uninstall_sw: Boolean to uninstall sw
225 :param: kwargs: Additional parameters
226 vca_id (str): VCA ID
227
228 :return: Returns True if successful or raises an exception.
229 """
230
231 try:
232 self.log.debug("[reset] Removing k8s cloud")
233 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
234
235 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
236
237 cloud_creds = await libjuju.get_cloud_credentials(cloud)
238
239 await libjuju.remove_cloud(cluster_uuid)
240
241 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
242
243 kubectl = self._get_kubectl(credentials)
244
245 delete_functions = [
246 kubectl.delete_cluster_role_binding,
247 kubectl.delete_service_account,
248 kubectl.delete_cluster_role,
249 ]
250
251 credential_attrs = cloud_creds[0].result["attrs"]
252 if RBAC_LABEL_KEY_NAME in credential_attrs:
253 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
254 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
255 for delete_func in delete_functions:
256 try:
257 delete_func(metadata_name)
258 except Exception as e:
259 self.log.warning("Cannot remove resource in K8s {}".format(e))
260
261 except Exception as e:
262 self.log.debug("Caught exception during reset: {}".format(e))
263 raise e
264 return True
265
266 """Deployment"""
267
268 async def install(
269 self,
270 cluster_uuid: str,
271 kdu_model: str,
272 kdu_instance: str,
273 atomic: bool = True,
274 timeout: float = 1800,
275 params: dict = None,
276 db_dict: dict = None,
277 kdu_name: str = None,
278 namespace: str = None,
279 **kwargs,
280 ) -> bool:
281 """Install a bundle
282
283 :param cluster_uuid str: The UUID of the cluster to install to
284 :param kdu_model str: The name or path of a bundle to install
285 :param kdu_instance: Kdu instance name
286 :param atomic bool: If set, waits until the model is active and resets
287 the cluster on failure.
288 :param timeout int: The time, in seconds, to wait for the install
289 to finish
290 :param params dict: Key-value pairs of instantiation parameters
291 :param kdu_name: Name of the KDU instance to be installed
292 :param namespace: K8s namespace to use for the KDU instance
293 :param kwargs: Additional parameters
294 vca_id (str): VCA ID
295
296 :return: If successful, returns ?
297 """
298 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
299 bundle = kdu_model
300
301 if not db_dict:
302 raise K8sException("db_dict must be set")
303 if not bundle:
304 raise K8sException("bundle must be set")
305
306 if bundle.startswith("cs:"):
307 # For Juju Bundles provided by the Charm Store
308 pass
309 elif bundle.startswith("ch:"):
310 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
311 pass
312 elif bundle.startswith("http"):
313 # Download the file
314 pass
315 else:
316 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
317 os.chdir(new_workdir)
318 bundle = "local:{}".format(kdu_model)
319
320 self.log.debug("Checking for model named {}".format(kdu_instance))
321
322 # Create the new model
323 self.log.debug("Adding model: {}".format(kdu_instance))
324 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
325 await libjuju.add_model(kdu_instance, cloud)
326
327 # if model:
328 # TODO: Instantiation parameters
329
330 """
331 "Juju bundle that models the KDU, in any of the following ways:
332 - <juju-repo>/<juju-bundle>
333 - <juju-bundle folder under k8s_models folder in the package>
334 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
335 in the package>
336 - <URL_where_to_fetch_juju_bundle>
337 """
338 try:
339 previous_workdir = os.getcwd()
340 except FileNotFoundError:
341 previous_workdir = "/app/storage"
342
343 self.log.debug("[install] deploying {}".format(bundle))
344 await libjuju.deploy(
345 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
346 )
347 os.chdir(previous_workdir)
348 if self.on_update_db:
349 await self.on_update_db(
350 cluster_uuid,
351 kdu_instance,
352 filter=db_dict["filter"],
353 vca_id=kwargs.get("vca_id"),
354 )
355 return True
356
357 async def scale(
358 self,
359 kdu_instance: str,
360 scale: int,
361 resource_name: str,
362 total_timeout: float = 1800,
363 **kwargs,
364 ) -> bool:
365 """Scale an application in a model
366
367 :param: kdu_instance str: KDU instance name
368 :param: scale int: Scale to which to set this application
369 :param: resource_name str: Resource name (Application name)
370 :param: timeout float: The time, in seconds, to wait for the install
371 to finish
372 :param kwargs: Additional parameters
373 vca_id (str): VCA ID
374
375 :return: If successful, returns True
376 """
377
378 try:
379 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
380 await libjuju.scale_application(
381 model_name=kdu_instance,
382 application_name=resource_name,
383 scale=scale,
384 total_timeout=total_timeout,
385 )
386 except Exception as e:
387 error_msg = "Error scaling application {} in kdu instance {}: {}".format(
388 resource_name, kdu_instance, e
389 )
390 self.log.error(error_msg)
391 raise K8sException(message=error_msg)
392 return True
393
394 async def get_scale_count(
395 self,
396 resource_name: str,
397 kdu_instance: str,
398 **kwargs,
399 ) -> int:
400 """Get an application scale count
401
402 :param: resource_name str: Resource name (Application name)
403 :param: kdu_instance str: KDU instance name
404 :param kwargs: Additional parameters
405 vca_id (str): VCA ID
406 :return: Return application instance count
407 """
408 try:
409 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
410 status = await libjuju.get_model_status(kdu_instance)
411 return len(status.applications[resource_name].units)
412 except Exception as e:
413 error_msg = "Error getting scale count from application {} in kdu instance {}: {}".format(
414 resource_name, kdu_instance, e
415 )
416 self.log.error(error_msg)
417 raise K8sException(message=error_msg)
418
419 async def instances_list(self, cluster_uuid: str) -> list:
420 """
421 returns a list of deployed releases in a cluster
422
423 :param cluster_uuid: the cluster
424 :return:
425 """
426 return []
427
428 async def upgrade(
429 self,
430 cluster_uuid: str,
431 kdu_instance: str,
432 kdu_model: str = None,
433 params: dict = None,
434 ) -> str:
435 """Upgrade a model
436
437 :param cluster_uuid str: The UUID of the cluster to upgrade
438 :param kdu_instance str: The unique name of the KDU instance
439 :param kdu_model str: The name or path of the bundle to upgrade to
440 :param params dict: Key-value pairs of instantiation parameters
441
442 :return: If successful, reference to the new revision number of the
443 KDU instance.
444 """
445
446 # TODO: Loop through the bundle and upgrade each charm individually
447
448 """
449 The API doesn't have a concept of bundle upgrades, because there are
450 many possible changes: charm revision, disk, number of units, etc.
451
452 As such, we are only supporting a limited subset of upgrades. We'll
453 upgrade the charm revision but leave storage and scale untouched.
454
455 Scale changes should happen through OSM constructs, and changes to
456 storage would require a redeployment of the service, at least in this
457 initial release.
458 """
459 raise MethodNotImplemented()
460
461 """Rollback"""
462
463 async def rollback(
464 self,
465 cluster_uuid: str,
466 kdu_instance: str,
467 revision: int = 0,
468 ) -> str:
469 """Rollback a model
470
471 :param cluster_uuid str: The UUID of the cluster to rollback
472 :param kdu_instance str: The unique name of the KDU instance
473 :param revision int: The revision to revert to. If omitted, rolls back
474 the previous upgrade.
475
476 :return: If successful, returns the revision of active KDU instance,
477 or raises an exception
478 """
479 raise MethodNotImplemented()
480
481 """Deletion"""
482
483 async def uninstall(
484 self,
485 cluster_uuid: str,
486 kdu_instance: str,
487 **kwargs,
488 ) -> bool:
489 """Uninstall a KDU instance
490
491 :param cluster_uuid str: The UUID of the cluster
492 :param kdu_instance str: The unique name of the KDU instance
493 :param kwargs: Additional parameters
494 vca_id (str): VCA ID
495
496 :return: Returns True if successful, or raises an exception
497 """
498
499 self.log.debug("[uninstall] Destroying model")
500
501 will_not_delete = False
502 if kdu_instance not in self.uninstall_locks:
503 self.uninstall_locks[kdu_instance] = asyncio.Lock(loop=self.loop)
504 delete_lock = self.uninstall_locks[kdu_instance]
505
506 while delete_lock.locked():
507 will_not_delete = True
508 await asyncio.sleep(0.1)
509
510 if will_not_delete:
511 self.log.info("Model {} deleted by another worker.".format(kdu_instance))
512 return True
513
514 try:
515 async with delete_lock:
516 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
517
518 await libjuju.destroy_model(kdu_instance, total_timeout=3600)
519 finally:
520 self.uninstall_locks.pop(kdu_instance)
521
522 self.log.debug(f"[uninstall] Model {kdu_instance} destroyed")
523 return True
524
525 async def exec_primitive(
526 self,
527 cluster_uuid: str = None,
528 kdu_instance: str = None,
529 primitive_name: str = None,
530 timeout: float = 300,
531 params: dict = None,
532 db_dict: dict = None,
533 **kwargs,
534 ) -> str:
535 """Exec primitive (Juju action)
536
537 :param cluster_uuid str: The UUID of the cluster
538 :param kdu_instance str: The unique name of the KDU instance
539 :param primitive_name: Name of action that will be executed
540 :param timeout: Timeout for action execution
541 :param params: Dictionary of all the parameters needed for the action
542 :param db_dict: Dictionary for any additional data
543 :param kwargs: Additional parameters
544 vca_id (str): VCA ID
545
546 :return: Returns the output of the action
547 """
548 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
549
550 if not params or "application-name" not in params:
551 raise K8sException(
552 "Missing application-name argument, \
553 argument needed for K8s actions"
554 )
555 try:
556 self.log.debug(
557 "[exec_primitive] Getting model "
558 "kdu_instance: {}".format(kdu_instance)
559 )
560 application_name = params["application-name"]
561 actions = await libjuju.get_actions(application_name, kdu_instance)
562 if primitive_name not in actions:
563 raise K8sException("Primitive {} not found".format(primitive_name))
564 output, status = await libjuju.execute_action(
565 application_name, kdu_instance, primitive_name, **params
566 )
567
568 if status != "completed":
569 raise K8sException(
570 "status is not completed: {} output: {}".format(status, output)
571 )
572 if self.on_update_db:
573 await self.on_update_db(
574 cluster_uuid, kdu_instance, filter=db_dict["filter"]
575 )
576
577 return output
578
579 except Exception as e:
580 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
581 self.log.error(error_msg)
582 raise K8sException(message=error_msg)
583
584 """Introspection"""
585
586 async def inspect_kdu(
587 self,
588 kdu_model: str,
589 ) -> dict:
590 """Inspect a KDU
591
592 Inspects a bundle and returns a dictionary of config parameters and
593 their default values.
594
595 :param kdu_model str: The name or path of the bundle to inspect.
596
597 :return: If successful, returns a dictionary of available parameters
598 and their default values.
599 """
600
601 kdu = {}
602 if not os.path.exists(kdu_model):
603 raise K8sException("file {} not found".format(kdu_model))
604
605 with open(kdu_model, "r") as f:
606 bundle = yaml.safe_load(f.read())
607
608 """
609 {
610 'description': 'Test bundle',
611 'bundle': 'kubernetes',
612 'applications': {
613 'mariadb-k8s': {
614 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
615 'scale': 1,
616 'options': {
617 'password': 'manopw',
618 'root_password': 'osm4u',
619 'user': 'mano'
620 },
621 'series': 'kubernetes'
622 }
623 }
624 }
625 """
626 # TODO: This should be returned in an agreed-upon format
627 kdu = bundle["applications"]
628
629 return kdu
630
631 async def help_kdu(
632 self,
633 kdu_model: str,
634 ) -> str:
635 """View the README
636
637 If available, returns the README of the bundle.
638
639 :param kdu_model str: The name or path of a bundle
640
641 :return: If found, returns the contents of the README.
642 """
643 readme = None
644
645 files = ["README", "README.txt", "README.md"]
646 path = os.path.dirname(kdu_model)
647 for file in os.listdir(path):
648 if file in files:
649 with open(file, "r") as f:
650 readme = f.read()
651 break
652
653 return readme
654
655 async def status_kdu(
656 self,
657 cluster_uuid: str,
658 kdu_instance: str,
659 complete_status: bool = False,
660 yaml_format: bool = False,
661 **kwargs,
662 ) -> Union[str, dict]:
663 """Get the status of the KDU
664
665 Get the current status of the KDU instance.
666
667 :param cluster_uuid str: The UUID of the cluster
668 :param kdu_instance str: The unique id of the KDU instance
669 :param complete_status: To get the complete_status of the KDU
670 :param yaml_format: To get the status in proper format for NSR record
671 :param: kwargs: Additional parameters
672 vca_id (str): VCA ID
673
674 :return: Returns a dictionary containing namespace, state, resources,
675 and deployment_time and returns complete_status if complete_status is True
676 """
677 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
678 status = {}
679
680 model_status = await libjuju.get_model_status(kdu_instance)
681
682 if not complete_status:
683 for name in model_status.applications:
684 application = model_status.applications[name]
685 status[name] = {"status": application["status"]["status"]}
686 else:
687 if yaml_format:
688 return obj_to_yaml(model_status)
689 else:
690 return obj_to_dict(model_status)
691
692 return status
693
694 async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs):
695 """
696 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
697
698 :param vcastatus dict: dict containing vcastatus
699 :param kdu_instance str: The unique id of the KDU instance
700 :param: kwargs: Additional parameters
701 vca_id (str): VCA ID
702
703 :return: None
704 """
705 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
706 try:
707 for model_name in vcastatus:
708 # Adding executed actions
709 vcastatus[model_name][
710 "executedActions"
711 ] = await libjuju.get_executed_actions(kdu_instance)
712
713 for application in vcastatus[model_name]["applications"]:
714 # Adding application actions
715 vcastatus[model_name]["applications"][application][
716 "actions"
717 ] = await libjuju.get_actions(application, kdu_instance)
718 # Adding application configs
719 vcastatus[model_name]["applications"][application][
720 "configs"
721 ] = await libjuju.get_application_configs(kdu_instance, application)
722
723 except Exception as e:
724 self.log.debug("Error in updating vca status: {}".format(str(e)))
725
726 async def get_services(
727 self, cluster_uuid: str, kdu_instance: str, namespace: str
728 ) -> list:
729 """Return a list of services of a kdu_instance"""
730
731 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
732 kubectl = self._get_kubectl(credentials)
733 return kubectl.get_services(
734 field_selector="metadata.namespace={}".format(kdu_instance)
735 )
736
737 async def get_service(
738 self, cluster_uuid: str, service_name: str, namespace: str
739 ) -> object:
740 """Return data for a specific service inside a namespace"""
741
742 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
743 kubectl = self._get_kubectl(credentials)
744 return kubectl.get_services(
745 field_selector="metadata.name={},metadata.namespace={}".format(
746 service_name, namespace
747 )
748 )[0]
749
750 def get_credentials(self, cluster_uuid: str) -> str:
751 """
752 Get Cluster Kubeconfig
753 """
754 k8scluster = self.db.get_one(
755 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
756 )
757
758 self.db.encrypt_decrypt_fields(
759 k8scluster.get("credentials"),
760 "decrypt",
761 ["password", "secret"],
762 schema_version=k8scluster["schema_version"],
763 salt=k8scluster["_id"],
764 )
765
766 return yaml.safe_dump(k8scluster.get("credentials"))
767
768 def _get_credential_name(self, cluster_uuid: str) -> str:
769 """
770 Get credential name for a k8s cloud
771
772 We cannot use the cluster_uuid for the credential name directly,
773 because it cannot start with a number, it must start with a letter.
774 Therefore, the k8s cloud credential name will be "cred-" followed
775 by the cluster uuid.
776
777 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
778
779 :return: Name to use for the credential name.
780 """
781 return "cred-{}".format(cluster_uuid)
782
783 def get_namespace(
784 self,
785 cluster_uuid: str,
786 ) -> str:
787 """Get the namespace UUID
788 Gets the namespace's unique name
789
790 :param cluster_uuid str: The UUID of the cluster
791 :returns: The namespace UUID, or raises an exception
792 """
793 pass
794
795 @staticmethod
796 def generate_kdu_instance_name(**kwargs):
797 db_dict = kwargs.get("db_dict")
798 kdu_name = kwargs.get("kdu_name", None)
799 if kdu_name:
800 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
801 else:
802 kdu_instance = db_dict["filter"]["_id"]
803 return kdu_instance
804
805 async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
806 """
807 Get libjuju object
808
809 :param: vca_id: VCA ID
810 If None, get a libjuju object with a Connection to the default VCA
811 Else, geta libjuju object with a Connection to the specified VCA
812 """
813 if not vca_id:
814 while self.loading_libjuju.locked():
815 await asyncio.sleep(0.1)
816 if not self.libjuju:
817 async with self.loading_libjuju:
818 vca_connection = await get_connection(self._store)
819 self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
820 return self.libjuju
821 else:
822 vca_connection = await get_connection(self._store, vca_id)
823 return Libjuju(
824 vca_connection,
825 loop=self.loop,
826 log=self.log,
827 n2vc=self,
828 )
829
830 def _get_kubectl(self, credentials: str) -> Kubectl:
831 """
832 Get Kubectl object
833
834 :param: kubeconfig_credentials: Kubeconfig credentials
835 """
836 kubecfg = tempfile.NamedTemporaryFile()
837 with open(kubecfg.name, "w") as kubecfg_file:
838 kubecfg_file.write(credentials)
839 return Kubectl(config_file=kubecfg.name)