2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 def _get_namespace(self
, cluster_uuid
: str) -> str:
95 Obtains the namespace used by the cluster with the uuid passed by argument
97 param: cluster_uuid: cluster's uuid
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster
= self
.db
.get_one(
102 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
104 return k8scluster
.get("namespace")
109 namespace
: str = "kube-system",
110 reuse_cluster_uuid
=None,
114 It prepares a given K8s cluster environment to run Charts
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
127 if reuse_cluster_uuid
:
128 cluster_id
= reuse_cluster_uuid
130 cluster_id
= str(uuid4())
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
136 paths
, env
= self
._init
_paths
_env
(
137 cluster_name
=cluster_id
, create_if_not_exist
=True
139 mode
= stat
.S_IRUSR | stat
.S_IWUSR
140 with
open(paths
["kube_config"], "w", mode
) as f
:
142 os
.chmod(paths
["kube_config"], 0o600)
144 # Code with initialization specific of helm version
145 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
147 # sync fs with local data
148 self
.fs
.reverse_sync(from_path
=cluster_id
)
150 self
.log
.info("Cluster {} initialized".format(cluster_id
))
152 return cluster_id
, n2vc_installed_sw
159 repo_type
: str = "chart",
162 password
: str = None,
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid
, repo_type
, name
, url
171 paths
, env
= self
._init
_paths
_env
(
172 cluster_name
=cluster_uuid
, create_if_not_exist
=True
176 self
.fs
.sync(from_path
=cluster_uuid
)
178 # helm repo add name url
179 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
180 paths
["kube_config"], self
._helm
_command
, name
, url
184 temp_cert_file
= os
.path
.join(
185 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
187 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
188 with
open(temp_cert_file
, "w") as the_cert
:
190 command
+= " --ca-file {}".format(temp_cert_file
)
193 command
+= " --username={}".format(user
)
196 command
+= " --password={}".format(password
)
198 self
.log
.debug("adding repo: {}".format(command
))
199 await self
._local
_async
_exec
(
200 command
=command
, raise_exception_on_error
=True, env
=env
204 command
= "env KUBECONFIG={} {} repo update {}".format(
205 paths
["kube_config"], self
._helm
_command
, name
207 self
.log
.debug("updating repo: {}".format(command
))
208 await self
._local
_async
_exec
(
209 command
=command
, raise_exception_on_error
=False, env
=env
213 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
215 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid
, repo_type
, name
223 paths
, env
= self
._init
_paths
_env
(
224 cluster_name
=cluster_uuid
, create_if_not_exist
=True
228 self
.fs
.sync(from_path
=cluster_uuid
)
231 command
= "{} repo update {}".format(self
._helm
_command
, name
)
232 self
.log
.debug("updating repo: {}".format(command
))
233 await self
._local
_async
_exec
(
234 command
=command
, raise_exception_on_error
=False, env
=env
238 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
240 async def repo_list(self
, cluster_uuid
: str) -> list:
242 Get the list of registered repositories
244 :return: list of registered repositories: [ (name, url) .... ]
247 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
250 paths
, env
= self
._init
_paths
_env
(
251 cluster_name
=cluster_uuid
, create_if_not_exist
=True
255 self
.fs
.sync(from_path
=cluster_uuid
)
257 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths
["kube_config"], self
._helm
_command
261 # Set exception to false because if there are no repos just want an empty list
262 output
, _rc
= await self
._local
_async
_exec
(
263 command
=command
, raise_exception_on_error
=False, env
=env
267 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
270 if output
and len(output
) > 0:
271 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self
._lower
_keys
_list
(repos
)
279 async def repo_remove(self
, cluster_uuid
: str, name
: str):
281 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
285 paths
, env
= self
._init
_paths
_env
(
286 cluster_name
=cluster_uuid
, create_if_not_exist
=True
290 self
.fs
.sync(from_path
=cluster_uuid
)
292 command
= "env KUBECONFIG={} {} repo remove {}".format(
293 paths
["kube_config"], self
._helm
_command
, name
295 await self
._local
_async
_exec
(
296 command
=command
, raise_exception_on_error
=True, env
=env
300 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
306 uninstall_sw
: bool = False,
311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
319 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
322 cluster_uuid
, uninstall_sw
327 self
.fs
.sync(from_path
=cluster_uuid
)
329 # uninstall releases if needed.
331 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
332 if len(releases
) > 0:
336 kdu_instance
= r
.get("name")
337 chart
= r
.get("chart")
339 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
341 await self
.uninstall(
342 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
344 except Exception as e
:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
349 "Error uninstalling release {}: {}".format(
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
356 ).format(cluster_uuid
)
359 False # Allow to remove k8s cluster without removing Tiller
363 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
365 # delete cluster directory
366 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
367 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
368 # Remove also local directorio if still exist
369 direct
= self
.fs
.path
+ "/" + cluster_uuid
370 shutil
.rmtree(direct
, ignore_errors
=True)
374 def _is_helm_chart_a_file(self
, chart_name
: str):
375 return chart_name
.count("/") > 1
377 async def _install_impl(
385 timeout
: float = 300,
387 db_dict
: dict = None,
388 kdu_name
: str = None,
389 namespace
: str = None,
392 paths
, env
= self
._init
_paths
_env
(
393 cluster_name
=cluster_id
, create_if_not_exist
=True
397 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
398 cluster_id
=cluster_id
, params
=params
402 kdu_model
, version
= self
._split
_version
(kdu_model
)
404 repo
= self
._split
_repo
(kdu_model
)
406 self
.repo_update(cluster_id
, repo
)
408 command
= self
._get
_install
_command
(
416 paths
["kube_config"],
419 self
.log
.debug("installing: {}".format(command
))
422 # exec helm in a task
423 exec_task
= asyncio
.ensure_future(
424 coro_or_future
=self
._local
_async
_exec
(
425 command
=command
, raise_exception_on_error
=False, env
=env
429 # write status in another task
430 status_task
= asyncio
.ensure_future(
431 coro_or_future
=self
._store
_status
(
432 cluster_id
=cluster_id
,
433 kdu_instance
=kdu_instance
,
441 # wait for execution task
442 await asyncio
.wait([exec_task
])
447 output
, rc
= exec_task
.result()
451 output
, rc
= await self
._local
_async
_exec
(
452 command
=command
, raise_exception_on_error
=False, env
=env
455 # remove temporal values yaml file
457 os
.remove(file_to_delete
)
460 await self
._store
_status
(
461 cluster_id
=cluster_id
,
462 kdu_instance
=kdu_instance
,
471 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
473 raise K8sException(msg
)
479 kdu_model
: str = None,
481 timeout
: float = 300,
483 db_dict
: dict = None,
485 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
488 self
.fs
.sync(from_path
=cluster_uuid
)
490 # look for instance to obtain namespace
491 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
492 if not instance_info
:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
496 paths
, env
= self
._init
_paths
_env
(
497 cluster_name
=cluster_uuid
, create_if_not_exist
=True
501 self
.fs
.sync(from_path
=cluster_uuid
)
504 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
505 cluster_id
=cluster_uuid
, params
=params
509 kdu_model
, version
= self
._split
_version
(kdu_model
)
511 repo
= self
._split
_repo
(kdu_model
)
513 self
.repo_update(cluster_uuid
, repo
)
515 command
= self
._get
_upgrade
_command
(
518 instance_info
["namespace"],
523 paths
["kube_config"],
526 self
.log
.debug("upgrading: {}".format(command
))
530 # exec helm in a task
531 exec_task
= asyncio
.ensure_future(
532 coro_or_future
=self
._local
_async
_exec
(
533 command
=command
, raise_exception_on_error
=False, env
=env
536 # write status in another task
537 status_task
= asyncio
.ensure_future(
538 coro_or_future
=self
._store
_status
(
539 cluster_id
=cluster_uuid
,
540 kdu_instance
=kdu_instance
,
541 namespace
=instance_info
["namespace"],
548 # wait for execution task
549 await asyncio
.wait([exec_task
])
553 output
, rc
= exec_task
.result()
557 output
, rc
= await self
._local
_async
_exec
(
558 command
=command
, raise_exception_on_error
=False, env
=env
561 # remove temporal values yaml file
563 os
.remove(file_to_delete
)
566 await self
._store
_status
(
567 cluster_id
=cluster_uuid
,
568 kdu_instance
=kdu_instance
,
569 namespace
=instance_info
["namespace"],
577 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
579 raise K8sException(msg
)
582 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
584 # return new revision number
585 instance
= await self
.get_instance_info(
586 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
589 revision
= int(instance
.get("revision"))
590 self
.log
.debug("New revision: {}".format(revision
))
600 total_timeout
: float = 1800,
601 cluster_uuid
: str = None,
602 kdu_model
: str = None,
604 db_dict
: dict = None,
607 """Scale a resource in a Helm Chart.
610 kdu_instance: KDU instance name
611 scale: Scale to which to set the resource
612 resource_name: Resource name
613 total_timeout: The time, in seconds, to wait
614 cluster_uuid: The UUID of the cluster
615 kdu_model: The chart reference
616 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
617 The --wait flag will be set automatically if --atomic is used
618 db_dict: Dictionary for any additional data
619 kwargs: Additional parameters
622 True if successful, False otherwise
625 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
627 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
628 resource_name
, kdu_model
, cluster_uuid
631 self
.log
.debug(debug_mgs
)
633 # look for instance to obtain namespace
634 # get_instance_info function calls the sync command
635 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
636 if not instance_info
:
637 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
640 paths
, env
= self
._init
_paths
_env
(
641 cluster_name
=cluster_uuid
, create_if_not_exist
=True
645 kdu_model
, version
= self
._split
_version
(kdu_model
)
647 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
650 "Repository not found for kdu_model {}".format(kdu_model
)
653 _
, replica_str
= await self
._get
_replica
_count
_url
(
654 kdu_model
, repo_url
, resource_name
657 command
= self
._get
_upgrade
_scale
_command
(
660 instance_info
["namespace"],
667 paths
["kube_config"],
670 self
.log
.debug("scaling: {}".format(command
))
673 # exec helm in a task
674 exec_task
= asyncio
.ensure_future(
675 coro_or_future
=self
._local
_async
_exec
(
676 command
=command
, raise_exception_on_error
=False, env
=env
679 # write status in another task
680 status_task
= asyncio
.ensure_future(
681 coro_or_future
=self
._store
_status
(
682 cluster_id
=cluster_uuid
,
683 kdu_instance
=kdu_instance
,
684 namespace
=instance_info
["namespace"],
691 # wait for execution task
692 await asyncio
.wait([exec_task
])
696 output
, rc
= exec_task
.result()
699 output
, rc
= await self
._local
_async
_exec
(
700 command
=command
, raise_exception_on_error
=False, env
=env
704 await self
._store
_status
(
705 cluster_id
=cluster_uuid
,
706 kdu_instance
=kdu_instance
,
707 namespace
=instance_info
["namespace"],
715 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
717 raise K8sException(msg
)
720 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
724 async def get_scale_count(
732 """Get a resource scale count.
735 cluster_uuid: The UUID of the cluster
736 resource_name: Resource name
737 kdu_instance: KDU instance name
738 kdu_model: The name or path of a bundle
739 kwargs: Additional parameters
742 Resource instance count
746 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
749 # look for instance to obtain namespace
750 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
751 if not instance_info
:
752 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
755 paths
, env
= self
._init
_paths
_env
(
756 cluster_name
=cluster_uuid
, create_if_not_exist
=True
759 replicas
= await self
._get
_replica
_count
_instance
(
760 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
763 # Get default value if scale count is not found from provided values
765 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
768 "Repository not found for kdu_model {}".format(kdu_model
)
771 replicas
, _
= await self
._get
_replica
_count
_url
(
772 kdu_model
, repo_url
, resource_name
776 msg
= "Replica count not found. Cannot be scaled"
778 raise K8sException(msg
)
783 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
786 "rollback kdu_instance {} to revision {} from cluster {}".format(
787 kdu_instance
, revision
, cluster_uuid
792 self
.fs
.sync(from_path
=cluster_uuid
)
794 # look for instance to obtain namespace
795 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
796 if not instance_info
:
797 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
800 paths
, env
= self
._init
_paths
_env
(
801 cluster_name
=cluster_uuid
, create_if_not_exist
=True
805 self
.fs
.sync(from_path
=cluster_uuid
)
807 command
= self
._get
_rollback
_command
(
808 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
811 self
.log
.debug("rolling_back: {}".format(command
))
813 # exec helm in a task
814 exec_task
= asyncio
.ensure_future(
815 coro_or_future
=self
._local
_async
_exec
(
816 command
=command
, raise_exception_on_error
=False, env
=env
819 # write status in another task
820 status_task
= asyncio
.ensure_future(
821 coro_or_future
=self
._store
_status
(
822 cluster_id
=cluster_uuid
,
823 kdu_instance
=kdu_instance
,
824 namespace
=instance_info
["namespace"],
826 operation
="rollback",
831 # wait for execution task
832 await asyncio
.wait([exec_task
])
837 output
, rc
= exec_task
.result()
840 await self
._store
_status
(
841 cluster_id
=cluster_uuid
,
842 kdu_instance
=kdu_instance
,
843 namespace
=instance_info
["namespace"],
845 operation
="rollback",
851 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
853 raise K8sException(msg
)
856 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
858 # return new revision number
859 instance
= await self
.get_instance_info(
860 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
863 revision
= int(instance
.get("revision"))
864 self
.log
.debug("New revision: {}".format(revision
))
869 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
871 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
872 (this call should happen after all _terminate-config-primitive_ of the VNF
875 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
876 :param kdu_instance: unique name for the KDU instance to be deleted
877 :param kwargs: Additional parameters (None yet)
878 :return: True if successful
882 "uninstall kdu_instance {} from cluster {}".format(
883 kdu_instance
, cluster_uuid
888 self
.fs
.sync(from_path
=cluster_uuid
)
890 # look for instance to obtain namespace
891 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
892 if not instance_info
:
893 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
896 paths
, env
= self
._init
_paths
_env
(
897 cluster_name
=cluster_uuid
, create_if_not_exist
=True
901 self
.fs
.sync(from_path
=cluster_uuid
)
903 command
= self
._get
_uninstall
_command
(
904 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
906 output
, _rc
= await self
._local
_async
_exec
(
907 command
=command
, raise_exception_on_error
=True, env
=env
911 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
913 return self
._output
_to
_table
(output
)
915 async def instances_list(self
, cluster_uuid
: str) -> list:
917 returns a list of deployed releases in a cluster
919 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
923 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
926 self
.fs
.sync(from_path
=cluster_uuid
)
928 # execute internal command
929 result
= await self
._instances
_list
(cluster_uuid
)
932 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
936 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
937 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
938 for instance
in instances
:
939 if instance
.get("name") == kdu_instance
:
941 self
.log
.debug("Instance {} not found".format(kdu_instance
))
944 async def upgrade_charm(
948 charm_id
: str = None,
949 charm_type
: str = None,
950 timeout
: float = None,
952 """This method upgrade charms in VNFs
955 ee_id: Execution environment id
956 path: Local path to the charm
958 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
959 timeout: (Float) Timeout for the ns update operation
962 The output of the update operation if status equals to "completed"
964 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
966 async def exec_primitive(
968 cluster_uuid
: str = None,
969 kdu_instance
: str = None,
970 primitive_name
: str = None,
971 timeout
: float = 300,
973 db_dict
: dict = None,
976 """Exec primitive (Juju action)
978 :param cluster_uuid: The UUID of the cluster or namespace:cluster
979 :param kdu_instance: The unique name of the KDU instance
980 :param primitive_name: Name of action that will be executed
981 :param timeout: Timeout for action execution
982 :param params: Dictionary of all the parameters needed for the action
983 :db_dict: Dictionary for any additional data
984 :param kwargs: Additional parameters (None yet)
986 :return: Returns the output of the action
989 "KDUs deployed with Helm don't support actions "
990 "different from rollback, upgrade and status"
993 async def get_services(
994 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
997 Returns a list of services defined for the specified kdu instance.
999 :param cluster_uuid: UUID of a K8s cluster known by OSM
1000 :param kdu_instance: unique name for the KDU instance
1001 :param namespace: K8s namespace used by the KDU instance
1002 :return: If successful, it will return a list of services, Each service
1003 can have the following data:
1004 - `name` of the service
1005 - `type` type of service in the k8 cluster
1006 - `ports` List of ports offered by the service, for each port includes at least
1007 name, port, protocol
1008 - `cluster_ip` Internal ip to be used inside k8s cluster
1009 - `external_ip` List of external ips (in case they are available)
1013 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1014 cluster_uuid
, kdu_instance
1019 paths
, env
= self
._init
_paths
_env
(
1020 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1024 self
.fs
.sync(from_path
=cluster_uuid
)
1026 # get list of services names for kdu
1027 service_names
= await self
._get
_services
(
1028 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1032 for service
in service_names
:
1033 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1034 service_list
.append(service
)
1037 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1041 async def get_service(
1042 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1046 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1047 service_name
, namespace
, cluster_uuid
1052 self
.fs
.sync(from_path
=cluster_uuid
)
1054 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1057 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1061 async def status_kdu(
1062 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1063 ) -> Union
[str, dict]:
1065 This call would retrieve tha current state of a given KDU instance. It would be
1066 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1067 values_ of the configuration parameters applied to a given instance. This call
1068 would be based on the `status` call.
1070 :param cluster_uuid: UUID of a K8s cluster known by OSM
1071 :param kdu_instance: unique name for the KDU instance
1072 :param kwargs: Additional parameters (None yet)
1073 :param yaml_format: if the return shall be returned as an YAML string or as a
1075 :return: If successful, it will return the following vector of arguments:
1076 - K8s `namespace` in the cluster where the KDU lives
1077 - `state` of the KDU instance. It can be:
1084 - List of `resources` (objects) that this release consists of, sorted by kind,
1085 and the status of those resources
1086 - Last `deployment_time`.
1090 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1091 cluster_uuid
, kdu_instance
1096 self
.fs
.sync(from_path
=cluster_uuid
)
1098 # get instance: needed to obtain namespace
1099 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1100 for instance
in instances
:
1101 if instance
.get("name") == kdu_instance
:
1104 # instance does not exist
1106 "Instance name: {} not found in cluster: {}".format(
1107 kdu_instance
, cluster_uuid
1111 status
= await self
._status
_kdu
(
1112 cluster_id
=cluster_uuid
,
1113 kdu_instance
=kdu_instance
,
1114 namespace
=instance
["namespace"],
1115 yaml_format
=yaml_format
,
1116 show_error_log
=True,
1120 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1124 async def get_values_kdu(
1125 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1128 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1130 return await self
._exec
_get
_command
(
1131 get_command
="values",
1132 kdu_instance
=kdu_instance
,
1133 namespace
=namespace
,
1134 kubeconfig
=kubeconfig
,
1137 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1140 "inspect kdu_model values {} from (optional) repo: {}".format(
1145 return await self
._exec
_inspect
_command
(
1146 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1149 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1152 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1155 return await self
._exec
_inspect
_command
(
1156 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1159 async def synchronize_repos(self
, cluster_uuid
: str):
1161 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1163 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1164 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1166 local_repo_list
= await self
.repo_list(cluster_uuid
)
1167 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1169 deleted_repo_list
= []
1170 added_repo_dict
= {}
1172 # iterate over the list of repos in the database that should be
1173 # added if not present
1174 for repo_name
, db_repo
in db_repo_dict
.items():
1176 # check if it is already present
1177 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1178 repo_id
= db_repo
.get("_id")
1179 if curr_repo_url
!= db_repo
["url"]:
1182 "repo {} url changed, delete and and again".format(
1186 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1187 deleted_repo_list
.append(repo_id
)
1190 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1191 if "ca_cert" in db_repo
:
1192 await self
.repo_add(
1196 cert
=db_repo
["ca_cert"],
1199 await self
.repo_add(
1204 added_repo_dict
[repo_id
] = db_repo
["name"]
1205 except Exception as e
:
1207 "Error adding repo id: {}, err_msg: {} ".format(
1212 # Delete repos that are present but not in nbi_list
1213 for repo_name
in local_repo_dict
:
1214 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1215 self
.log
.debug("delete repo {}".format(repo_name
))
1217 await self
.repo_remove(cluster_uuid
, repo_name
)
1218 deleted_repo_list
.append(repo_name
)
1219 except Exception as e
:
1221 "Error deleting repo, name: {}, err_msg: {}".format(
1226 return deleted_repo_list
, added_repo_dict
1228 except K8sException
:
1230 except Exception as e
:
1231 # Do not raise errors synchronizing repos
1232 self
.log
.error("Error synchronizing repos: {}".format(e
))
1233 raise Exception("Error synchronizing repos: {}".format(e
))
1235 def _get_db_repos_dict(self
, repo_ids
: list):
1237 for repo_id
in repo_ids
:
1238 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1239 db_repos_dict
[db_repo
["name"]] = db_repo
1240 return db_repos_dict
1243 ####################################################################################
1244 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1245 ####################################################################################
1249 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1251 Creates and returns base cluster and kube dirs and returns them.
1252 Also created helm3 dirs according to new directory specification, paths are
1253 not returned but assigned to helm environment variables
1255 :param cluster_name: cluster_name
1256 :return: Dictionary with config_paths and dictionary with helm environment variables
1260 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1262 Implements the helm version dependent cluster initialization
1266 async def _instances_list(self
, cluster_id
):
1268 Implements the helm version dependent helm instances list
1272 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1274 Implements the helm version dependent method to obtain services from a helm instance
1278 async def _status_kdu(
1282 namespace
: str = None,
1283 yaml_format
: bool = False,
1284 show_error_log
: bool = False,
1285 ) -> Union
[str, dict]:
1287 Implements the helm version dependent method to obtain status of a helm instance
1291 def _get_install_command(
1303 Obtain command to be executed to delete the indicated instance
1307 def _get_upgrade_scale_command(
1320 """Obtain command to be executed to upgrade the indicated instance."""
1323 def _get_upgrade_command(
1335 Obtain command to be executed to upgrade the indicated instance
1339 def _get_rollback_command(
1340 self
, kdu_instance
, namespace
, revision
, kubeconfig
1343 Obtain command to be executed to rollback the indicated instance
1347 def _get_uninstall_command(
1348 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1351 Obtain command to be executed to delete the indicated instance
1355 def _get_inspect_command(
1356 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1359 Obtain command to be executed to obtain information about the kdu
1363 def _get_get_command(
1364 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1366 """Obtain command to be executed to get information about the kdu instance."""
1369 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1371 Method call to uninstall cluster software for helm. This method is dependent
1373 For Helm v2 it will be called when Tiller must be uninstalled
1374 For Helm v3 it does nothing and does not need to be callled
1378 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1380 Obtains the cluster repos identifiers
1384 ####################################################################################
1385 ################################### P R I V A T E ##################################
1386 ####################################################################################
1390 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1391 if os
.path
.exists(filename
):
1394 msg
= "File {} does not exist".format(filename
)
1395 if exception_if_not_exists
:
1396 raise K8sException(msg
)
1399 def _remove_multiple_spaces(strobj
):
1400 strobj
= strobj
.strip()
1401 while " " in strobj
:
1402 strobj
= strobj
.replace(" ", " ")
1406 def _output_to_lines(output
: str) -> list:
1407 output_lines
= list()
1408 lines
= output
.splitlines(keepends
=False)
1412 output_lines
.append(line
)
1416 def _output_to_table(output
: str) -> list:
1417 output_table
= list()
1418 lines
= output
.splitlines(keepends
=False)
1420 line
= line
.replace("\t", " ")
1422 output_table
.append(line_list
)
1423 cells
= line
.split(sep
=" ")
1427 line_list
.append(cell
)
1431 def _parse_services(output
: str) -> list:
1432 lines
= output
.splitlines(keepends
=False)
1435 line
= line
.replace("\t", " ")
1436 cells
= line
.split(sep
=" ")
1437 if len(cells
) > 0 and cells
[0].startswith("service/"):
1438 elems
= cells
[0].split(sep
="/")
1440 services
.append(elems
[1])
1444 def _get_deep(dictionary
: dict, members
: tuple):
1449 value
= target
.get(m
)
1458 # find key:value in several lines
1460 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1461 for line
in p_lines
:
1463 if line
.startswith(p_key
+ ":"):
1464 parts
= line
.split(":")
1465 the_value
= parts
[1].strip()
1473 def _lower_keys_list(input_list
: list):
1475 Transform the keys in a list of dictionaries to lower case and returns a new list
1480 for dictionary
in input_list
:
1481 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1482 new_list
.append(new_dict
)
1485 async def _local_async_exec(
1488 raise_exception_on_error
: bool = False,
1489 show_error_log
: bool = True,
1490 encode_utf8
: bool = False,
1494 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1496 "Executing async local command: {}, env: {}".format(command
, env
)
1500 command
= shlex
.split(command
)
1502 environ
= os
.environ
.copy()
1507 process
= await asyncio
.create_subprocess_exec(
1509 stdout
=asyncio
.subprocess
.PIPE
,
1510 stderr
=asyncio
.subprocess
.PIPE
,
1514 # wait for command terminate
1515 stdout
, stderr
= await process
.communicate()
1517 return_code
= process
.returncode
1521 output
= stdout
.decode("utf-8").strip()
1522 # output = stdout.decode()
1524 output
= stderr
.decode("utf-8").strip()
1525 # output = stderr.decode()
1527 if return_code
!= 0 and show_error_log
:
1529 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1532 self
.log
.debug("Return code: {}".format(return_code
))
1534 if raise_exception_on_error
and return_code
!= 0:
1535 raise K8sException(output
)
1538 output
= output
.encode("utf-8").strip()
1539 output
= str(output
).replace("\\n", "\n")
1541 return output
, return_code
1543 except asyncio
.CancelledError
:
1545 except K8sException
:
1547 except Exception as e
:
1548 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1550 if raise_exception_on_error
:
1551 raise K8sException(e
) from e
1555 async def _local_async_exec_pipe(
1559 raise_exception_on_error
: bool = True,
1560 show_error_log
: bool = True,
1561 encode_utf8
: bool = False,
1565 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1566 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1567 command
= "{} | {}".format(command1
, command2
)
1569 "Executing async local command: {}, env: {}".format(command
, env
)
1573 command1
= shlex
.split(command1
)
1574 command2
= shlex
.split(command2
)
1576 environ
= os
.environ
.copy()
1581 read
, write
= os
.pipe()
1582 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1584 process_2
= await asyncio
.create_subprocess_exec(
1585 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1588 stdout
, stderr
= await process_2
.communicate()
1590 return_code
= process_2
.returncode
1594 output
= stdout
.decode("utf-8").strip()
1595 # output = stdout.decode()
1597 output
= stderr
.decode("utf-8").strip()
1598 # output = stderr.decode()
1600 if return_code
!= 0 and show_error_log
:
1602 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1605 self
.log
.debug("Return code: {}".format(return_code
))
1607 if raise_exception_on_error
and return_code
!= 0:
1608 raise K8sException(output
)
1611 output
= output
.encode("utf-8").strip()
1612 output
= str(output
).replace("\\n", "\n")
1614 return output
, return_code
1615 except asyncio
.CancelledError
:
1617 except K8sException
:
1619 except Exception as e
:
1620 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1622 if raise_exception_on_error
:
1623 raise K8sException(e
) from e
1627 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1629 Obtains the data of the specified service in the k8cluster.
1631 :param cluster_id: id of a K8s cluster known by OSM
1632 :param service_name: name of the K8s service in the specified namespace
1633 :param namespace: K8s namespace used by the KDU instance
1634 :return: If successful, it will return a service with the following data:
1635 - `name` of the service
1636 - `type` type of service in the k8 cluster
1637 - `ports` List of ports offered by the service, for each port includes at least
1638 name, port, protocol
1639 - `cluster_ip` Internal ip to be used inside k8s cluster
1640 - `external_ip` List of external ips (in case they are available)
1644 paths
, env
= self
._init
_paths
_env
(
1645 cluster_name
=cluster_id
, create_if_not_exist
=True
1648 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1649 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1652 output
, _rc
= await self
._local
_async
_exec
(
1653 command
=command
, raise_exception_on_error
=True, env
=env
1656 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1659 "name": service_name
,
1660 "type": self
._get
_deep
(data
, ("spec", "type")),
1661 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1662 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1664 if service
["type"] == "LoadBalancer":
1665 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1666 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1667 service
["external_ip"] = ip_list
1671 async def _exec_get_command(
1672 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1674 """Obtains information about the kdu instance."""
1676 full_command
= self
._get
_get
_command
(
1677 get_command
, kdu_instance
, namespace
, kubeconfig
1680 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1684 async def _exec_inspect_command(
1685 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1687 """Obtains information about a kdu, no cluster (no env)."""
1691 repo_str
= " --repo {}".format(repo_url
)
1693 idx
= kdu_model
.find("/")
1696 kdu_model
= kdu_model
[idx
:]
1698 kdu_model
, version
= self
._split
_version
(kdu_model
)
1700 version_str
= "--version {}".format(version
)
1704 full_command
= self
._get
_inspect
_command
(
1705 inspect_command
, kdu_model
, repo_str
, version_str
1708 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1712 async def _get_replica_count_url(
1716 resource_name
: str = None,
1718 """Get the replica count value in the Helm Chart Values.
1721 kdu_model: The name or path of a bundle
1722 repo_url: Helm Chart repository url
1723 resource_name: Resource name
1726 True if replicas, False replicaCount
1729 kdu_values
= yaml
.load(
1730 await self
.values_kdu(kdu_model
, repo_url
), Loader
=yaml
.SafeLoader
1735 "kdu_values not found for kdu_model {}".format(kdu_model
)
1739 kdu_values
= kdu_values
.get(resource_name
, None)
1742 msg
= "resource {} not found in the values in model {}".format(
1743 resource_name
, kdu_model
1746 raise K8sException(msg
)
1748 duplicate_check
= False
1753 if kdu_values
.get("replicaCount", None):
1754 replicas
= kdu_values
["replicaCount"]
1755 replica_str
= "replicaCount"
1756 elif kdu_values
.get("replicas", None):
1757 duplicate_check
= True
1758 replicas
= kdu_values
["replicas"]
1759 replica_str
= "replicas"
1763 "replicaCount or replicas not found in the resource"
1764 "{} values in model {}. Cannot be scaled".format(
1765 resource_name
, kdu_model
1770 "replicaCount or replicas not found in the values"
1771 "in model {}. Cannot be scaled".format(kdu_model
)
1774 raise K8sException(msg
)
1776 # Control if replicas and replicaCount exists at the same time
1777 msg
= "replicaCount and replicas are exists at the same time"
1779 if "replicaCount" in kdu_values
:
1781 raise K8sException(msg
)
1783 if "replicas" in kdu_values
:
1785 raise K8sException(msg
)
1787 return replicas
, replica_str
1789 async def _get_replica_count_instance(
1794 resource_name
: str = None,
1796 """Get the replica count value in the instance.
1799 kdu_instance: The name of the KDU instance
1800 namespace: KDU instance namespace
1802 resource_name: Resource name
1805 True if replicas, False replicaCount
1808 kdu_values
= yaml
.load(
1809 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1810 Loader
=yaml
.SafeLoader
,
1817 kdu_values
.get(resource_name
, None) if resource_name
else None
1821 resource_values
.get("replicaCount", None)
1822 or resource_values
.get("replicas", None)
1826 kdu_values
.get("replicaCount", None)
1827 or kdu_values
.get("replicas", None)
1833 async def _store_status(
1838 namespace
: str = None,
1839 check_every
: float = 10,
1840 db_dict
: dict = None,
1841 run_once
: bool = False,
1845 await asyncio
.sleep(check_every
)
1846 detailed_status
= await self
._status
_kdu
(
1847 cluster_id
=cluster_id
,
1848 kdu_instance
=kdu_instance
,
1850 namespace
=namespace
,
1852 status
= detailed_status
.get("info").get("description")
1853 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1854 # write status to db
1855 result
= await self
.write_app_status_to_db(
1858 detailed_status
=str(detailed_status
),
1859 operation
=operation
,
1862 self
.log
.info("Error writing in database. Task exiting...")
1864 except asyncio
.CancelledError
:
1865 self
.log
.debug("Task cancelled")
1867 except Exception as e
:
1869 "_store_status exception: {}".format(str(e
)), exc_info
=True
1876 # params for use in -f file
1877 # returns values file option and filename (in order to delete it at the end)
1878 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1880 if params
and len(params
) > 0:
1881 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1883 def get_random_number():
1884 r
= random
.randrange(start
=1, stop
=99999999)
1892 value
= params
.get(key
)
1893 if "!!yaml" in str(value
):
1894 value
= yaml
.safe_load(value
[7:])
1895 params2
[key
] = value
1897 values_file
= get_random_number() + ".yaml"
1898 with
open(values_file
, "w") as stream
:
1899 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1901 return "-f {}".format(values_file
), values_file
1905 # params for use in --set option
1907 def _params_to_set_option(params
: dict) -> str:
1909 if params
and len(params
) > 0:
1912 value
= params
.get(key
, None)
1913 if value
is not None:
1915 params_str
+= "--set "
1919 params_str
+= "{}={}".format(key
, value
)
1923 def generate_kdu_instance_name(**kwargs
):
1924 chart_name
= kwargs
["kdu_model"]
1925 # check embeded chart (file or dir)
1926 if chart_name
.startswith("/"):
1927 # extract file or directory name
1928 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1930 elif "://" in chart_name
:
1931 # extract last portion of URL
1932 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1935 for c
in chart_name
:
1936 if c
.isalpha() or c
.isnumeric():
1943 # if does not start with alpha character, prefix 'a'
1944 if not name
[0].isalpha():
1949 def get_random_number():
1950 r
= random
.randrange(start
=1, stop
=99999999)
1952 s
= s
.rjust(10, "0")
1955 name
= name
+ get_random_number()
1958 def _split_version(self
, kdu_model
: str) -> (str, str):
1960 if not self
._is
_helm
_chart
_a
_file
(kdu_model
) and ":" in kdu_model
:
1961 parts
= kdu_model
.split(sep
=":")
1963 version
= str(parts
[1])
1964 kdu_model
= parts
[0]
1965 return kdu_model
, version
1967 async def _split_repo(self
, kdu_model
: str) -> str:
1969 idx
= kdu_model
.find("/")
1971 repo_name
= kdu_model
[:idx
]
1974 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
1976 idx
= kdu_model
.find("/")
1978 repo_name
= kdu_model
[:idx
]
1979 # Find repository link
1980 local_repo_list
= await self
.repo_list(cluster_uuid
)
1981 for repo
in local_repo_list
:
1982 repo_url
= repo
["url"] if repo
["name"] == repo_name
else None