6744015b5dfe80d5f7363459e36112b1eeff733c
[osm/N2VC.git] / n2vc / k8s_juju_conn.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 from typing import Union
17 import os
18 import uuid
19 import yaml
20 import tempfile
21 import binascii
22
23 from n2vc.config import EnvironConfig
24 from n2vc.definitions import RelationEndpoint
25 from n2vc.exceptions import K8sException
26 from n2vc.k8s_conn import K8sConnector
27 from n2vc.kubectl import Kubectl
28 from .exceptions import MethodNotImplemented
29 from n2vc.libjuju import Libjuju
30 from n2vc.utils import obj_to_dict, obj_to_yaml
31 from n2vc.store import MotorStore
32 from n2vc.vca.cloud import Cloud
33 from n2vc.vca.connection import get_connection
34
35
36 RBAC_LABEL_KEY_NAME = "rbac-id"
37 RBAC_STACK_PREFIX = "juju-credential"
38
39
40 def generate_rbac_id():
41 return binascii.hexlify(os.urandom(4)).decode()
42
43
44 class K8sJujuConnector(K8sConnector):
45 libjuju = None
46
47 def __init__(
48 self,
49 fs: object,
50 db: object,
51 kubectl_command: str = "/usr/bin/kubectl",
52 juju_command: str = "/usr/bin/juju",
53 log: object = None,
54 loop: object = None,
55 on_update_db=None,
56 ):
57 """
58 :param fs: file system for kubernetes and helm configuration
59 :param db: Database object
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
62 :param log: logger
63 :param: loop: Asyncio loop
64 """
65
66 # parent class
67 K8sConnector.__init__(
68 self,
69 db,
70 log=log,
71 on_update_db=on_update_db,
72 )
73
74 self.fs = fs
75 self.loop = loop or asyncio.get_event_loop()
76 self.log.debug("Initializing K8S Juju connector")
77
78 db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
79 self._store = MotorStore(db_uri)
80 self.loading_libjuju = asyncio.Lock(loop=self.loop)
81 self.uninstall_locks = {}
82
83 self.log.debug("K8S Juju connector initialized")
84 # TODO: Remove these commented lines:
85 # self.authenticated = False
86 # self.models = {}
87 # self.juju_secret = ""
88
89 """Initialization"""
90
91 async def init_env(
92 self,
93 k8s_creds: str,
94 namespace: str = "kube-system",
95 reuse_cluster_uuid: str = None,
96 **kwargs,
97 ) -> (str, bool):
98 """
99 It prepares a given K8s cluster environment to run Juju bundles.
100
101 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
102 '.kube/config'
103 :param namespace: optional namespace to be used for juju. By default,
104 'kube-system' will be used
105 :param reuse_cluster_uuid: existing cluster uuid for reuse
106 :param: kwargs: Additional parameters
107 vca_id (str): VCA ID
108
109 :return: uuid of the K8s cluster and True if connector has installed some
110 software in the cluster
111 (on error, an exception will be raised)
112 """
113 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
114
115 cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
116 kubectl = self._get_kubectl(k8s_creds)
117
118 # CREATING RESOURCES IN K8S
119 rbac_id = generate_rbac_id()
120 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
121 labels = {RBAC_STACK_PREFIX: rbac_id}
122
123 # Create cleanup dictionary to clean up created resources
124 # if it fails in the middle of the process
125 cleanup_data = []
126 try:
127 self.log.debug("Initializing K8s cluster for juju")
128 kubectl.create_cluster_role(
129 name=metadata_name,
130 labels=labels,
131 )
132 self.log.debug("Cluster role created")
133 cleanup_data.append(
134 {
135 "delete": kubectl.delete_cluster_role,
136 "args": (metadata_name,),
137 }
138 )
139
140 kubectl.create_service_account(
141 name=metadata_name,
142 labels=labels,
143 )
144 self.log.debug("Service account created")
145 cleanup_data.append(
146 {
147 "delete": kubectl.delete_service_account,
148 "args": (metadata_name,),
149 }
150 )
151
152 kubectl.create_cluster_role_binding(
153 name=metadata_name,
154 labels=labels,
155 )
156 self.log.debug("Role binding created")
157 cleanup_data.append(
158 {
159 "delete": kubectl.delete_service_account,
160 "args": (metadata_name,),
161 }
162 )
163 token, client_cert_data = await kubectl.get_secret_data(
164 metadata_name,
165 )
166
167 default_storage_class = kubectl.get_default_storage_class()
168 self.log.debug("Default storage class: {}".format(default_storage_class))
169 await libjuju.add_k8s(
170 name=cluster_uuid,
171 rbac_id=rbac_id,
172 token=token,
173 client_cert_data=client_cert_data,
174 configuration=kubectl.configuration,
175 storage_class=default_storage_class,
176 credential_name=self._get_credential_name(cluster_uuid),
177 )
178 self.log.debug("K8s cluster added to juju controller")
179 return cluster_uuid, True
180 except Exception as e:
181 self.log.error("Error initializing k8scluster: {}".format(e), exc_info=True)
182 if len(cleanup_data) > 0:
183 self.log.debug("Cleaning up created resources in k8s cluster...")
184 for item in cleanup_data:
185 delete_function = item["delete"]
186 delete_args = item["args"]
187 delete_function(*delete_args)
188 self.log.debug("Cleanup finished")
189 raise e
190
191 """Repo Management"""
192
193 async def repo_add(
194 self,
195 name: str,
196 url: str,
197 _type: str = "charm",
198 cert: str = None,
199 user: str = None,
200 password: str = None,
201 ):
202 raise MethodNotImplemented()
203
204 async def repo_list(self):
205 raise MethodNotImplemented()
206
207 async def repo_remove(
208 self,
209 name: str,
210 ):
211 raise MethodNotImplemented()
212
213 async def synchronize_repos(self, cluster_uuid: str, name: str):
214 """
215 Returns None as currently add_repo is not implemented
216 """
217 return None
218
219 """Reset"""
220
221 async def reset(
222 self,
223 cluster_uuid: str,
224 force: bool = False,
225 uninstall_sw: bool = False,
226 **kwargs,
227 ) -> bool:
228 """Reset a cluster
229
230 Resets the Kubernetes cluster by removing the model that represents it.
231
232 :param cluster_uuid str: The UUID of the cluster to reset
233 :param force: Force reset
234 :param uninstall_sw: Boolean to uninstall sw
235 :param: kwargs: Additional parameters
236 vca_id (str): VCA ID
237
238 :return: Returns True if successful or raises an exception.
239 """
240
241 try:
242 self.log.debug("[reset] Removing k8s cloud")
243 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
244
245 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
246
247 cloud_creds = await libjuju.get_cloud_credentials(cloud)
248
249 await libjuju.remove_cloud(cluster_uuid)
250
251 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
252
253 kubectl = self._get_kubectl(credentials)
254
255 delete_functions = [
256 kubectl.delete_cluster_role_binding,
257 kubectl.delete_service_account,
258 kubectl.delete_cluster_role,
259 ]
260
261 credential_attrs = cloud_creds[0].result["attrs"]
262 if RBAC_LABEL_KEY_NAME in credential_attrs:
263 rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
264 metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
265 for delete_func in delete_functions:
266 try:
267 delete_func(metadata_name)
268 except Exception as e:
269 self.log.warning("Cannot remove resource in K8s {}".format(e))
270
271 except Exception as e:
272 self.log.debug("Caught exception during reset: {}".format(e))
273 raise e
274 return True
275
276 """Deployment"""
277
278 async def install(
279 self,
280 cluster_uuid: str,
281 kdu_model: str,
282 kdu_instance: str,
283 atomic: bool = True,
284 timeout: float = 1800,
285 params: dict = None,
286 db_dict: dict = None,
287 kdu_name: str = None,
288 namespace: str = None,
289 **kwargs,
290 ) -> bool:
291 """Install a bundle
292
293 :param cluster_uuid str: The UUID of the cluster to install to
294 :param kdu_model str: The name or path of a bundle to install
295 :param kdu_instance: Kdu instance name
296 :param atomic bool: If set, waits until the model is active and resets
297 the cluster on failure.
298 :param timeout int: The time, in seconds, to wait for the install
299 to finish
300 :param params dict: Key-value pairs of instantiation parameters
301 :param kdu_name: Name of the KDU instance to be installed
302 :param namespace: K8s namespace to use for the KDU instance
303 :param kwargs: Additional parameters
304 vca_id (str): VCA ID
305
306 :return: If successful, returns ?
307 """
308 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
309 bundle = kdu_model
310
311 if not db_dict:
312 raise K8sException("db_dict must be set")
313 if not bundle:
314 raise K8sException("bundle must be set")
315
316 if bundle.startswith("cs:"):
317 # For Juju Bundles provided by the Charm Store
318 pass
319 elif bundle.startswith("ch:"):
320 # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
321 pass
322 elif bundle.startswith("http"):
323 # Download the file
324 pass
325 else:
326 new_workdir = kdu_model.strip(kdu_model.split("/")[-1])
327 os.chdir(new_workdir)
328 bundle = "local:{}".format(kdu_model)
329
330 self.log.debug("Checking for model named {}".format(kdu_instance))
331
332 # Create the new model
333 self.log.debug("Adding model: {}".format(kdu_instance))
334 cloud = Cloud(cluster_uuid, self._get_credential_name(cluster_uuid))
335 await libjuju.add_model(kdu_instance, cloud)
336
337 # if model:
338 # TODO: Instantiation parameters
339
340 """
341 "Juju bundle that models the KDU, in any of the following ways:
342 - <juju-repo>/<juju-bundle>
343 - <juju-bundle folder under k8s_models folder in the package>
344 - <juju-bundle tgz file (w/ or w/o extension) under k8s_models folder
345 in the package>
346 - <URL_where_to_fetch_juju_bundle>
347 """
348 try:
349 previous_workdir = os.getcwd()
350 except FileNotFoundError:
351 previous_workdir = "/app/storage"
352
353 self.log.debug("[install] deploying {}".format(bundle))
354 await libjuju.deploy(
355 bundle, model_name=kdu_instance, wait=atomic, timeout=timeout
356 )
357 os.chdir(previous_workdir)
358 if self.on_update_db:
359 await self.on_update_db(
360 cluster_uuid,
361 kdu_instance,
362 filter=db_dict["filter"],
363 vca_id=kwargs.get("vca_id"),
364 )
365 return True
366
367 async def scale(
368 self,
369 kdu_instance: str,
370 scale: int,
371 resource_name: str,
372 total_timeout: float = 1800,
373 **kwargs,
374 ) -> bool:
375 """Scale an application in a model
376
377 :param: kdu_instance str: KDU instance name
378 :param: scale int: Scale to which to set the application
379 :param: resource_name str: The application name in the Juju Bundle
380 :param: timeout float: The time, in seconds, to wait for the install
381 to finish
382 :param kwargs: Additional parameters
383 vca_id (str): VCA ID
384
385 :return: If successful, returns True
386 """
387
388 try:
389 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
390 await libjuju.scale_application(
391 model_name=kdu_instance,
392 application_name=resource_name,
393 scale=scale,
394 total_timeout=total_timeout,
395 )
396 except Exception as e:
397 error_msg = "Error scaling application {} in kdu instance {}: {}".format(
398 resource_name, kdu_instance, e
399 )
400 self.log.error(error_msg)
401 raise K8sException(message=error_msg)
402 return True
403
404 async def get_scale_count(
405 self,
406 resource_name: str,
407 kdu_instance: str,
408 **kwargs,
409 ) -> int:
410 """Get an application scale count
411
412 :param: resource_name str: The application name in the Juju Bundle
413 :param: kdu_instance str: KDU instance name
414 :param kwargs: Additional parameters
415 vca_id (str): VCA ID
416 :return: Return application instance count
417 """
418
419 try:
420 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
421 status = await libjuju.get_model_status(kdu_instance)
422 return len(status.applications[resource_name].units)
423 except Exception as e:
424 error_msg = "Error getting scale count from application {} in kdu instance {}: {}".format(
425 resource_name, kdu_instance, e
426 )
427 self.log.error(error_msg)
428 raise K8sException(message=error_msg)
429
430 async def instances_list(self, cluster_uuid: str) -> list:
431 """
432 returns a list of deployed releases in a cluster
433
434 :param cluster_uuid: the cluster
435 :return:
436 """
437 return []
438
439 async def upgrade(
440 self,
441 cluster_uuid: str,
442 kdu_instance: str,
443 kdu_model: str = None,
444 params: dict = None,
445 ) -> str:
446 """Upgrade a model
447
448 :param cluster_uuid str: The UUID of the cluster to upgrade
449 :param kdu_instance str: The unique name of the KDU instance
450 :param kdu_model str: The name or path of the bundle to upgrade to
451 :param params dict: Key-value pairs of instantiation parameters
452
453 :return: If successful, reference to the new revision number of the
454 KDU instance.
455 """
456
457 # TODO: Loop through the bundle and upgrade each charm individually
458
459 """
460 The API doesn't have a concept of bundle upgrades, because there are
461 many possible changes: charm revision, disk, number of units, etc.
462
463 As such, we are only supporting a limited subset of upgrades. We'll
464 upgrade the charm revision but leave storage and scale untouched.
465
466 Scale changes should happen through OSM constructs, and changes to
467 storage would require a redeployment of the service, at least in this
468 initial release.
469 """
470 raise MethodNotImplemented()
471
472 """Rollback"""
473
474 async def rollback(
475 self,
476 cluster_uuid: str,
477 kdu_instance: str,
478 revision: int = 0,
479 ) -> str:
480 """Rollback a model
481
482 :param cluster_uuid str: The UUID of the cluster to rollback
483 :param kdu_instance str: The unique name of the KDU instance
484 :param revision int: The revision to revert to. If omitted, rolls back
485 the previous upgrade.
486
487 :return: If successful, returns the revision of active KDU instance,
488 or raises an exception
489 """
490 raise MethodNotImplemented()
491
492 """Deletion"""
493
494 async def uninstall(
495 self,
496 cluster_uuid: str,
497 kdu_instance: str,
498 **kwargs,
499 ) -> bool:
500 """Uninstall a KDU instance
501
502 :param cluster_uuid str: The UUID of the cluster
503 :param kdu_instance str: The unique name of the KDU instance
504 :param kwargs: Additional parameters
505 vca_id (str): VCA ID
506
507 :return: Returns True if successful, or raises an exception
508 """
509
510 self.log.debug("[uninstall] Destroying model")
511
512 will_not_delete = False
513 if kdu_instance not in self.uninstall_locks:
514 self.uninstall_locks[kdu_instance] = asyncio.Lock(loop=self.loop)
515 delete_lock = self.uninstall_locks[kdu_instance]
516
517 while delete_lock.locked():
518 will_not_delete = True
519 await asyncio.sleep(0.1)
520
521 if will_not_delete:
522 self.log.info("Model {} deleted by another worker.".format(kdu_instance))
523 return True
524
525 try:
526 async with delete_lock:
527 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
528
529 await libjuju.destroy_model(kdu_instance, total_timeout=3600)
530 finally:
531 self.uninstall_locks.pop(kdu_instance)
532
533 self.log.debug(f"[uninstall] Model {kdu_instance} destroyed")
534 return True
535
536 async def upgrade_charm(
537 self,
538 ee_id: str = None,
539 path: str = None,
540 charm_id: str = None,
541 charm_type: str = None,
542 timeout: float = None,
543 ) -> str:
544 """This method upgrade charms in VNFs
545
546 Args:
547 ee_id: Execution environment id
548 path: Local path to the charm
549 charm_id: charm-id
550 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
551 timeout: (Float) Timeout for the ns update operation
552
553 Returns:
554 The output of the update operation if status equals to "completed"
555 """
556 raise K8sException(
557 "KDUs deployed with Juju Bundle do not support charm upgrade"
558 )
559
560 async def exec_primitive(
561 self,
562 cluster_uuid: str = None,
563 kdu_instance: str = None,
564 primitive_name: str = None,
565 timeout: float = 300,
566 params: dict = None,
567 db_dict: dict = None,
568 **kwargs,
569 ) -> str:
570 """Exec primitive (Juju action)
571
572 :param cluster_uuid str: The UUID of the cluster
573 :param kdu_instance str: The unique name of the KDU instance
574 :param primitive_name: Name of action that will be executed
575 :param timeout: Timeout for action execution
576 :param params: Dictionary of all the parameters needed for the action
577 :param db_dict: Dictionary for any additional data
578 :param kwargs: Additional parameters
579 vca_id (str): VCA ID
580
581 :return: Returns the output of the action
582 """
583 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
584
585 if not params or "application-name" not in params:
586 raise K8sException(
587 "Missing application-name argument, \
588 argument needed for K8s actions"
589 )
590 try:
591 self.log.debug(
592 "[exec_primitive] Getting model "
593 "kdu_instance: {}".format(kdu_instance)
594 )
595 application_name = params["application-name"]
596 actions = await libjuju.get_actions(application_name, kdu_instance)
597 if primitive_name not in actions:
598 raise K8sException("Primitive {} not found".format(primitive_name))
599 output, status = await libjuju.execute_action(
600 application_name, kdu_instance, primitive_name, **params
601 )
602
603 if status != "completed":
604 raise K8sException(
605 "status is not completed: {} output: {}".format(status, output)
606 )
607 if self.on_update_db:
608 await self.on_update_db(
609 cluster_uuid, kdu_instance, filter=db_dict["filter"]
610 )
611
612 return output
613
614 except Exception as e:
615 error_msg = "Error executing primitive {}: {}".format(primitive_name, e)
616 self.log.error(error_msg)
617 raise K8sException(message=error_msg)
618
619 """Introspection"""
620
621 async def inspect_kdu(
622 self,
623 kdu_model: str,
624 ) -> dict:
625 """Inspect a KDU
626
627 Inspects a bundle and returns a dictionary of config parameters and
628 their default values.
629
630 :param kdu_model str: The name or path of the bundle to inspect.
631
632 :return: If successful, returns a dictionary of available parameters
633 and their default values.
634 """
635
636 kdu = {}
637 if not os.path.exists(kdu_model):
638 raise K8sException("file {} not found".format(kdu_model))
639
640 with open(kdu_model, "r") as f:
641 bundle = yaml.safe_load(f.read())
642
643 """
644 {
645 'description': 'Test bundle',
646 'bundle': 'kubernetes',
647 'applications': {
648 'mariadb-k8s': {
649 'charm': 'cs:~charmed-osm/mariadb-k8s-20',
650 'scale': 1,
651 'options': {
652 'password': 'manopw',
653 'root_password': 'osm4u',
654 'user': 'mano'
655 },
656 'series': 'kubernetes'
657 }
658 }
659 }
660 """
661 # TODO: This should be returned in an agreed-upon format
662 kdu = bundle["applications"]
663
664 return kdu
665
666 async def help_kdu(
667 self,
668 kdu_model: str,
669 ) -> str:
670 """View the README
671
672 If available, returns the README of the bundle.
673
674 :param kdu_model str: The name or path of a bundle
675
676 :return: If found, returns the contents of the README.
677 """
678 readme = None
679
680 files = ["README", "README.txt", "README.md"]
681 path = os.path.dirname(kdu_model)
682 for file in os.listdir(path):
683 if file in files:
684 with open(file, "r") as f:
685 readme = f.read()
686 break
687
688 return readme
689
690 async def status_kdu(
691 self,
692 cluster_uuid: str,
693 kdu_instance: str,
694 complete_status: bool = False,
695 yaml_format: bool = False,
696 **kwargs,
697 ) -> Union[str, dict]:
698 """Get the status of the KDU
699
700 Get the current status of the KDU instance.
701
702 :param cluster_uuid str: The UUID of the cluster
703 :param kdu_instance str: The unique id of the KDU instance
704 :param complete_status: To get the complete_status of the KDU
705 :param yaml_format: To get the status in proper format for NSR record
706 :param: kwargs: Additional parameters
707 vca_id (str): VCA ID
708
709 :return: Returns a dictionary containing namespace, state, resources,
710 and deployment_time and returns complete_status if complete_status is True
711 """
712 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
713 status = {}
714
715 model_status = await libjuju.get_model_status(kdu_instance)
716
717 if not complete_status:
718 for name in model_status.applications:
719 application = model_status.applications[name]
720 status[name] = {"status": application["status"]["status"]}
721 else:
722 if yaml_format:
723 return obj_to_yaml(model_status)
724 else:
725 return obj_to_dict(model_status)
726
727 return status
728
729 async def add_relation(
730 self,
731 provider: RelationEndpoint,
732 requirer: RelationEndpoint,
733 ):
734 """
735 Add relation between two charmed endpoints
736
737 :param: provider: Provider relation endpoint
738 :param: requirer: Requirer relation endpoint
739 """
740 self.log.debug(f"adding new relation between {provider} and {requirer}")
741 cross_model_relation = (
742 provider.model_name != requirer.model_name
743 or requirer.vca_id != requirer.vca_id
744 )
745 try:
746 if cross_model_relation:
747 # Cross-model relation
748 provider_libjuju = await self._get_libjuju(provider.vca_id)
749 requirer_libjuju = await self._get_libjuju(requirer.vca_id)
750 offer = await provider_libjuju.offer(provider)
751 if offer:
752 saas_name = await requirer_libjuju.consume(
753 requirer.model_name, offer, provider_libjuju
754 )
755 await requirer_libjuju.add_relation(
756 requirer.model_name,
757 requirer.endpoint,
758 saas_name,
759 )
760 else:
761 # Standard relation
762 vca_id = provider.vca_id
763 model = provider.model_name
764 libjuju = await self._get_libjuju(vca_id)
765 # add juju relations between two applications
766 await libjuju.add_relation(
767 model_name=model,
768 endpoint_1=provider.endpoint,
769 endpoint_2=requirer.endpoint,
770 )
771 except Exception as e:
772 message = f"Error adding relation between {provider} and {requirer}: {e}"
773 self.log.error(message)
774 raise Exception(message=message)
775
776 async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs):
777 """
778 Add all configs, actions, executed actions of all applications in a model to vcastatus dict
779
780 :param vcastatus dict: dict containing vcastatus
781 :param kdu_instance str: The unique id of the KDU instance
782 :param: kwargs: Additional parameters
783 vca_id (str): VCA ID
784
785 :return: None
786 """
787 libjuju = await self._get_libjuju(kwargs.get("vca_id"))
788 try:
789 for model_name in vcastatus:
790 # Adding executed actions
791 vcastatus[model_name][
792 "executedActions"
793 ] = await libjuju.get_executed_actions(kdu_instance)
794
795 for application in vcastatus[model_name]["applications"]:
796 # Adding application actions
797 vcastatus[model_name]["applications"][application][
798 "actions"
799 ] = await libjuju.get_actions(application, kdu_instance)
800 # Adding application configs
801 vcastatus[model_name]["applications"][application][
802 "configs"
803 ] = await libjuju.get_application_configs(kdu_instance, application)
804
805 except Exception as e:
806 self.log.debug("Error in updating vca status: {}".format(str(e)))
807
808 async def get_services(
809 self, cluster_uuid: str, kdu_instance: str, namespace: str
810 ) -> list:
811 """Return a list of services of a kdu_instance"""
812
813 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
814 kubectl = self._get_kubectl(credentials)
815 return kubectl.get_services(
816 field_selector="metadata.namespace={}".format(kdu_instance)
817 )
818
819 async def get_service(
820 self, cluster_uuid: str, service_name: str, namespace: str
821 ) -> object:
822 """Return data for a specific service inside a namespace"""
823
824 credentials = self.get_credentials(cluster_uuid=cluster_uuid)
825 kubectl = self._get_kubectl(credentials)
826 return kubectl.get_services(
827 field_selector="metadata.name={},metadata.namespace={}".format(
828 service_name, namespace
829 )
830 )[0]
831
832 def get_credentials(self, cluster_uuid: str) -> str:
833 """
834 Get Cluster Kubeconfig
835 """
836 k8scluster = self.db.get_one(
837 "k8sclusters", q_filter={"_id": cluster_uuid}, fail_on_empty=False
838 )
839
840 self.db.encrypt_decrypt_fields(
841 k8scluster.get("credentials"),
842 "decrypt",
843 ["password", "secret"],
844 schema_version=k8scluster["schema_version"],
845 salt=k8scluster["_id"],
846 )
847
848 return yaml.safe_dump(k8scluster.get("credentials"))
849
850 def _get_credential_name(self, cluster_uuid: str) -> str:
851 """
852 Get credential name for a k8s cloud
853
854 We cannot use the cluster_uuid for the credential name directly,
855 because it cannot start with a number, it must start with a letter.
856 Therefore, the k8s cloud credential name will be "cred-" followed
857 by the cluster uuid.
858
859 :param: cluster_uuid: Cluster UUID of the kubernetes cloud (=cloud_name)
860
861 :return: Name to use for the credential name.
862 """
863 return "cred-{}".format(cluster_uuid)
864
865 def get_namespace(
866 self,
867 cluster_uuid: str,
868 ) -> str:
869 """Get the namespace UUID
870 Gets the namespace's unique name
871
872 :param cluster_uuid str: The UUID of the cluster
873 :returns: The namespace UUID, or raises an exception
874 """
875 pass
876
877 @staticmethod
878 def generate_kdu_instance_name(**kwargs):
879 db_dict = kwargs.get("db_dict")
880 kdu_name = kwargs.get("kdu_name", None)
881 if kdu_name:
882 kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
883 else:
884 kdu_instance = db_dict["filter"]["_id"]
885 return kdu_instance
886
887 async def _get_libjuju(self, vca_id: str = None) -> Libjuju:
888 """
889 Get libjuju object
890
891 :param: vca_id: VCA ID
892 If None, get a libjuju object with a Connection to the default VCA
893 Else, geta libjuju object with a Connection to the specified VCA
894 """
895 if not vca_id:
896 while self.loading_libjuju.locked():
897 await asyncio.sleep(0.1)
898 if not self.libjuju:
899 async with self.loading_libjuju:
900 vca_connection = await get_connection(self._store)
901 self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
902 return self.libjuju
903 else:
904 vca_connection = await get_connection(self._store, vca_id)
905 return Libjuju(
906 vca_connection,
907 loop=self.loop,
908 log=self.log,
909 n2vc=self,
910 )
911
912 def _get_kubectl(self, credentials: str) -> Kubectl:
913 """
914 Get Kubectl object
915
916 :param: kubeconfig_credentials: Kubeconfig credentials
917 """
918 kubecfg = tempfile.NamedTemporaryFile()
919 with open(kubecfg.name, "w") as kubecfg_file:
920 kubecfg_file.write(credentials)
921 return Kubectl(config_file=kubecfg.name)