2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
32 from uuid
import uuid4
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
45 service_account
= "osm"
51 kubectl_command
: str = "/usr/bin/kubectl",
52 helm_command
: str = "/usr/bin/helm",
58 :param fs: file system for kubernetes and helm configuration
59 :param db: database object to write current operation status
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
63 :param on_update_db: callback called when k8s connector updates database
67 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
69 self
.log
.info("Initializing K8S Helm connector")
71 # random numbers for release name generation
72 random
.seed(time
.time())
77 # exception if kubectl is not installed
78 self
.kubectl_command
= kubectl_command
79 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
81 # exception if helm is not installed
82 self
._helm
_command
= helm_command
83 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
86 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
88 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
89 cluster_id for backward compatibility
91 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(':')
92 return namespace
, cluster_id
95 self
, k8s_creds
: str, namespace
: str = "kube-system", reuse_cluster_uuid
=None
98 It prepares a given K8s cluster environment to run Charts
100 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
102 :param namespace: optional namespace to be used for helm. By default,
103 'kube-system' will be used
104 :param reuse_cluster_uuid: existing cluster uuid for reuse
105 :return: uuid of the K8s cluster and True if connector has installed some
106 software in the cluster
107 (on error, an exception will be raised)
110 if reuse_cluster_uuid
:
111 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
112 namespace
= namespace_
or namespace
114 cluster_id
= str(uuid4())
115 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
117 self
.log
.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
))
119 paths
, env
= self
._init
_paths
_env
(
120 cluster_name
=cluster_id
, create_if_not_exist
=True
122 mode
= stat
.S_IRUSR | stat
.S_IWUSR
123 with
open(paths
["kube_config"], "w", mode
) as f
:
125 os
.chmod(paths
["kube_config"], 0o600)
127 # Code with initialization specific of helm version
128 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
130 # sync fs with local data
131 self
.fs
.reverse_sync(from_path
=cluster_id
)
133 self
.log
.info("Cluster {} initialized".format(cluster_id
))
135 return cluster_uuid
, n2vc_installed_sw
138 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
140 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
141 self
.log
.debug("Cluster {}, adding {} repository {}. URL: {}".format(
142 cluster_id
, repo_type
, name
, url
))
145 self
.fs
.sync(from_path
=cluster_id
)
148 paths
, env
= self
._init
_paths
_env
(
149 cluster_name
=cluster_id
, create_if_not_exist
=True
153 command
= "{} repo update".format(
156 self
.log
.debug("updating repo: {}".format(command
))
157 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False, env
=env
)
159 # helm repo add name url
160 command
= "{} repo add {} {}".format(
161 self
._helm
_command
, name
, url
163 self
.log
.debug("adding repo: {}".format(command
))
164 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
167 self
.fs
.reverse_sync(from_path
=cluster_id
)
169 async def repo_list(self
, cluster_uuid
: str) -> list:
171 Get the list of registered repositories
173 :return: list of registered repositories: [ (name, url) .... ]
176 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
177 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
180 self
.fs
.sync(from_path
=cluster_id
)
183 paths
, env
= self
._init
_paths
_env
(
184 cluster_name
=cluster_id
, create_if_not_exist
=True
187 command
= "{} repo list --output yaml".format(
191 # Set exception to false because if there are no repos just want an empty list
192 output
, _rc
= await self
._local
_async
_exec
(
193 command
=command
, raise_exception_on_error
=False, env
=env
197 self
.fs
.reverse_sync(from_path
=cluster_id
)
200 if output
and len(output
) > 0:
201 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
202 # unify format between helm2 and helm3 setting all keys lowercase
203 return self
._lower
_keys
_list
(repos
)
209 async def repo_remove(self
, cluster_uuid
: str, name
: str):
211 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
212 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
215 self
.fs
.sync(from_path
=cluster_id
)
218 paths
, env
= self
._init
_paths
_env
(
219 cluster_name
=cluster_id
, create_if_not_exist
=True
222 command
= "{} repo remove {}".format(
223 self
._helm
_command
, name
225 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
228 self
.fs
.reverse_sync(from_path
=cluster_id
)
231 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
234 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
235 self
.log
.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
236 .format(cluster_id
, uninstall_sw
))
239 self
.fs
.sync(from_path
=cluster_id
)
241 # uninstall releases if needed.
243 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
244 if len(releases
) > 0:
248 kdu_instance
= r
.get("name")
249 chart
= r
.get("chart")
251 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
253 await self
.uninstall(
254 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
256 except Exception as e
:
257 # will not raise exception as it was found
258 # that in some cases of previously installed helm releases it
261 "Error uninstalling release {}: {}".format(kdu_instance
, e
)
265 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
268 uninstall_sw
= False # Allow to remove k8s cluster without removing Tiller
271 await self
._uninstall
_sw
(cluster_id
, namespace
)
273 # delete cluster directory
274 self
.log
.debug("Removing directory {}".format(cluster_id
))
275 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
276 # Remove also local directorio if still exist
277 direct
= self
.fs
.path
+ "/" + cluster_id
278 shutil
.rmtree(direct
, ignore_errors
=True)
282 async def _install_impl(
289 timeout
: float = 300,
291 db_dict
: dict = None,
292 kdu_name
: str = None,
293 namespace
: str = None,
296 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
297 cluster_id
=cluster_id
, params
=params
303 parts
= kdu_model
.split(sep
=":")
305 version
= str(parts
[1])
308 # generate a name for the release. Then, check if already exists
310 while kdu_instance
is None:
311 kdu_instance
= self
._generate
_release
_name
(kdu_model
)
313 result
= await self
._status
_kdu
(
314 cluster_id
=cluster_id
,
315 kdu_instance
=kdu_instance
,
317 show_error_log
=False,
319 if result
is not None:
320 # instance already exists: generate a new one
325 command
= self
._get
_install
_command
(kdu_model
, kdu_instance
, namespace
,
326 params_str
, version
, atomic
, timeout
)
328 self
.log
.debug("installing: {}".format(command
))
331 # exec helm in a task
332 exec_task
= asyncio
.ensure_future(
333 coro_or_future
=self
._local
_async
_exec
(
334 command
=command
, raise_exception_on_error
=False, env
=env
338 # write status in another task
339 status_task
= asyncio
.ensure_future(
340 coro_or_future
=self
._store
_status
(
341 cluster_id
=cluster_id
,
342 kdu_instance
=kdu_instance
,
350 # wait for execution task
351 await asyncio
.wait([exec_task
])
356 output
, rc
= exec_task
.result()
360 output
, rc
= await self
._local
_async
_exec
(
361 command
=command
, raise_exception_on_error
=False, env
=env
364 # remove temporal values yaml file
366 os
.remove(file_to_delete
)
369 await self
._store
_status
(
370 cluster_id
=cluster_id
,
371 kdu_instance
=kdu_instance
,
380 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
382 raise K8sException(msg
)
390 kdu_model
: str = None,
392 timeout
: float = 300,
394 db_dict
: dict = None,
396 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
397 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
400 self
.fs
.sync(from_path
=cluster_id
)
402 # look for instance to obtain namespace
403 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
404 if not instance_info
:
405 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
408 paths
, env
= self
._init
_paths
_env
(
409 cluster_name
=cluster_id
, create_if_not_exist
=True
413 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
414 cluster_id
=cluster_id
, params
=params
420 parts
= kdu_model
.split(sep
=":")
422 version
= str(parts
[1])
425 command
= self
._get
_upgrade
_command
(kdu_model
, kdu_instance
, instance_info
["namespace"],
426 params_str
, version
, atomic
, timeout
)
428 self
.log
.debug("upgrading: {}".format(command
))
432 # exec helm in a task
433 exec_task
= asyncio
.ensure_future(
434 coro_or_future
=self
._local
_async
_exec
(
435 command
=command
, raise_exception_on_error
=False, env
=env
438 # write status in another task
439 status_task
= asyncio
.ensure_future(
440 coro_or_future
=self
._store
_status
(
441 cluster_id
=cluster_id
,
442 kdu_instance
=kdu_instance
,
443 namespace
=instance_info
["namespace"],
450 # wait for execution task
451 await asyncio
.wait([exec_task
])
455 output
, rc
= exec_task
.result()
459 output
, rc
= await self
._local
_async
_exec
(
460 command
=command
, raise_exception_on_error
=False, env
=env
463 # remove temporal values yaml file
465 os
.remove(file_to_delete
)
468 await self
._store
_status
(
469 cluster_id
=cluster_id
,
470 kdu_instance
=kdu_instance
,
471 namespace
=instance_info
["namespace"],
479 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
481 raise K8sException(msg
)
484 self
.fs
.reverse_sync(from_path
=cluster_id
)
486 # return new revision number
487 instance
= await self
.get_instance_info(
488 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
491 revision
= int(instance
.get("revision"))
492 self
.log
.debug("New revision: {}".format(revision
))
498 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
501 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
503 "rollback kdu_instance {} to revision {} from cluster {}".format(
504 kdu_instance
, revision
, cluster_id
509 self
.fs
.sync(from_path
=cluster_id
)
511 # look for instance to obtain namespace
512 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
513 if not instance_info
:
514 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
517 paths
, env
= self
._init
_paths
_env
(
518 cluster_name
=cluster_id
, create_if_not_exist
=True
521 command
= self
._get
_rollback
_command
(kdu_instance
, instance_info
["namespace"],
524 self
.log
.debug("rolling_back: {}".format(command
))
526 # exec helm in a task
527 exec_task
= asyncio
.ensure_future(
528 coro_or_future
=self
._local
_async
_exec
(
529 command
=command
, raise_exception_on_error
=False, env
=env
532 # write status in another task
533 status_task
= asyncio
.ensure_future(
534 coro_or_future
=self
._store
_status
(
535 cluster_id
=cluster_id
,
536 kdu_instance
=kdu_instance
,
537 namespace
=instance_info
["namespace"],
539 operation
="rollback",
544 # wait for execution task
545 await asyncio
.wait([exec_task
])
550 output
, rc
= exec_task
.result()
553 await self
._store
_status
(
554 cluster_id
=cluster_id
,
555 kdu_instance
=kdu_instance
,
556 namespace
=instance_info
["namespace"],
558 operation
="rollback",
564 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
566 raise K8sException(msg
)
569 self
.fs
.reverse_sync(from_path
=cluster_id
)
571 # return new revision number
572 instance
= await self
.get_instance_info(
573 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
576 revision
= int(instance
.get("revision"))
577 self
.log
.debug("New revision: {}".format(revision
))
582 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str):
584 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
585 (this call should happen after all _terminate-config-primitive_ of the VNF
588 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
589 :param kdu_instance: unique name for the KDU instance to be deleted
590 :return: True if successful
593 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
595 "uninstall kdu_instance {} from cluster {}".format(
596 kdu_instance
, cluster_id
601 self
.fs
.sync(from_path
=cluster_id
)
603 # look for instance to obtain namespace
604 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
605 if not instance_info
:
606 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
609 paths
, env
= self
._init
_paths
_env
(
610 cluster_name
=cluster_id
, create_if_not_exist
=True
613 command
= self
._get
_uninstall
_command
(kdu_instance
, instance_info
["namespace"])
614 output
, _rc
= await self
._local
_async
_exec
(
615 command
=command
, raise_exception_on_error
=True, env
=env
619 self
.fs
.reverse_sync(from_path
=cluster_id
)
621 return self
._output
_to
_table
(output
)
623 async def instances_list(self
, cluster_uuid
: str) -> list:
625 returns a list of deployed releases in a cluster
627 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
631 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
632 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
635 self
.fs
.sync(from_path
=cluster_id
)
637 # execute internal command
638 result
= await self
._instances
_list
(cluster_id
)
641 self
.fs
.reverse_sync(from_path
=cluster_id
)
645 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
646 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
647 for instance
in instances
:
648 if instance
.get("name") == kdu_instance
:
650 self
.log
.debug("Instance {} not found".format(kdu_instance
))
653 async def exec_primitive(
655 cluster_uuid
: str = None,
656 kdu_instance
: str = None,
657 primitive_name
: str = None,
658 timeout
: float = 300,
660 db_dict
: dict = None,
662 """Exec primitive (Juju action)
664 :param cluster_uuid: The UUID of the cluster or namespace:cluster
665 :param kdu_instance: The unique name of the KDU instance
666 :param primitive_name: Name of action that will be executed
667 :param timeout: Timeout for action execution
668 :param params: Dictionary of all the parameters needed for the action
669 :db_dict: Dictionary for any additional data
671 :return: Returns the output of the action
674 "KDUs deployed with Helm don't support actions "
675 "different from rollback, upgrade and status"
678 async def get_services(self
,
681 namespace
: str) -> list:
683 Returns a list of services defined for the specified kdu instance.
685 :param cluster_uuid: UUID of a K8s cluster known by OSM
686 :param kdu_instance: unique name for the KDU instance
687 :param namespace: K8s namespace used by the KDU instance
688 :return: If successful, it will return a list of services, Each service
689 can have the following data:
690 - `name` of the service
691 - `type` type of service in the k8 cluster
692 - `ports` List of ports offered by the service, for each port includes at least
694 - `cluster_ip` Internal ip to be used inside k8s cluster
695 - `external_ip` List of external ips (in case they are available)
698 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
700 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
701 cluster_uuid
, kdu_instance
706 self
.fs
.sync(from_path
=cluster_id
)
708 # get list of services names for kdu
709 service_names
= await self
._get
_services
(cluster_id
, kdu_instance
, namespace
)
712 for service
in service_names
:
713 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
714 service_list
.append(service
)
717 self
.fs
.reverse_sync(from_path
=cluster_id
)
721 async def get_service(self
,
724 namespace
: str) -> object:
727 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
728 service_name
, namespace
, cluster_uuid
)
731 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
734 self
.fs
.sync(from_path
=cluster_id
)
736 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
739 self
.fs
.reverse_sync(from_path
=cluster_id
)
743 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str) -> str:
746 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
747 cluster_uuid
, kdu_instance
751 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
754 self
.fs
.sync(from_path
=cluster_id
)
756 # get instance: needed to obtain namespace
757 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
758 for instance
in instances
:
759 if instance
.get("name") == kdu_instance
:
762 # instance does not exist
763 raise K8sException("Instance name: {} not found in cluster: {}".format(
764 kdu_instance
, cluster_id
))
766 status
= await self
._status
_kdu
(
767 cluster_id
=cluster_id
,
768 kdu_instance
=kdu_instance
,
769 namespace
=instance
["namespace"],
775 self
.fs
.reverse_sync(from_path
=cluster_id
)
779 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
782 "inspect kdu_model values {} from (optional) repo: {}".format(
787 return await self
._exec
_inspect
_comand
(
788 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
791 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
794 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
797 return await self
._exec
_inspect
_comand
(
798 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
801 async def synchronize_repos(self
, cluster_uuid
: str):
803 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
805 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
806 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
808 local_repo_list
= await self
.repo_list(cluster_uuid
)
809 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
811 deleted_repo_list
= []
814 # iterate over the list of repos in the database that should be
815 # added if not present
816 for repo_name
, db_repo
in db_repo_dict
.items():
818 # check if it is already present
819 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
820 repo_id
= db_repo
.get("_id")
821 if curr_repo_url
!= db_repo
["url"]:
823 self
.log
.debug("repo {} url changed, delete and and again".format(
825 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
826 deleted_repo_list
.append(repo_id
)
829 self
.log
.debug("add repo {}".format(db_repo
["name"]))
830 await self
.repo_add(cluster_uuid
, db_repo
["name"], db_repo
["url"])
831 added_repo_dict
[repo_id
] = db_repo
["name"]
832 except Exception as e
:
834 "Error adding repo id: {}, err_msg: {} ".format(
839 # Delete repos that are present but not in nbi_list
840 for repo_name
in local_repo_dict
:
841 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
842 self
.log
.debug("delete repo {}".format(repo_name
))
844 await self
.repo_remove(cluster_uuid
, repo_name
)
845 deleted_repo_list
.append(repo_name
)
846 except Exception as e
:
848 "Error deleting repo, name: {}, err_msg: {}".format(
853 return deleted_repo_list
, added_repo_dict
857 except Exception as e
:
858 # Do not raise errors synchronizing repos
859 self
.log
.error("Error synchronizing repos: {}".format(e
))
860 raise Exception("Error synchronizing repos: {}".format(e
))
862 def _get_db_repos_dict(self
, repo_ids
: list):
864 for repo_id
in repo_ids
:
865 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
866 db_repos_dict
[db_repo
["name"]] = db_repo
870 ####################################################################################
871 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
872 ####################################################################################
876 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
878 Creates and returns base cluster and kube dirs and returns them.
879 Also created helm3 dirs according to new directory specification, paths are
880 not returned but assigned to helm environment variables
882 :param cluster_name: cluster_name
883 :return: Dictionary with config_paths and dictionary with helm environment variables
887 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
889 Implements the helm version dependent cluster initialization
893 async def _instances_list(self
, cluster_id
):
895 Implements the helm version dependent helm instances list
899 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
901 Implements the helm version dependent method to obtain services from a helm instance
905 async def _status_kdu(self
, cluster_id
: str, kdu_instance
: str, namespace
: str = None,
906 show_error_log
: bool = False, return_text
: bool = False):
908 Implements the helm version dependent method to obtain status of a helm instance
912 def _get_install_command(self
, kdu_model
, kdu_instance
, namespace
,
913 params_str
, version
, atomic
, timeout
) -> str:
915 Obtain command to be executed to delete the indicated instance
919 def _get_upgrade_command(self
, kdu_model
, kdu_instance
, namespace
,
920 params_str
, version
, atomic
, timeout
) -> str:
922 Obtain command to be executed to upgrade the indicated instance
926 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
928 Obtain command to be executed to rollback the indicated instance
932 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
934 Obtain command to be executed to delete the indicated instance
938 def _get_inspect_command(self
, show_command
: str, kdu_model
: str, repo_str
: str,
941 Obtain command to be executed to obtain information about the kdu
945 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
947 Method call to uninstall cluster software for helm. This method is dependent
949 For Helm v2 it will be called when Tiller must be uninstalled
950 For Helm v3 it does nothing and does not need to be callled
954 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
956 Obtains the cluster repos identifiers
960 ####################################################################################
961 ################################### P R I V A T E ##################################
962 ####################################################################################
966 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
967 if os
.path
.exists(filename
):
970 msg
= "File {} does not exist".format(filename
)
971 if exception_if_not_exists
:
972 raise K8sException(msg
)
975 def _remove_multiple_spaces(strobj
):
976 strobj
= strobj
.strip()
978 strobj
= strobj
.replace(" ", " ")
982 def _output_to_lines(output
: str) -> list:
983 output_lines
= list()
984 lines
= output
.splitlines(keepends
=False)
988 output_lines
.append(line
)
992 def _output_to_table(output
: str) -> list:
993 output_table
= list()
994 lines
= output
.splitlines(keepends
=False)
996 line
= line
.replace("\t", " ")
998 output_table
.append(line_list
)
999 cells
= line
.split(sep
=" ")
1003 line_list
.append(cell
)
1007 def _parse_services(output
: str) -> list:
1008 lines
= output
.splitlines(keepends
=False)
1011 line
= line
.replace("\t", " ")
1012 cells
= line
.split(sep
=" ")
1013 if len(cells
) > 0 and cells
[0].startswith("service/"):
1014 elems
= cells
[0].split(sep
="/")
1016 services
.append(elems
[1])
1020 def _get_deep(dictionary
: dict, members
: tuple):
1025 value
= target
.get(m
)
1034 # find key:value in several lines
1036 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1037 for line
in p_lines
:
1039 if line
.startswith(p_key
+ ":"):
1040 parts
= line
.split(":")
1041 the_value
= parts
[1].strip()
1049 def _lower_keys_list(input_list
: list):
1051 Transform the keys in a list of dictionaries to lower case and returns a new list
1055 for dictionary
in input_list
:
1056 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1057 new_list
.append(new_dict
)
1060 def _local_exec(self
, command
: str) -> (str, int):
1061 command
= self
._remove
_multiple
_spaces
(command
)
1062 self
.log
.debug("Executing sync local command: {}".format(command
))
1063 # raise exception if fails
1066 output
= subprocess
.check_output(
1067 command
, shell
=True, universal_newlines
=True
1070 self
.log
.debug(output
)
1074 return output
, return_code
1076 async def _local_async_exec(
1079 raise_exception_on_error
: bool = False,
1080 show_error_log
: bool = True,
1081 encode_utf8
: bool = False,
1085 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1086 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1089 command
= shlex
.split(command
)
1091 environ
= os
.environ
.copy()
1096 process
= await asyncio
.create_subprocess_exec(
1097 *command
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1101 # wait for command terminate
1102 stdout
, stderr
= await process
.communicate()
1104 return_code
= process
.returncode
1108 output
= stdout
.decode("utf-8").strip()
1109 # output = stdout.decode()
1111 output
= stderr
.decode("utf-8").strip()
1112 # output = stderr.decode()
1114 if return_code
!= 0 and show_error_log
:
1116 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1119 self
.log
.debug("Return code: {}".format(return_code
))
1121 if raise_exception_on_error
and return_code
!= 0:
1122 raise K8sException(output
)
1125 output
= output
.encode("utf-8").strip()
1126 output
= str(output
).replace("\\n", "\n")
1128 return output
, return_code
1130 except asyncio
.CancelledError
:
1132 except K8sException
:
1134 except Exception as e
:
1135 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1137 if raise_exception_on_error
:
1138 raise K8sException(e
) from e
1142 async def _local_async_exec_pipe(self
,
1145 raise_exception_on_error
: bool = True,
1146 show_error_log
: bool = True,
1147 encode_utf8
: bool = False,
1150 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1151 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1152 command
= "{} | {}".format(command1
, command2
)
1153 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1156 command1
= shlex
.split(command1
)
1157 command2
= shlex
.split(command2
)
1159 environ
= os
.environ
.copy()
1164 read
, write
= os
.pipe()
1165 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1167 process_2
= await asyncio
.create_subprocess_exec(*command2
, stdin
=read
,
1168 stdout
=asyncio
.subprocess
.PIPE
,
1171 stdout
, stderr
= await process_2
.communicate()
1173 return_code
= process_2
.returncode
1177 output
= stdout
.decode("utf-8").strip()
1178 # output = stdout.decode()
1180 output
= stderr
.decode("utf-8").strip()
1181 # output = stderr.decode()
1183 if return_code
!= 0 and show_error_log
:
1185 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1188 self
.log
.debug("Return code: {}".format(return_code
))
1190 if raise_exception_on_error
and return_code
!= 0:
1191 raise K8sException(output
)
1194 output
= output
.encode("utf-8").strip()
1195 output
= str(output
).replace("\\n", "\n")
1197 return output
, return_code
1198 except asyncio
.CancelledError
:
1200 except K8sException
:
1202 except Exception as e
:
1203 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1205 if raise_exception_on_error
:
1206 raise K8sException(e
) from e
1210 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1212 Obtains the data of the specified service in the k8cluster.
1214 :param cluster_id: id of a K8s cluster known by OSM
1215 :param service_name: name of the K8s service in the specified namespace
1216 :param namespace: K8s namespace used by the KDU instance
1217 :return: If successful, it will return a service with the following data:
1218 - `name` of the service
1219 - `type` type of service in the k8 cluster
1220 - `ports` List of ports offered by the service, for each port includes at least
1221 name, port, protocol
1222 - `cluster_ip` Internal ip to be used inside k8s cluster
1223 - `external_ip` List of external ips (in case they are available)
1227 paths
, env
= self
._init
_paths
_env
(
1228 cluster_name
=cluster_id
, create_if_not_exist
=True
1231 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1232 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1235 output
, _rc
= await self
._local
_async
_exec
(
1236 command
=command
, raise_exception_on_error
=True, env
=env
1239 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1242 "name": service_name
,
1243 "type": self
._get
_deep
(data
, ("spec", "type")),
1244 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1245 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP"))
1247 if service
["type"] == "LoadBalancer":
1248 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1249 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1250 service
["external_ip"] = ip_list
1254 async def _exec_inspect_comand(
1255 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1258 Obtains information about a kdu, no cluster (no env)
1263 repo_str
= " --repo {}".format(repo_url
)
1265 idx
= kdu_model
.find("/")
1268 kdu_model
= kdu_model
[idx
:]
1271 if ":" in kdu_model
:
1272 parts
= kdu_model
.split(sep
=":")
1274 version
= "--version {}".format(str(parts
[1]))
1275 kdu_model
= parts
[0]
1277 full_command
= self
._get
_inspect
_command
(inspect_command
, kdu_model
, repo_str
, version
)
1278 output
, _rc
= await self
._local
_async
_exec
(
1279 command
=full_command
, encode_utf8
=True
1284 async def _store_status(
1289 namespace
: str = None,
1290 check_every
: float = 10,
1291 db_dict
: dict = None,
1292 run_once
: bool = False,
1296 await asyncio
.sleep(check_every
)
1297 detailed_status
= await self
._status
_kdu
(
1298 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, namespace
=namespace
,
1301 status
= detailed_status
.get("info").get("description")
1302 self
.log
.debug('KDU {} STATUS: {}.'.format(kdu_instance
, status
))
1303 # write status to db
1304 result
= await self
.write_app_status_to_db(
1307 detailed_status
=str(detailed_status
),
1308 operation
=operation
,
1311 self
.log
.info("Error writing in database. Task exiting...")
1313 except asyncio
.CancelledError
:
1314 self
.log
.debug("Task cancelled")
1316 except Exception as e
:
1317 self
.log
.debug("_store_status exception: {}".format(str(e
)), exc_info
=True)
1323 # params for use in -f file
1324 # returns values file option and filename (in order to delete it at the end)
1325 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1327 if params
and len(params
) > 0:
1328 self
._init
_paths
_env
(
1329 cluster_name
=cluster_id
, create_if_not_exist
=True
1332 def get_random_number():
1333 r
= random
.randrange(start
=1, stop
=99999999)
1341 value
= params
.get(key
)
1342 if "!!yaml" in str(value
):
1343 value
= yaml
.load(value
[7:])
1344 params2
[key
] = value
1346 values_file
= get_random_number() + ".yaml"
1347 with
open(values_file
, "w") as stream
:
1348 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1350 return "-f {}".format(values_file
), values_file
1354 # params for use in --set option
1356 def _params_to_set_option(params
: dict) -> str:
1358 if params
and len(params
) > 0:
1361 value
= params
.get(key
, None)
1362 if value
is not None:
1364 params_str
+= "--set "
1368 params_str
+= "{}={}".format(key
, value
)
1372 def _generate_release_name(chart_name
: str):
1373 # check embeded chart (file or dir)
1374 if chart_name
.startswith("/"):
1375 # extract file or directory name
1376 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1378 elif "://" in chart_name
:
1379 # extract last portion of URL
1380 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1383 for c
in chart_name
:
1384 if c
.isalpha() or c
.isnumeric():
1391 # if does not start with alpha character, prefix 'a'
1392 if not name
[0].isalpha():
1397 def get_random_number():
1398 r
= random
.randrange(start
=1, stop
=99999999)
1400 s
= s
.rjust(10, "0")
1403 name
= name
+ get_random_number()