Bug 1651 fix
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import os
17 import uuid
18 import yaml
19 import tempfile
20 import binascii
21
22 from n2vc.config import EnvironConfig
23 from n2vc.exceptions import K8sException
24 from n2vc.k8s_conn import K8sConnector
25 from n2vc.kubectl import Kubectl
26 from .exceptions import MethodNotImplemented
27 from n2vc.libjuju import Libjuju
28 from n2vc.utils import obj_to_dict, obj_to_yaml
29 from n2vc.store import MotorStore
30 from n2vc.vca.cloud import Cloud
31 from n2vc.vca.connection import get_connection
32
33
34 RBAC_LABEL_KEY_NAME = "rbac-id"
35 RBAC_STACK_PREFIX = "juju-credential"
36
37
38 def generate_rbac_id():
39 return binascii.hexlify(os.urandom(4)).decode()
40
41
42 class K8sJujuConnector(K8sConnector):
43 libjuju = None
44
45 def __init__(
46 self,
47 fs: object,
48 db: object,
49 kubectl_command: str = "/usr/bin/kubectl",
50 juju_command: str = "/usr/bin/juju",
51 log: object = None,
52 loop: object = None,
53 on_update_db=None,
54 ):
55 """
56 :param fs: file system for kubernetes and helm configuration
57 :param db: Database object
58 :param kubectl_command: path to kubectl executable
59 :param helm_command: path to helm executable
60 :param log: logger
61 :param: loop: Asyncio loop
62 """
63
64 # parent class
65 K8sConnector.__init__(
66 self,
67 db,
68 log=log,
69 on_update_db=on_update_db,
70 )
71
72 self.fs = fs
73 self.loop = loop or asyncio.get_event_loop()
74 self.log.debug("Initializing K8S Juju connector")
75
76 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
77 self._store = MotorStore(db_uri)
78 self.loading_libjuju = asyncio.Lock(loop=self.loop)
79
80 self.log.debug("K8S Juju connector initialized")
81 # TODO: Remove these commented lines:
82 # self.authenticated = False
83 # self.models = {}
84 # self.juju_secret = ""
85
86 """Initialization"""
87
88 async def init_env(
89 self,
90 k8s_creds: str,
91 namespace: str = "kube-system",
92 reuse_cluster_uuid: str = None,
93 **kwargs,
94 ) -> (str, bool):
95 """
96 It prepares a given K8s cluster environment to run Juju bundles.
97
98 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
99 '.kube/config'
100 :param namespace: optional namespace to be used for juju. By default,
101 'kube-system' will be used
102 :param reuse_cluster_uuid: existing cluster uuid for reuse
103 :param: kwargs: Additional parameters
104 vca_id (str): VCA ID
105
106 :return: uuid of the K8s cluster and True if connector has installed some
107 software in the cluster
108 (on error, an exception will be raised)
109 """
110 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
111
112 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
113 kubectl = self._get_kubectl(k8s_creds)
114
115 # CREATING RESOURCES IN K8S
116 rbac_id = generate_rbac_id()
117 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
118 labels = {RBAC_STACK_PREFIX: rbac_id}
119
120 # Create cleanup dictionary to clean up created resources
121 # if it fails in the middle of the process
122 cleanup_data = []
123 try:
124 kubectl.create_cluster_role(
125 name=metadata_name,
126 labels=labels,
127 )
128 cleanup_data.append(
129 {
130 "delete": kubectl.delete_cluster_role,
131 "args": (metadata_name),
132 }
133 )
134
135 kubectl.create_service_account(
136 name=metadata_name,
137 labels=labels,
138 )
139 cleanup_data.append(
140 {
141 "delete": kubectl.delete_service_account,
142 "args": (metadata_name),
143 }
144 )
145
146 kubectl.create_cluster_role_binding(
147 name=metadata_name,
148 labels=labels,
149 )
150 cleanup_data.append(
151 {
152 "delete": kubectl.delete_service_account,
153 "args": (metadata_name),
154 }
155 )
156 token, client_cert_data = await kubectl.get_secret_data(
157 metadata_name,
158 )
159
160 default_storage_class = kubectl.get_default_storage_class()
161 await libjuju.add_k8s(
162 name=cluster_uuid,
163 rbac_id=rbac_id,
164 token=token,
165 client_cert_data=client_cert_data,
166 configuration=kubectl.configuration,
167 storage_class=default_storage_class,
168 credential_name=self._get_credential_name(cluster_uuid),
169 )
170 return cluster_uuid, True
171 except Exception as e:
172 self.log.error("Error initializing k8scluster: {}".format(e))
173 if len(cleanup_data) > 0:
174 self.log.debug("Cleaning up created resources in k8s cluster...")
175 for item in cleanup_data:
176 delete_function = item["delete"]
177 delete_args = item["args"]
178 delete_function(*delete_args)
179 self.log.debug("Cleanup finished")
180 raise e
181
182 """Repo Management"""
183
184 async def repo_add(
185 self,
186 name: str,
187 url: str,
188 _type: str = "charm",
189 ):
190 raise MethodNotImplemented()
191
192 async def repo_list(self):
193 raise MethodNotImplemented()
194
195 async def repo_remove(
196 self,
197 name: str,
198 ):
199 raise MethodNotImplemented()
200
201 async def synchronize_repos(self, cluster_uuid: str, name: str):
202 """
203 Returns None as currently add_repo is not implemented
204 """
205 return None
206
207 """Reset"""
208
209 async def reset(
210 self,
211 cluster_uuid: str,
212 force: bool = False,
213 uninstall_sw: bool = False,
214 **kwargs,
215 ) -> bool:
216 """Reset a cluster
217
218 Resets the Kubernetes cluster by removing the model that represents it.
219
220 :param cluster_uuid str: The UUID of the cluster to reset
221 :param force: Force reset
222 :param uninstall_sw: Boolean to uninstall sw
223 :param: kwargs: Additional parameters
224 vca_id (str): VCA ID
225
226 :return: Returns True if successful or raises an exception.
227 """
228
229 try:
230 self.log.debug("[reset] Removing k8s cloud")
231 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
232
233 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
234
235 cloud_creds = await libjuju.get_cloud_credentials(cloud)
236
237 await libjuju.remove_cloud(cluster_uuid)
238
239 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
240
241 kubectl = self._get_kubectl(credentials)
242
243 delete_functions = [
244 kubectl.delete_cluster_role_binding,
245 kubectl.delete_service_account,
246 kubectl.delete_cluster_role,
247 ]
248
249 credential_attrs = cloud_creds[0].result["attrs"]
250 if RBAC_LABEL_KEY_NAME in credential_attrs:
251 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
252 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
253 for delete_func in delete_functions:
254 try:
255 delete_func(metadata_name)
256 except Exception as e:
257 self.log.warning("Cannot remove resource in K8s {}".format(e))
258
259 except Exception as e:
260 self.log.debug("Caught exception during reset: {}".format(e))
261 raise e
262 return True
263
264 """Deployment"""
265
266 async def install(
267 self,
268 cluster_uuid: str,
269 kdu_model: str,
270 kdu_instance: str,
271 atomic: bool = True,
272 timeout: float = 1800,
273 params: dict = None,
274 db_dict: dict = None,
275 kdu_name: str = None,
276 namespace: str = None,
277 **kwargs,
278 ) -> bool:
279 """Install a bundle
280
281 :param cluster_uuid str: The UUID of the cluster to install to
282 :param kdu_model str: The name or path of a bundle to install
283 :param kdu_instance: Kdu instance name
284 :param atomic bool: If set, waits until the model is active and resets
285 the cluster on failure.
286 :param timeout int: The time, in seconds, to wait for the install
287 to finish
288 :param params dict: Key-value pairs of instantiation parameters
289 :param kdu_name: Name of the KDU instance to be installed
290 :param namespace: K8s namespace to use for the KDU instance
291 :param kwargs: Additional parameters
292 vca_id (str): VCA ID
293
294 :return: If successful, returns ?
295 """
296 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
297 bundle = kdu_model
298
299 if not db_dict:
300 raise K8sException("db_dict must be set")
301 if not bundle:
302 raise K8sException("bundle must be set")
303
304 if bundle.startswith("cs:"):
305 pass
306 elif bundle.startswith("http"):
307 # Download the file
308 pass
309 else:
310 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
311 os.chdir(new_workdir)
312 bundle = "local:{}".format(kdu_model)
313
314 self.log.debug("Checking for model named {}".format(kdu_instance))
315
316 # Create the new model
317 self.log.debug("Adding model: {}".format(kdu_instance))
318 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
319 await libjuju.add_model(kdu_instance, cloud)
320
321 # if model:
322 # TODO: Instantiation parameters
323
324 """
325 "Juju bundle that models the KDU, in any of the following ways:
326 - <juju-repo>/<juju-bundle>
327 - <juju-bundle folder under k8s_models folder in the package>
328 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
329 in the package>
330 - <URL_where_to_fetch_juju_bundle>
331 """
332 try:
333 previous_workdir = os.getcwd()
334 except FileNotFoundError:
335 previous_workdir = "/app/storage"
336
337 self.log.debug("[install] deploying {}".format(bundle))
338 await libjuju.deploy(
339 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
340 )
341 os.chdir(previous_workdir)
342 if self.on_update_db:
343 await self.on_update_db(
344 cluster_uuid,
345 kdu_instance,
346 filter=db_dict["filter"],
347 vca_id=kwargs.get("vca_id"),
348 )
349 return True
350
351 async def scale(
352 self,
353 kdu_instance: str,
354 scale: int,
355 resource_name: str,
356 total_timeout: float = 1800,
357 **kwargs,
358 ) -> bool:
359 """Scale an application in a model
360
361 :param: kdu_instance str: KDU instance name
362 :param: scale int: Scale to which to set this application
363 :param: resource_name str: Resource name (Application name)
364 :param: timeout float: The time, in seconds, to wait for the install
365 to finish
366 :param kwargs: Additional parameters
367 vca_id (str): VCA ID
368
369 :return: If successful, returns True
370 """
371
372 try:
373 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
374 await libjuju.scale_application(
375 model_name=kdu_instance,
376 application_name=resource_name,
377 scale=scale,
378 total_timeout=total_timeout,
379 )
380 except Exception as e:
381 error_msg = "Error scaling application {} in kdu instance {}: {}".format(
382 resource_name, kdu_instance, e
383 )
384 self.log.error(error_msg)
385 raise K8sException(message=error_msg)
386 return True
387
388 async def get_scale_count(
389 self,
390 resource_name: str,
391 kdu_instance: str,
392 **kwargs,
393 ) -> int:
394 """Get an application scale count
395
396 :param: resource_name str: Resource name (Application name)
397 :param: kdu_instance str: KDU instance name
398 :param kwargs: Additional parameters
399 vca_id (str): VCA ID
400 :return: Return application instance count
401 """
402 try:
403 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
404 status = await libjuju.get_model_status(kdu_instance)
405 return len(status.applications[resource_name].units)
406 except Exception as e:
407 error_msg = "Error getting scale count from application {} in kdu instance {}: {}".format(
408 resource_name, kdu_instance, e
409 )
410 self.log.error(error_msg)
411 raise K8sException(message=error_msg)
412
413 async def instances_list(self, cluster_uuid: str) -> list:
414 """
415 returns a list of deployed releases in a cluster
416
417 :param cluster_uuid: the cluster
418 :return:
419 """
420 return []
421
422 async def upgrade(
423 self,
424 cluster_uuid: str,
425 kdu_instance: str,
426 kdu_model: str = None,
427 params: dict = None,
428 ) -> str:
429 """Upgrade a model
430
431 :param cluster_uuid str: The UUID of the cluster to upgrade
432 :param kdu_instance str: The unique name of the KDU instance
433 :param kdu_model str: The name or path of the bundle to upgrade to
434 :param params dict: Key-value pairs of instantiation parameters
435
436 :return: If successful, reference to the new revision number of the
437 KDU instance.
438 """
439
440 # TODO: Loop through the bundle and upgrade each charm individually
441
442 """
443 The API doesn't have a concept of bundle upgrades, because there are
444 many possible changes: charm revision, disk, number of units, etc.
445
446 As such, we are only supporting a limited subset of upgrades. We'll
447 upgrade the charm revision but leave storage and scale untouched.
448
449 Scale changes should happen through OSM constructs, and changes to
450 storage would require a redeployment of the service, at least in this
451 initial release.
452 """
453 raise MethodNotImplemented()
454
455 """Rollback"""
456
457 async def rollback(
458 self,
459 cluster_uuid: str,
460 kdu_instance: str,
461 revision: int = 0,
462 ) -> str:
463 """Rollback a model
464
465 :param cluster_uuid str: The UUID of the cluster to rollback
466 :param kdu_instance str: The unique name of the KDU instance
467 :param revision int: The revision to revert to. If omitted, rolls back
468 the previous upgrade.
469
470 :return: If successful, returns the revision of active KDU instance,
471 or raises an exception
472 """
473 raise MethodNotImplemented()
474
475 """Deletion"""
476
477 async def uninstall(
478 self,
479 cluster_uuid: str,
480 kdu_instance: str,
481 **kwargs,
482 ) -> bool:
483 """Uninstall a KDU instance
484
485 :param cluster_uuid str: The UUID of the cluster
486 :param kdu_instance str: The unique name of the KDU instance
487 :param kwargs: Additional parameters
488 vca_id (str): VCA ID
489
490 :return: Returns True if successful, or raises an exception
491 """
492
493 self.log.debug("[uninstall] Destroying model")
494 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
495
496 await libjuju.destroy_model(kdu_instance, total_timeout=3600)
497
498 # self.log.debug("[uninstall] Model destroyed and disconnecting")
499 # await controller.disconnect()
500
501 return True
502 # TODO: Remove these commented lines
503 # if not self.authenticated:
504 # self.log.debug("[uninstall] Connecting to controller")
505 # await self.login(cluster_uuid)
506
507 async def exec_primitive(
508 self,
509 cluster_uuid: str = None,
510 kdu_instance: str = None,
511 primitive_name: str = None,
512 timeout: float = 300,
513 params: dict = None,
514 db_dict: dict = None,
515 **kwargs,
516 ) -> str:
517 """Exec primitive (Juju action)
518
519 :param cluster_uuid str: The UUID of the cluster
520 :param kdu_instance str: The unique name of the KDU instance
521 :param primitive_name: Name of action that will be executed
522 :param timeout: Timeout for action execution
523 :param params: Dictionary of all the parameters needed for the action
524 :param db_dict: Dictionary for any additional data
525 :param kwargs: Additional parameters
526 vca_id (str): VCA ID
527
528 :return: Returns the output of the action
529 """
530 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
531
532 if not params or "application-name" not in params:
533 raise K8sException(
534 "Missing application-name argument, \
535 argument needed for K8s actions"
536 )
537 try:
538 self.log.debug(
539 "[exec_primitive] Getting model "
540 "kdu_instance: {}".format(kdu_instance)
541 )
542 application_name = params["application-name"]
543 actions = await libjuju.get_actions(application_name, kdu_instance)
544 if primitive_name not in actions:
545 raise K8sException("Primitive {} not found".format(primitive_name))
546 output, status = await libjuju.execute_action(
547 application_name, kdu_instance, primitive_name, **params
548 )
549
550 if status != "completed":
551 raise K8sException(
552 "status is not completed: {} output: {}".format(status, output)
553 )
554 if self.on_update_db:
555 await self.on_update_db(
556 cluster_uuid, kdu_instance, filter=db_dict["filter"]
557 )
558
559 return output
560
561 except Exception as e:
562 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
563 self.log.error(error_msg)
564 raise K8sException(message=error_msg)
565
566 """Introspection"""
567
568 async def inspect_kdu(
569 self,
570 kdu_model: str,
571 ) -> dict:
572 """Inspect a KDU
573
574 Inspects a bundle and returns a dictionary of config parameters and
575 their default values.
576
577 :param kdu_model str: The name or path of the bundle to inspect.
578
579 :return: If successful, returns a dictionary of available parameters
580 and their default values.
581 """
582
583 kdu = {}
584 if not os.path.exists(kdu_model):
585 raise K8sException("file {} not found".format(kdu_model))
586
587 with open(kdu_model, "r") as f:
588 bundle = yaml.safe_load(f.read())
589
590 """
591 {
592 'description': 'Test bundle',
593 'bundle': 'kubernetes',
594 'applications': {
595 'mariadb-k8s': {
596 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
597 'scale': 1,
598 'options': {
599 'password': 'manopw',
600 'root_password': 'osm4u',
601 'user': 'mano'
602 },
603 'series': 'kubernetes'
604 }
605 }
606 }
607 """
608 # TODO: This should be returned in an agreed-upon format
609 kdu = bundle["applications"]
610
611 return kdu
612
613 async def help_kdu(
614 self,
615 kdu_model: str,
616 ) -> str:
617 """View the README
618
619 If available, returns the README of the bundle.
620
621 :param kdu_model str: The name or path of a bundle
622
623 :return: If found, returns the contents of the README.
624 """
625 readme = None
626
627 files = ["README", "README.txt", "README.md"]
628 path = os.path.dirname(kdu_model)
629 for file in os.listdir(path):
630 if file in files:
631 with open(file, "r") as f:
632 readme = f.read()
633 break
634
635 return readme
636
637 async def status_kdu(
638 self,
639 cluster_uuid: str,
640 kdu_instance: str,
641 complete_status: bool = False,
642 yaml_format: bool = False,
643 **kwargs,
644 ) -> dict:
645 """Get the status of the KDU
646
647 Get the current status of the KDU instance.
648
649 :param cluster_uuid str: The UUID of the cluster
650 :param kdu_instance str: The unique id of the KDU instance
651 :param complete_status: To get the complete_status of the KDU
652 :param yaml_format: To get the status in proper format for NSR record
653 :param: kwargs: Additional parameters
654 vca_id (str): VCA ID
655
656 :return: Returns a dictionary containing namespace, state, resources,
657 and deployment_time and returns complete_status if complete_status is True
658 """
659 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
660 status = {}
661
662 model_status = await libjuju.get_model_status(kdu_instance)
663
664 if not complete_status:
665 for name in model_status.applications:
666 application = model_status.applications[name]
667 status[name] = {"status": application["status"]["status"]}
668 else:
669 if yaml_format:
670 return obj_to_yaml(model_status)
671 else:
672 return obj_to_dict(model_status)
673
674 return status
675
676 async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs):
677 """
678 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
679
680 :param vcastatus dict: dict containing vcastatus
681 :param kdu_instance str: The unique id of the KDU instance
682 :param: kwargs: Additional parameters
683 vca_id (str): VCA ID
684
685 :return: None
686 """
687 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
688 try:
689 for model_name in vcastatus:
690 # Adding executed actions
691 vcastatus[model_name][
692 "executedActions"
693 ] = await libjuju.get_executed_actions(kdu_instance)
694
695 for application in vcastatus[model_name]["applications"]:
696 # Adding application actions
697 vcastatus[model_name]["applications"][application][
698 "actions"
699 ] = await libjuju.get_actions(application, kdu_instance)
700 # Adding application configs
701 vcastatus[model_name]["applications"][application][
702 "configs"
703 ] = await libjuju.get_application_configs(kdu_instance, application)
704
705 except Exception as e:
706 self.log.debug("Error in updating vca status: {}".format(str(e)))
707
708 async def get_services(
709 self, cluster_uuid: str, kdu_instance: str, namespace: str
710 ) -> list:
711 """Return a list of services of a kdu_instance"""
712
713 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
714 kubectl = self._get_kubectl(credentials)
715 return kubectl.get_services(
716 field_selector="metadata.namespace={}".format(kdu_instance)
717 )
718
719 async def get_service(
720 self, cluster_uuid: str, service_name: str, namespace: str
721 ) -> object:
722 """Return data for a specific service inside a namespace"""
723
724 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
725 kubectl = self._get_kubectl(credentials)
726 return kubectl.get_services(
727 field_selector="metadata.name={},metadata.namespace={}".format(
728 service_name, namespace
729 )
730 )[0]
731
732 def get_credentials(self, cluster_uuid: str) -> str:
733 """
734 Get Cluster Kubeconfig
735 """
736 k8scluster = self.db.get_one(
737 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
738 )
739
740 self.db.encrypt_decrypt_fields(
741 k8scluster.get("credentials"),
742 "decrypt",
743 ["password", "secret"],
744 schema_version=k8scluster["schema_version"],
745 salt=k8scluster["_id"],
746 )
747
748 return yaml.safe_dump(k8scluster.get("credentials"))
749
750 def _get_credential_name(self, cluster_uuid: str) -> str:
751 """
752 Get credential name for a k8s cloud
753
754 We cannot use the cluster_uuid for the credential name directly,
755 because it cannot start with a number, it must start with a letter.
756 Therefore, the k8s cloud credential name will be "cred-" followed
757 by the cluster uuid.
758
759 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
760
761 :return: Name to use for the credential name.
762 """
763 return "cred-{}".format(cluster_uuid)
764
765 def get_namespace(
766 self,
767 cluster_uuid: str,
768 ) -> str:
769 """Get the namespace UUID
770 Gets the namespace's unique name
771
772 :param cluster_uuid str: The UUID of the cluster
773 :returns: The namespace UUID, or raises an exception
774 """
775 pass
776
777 @staticmethod
778 def generate_kdu_instance_name(**kwargs):
779 db_dict = kwargs.get("db_dict")
780 kdu_name = kwargs.get("kdu_name", None)
781 if kdu_name:
782 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
783 else:
784 kdu_instance = db_dict["filter"]["_id"]
785 return kdu_instance
786
787 async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
788 """
789 Get libjuju object
790
791 :param: vca_id: VCA ID
792 If None, get a libjuju object with a Connection to the default VCA
793 Else, geta libjuju object with a Connection to the specified VCA
794 """
795 if not vca_id:
796 while self.loading_libjuju.locked():
797 await asyncio.sleep(0.1)
798 if not self.libjuju:
799 async with self.loading_libjuju:
800 vca_connection = await get_connection(self._store)
801 self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
802 return self.libjuju
803 else:
804 vca_connection = await get_connection(self._store, vca_id)
805 return Libjuju(
806 vca_connection,
807 loop=self.loop,
808 log=self.log,
809 n2vc=self,
810 )
811
812 def _get_kubectl(self, credentials: str) -> Kubectl:
813 """
814 Get Kubectl object
815
816 :param: kubeconfig_credentials: Kubeconfig credentials
817 """
818 kubecfg = tempfile.NamedTemporaryFile()
819 with open(kubecfg.name, "w") as kubecfg_file:
820 kubecfg_file.write(credentials)
821 return Kubectl(config_file=kubecfg.name)