Update helm repo after adding the repo
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 from typing import Union
17 import os
18 import uuid
19 import yaml
20 import tempfile
21 import binascii
22
23 from n2vc.config import EnvironConfig
24 from n2vc.definitions import RelationEndpoint
25 from n2vc.exceptions import K8sException
26 from n2vc.k8s_conn import K8sConnector
27 from n2vc.kubectl import Kubectl
28 from .exceptions import MethodNotImplemented
29 from n2vc.libjuju import Libjuju
30 from n2vc.utils import obj_to_dict, obj_to_yaml
31 from n2vc.store import MotorStore
32 from n2vc.vca.cloud import Cloud
33 from n2vc.vca.connection import get_connection
34
35
36 RBAC_LABEL_KEY_NAME = "rbac-id"
37 RBAC_STACK_PREFIX = "juju-credential"
38
39
40 def generate_rbac_id():
41 return binascii.hexlify(os.urandom(4)).decode()
42
43
44 class K8sJujuConnector(K8sConnector):
45 libjuju = None
46
47 def __init__(
48 self,
49 fs: object,
50 db: object,
51 kubectl_command: str = "/usr/bin/kubectl",
52 juju_command: str = "/usr/bin/juju",
53 log: object = None,
54 loop: object = None,
55 on_update_db=None,
56 ):
57 """
58 :param fs: file system for kubernetes and helm configuration
59 :param db: Database object
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
62 :param log: logger
63 :param: loop: Asyncio loop
64 """
65
66 # parent class
67 K8sConnector.__init__(
68 self,
69 db,
70 log=log,
71 on_update_db=on_update_db,
72 )
73
74 self.fs = fs
75 self.loop = loop or asyncio.get_event_loop()
76 self.log.debug("Initializing K8S Juju connector")
77
78 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
79 self._store = MotorStore(db_uri)
80 self.loading_libjuju = asyncio.Lock(loop=self.loop)
81 self.uninstall_locks = {}
82
83 self.log.debug("K8S Juju connector initialized")
84 # TODO: Remove these commented lines:
85 # self.authenticated = False
86 # self.models = {}
87 # self.juju_secret = ""
88
89 """Initialization"""
90
91 async def init_env(
92 self,
93 k8s_creds: str,
94 namespace: str = "kube-system",
95 reuse_cluster_uuid: str = None,
96 **kwargs,
97 ) -> (str, bool):
98 """
99 It prepares a given K8s cluster environment to run Juju bundles.
100
101 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
102 '.kube/config'
103 :param namespace: optional namespace to be used for juju. By default,
104 'kube-system' will be used
105 :param reuse_cluster_uuid: existing cluster uuid for reuse
106 :param: kwargs: Additional parameters
107 vca_id (str): VCA ID
108
109 :return: uuid of the K8s cluster and True if connector has installed some
110 software in the cluster
111 (on error, an exception will be raised)
112 """
113 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
114
115 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
116 kubectl = self._get_kubectl(k8s_creds)
117
118 # CREATING RESOURCES IN K8S
119 rbac_id = generate_rbac_id()
120 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
121 labels = {RBAC_STACK_PREFIX: rbac_id}
122
123 # Create cleanup dictionary to clean up created resources
124 # if it fails in the middle of the process
125 cleanup_data = []
126 try:
127 self.log.debug("Initializing K8s cluster for juju")
128 kubectl.create_cluster_role(
129 name=metadata_name,
130 labels=labels,
131 )
132 self.log.debug("Cluster role created")
133 cleanup_data.append(
134 {
135 "delete": kubectl.delete_cluster_role,
136 "args": (metadata_name,),
137 }
138 )
139
140 kubectl.create_service_account(
141 name=metadata_name,
142 labels=labels,
143 )
144 self.log.debug("Service account created")
145 cleanup_data.append(
146 {
147 "delete": kubectl.delete_service_account,
148 "args": (metadata_name,),
149 }
150 )
151
152 kubectl.create_cluster_role_binding(
153 name=metadata_name,
154 labels=labels,
155 )
156 self.log.debug("Role binding created")
157 cleanup_data.append(
158 {
159 "delete": kubectl.delete_service_account,
160 "args": (metadata_name,),
161 }
162 )
163 token, client_cert_data = await kubectl.get_secret_data(
164 metadata_name,
165 )
166
167 default_storage_class = kubectl.get_default_storage_class()
168 self.log.debug("Default storage class: {}".format(default_storage_class))
169 await libjuju.add_k8s(
170 name=cluster_uuid,
171 rbac_id=rbac_id,
172 token=token,
173 client_cert_data=client_cert_data,
174 configuration=kubectl.configuration,
175 storage_class=default_storage_class,
176 credential_name=self._get_credential_name(cluster_uuid),
177 )
178 self.log.debug("K8s cluster added to juju controller")
179 return cluster_uuid, True
180 except Exception as e:
181 self.log.error("Error initializing k8scluster: {}".format(e), exc_info=True)
182 if len(cleanup_data) > 0:
183 self.log.debug("Cleaning up created resources in k8s cluster...")
184 for item in cleanup_data:
185 delete_function = item["delete"]
186 delete_args = item["args"]
187 delete_function(*delete_args)
188 self.log.debug("Cleanup finished")
189 raise e
190
191 """Repo Management"""
192
193 async def repo_add(
194 self,
195 name: str,
196 url: str,
197 _type: str = "charm",
198 cert: str = None,
199 user: str = None,
200 password: str = None,
201 ):
202 raise MethodNotImplemented()
203
204 async def repo_list(self):
205 raise MethodNotImplemented()
206
207 async def repo_remove(
208 self,
209 name: str,
210 ):
211 raise MethodNotImplemented()
212
213 async def synchronize_repos(self, cluster_uuid: str, name: str):
214 """
215 Returns None as currently add_repo is not implemented
216 """
217 return None
218
219 """Reset"""
220
221 async def reset(
222 self,
223 cluster_uuid: str,
224 force: bool = False,
225 uninstall_sw: bool = False,
226 **kwargs,
227 ) -> bool:
228 """Reset a cluster
229
230 Resets the Kubernetes cluster by removing the model that represents it.
231
232 :param cluster_uuid str: The UUID of the cluster to reset
233 :param force: Force reset
234 :param uninstall_sw: Boolean to uninstall sw
235 :param: kwargs: Additional parameters
236 vca_id (str): VCA ID
237
238 :return: Returns True if successful or raises an exception.
239 """
240
241 try:
242 self.log.debug("[reset] Removing k8s cloud")
243 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
244
245 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
246
247 cloud_creds = await libjuju.get_cloud_credentials(cloud)
248
249 await libjuju.remove_cloud(cluster_uuid)
250
251 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
252
253 kubectl = self._get_kubectl(credentials)
254
255 delete_functions = [
256 kubectl.delete_cluster_role_binding,
257 kubectl.delete_service_account,
258 kubectl.delete_cluster_role,
259 ]
260
261 credential_attrs = cloud_creds[0].result["attrs"]
262 if RBAC_LABEL_KEY_NAME in credential_attrs:
263 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
264 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
265 for delete_func in delete_functions:
266 try:
267 delete_func(metadata_name)
268 except Exception as e:
269 self.log.warning("Cannot remove resource in K8s {}".format(e))
270
271 except Exception as e:
272 self.log.debug("Caught exception during reset: {}".format(e))
273 raise e
274 return True
275
276 """Deployment"""
277
278 async def install(
279 self,
280 cluster_uuid: str,
281 kdu_model: str,
282 kdu_instance: str,
283 atomic: bool = True,
284 timeout: float = 1800,
285 params: dict = None,
286 db_dict: dict = None,
287 kdu_name: str = None,
288 namespace: str = None,
289 **kwargs,
290 ) -> bool:
291 """Install a bundle
292
293 :param cluster_uuid str: The UUID of the cluster to install to
294 :param kdu_model str: The name or path of a bundle to install
295 :param kdu_instance: Kdu instance name
296 :param atomic bool: If set, waits until the model is active and resets
297 the cluster on failure.
298 :param timeout int: The time, in seconds, to wait for the install
299 to finish
300 :param params dict: Key-value pairs of instantiation parameters
301 :param kdu_name: Name of the KDU instance to be installed
302 :param namespace: K8s namespace to use for the KDU instance
303 :param kwargs: Additional parameters
304 vca_id (str): VCA ID
305
306 :return: If successful, returns ?
307 """
308 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
309 bundle = kdu_model
310
311 if not db_dict:
312 raise K8sException("db_dict must be set")
313 if not bundle:
314 raise K8sException("bundle must be set")
315
316 if bundle.startswith("cs:"):
317 # For Juju Bundles provided by the Charm Store
318 pass
319 elif bundle.startswith("ch:"):
320 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
321 pass
322 elif bundle.startswith("http"):
323 # Download the file
324 pass
325 else:
326 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
327 os.chdir(new_workdir)
328 bundle = "local:{}".format(kdu_model)
329
330 self.log.debug("Checking for model named {}".format(kdu_instance))
331
332 # Create the new model
333 self.log.debug("Adding model: {}".format(kdu_instance))
334 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
335 await libjuju.add_model(kdu_instance, cloud)
336
337 # if model:
338 # TODO: Instantiation parameters
339
340 """
341 "Juju bundle that models the KDU, in any of the following ways:
342 - <juju-repo>/<juju-bundle>
343 - <juju-bundle folder under k8s_models folder in the package>
344 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
345 in the package>
346 - <URL_where_to_fetch_juju_bundle>
347 """
348 try:
349 previous_workdir = os.getcwd()
350 except FileNotFoundError:
351 previous_workdir = "/app/storage"
352
353 self.log.debug("[install] deploying {}".format(bundle))
354 await libjuju.deploy(
355 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
356 )
357 os.chdir(previous_workdir)
358 if self.on_update_db:
359 await self.on_update_db(
360 cluster_uuid,
361 kdu_instance,
362 filter=db_dict["filter"],
363 vca_id=kwargs.get("vca_id"),
364 )
365 return True
366
367 async def scale(
368 self,
369 kdu_instance: str,
370 scale: int,
371 resource_name: str,
372 total_timeout: float = 1800,
373 **kwargs,
374 ) -> bool:
375 """Scale an application in a model
376
377 :param: kdu_instance str: KDU instance name
378 :param: scale int: Scale to which to set the application
379 :param: resource_name str: The application name in the Juju Bundle
380 :param: timeout float: The time, in seconds, to wait for the install
381 to finish
382 :param kwargs: Additional parameters
383 vca_id (str): VCA ID
384
385 :return: If successful, returns True
386 """
387
388 try:
389 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
390 await libjuju.scale_application(
391 model_name=kdu_instance,
392 application_name=resource_name,
393 scale=scale,
394 total_timeout=total_timeout,
395 )
396 except Exception as e:
397 error_msg = "Error scaling application {} in kdu instance {}: {}".format(
398 resource_name, kdu_instance, e
399 )
400 self.log.error(error_msg)
401 raise K8sException(message=error_msg)
402 return True
403
404 async def get_scale_count(
405 self,
406 resource_name: str,
407 kdu_instance: str,
408 **kwargs,
409 ) -> int:
410 """Get an application scale count
411
412 :param: resource_name str: The application name in the Juju Bundle
413 :param: kdu_instance str: KDU instance name
414 :param kwargs: Additional parameters
415 vca_id (str): VCA ID
416 :return: Return application instance count
417 """
418
419 try:
420 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
421 status = await libjuju.get_model_status(kdu_instance)
422 return len(status.applications[resource_name].units)
423 except Exception as e:
424 error_msg = "Error getting scale count from application {} in kdu instance {}: {}".format(
425 resource_name, kdu_instance, e
426 )
427 self.log.error(error_msg)
428 raise K8sException(message=error_msg)
429
430 async def instances_list(self, cluster_uuid: str) -> list:
431 """
432 returns a list of deployed releases in a cluster
433
434 :param cluster_uuid: the cluster
435 :return:
436 """
437 return []
438
439 async def upgrade(
440 self,
441 cluster_uuid: str,
442 kdu_instance: str,
443 kdu_model: str = None,
444 params: dict = None,
445 ) -> str:
446 """Upgrade a model
447
448 :param cluster_uuid str: The UUID of the cluster to upgrade
449 :param kdu_instance str: The unique name of the KDU instance
450 :param kdu_model str: The name or path of the bundle to upgrade to
451 :param params dict: Key-value pairs of instantiation parameters
452
453 :return: If successful, reference to the new revision number of the
454 KDU instance.
455 """
456
457 # TODO: Loop through the bundle and upgrade each charm individually
458
459 """
460 The API doesn't have a concept of bundle upgrades, because there are
461 many possible changes: charm revision, disk, number of units, etc.
462
463 As such, we are only supporting a limited subset of upgrades. We'll
464 upgrade the charm revision but leave storage and scale untouched.
465
466 Scale changes should happen through OSM constructs, and changes to
467 storage would require a redeployment of the service, at least in this
468 initial release.
469 """
470 raise MethodNotImplemented()
471
472 """Rollback"""
473
474 async def rollback(
475 self,
476 cluster_uuid: str,
477 kdu_instance: str,
478 revision: int = 0,
479 ) -> str:
480 """Rollback a model
481
482 :param cluster_uuid str: The UUID of the cluster to rollback
483 :param kdu_instance str: The unique name of the KDU instance
484 :param revision int: The revision to revert to. If omitted, rolls back
485 the previous upgrade.
486
487 :return: If successful, returns the revision of active KDU instance,
488 or raises an exception
489 """
490 raise MethodNotImplemented()
491
492 """Deletion"""
493
494 async def uninstall(
495 self,
496 cluster_uuid: str,
497 kdu_instance: str,
498 **kwargs,
499 ) -> bool:
500 """Uninstall a KDU instance
501
502 :param cluster_uuid str: The UUID of the cluster
503 :param kdu_instance str: The unique name of the KDU instance
504 :param kwargs: Additional parameters
505 vca_id (str): VCA ID
506
507 :return: Returns True if successful, or raises an exception
508 """
509
510 self.log.debug("[uninstall] Destroying model")
511
512 will_not_delete = False
513 if kdu_instance not in self.uninstall_locks:
514 self.uninstall_locks[kdu_instance] = asyncio.Lock(loop=self.loop)
515 delete_lock = self.uninstall_locks[kdu_instance]
516
517 while delete_lock.locked():
518 will_not_delete = True
519 await asyncio.sleep(0.1)
520
521 if will_not_delete:
522 self.log.info("Model {} deleted by another worker.".format(kdu_instance))
523 return True
524
525 try:
526 async with delete_lock:
527 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
528
529 await libjuju.destroy_model(kdu_instance, total_timeout=3600)
530 finally:
531 self.uninstall_locks.pop(kdu_instance)
532
533 self.log.debug(f"[uninstall] Model {kdu_instance} destroyed")
534 return True
535
536 async def exec_primitive(
537 self,
538 cluster_uuid: str = None,
539 kdu_instance: str = None,
540 primitive_name: str = None,
541 timeout: float = 300,
542 params: dict = None,
543 db_dict: dict = None,
544 **kwargs,
545 ) -> str:
546 """Exec primitive (Juju action)
547
548 :param cluster_uuid str: The UUID of the cluster
549 :param kdu_instance str: The unique name of the KDU instance
550 :param primitive_name: Name of action that will be executed
551 :param timeout: Timeout for action execution
552 :param params: Dictionary of all the parameters needed for the action
553 :param db_dict: Dictionary for any additional data
554 :param kwargs: Additional parameters
555 vca_id (str): VCA ID
556
557 :return: Returns the output of the action
558 """
559 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
560
561 if not params or "application-name" not in params:
562 raise K8sException(
563 "Missing application-name argument, \
564 argument needed for K8s actions"
565 )
566 try:
567 self.log.debug(
568 "[exec_primitive] Getting model "
569 "kdu_instance: {}".format(kdu_instance)
570 )
571 application_name = params["application-name"]
572 actions = await libjuju.get_actions(application_name, kdu_instance)
573 if primitive_name not in actions:
574 raise K8sException("Primitive {} not found".format(primitive_name))
575 output, status = await libjuju.execute_action(
576 application_name, kdu_instance, primitive_name, **params
577 )
578
579 if status != "completed":
580 raise K8sException(
581 "status is not completed: {} output: {}".format(status, output)
582 )
583 if self.on_update_db:
584 await self.on_update_db(
585 cluster_uuid, kdu_instance, filter=db_dict["filter"]
586 )
587
588 return output
589
590 except Exception as e:
591 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
592 self.log.error(error_msg)
593 raise K8sException(message=error_msg)
594
595 """Introspection"""
596
597 async def inspect_kdu(
598 self,
599 kdu_model: str,
600 ) -> dict:
601 """Inspect a KDU
602
603 Inspects a bundle and returns a dictionary of config parameters and
604 their default values.
605
606 :param kdu_model str: The name or path of the bundle to inspect.
607
608 :return: If successful, returns a dictionary of available parameters
609 and their default values.
610 """
611
612 kdu = {}
613 if not os.path.exists(kdu_model):
614 raise K8sException("file {} not found".format(kdu_model))
615
616 with open(kdu_model, "r") as f:
617 bundle = yaml.safe_load(f.read())
618
619 """
620 {
621 'description': 'Test bundle',
622 'bundle': 'kubernetes',
623 'applications': {
624 'mariadb-k8s': {
625 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
626 'scale': 1,
627 'options': {
628 'password': 'manopw',
629 'root_password': 'osm4u',
630 'user': 'mano'
631 },
632 'series': 'kubernetes'
633 }
634 }
635 }
636 """
637 # TODO: This should be returned in an agreed-upon format
638 kdu = bundle["applications"]
639
640 return kdu
641
642 async def help_kdu(
643 self,
644 kdu_model: str,
645 ) -> str:
646 """View the README
647
648 If available, returns the README of the bundle.
649
650 :param kdu_model str: The name or path of a bundle
651
652 :return: If found, returns the contents of the README.
653 """
654 readme = None
655
656 files = ["README", "README.txt", "README.md"]
657 path = os.path.dirname(kdu_model)
658 for file in os.listdir(path):
659 if file in files:
660 with open(file, "r") as f:
661 readme = f.read()
662 break
663
664 return readme
665
666 async def status_kdu(
667 self,
668 cluster_uuid: str,
669 kdu_instance: str,
670 complete_status: bool = False,
671 yaml_format: bool = False,
672 **kwargs,
673 ) -> Union[str, dict]:
674 """Get the status of the KDU
675
676 Get the current status of the KDU instance.
677
678 :param cluster_uuid str: The UUID of the cluster
679 :param kdu_instance str: The unique id of the KDU instance
680 :param complete_status: To get the complete_status of the KDU
681 :param yaml_format: To get the status in proper format for NSR record
682 :param: kwargs: Additional parameters
683 vca_id (str): VCA ID
684
685 :return: Returns a dictionary containing namespace, state, resources,
686 and deployment_time and returns complete_status if complete_status is True
687 """
688 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
689 status = {}
690
691 model_status = await libjuju.get_model_status(kdu_instance)
692
693 if not complete_status:
694 for name in model_status.applications:
695 application = model_status.applications[name]
696 status[name] = {"status": application["status"]["status"]}
697 else:
698 if yaml_format:
699 return obj_to_yaml(model_status)
700 else:
701 return obj_to_dict(model_status)
702
703 return status
704
705 async def add_relation(
706 self,
707 provider: RelationEndpoint,
708 requirer: RelationEndpoint,
709 ):
710 """
711 Add relation between two charmed endpoints
712
713 :param: provider: Provider relation endpoint
714 :param: requirer: Requirer relation endpoint
715 """
716 self.log.debug(f"adding new relation between {provider} and {requirer}")
717 cross_model_relation = (
718 provider.model_name != requirer.model_name
719 or requirer.vca_id != requirer.vca_id
720 )
721 try:
722 if cross_model_relation:
723 # Cross-model relation
724 provider_libjuju = await self._get_libjuju(provider.vca_id)
725 requirer_libjuju = await self._get_libjuju(requirer.vca_id)
726 offer = await provider_libjuju.offer(provider)
727 if offer:
728 saas_name = await requirer_libjuju.consume(
729 requirer.model_name, offer, provider_libjuju
730 )
731 await requirer_libjuju.add_relation(
732 requirer.model_name,
733 requirer.endpoint,
734 saas_name,
735 )
736 else:
737 # Standard relation
738 vca_id = provider.vca_id
739 model = provider.model_name
740 libjuju = await self._get_libjuju(vca_id)
741 # add juju relations between two applications
742 await libjuju.add_relation(
743 model_name=model,
744 endpoint_1=provider.endpoint,
745 endpoint_2=requirer.endpoint,
746 )
747 except Exception as e:
748 message = f"Error adding relation between {provider} and {requirer}: {e}"
749 self.log.error(message)
750 raise Exception(message=message)
751
752 async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs):
753 """
754 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
755
756 :param vcastatus dict: dict containing vcastatus
757 :param kdu_instance str: The unique id of the KDU instance
758 :param: kwargs: Additional parameters
759 vca_id (str): VCA ID
760
761 :return: None
762 """
763 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
764 try:
765 for model_name in vcastatus:
766 # Adding executed actions
767 vcastatus[model_name][
768 "executedActions"
769 ] = await libjuju.get_executed_actions(kdu_instance)
770
771 for application in vcastatus[model_name]["applications"]:
772 # Adding application actions
773 vcastatus[model_name]["applications"][application][
774 "actions"
775 ] = await libjuju.get_actions(application, kdu_instance)
776 # Adding application configs
777 vcastatus[model_name]["applications"][application][
778 "configs"
779 ] = await libjuju.get_application_configs(kdu_instance, application)
780
781 except Exception as e:
782 self.log.debug("Error in updating vca status: {}".format(str(e)))
783
784 async def get_services(
785 self, cluster_uuid: str, kdu_instance: str, namespace: str
786 ) -> list:
787 """Return a list of services of a kdu_instance"""
788
789 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
790 kubectl = self._get_kubectl(credentials)
791 return kubectl.get_services(
792 field_selector="metadata.namespace={}".format(kdu_instance)
793 )
794
795 async def get_service(
796 self, cluster_uuid: str, service_name: str, namespace: str
797 ) -> object:
798 """Return data for a specific service inside a namespace"""
799
800 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
801 kubectl = self._get_kubectl(credentials)
802 return kubectl.get_services(
803 field_selector="metadata.name={},metadata.namespace={}".format(
804 service_name, namespace
805 )
806 )[0]
807
808 def get_credentials(self, cluster_uuid: str) -> str:
809 """
810 Get Cluster Kubeconfig
811 """
812 k8scluster = self.db.get_one(
813 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
814 )
815
816 self.db.encrypt_decrypt_fields(
817 k8scluster.get("credentials"),
818 "decrypt",
819 ["password", "secret"],
820 schema_version=k8scluster["schema_version"],
821 salt=k8scluster["_id"],
822 )
823
824 return yaml.safe_dump(k8scluster.get("credentials"))
825
826 def _get_credential_name(self, cluster_uuid: str) -> str:
827 """
828 Get credential name for a k8s cloud
829
830 We cannot use the cluster_uuid for the credential name directly,
831 because it cannot start with a number, it must start with a letter.
832 Therefore, the k8s cloud credential name will be "cred-" followed
833 by the cluster uuid.
834
835 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
836
837 :return: Name to use for the credential name.
838 """
839 return "cred-{}".format(cluster_uuid)
840
841 def get_namespace(
842 self,
843 cluster_uuid: str,
844 ) -> str:
845 """Get the namespace UUID
846 Gets the namespace's unique name
847
848 :param cluster_uuid str: The UUID of the cluster
849 :returns: The namespace UUID, or raises an exception
850 """
851 pass
852
853 @staticmethod
854 def generate_kdu_instance_name(**kwargs):
855 db_dict = kwargs.get("db_dict")
856 kdu_name = kwargs.get("kdu_name", None)
857 if kdu_name:
858 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
859 else:
860 kdu_instance = db_dict["filter"]["_id"]
861 return kdu_instance
862
863 async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
864 """
865 Get libjuju object
866
867 :param: vca_id: VCA ID
868 If None, get a libjuju object with a Connection to the default VCA
869 Else, geta libjuju object with a Connection to the specified VCA
870 """
871 if not vca_id:
872 while self.loading_libjuju.locked():
873 await asyncio.sleep(0.1)
874 if not self.libjuju:
875 async with self.loading_libjuju:
876 vca_connection = await get_connection(self._store)
877 self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
878 return self.libjuju
879 else:
880 vca_connection = await get_connection(self._store, vca_id)
881 return Libjuju(
882 vca_connection,
883 loop=self.loop,
884 log=self.log,
885 n2vc=self,
886 )
887
888 def _get_kubectl(self, credentials: str) -> Kubectl:
889 """
890 Get Kubectl object
891
892 :param: kubeconfig_credentials: Kubeconfig credentials
893 """
894 kubecfg = tempfile.NamedTemporaryFile()
895 with open(kubecfg.name, "w") as kubecfg_file:
896 kubecfg_file.write(credentials)
897 return Kubectl(config_file=kubecfg.name)