2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
31 from uuid
import uuid4
33 from n2vc
.config
import EnvironConfig
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
46 service_account
= "osm"
52 kubectl_command
: str = "/usr/bin/kubectl",
53 helm_command
: str = "/usr/bin/helm",
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
64 :param on_update_db: callback called when k8s connector updates database
68 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
70 self
.log
.info("Initializing K8S Helm connector")
72 self
.config
= EnvironConfig()
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 # obtain stable repo url from config or apply default
88 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
89 if self
._stable
_repo
_url
== "None":
90 self
._stable
_repo
_url
= None
93 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
95 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
96 cluster_id for backward compatibility
98 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(":")
99 return namespace
, cluster_id
104 namespace
: str = "kube-system",
105 reuse_cluster_uuid
=None,
109 It prepares a given K8s cluster environment to run Charts
111 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
113 :param namespace: optional namespace to be used for helm. By default,
114 'kube-system' will be used
115 :param reuse_cluster_uuid: existing cluster uuid for reuse
116 :param kwargs: Additional parameters (None yet)
117 :return: uuid of the K8s cluster and True if connector has installed some
118 software in the cluster
119 (on error, an exception will be raised)
122 if reuse_cluster_uuid
:
123 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
124 namespace
= namespace_
or namespace
126 cluster_id
= str(uuid4())
127 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
130 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
133 paths
, env
= self
._init
_paths
_env
(
134 cluster_name
=cluster_id
, create_if_not_exist
=True
136 mode
= stat
.S_IRUSR | stat
.S_IWUSR
137 with
open(paths
["kube_config"], "w", mode
) as f
:
139 os
.chmod(paths
["kube_config"], 0o600)
141 # Code with initialization specific of helm version
142 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
144 # sync fs with local data
145 self
.fs
.reverse_sync(from_path
=cluster_id
)
147 self
.log
.info("Cluster {} initialized".format(cluster_id
))
149 return cluster_uuid
, n2vc_installed_sw
152 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
154 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
156 "Cluster {}, adding {} repository {}. URL: {}".format(
157 cluster_id
, repo_type
, name
, url
162 self
.fs
.sync(from_path
=cluster_id
)
165 paths
, env
= self
._init
_paths
_env
(
166 cluster_name
=cluster_id
, create_if_not_exist
=True
170 command
= "{} repo update".format(self
._helm
_command
)
171 self
.log
.debug("updating repo: {}".format(command
))
172 await self
._local
_async
_exec
(
173 command
=command
, raise_exception_on_error
=False, env
=env
176 # helm repo add name url
177 command
= "{} repo add {} {}".format(self
._helm
_command
, name
, url
)
178 self
.log
.debug("adding repo: {}".format(command
))
179 await self
._local
_async
_exec
(
180 command
=command
, raise_exception_on_error
=True, env
=env
184 self
.fs
.reverse_sync(from_path
=cluster_id
)
186 async def repo_list(self
, cluster_uuid
: str) -> list:
188 Get the list of registered repositories
190 :return: list of registered repositories: [ (name, url) .... ]
193 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
194 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
197 self
.fs
.sync(from_path
=cluster_id
)
200 paths
, env
= self
._init
_paths
_env
(
201 cluster_name
=cluster_id
, create_if_not_exist
=True
204 command
= "{} repo list --output yaml".format(self
._helm
_command
)
206 # Set exception to false because if there are no repos just want an empty list
207 output
, _rc
= await self
._local
_async
_exec
(
208 command
=command
, raise_exception_on_error
=False, env
=env
212 self
.fs
.reverse_sync(from_path
=cluster_id
)
215 if output
and len(output
) > 0:
216 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
217 # unify format between helm2 and helm3 setting all keys lowercase
218 return self
._lower
_keys
_list
(repos
)
224 async def repo_remove(self
, cluster_uuid
: str, name
: str):
226 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
227 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
230 self
.fs
.sync(from_path
=cluster_id
)
233 paths
, env
= self
._init
_paths
_env
(
234 cluster_name
=cluster_id
, create_if_not_exist
=True
237 command
= "{} repo remove {}".format(self
._helm
_command
, name
)
238 await self
._local
_async
_exec
(
239 command
=command
, raise_exception_on_error
=True, env
=env
243 self
.fs
.reverse_sync(from_path
=cluster_id
)
249 uninstall_sw
: bool = False,
254 Resets the Kubernetes cluster by removing the helm deployment that represents it.
256 :param cluster_uuid: The UUID of the cluster to reset
257 :param force: Boolean to force the reset
258 :param uninstall_sw: Boolean to force the reset
259 :param kwargs: Additional parameters (None yet)
260 :return: Returns True if successful or raises an exception.
262 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
264 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
265 cluster_id
, uninstall_sw
270 self
.fs
.sync(from_path
=cluster_id
)
272 # uninstall releases if needed.
274 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
275 if len(releases
) > 0:
279 kdu_instance
= r
.get("name")
280 chart
= r
.get("chart")
282 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
284 await self
.uninstall(
285 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
287 except Exception as e
:
288 # will not raise exception as it was found
289 # that in some cases of previously installed helm releases it
292 "Error uninstalling release {}: {}".format(
298 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
302 False # Allow to remove k8s cluster without removing Tiller
306 await self
._uninstall
_sw
(cluster_id
, namespace
)
308 # delete cluster directory
309 self
.log
.debug("Removing directory {}".format(cluster_id
))
310 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
311 # Remove also local directorio if still exist
312 direct
= self
.fs
.path
+ "/" + cluster_id
313 shutil
.rmtree(direct
, ignore_errors
=True)
317 async def _install_impl(
325 timeout
: float = 300,
327 db_dict
: dict = None,
328 kdu_name
: str = None,
329 namespace
: str = None,
332 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
333 cluster_id
=cluster_id
, params
=params
339 parts
= kdu_model
.split(sep
=":")
341 version
= str(parts
[1])
344 command
= self
._get
_install
_command
(
345 kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
348 self
.log
.debug("installing: {}".format(command
))
351 # exec helm in a task
352 exec_task
= asyncio
.ensure_future(
353 coro_or_future
=self
._local
_async
_exec
(
354 command
=command
, raise_exception_on_error
=False, env
=env
358 # write status in another task
359 status_task
= asyncio
.ensure_future(
360 coro_or_future
=self
._store
_status
(
361 cluster_id
=cluster_id
,
362 kdu_instance
=kdu_instance
,
370 # wait for execution task
371 await asyncio
.wait([exec_task
])
376 output
, rc
= exec_task
.result()
380 output
, rc
= await self
._local
_async
_exec
(
381 command
=command
, raise_exception_on_error
=False, env
=env
384 # remove temporal values yaml file
386 os
.remove(file_to_delete
)
389 await self
._store
_status
(
390 cluster_id
=cluster_id
,
391 kdu_instance
=kdu_instance
,
400 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
402 raise K8sException(msg
)
408 kdu_model
: str = None,
410 timeout
: float = 300,
412 db_dict
: dict = None,
414 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
415 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
418 self
.fs
.sync(from_path
=cluster_id
)
420 # look for instance to obtain namespace
421 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
422 if not instance_info
:
423 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
426 paths
, env
= self
._init
_paths
_env
(
427 cluster_name
=cluster_id
, create_if_not_exist
=True
431 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
432 cluster_id
=cluster_id
, params
=params
438 parts
= kdu_model
.split(sep
=":")
440 version
= str(parts
[1])
443 command
= self
._get
_upgrade
_command
(
446 instance_info
["namespace"],
453 self
.log
.debug("upgrading: {}".format(command
))
457 # exec helm in a task
458 exec_task
= asyncio
.ensure_future(
459 coro_or_future
=self
._local
_async
_exec
(
460 command
=command
, raise_exception_on_error
=False, env
=env
463 # write status in another task
464 status_task
= asyncio
.ensure_future(
465 coro_or_future
=self
._store
_status
(
466 cluster_id
=cluster_id
,
467 kdu_instance
=kdu_instance
,
468 namespace
=instance_info
["namespace"],
475 # wait for execution task
476 await asyncio
.wait([exec_task
])
480 output
, rc
= exec_task
.result()
484 output
, rc
= await self
._local
_async
_exec
(
485 command
=command
, raise_exception_on_error
=False, env
=env
488 # remove temporal values yaml file
490 os
.remove(file_to_delete
)
493 await self
._store
_status
(
494 cluster_id
=cluster_id
,
495 kdu_instance
=kdu_instance
,
496 namespace
=instance_info
["namespace"],
504 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
506 raise K8sException(msg
)
509 self
.fs
.reverse_sync(from_path
=cluster_id
)
511 # return new revision number
512 instance
= await self
.get_instance_info(
513 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
516 revision
= int(instance
.get("revision"))
517 self
.log
.debug("New revision: {}".format(revision
))
527 total_timeout
: float = 1800,
530 raise NotImplementedError("Method not implemented")
532 async def get_scale_count(
538 raise NotImplementedError("Method not implemented")
541 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
544 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
546 "rollback kdu_instance {} to revision {} from cluster {}".format(
547 kdu_instance
, revision
, cluster_id
552 self
.fs
.sync(from_path
=cluster_id
)
554 # look for instance to obtain namespace
555 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
556 if not instance_info
:
557 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
560 paths
, env
= self
._init
_paths
_env
(
561 cluster_name
=cluster_id
, create_if_not_exist
=True
564 command
= self
._get
_rollback
_command
(
565 kdu_instance
, instance_info
["namespace"], revision
568 self
.log
.debug("rolling_back: {}".format(command
))
570 # exec helm in a task
571 exec_task
= asyncio
.ensure_future(
572 coro_or_future
=self
._local
_async
_exec
(
573 command
=command
, raise_exception_on_error
=False, env
=env
576 # write status in another task
577 status_task
= asyncio
.ensure_future(
578 coro_or_future
=self
._store
_status
(
579 cluster_id
=cluster_id
,
580 kdu_instance
=kdu_instance
,
581 namespace
=instance_info
["namespace"],
583 operation
="rollback",
588 # wait for execution task
589 await asyncio
.wait([exec_task
])
594 output
, rc
= exec_task
.result()
597 await self
._store
_status
(
598 cluster_id
=cluster_id
,
599 kdu_instance
=kdu_instance
,
600 namespace
=instance_info
["namespace"],
602 operation
="rollback",
608 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
610 raise K8sException(msg
)
613 self
.fs
.reverse_sync(from_path
=cluster_id
)
615 # return new revision number
616 instance
= await self
.get_instance_info(
617 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
620 revision
= int(instance
.get("revision"))
621 self
.log
.debug("New revision: {}".format(revision
))
626 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
628 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
629 (this call should happen after all _terminate-config-primitive_ of the VNF
632 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
633 :param kdu_instance: unique name for the KDU instance to be deleted
634 :param kwargs: Additional parameters (None yet)
635 :return: True if successful
638 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
640 "uninstall kdu_instance {} from cluster {}".format(kdu_instance
, cluster_id
)
644 self
.fs
.sync(from_path
=cluster_id
)
646 # look for instance to obtain namespace
647 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
648 if not instance_info
:
649 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
652 paths
, env
= self
._init
_paths
_env
(
653 cluster_name
=cluster_id
, create_if_not_exist
=True
656 command
= self
._get
_uninstall
_command
(kdu_instance
, instance_info
["namespace"])
657 output
, _rc
= await self
._local
_async
_exec
(
658 command
=command
, raise_exception_on_error
=True, env
=env
662 self
.fs
.reverse_sync(from_path
=cluster_id
)
664 return self
._output
_to
_table
(output
)
666 async def instances_list(self
, cluster_uuid
: str) -> list:
668 returns a list of deployed releases in a cluster
670 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
674 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
675 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
678 self
.fs
.sync(from_path
=cluster_id
)
680 # execute internal command
681 result
= await self
._instances
_list
(cluster_id
)
684 self
.fs
.reverse_sync(from_path
=cluster_id
)
688 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
689 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
690 for instance
in instances
:
691 if instance
.get("name") == kdu_instance
:
693 self
.log
.debug("Instance {} not found".format(kdu_instance
))
696 async def exec_primitive(
698 cluster_uuid
: str = None,
699 kdu_instance
: str = None,
700 primitive_name
: str = None,
701 timeout
: float = 300,
703 db_dict
: dict = None,
706 """Exec primitive (Juju action)
708 :param cluster_uuid: The UUID of the cluster or namespace:cluster
709 :param kdu_instance: The unique name of the KDU instance
710 :param primitive_name: Name of action that will be executed
711 :param timeout: Timeout for action execution
712 :param params: Dictionary of all the parameters needed for the action
713 :db_dict: Dictionary for any additional data
714 :param kwargs: Additional parameters (None yet)
716 :return: Returns the output of the action
719 "KDUs deployed with Helm don't support actions "
720 "different from rollback, upgrade and status"
723 async def get_services(
724 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
727 Returns a list of services defined for the specified kdu instance.
729 :param cluster_uuid: UUID of a K8s cluster known by OSM
730 :param kdu_instance: unique name for the KDU instance
731 :param namespace: K8s namespace used by the KDU instance
732 :return: If successful, it will return a list of services, Each service
733 can have the following data:
734 - `name` of the service
735 - `type` type of service in the k8 cluster
736 - `ports` List of ports offered by the service, for each port includes at least
738 - `cluster_ip` Internal ip to be used inside k8s cluster
739 - `external_ip` List of external ips (in case they are available)
742 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
744 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
745 cluster_uuid
, kdu_instance
750 self
.fs
.sync(from_path
=cluster_id
)
752 # get list of services names for kdu
753 service_names
= await self
._get
_services
(cluster_id
, kdu_instance
, namespace
)
756 for service
in service_names
:
757 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
758 service_list
.append(service
)
761 self
.fs
.reverse_sync(from_path
=cluster_id
)
765 async def get_service(
766 self
, cluster_uuid
: str, service_name
: str, namespace
: str
770 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
771 service_name
, namespace
, cluster_uuid
775 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
778 self
.fs
.sync(from_path
=cluster_id
)
780 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
783 self
.fs
.reverse_sync(from_path
=cluster_id
)
787 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
) -> str:
789 This call would retrieve tha current state of a given KDU instance. It would be
790 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
791 values_ of the configuration parameters applied to a given instance. This call
792 would be based on the `status` call.
794 :param cluster_uuid: UUID of a K8s cluster known by OSM
795 :param kdu_instance: unique name for the KDU instance
796 :param kwargs: Additional parameters (None yet)
797 :return: If successful, it will return the following vector of arguments:
798 - K8s `namespace` in the cluster where the KDU lives
799 - `state` of the KDU instance. It can be:
806 - List of `resources` (objects) that this release consists of, sorted by kind,
807 and the status of those resources
808 - Last `deployment_time`.
812 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
813 cluster_uuid
, kdu_instance
817 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
820 self
.fs
.sync(from_path
=cluster_id
)
822 # get instance: needed to obtain namespace
823 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
824 for instance
in instances
:
825 if instance
.get("name") == kdu_instance
:
828 # instance does not exist
830 "Instance name: {} not found in cluster: {}".format(
831 kdu_instance
, cluster_id
835 status
= await self
._status
_kdu
(
836 cluster_id
=cluster_id
,
837 kdu_instance
=kdu_instance
,
838 namespace
=instance
["namespace"],
844 self
.fs
.reverse_sync(from_path
=cluster_id
)
848 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
851 "inspect kdu_model values {} from (optional) repo: {}".format(
856 return await self
._exec
_inspect
_comand
(
857 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
860 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
863 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
866 return await self
._exec
_inspect
_comand
(
867 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
870 async def synchronize_repos(self
, cluster_uuid
: str):
872 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
874 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
875 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
877 local_repo_list
= await self
.repo_list(cluster_uuid
)
878 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
880 deleted_repo_list
= []
883 # iterate over the list of repos in the database that should be
884 # added if not present
885 for repo_name
, db_repo
in db_repo_dict
.items():
887 # check if it is already present
888 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
889 repo_id
= db_repo
.get("_id")
890 if curr_repo_url
!= db_repo
["url"]:
893 "repo {} url changed, delete and and again".format(
897 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
898 deleted_repo_list
.append(repo_id
)
901 self
.log
.debug("add repo {}".format(db_repo
["name"]))
903 cluster_uuid
, db_repo
["name"], db_repo
["url"]
905 added_repo_dict
[repo_id
] = db_repo
["name"]
906 except Exception as e
:
908 "Error adding repo id: {}, err_msg: {} ".format(
913 # Delete repos that are present but not in nbi_list
914 for repo_name
in local_repo_dict
:
915 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
916 self
.log
.debug("delete repo {}".format(repo_name
))
918 await self
.repo_remove(cluster_uuid
, repo_name
)
919 deleted_repo_list
.append(repo_name
)
920 except Exception as e
:
922 "Error deleting repo, name: {}, err_msg: {}".format(
927 return deleted_repo_list
, added_repo_dict
931 except Exception as e
:
932 # Do not raise errors synchronizing repos
933 self
.log
.error("Error synchronizing repos: {}".format(e
))
934 raise Exception("Error synchronizing repos: {}".format(e
))
936 def _get_db_repos_dict(self
, repo_ids
: list):
938 for repo_id
in repo_ids
:
939 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
940 db_repos_dict
[db_repo
["name"]] = db_repo
944 ####################################################################################
945 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
946 ####################################################################################
950 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
952 Creates and returns base cluster and kube dirs and returns them.
953 Also created helm3 dirs according to new directory specification, paths are
954 not returned but assigned to helm environment variables
956 :param cluster_name: cluster_name
957 :return: Dictionary with config_paths and dictionary with helm environment variables
961 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
963 Implements the helm version dependent cluster initialization
967 async def _instances_list(self
, cluster_id
):
969 Implements the helm version dependent helm instances list
973 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
975 Implements the helm version dependent method to obtain services from a helm instance
979 async def _status_kdu(
983 namespace
: str = None,
984 show_error_log
: bool = False,
985 return_text
: bool = False,
988 Implements the helm version dependent method to obtain status of a helm instance
992 def _get_install_command(
993 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
996 Obtain command to be executed to delete the indicated instance
1000 def _get_upgrade_command(
1001 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
1004 Obtain command to be executed to upgrade the indicated instance
1008 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
1010 Obtain command to be executed to rollback the indicated instance
1014 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
1016 Obtain command to be executed to delete the indicated instance
1020 def _get_inspect_command(
1021 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1024 Obtain command to be executed to obtain information about the kdu
1028 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1030 Method call to uninstall cluster software for helm. This method is dependent
1032 For Helm v2 it will be called when Tiller must be uninstalled
1033 For Helm v3 it does nothing and does not need to be callled
1037 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1039 Obtains the cluster repos identifiers
1043 ####################################################################################
1044 ################################### P R I V A T E ##################################
1045 ####################################################################################
1049 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1050 if os
.path
.exists(filename
):
1053 msg
= "File {} does not exist".format(filename
)
1054 if exception_if_not_exists
:
1055 raise K8sException(msg
)
1058 def _remove_multiple_spaces(strobj
):
1059 strobj
= strobj
.strip()
1060 while " " in strobj
:
1061 strobj
= strobj
.replace(" ", " ")
1065 def _output_to_lines(output
: str) -> list:
1066 output_lines
= list()
1067 lines
= output
.splitlines(keepends
=False)
1071 output_lines
.append(line
)
1075 def _output_to_table(output
: str) -> list:
1076 output_table
= list()
1077 lines
= output
.splitlines(keepends
=False)
1079 line
= line
.replace("\t", " ")
1081 output_table
.append(line_list
)
1082 cells
= line
.split(sep
=" ")
1086 line_list
.append(cell
)
1090 def _parse_services(output
: str) -> list:
1091 lines
= output
.splitlines(keepends
=False)
1094 line
= line
.replace("\t", " ")
1095 cells
= line
.split(sep
=" ")
1096 if len(cells
) > 0 and cells
[0].startswith("service/"):
1097 elems
= cells
[0].split(sep
="/")
1099 services
.append(elems
[1])
1103 def _get_deep(dictionary
: dict, members
: tuple):
1108 value
= target
.get(m
)
1117 # find key:value in several lines
1119 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1120 for line
in p_lines
:
1122 if line
.startswith(p_key
+ ":"):
1123 parts
= line
.split(":")
1124 the_value
= parts
[1].strip()
1132 def _lower_keys_list(input_list
: list):
1134 Transform the keys in a list of dictionaries to lower case and returns a new list
1139 for dictionary
in input_list
:
1140 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1141 new_list
.append(new_dict
)
1144 async def _local_async_exec(
1147 raise_exception_on_error
: bool = False,
1148 show_error_log
: bool = True,
1149 encode_utf8
: bool = False,
1153 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1155 "Executing async local command: {}, env: {}".format(command
, env
)
1159 command
= shlex
.split(command
)
1161 environ
= os
.environ
.copy()
1166 process
= await asyncio
.create_subprocess_exec(
1168 stdout
=asyncio
.subprocess
.PIPE
,
1169 stderr
=asyncio
.subprocess
.PIPE
,
1173 # wait for command terminate
1174 stdout
, stderr
= await process
.communicate()
1176 return_code
= process
.returncode
1180 output
= stdout
.decode("utf-8").strip()
1181 # output = stdout.decode()
1183 output
= stderr
.decode("utf-8").strip()
1184 # output = stderr.decode()
1186 if return_code
!= 0 and show_error_log
:
1188 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1191 self
.log
.debug("Return code: {}".format(return_code
))
1193 if raise_exception_on_error
and return_code
!= 0:
1194 raise K8sException(output
)
1197 output
= output
.encode("utf-8").strip()
1198 output
= str(output
).replace("\\n", "\n")
1200 return output
, return_code
1202 except asyncio
.CancelledError
:
1204 except K8sException
:
1206 except Exception as e
:
1207 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1209 if raise_exception_on_error
:
1210 raise K8sException(e
) from e
1214 async def _local_async_exec_pipe(
1218 raise_exception_on_error
: bool = True,
1219 show_error_log
: bool = True,
1220 encode_utf8
: bool = False,
1224 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1225 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1226 command
= "{} | {}".format(command1
, command2
)
1228 "Executing async local command: {}, env: {}".format(command
, env
)
1232 command1
= shlex
.split(command1
)
1233 command2
= shlex
.split(command2
)
1235 environ
= os
.environ
.copy()
1240 read
, write
= os
.pipe()
1241 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1243 process_2
= await asyncio
.create_subprocess_exec(
1244 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1247 stdout
, stderr
= await process_2
.communicate()
1249 return_code
= process_2
.returncode
1253 output
= stdout
.decode("utf-8").strip()
1254 # output = stdout.decode()
1256 output
= stderr
.decode("utf-8").strip()
1257 # output = stderr.decode()
1259 if return_code
!= 0 and show_error_log
:
1261 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1264 self
.log
.debug("Return code: {}".format(return_code
))
1266 if raise_exception_on_error
and return_code
!= 0:
1267 raise K8sException(output
)
1270 output
= output
.encode("utf-8").strip()
1271 output
= str(output
).replace("\\n", "\n")
1273 return output
, return_code
1274 except asyncio
.CancelledError
:
1276 except K8sException
:
1278 except Exception as e
:
1279 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1281 if raise_exception_on_error
:
1282 raise K8sException(e
) from e
1286 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1288 Obtains the data of the specified service in the k8cluster.
1290 :param cluster_id: id of a K8s cluster known by OSM
1291 :param service_name: name of the K8s service in the specified namespace
1292 :param namespace: K8s namespace used by the KDU instance
1293 :return: If successful, it will return a service with the following data:
1294 - `name` of the service
1295 - `type` type of service in the k8 cluster
1296 - `ports` List of ports offered by the service, for each port includes at least
1297 name, port, protocol
1298 - `cluster_ip` Internal ip to be used inside k8s cluster
1299 - `external_ip` List of external ips (in case they are available)
1303 paths
, env
= self
._init
_paths
_env
(
1304 cluster_name
=cluster_id
, create_if_not_exist
=True
1307 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1308 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1311 output
, _rc
= await self
._local
_async
_exec
(
1312 command
=command
, raise_exception_on_error
=True, env
=env
1315 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1318 "name": service_name
,
1319 "type": self
._get
_deep
(data
, ("spec", "type")),
1320 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1321 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1323 if service
["type"] == "LoadBalancer":
1324 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1325 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1326 service
["external_ip"] = ip_list
1330 async def _exec_inspect_comand(
1331 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1334 Obtains information about a kdu, no cluster (no env)
1339 repo_str
= " --repo {}".format(repo_url
)
1341 idx
= kdu_model
.find("/")
1344 kdu_model
= kdu_model
[idx
:]
1347 if ":" in kdu_model
:
1348 parts
= kdu_model
.split(sep
=":")
1350 version
= "--version {}".format(str(parts
[1]))
1351 kdu_model
= parts
[0]
1353 full_command
= self
._get
_inspect
_command
(
1354 inspect_command
, kdu_model
, repo_str
, version
1356 output
, _rc
= await self
._local
_async
_exec
(
1357 command
=full_command
, encode_utf8
=True
1362 async def _store_status(
1367 namespace
: str = None,
1368 check_every
: float = 10,
1369 db_dict
: dict = None,
1370 run_once
: bool = False,
1374 await asyncio
.sleep(check_every
)
1375 detailed_status
= await self
._status
_kdu
(
1376 cluster_id
=cluster_id
,
1377 kdu_instance
=kdu_instance
,
1378 namespace
=namespace
,
1381 status
= detailed_status
.get("info").get("description")
1382 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1383 # write status to db
1384 result
= await self
.write_app_status_to_db(
1387 detailed_status
=str(detailed_status
),
1388 operation
=operation
,
1391 self
.log
.info("Error writing in database. Task exiting...")
1393 except asyncio
.CancelledError
:
1394 self
.log
.debug("Task cancelled")
1396 except Exception as e
:
1398 "_store_status exception: {}".format(str(e
)), exc_info
=True
1405 # params for use in -f file
1406 # returns values file option and filename (in order to delete it at the end)
1407 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1409 if params
and len(params
) > 0:
1410 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1412 def get_random_number():
1413 r
= random
.randrange(start
=1, stop
=99999999)
1421 value
= params
.get(key
)
1422 if "!!yaml" in str(value
):
1423 value
= yaml
.load(value
[7:])
1424 params2
[key
] = value
1426 values_file
= get_random_number() + ".yaml"
1427 with
open(values_file
, "w") as stream
:
1428 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1430 return "-f {}".format(values_file
), values_file
1434 # params for use in --set option
1436 def _params_to_set_option(params
: dict) -> str:
1438 if params
and len(params
) > 0:
1441 value
= params
.get(key
, None)
1442 if value
is not None:
1444 params_str
+= "--set "
1448 params_str
+= "{}={}".format(key
, value
)
1452 def generate_kdu_instance_name(**kwargs
):
1453 chart_name
= kwargs
["kdu_model"]
1454 # check embeded chart (file or dir)
1455 if chart_name
.startswith("/"):
1456 # extract file or directory name
1457 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1459 elif "://" in chart_name
:
1460 # extract last portion of URL
1461 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1464 for c
in chart_name
:
1465 if c
.isalpha() or c
.isnumeric():
1472 # if does not start with alpha character, prefix 'a'
1473 if not name
[0].isalpha():
1478 def get_random_number():
1479 r
= random
.randrange(start
=1, stop
=99999999)
1481 s
= s
.rjust(10, "0")
1484 name
= name
+ get_random_number()