2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 def _get_namespace(self
, cluster_uuid
: str) -> str:
95 Obtains the namespace used by the cluster with the uuid passed by argument
97 param: cluster_uuid: cluster's uuid
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster
= self
.db
.get_one(
102 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
104 return k8scluster
.get("namespace")
109 namespace
: str = "kube-system",
110 reuse_cluster_uuid
=None,
114 It prepares a given K8s cluster environment to run Charts
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
127 if reuse_cluster_uuid
:
128 cluster_id
= reuse_cluster_uuid
130 cluster_id
= str(uuid4())
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
136 paths
, env
= self
._init
_paths
_env
(
137 cluster_name
=cluster_id
, create_if_not_exist
=True
139 mode
= stat
.S_IRUSR | stat
.S_IWUSR
140 with
open(paths
["kube_config"], "w", mode
) as f
:
142 os
.chmod(paths
["kube_config"], 0o600)
144 # Code with initialization specific of helm version
145 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
147 # sync fs with local data
148 self
.fs
.reverse_sync(from_path
=cluster_id
)
150 self
.log
.info("Cluster {} initialized".format(cluster_id
))
152 return cluster_id
, n2vc_installed_sw
155 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
158 "Cluster {}, adding {} repository {}. URL: {}".format(
159 cluster_uuid
, repo_type
, name
, url
164 paths
, env
= self
._init
_paths
_env
(
165 cluster_name
=cluster_uuid
, create_if_not_exist
=True
169 self
.fs
.sync(from_path
=cluster_uuid
)
172 command
= "env KUBECONFIG={} {} repo update".format(
173 paths
["kube_config"], self
._helm
_command
175 self
.log
.debug("updating repo: {}".format(command
))
176 await self
._local
_async
_exec
(
177 command
=command
, raise_exception_on_error
=False, env
=env
180 # helm repo add name url
181 command
= "env KUBECONFIG={} {} repo add {} {}".format(
182 paths
["kube_config"], self
._helm
_command
, name
, url
184 self
.log
.debug("adding repo: {}".format(command
))
185 await self
._local
_async
_exec
(
186 command
=command
, raise_exception_on_error
=True, env
=env
190 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
192 async def repo_list(self
, cluster_uuid
: str) -> list:
194 Get the list of registered repositories
196 :return: list of registered repositories: [ (name, url) .... ]
199 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
202 paths
, env
= self
._init
_paths
_env
(
203 cluster_name
=cluster_uuid
, create_if_not_exist
=True
207 self
.fs
.sync(from_path
=cluster_uuid
)
209 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
210 paths
["kube_config"], self
._helm
_command
213 # Set exception to false because if there are no repos just want an empty list
214 output
, _rc
= await self
._local
_async
_exec
(
215 command
=command
, raise_exception_on_error
=False, env
=env
219 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
222 if output
and len(output
) > 0:
223 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
224 # unify format between helm2 and helm3 setting all keys lowercase
225 return self
._lower
_keys
_list
(repos
)
231 async def repo_remove(self
, cluster_uuid
: str, name
: str):
233 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
237 paths
, env
= self
._init
_paths
_env
(
238 cluster_name
=cluster_uuid
, create_if_not_exist
=True
242 self
.fs
.sync(from_path
=cluster_uuid
)
244 command
= "env KUBECONFIG={} {} repo remove {}".format(
245 paths
["kube_config"], self
._helm
_command
, name
247 await self
._local
_async
_exec
(
248 command
=command
, raise_exception_on_error
=True, env
=env
252 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
258 uninstall_sw
: bool = False,
263 Resets the Kubernetes cluster by removing the helm deployment that represents it.
265 :param cluster_uuid: The UUID of the cluster to reset
266 :param force: Boolean to force the reset
267 :param uninstall_sw: Boolean to force the reset
268 :param kwargs: Additional parameters (None yet)
269 :return: Returns True if successful or raises an exception.
271 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
273 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
274 cluster_uuid
, uninstall_sw
279 self
.fs
.sync(from_path
=cluster_uuid
)
281 # uninstall releases if needed.
283 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
284 if len(releases
) > 0:
288 kdu_instance
= r
.get("name")
289 chart
= r
.get("chart")
291 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
293 await self
.uninstall(
294 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
296 except Exception as e
:
297 # will not raise exception as it was found
298 # that in some cases of previously installed helm releases it
301 "Error uninstalling release {}: {}".format(
307 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
308 ).format(cluster_uuid
)
311 False # Allow to remove k8s cluster without removing Tiller
315 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
317 # delete cluster directory
318 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
319 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
320 # Remove also local directorio if still exist
321 direct
= self
.fs
.path
+ "/" + cluster_uuid
322 shutil
.rmtree(direct
, ignore_errors
=True)
326 async def _install_impl(
334 timeout
: float = 300,
336 db_dict
: dict = None,
337 kdu_name
: str = None,
338 namespace
: str = None,
341 paths
, env
= self
._init
_paths
_env
(
342 cluster_name
=cluster_id
, create_if_not_exist
=True
346 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
347 cluster_id
=cluster_id
, params
=params
353 parts
= kdu_model
.split(sep
=":")
355 version
= str(parts
[1])
358 command
= self
._get
_install
_command
(
366 paths
["kube_config"],
369 self
.log
.debug("installing: {}".format(command
))
372 # exec helm in a task
373 exec_task
= asyncio
.ensure_future(
374 coro_or_future
=self
._local
_async
_exec
(
375 command
=command
, raise_exception_on_error
=False, env
=env
379 # write status in another task
380 status_task
= asyncio
.ensure_future(
381 coro_or_future
=self
._store
_status
(
382 cluster_id
=cluster_id
,
383 kdu_instance
=kdu_instance
,
391 # wait for execution task
392 await asyncio
.wait([exec_task
])
397 output
, rc
= exec_task
.result()
401 output
, rc
= await self
._local
_async
_exec
(
402 command
=command
, raise_exception_on_error
=False, env
=env
405 # remove temporal values yaml file
407 os
.remove(file_to_delete
)
410 await self
._store
_status
(
411 cluster_id
=cluster_id
,
412 kdu_instance
=kdu_instance
,
421 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
423 raise K8sException(msg
)
429 kdu_model
: str = None,
431 timeout
: float = 300,
433 db_dict
: dict = None,
435 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
438 self
.fs
.sync(from_path
=cluster_uuid
)
440 # look for instance to obtain namespace
441 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
442 if not instance_info
:
443 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
446 paths
, env
= self
._init
_paths
_env
(
447 cluster_name
=cluster_uuid
, create_if_not_exist
=True
451 self
.fs
.sync(from_path
=cluster_uuid
)
454 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
455 cluster_id
=cluster_uuid
, params
=params
461 parts
= kdu_model
.split(sep
=":")
463 version
= str(parts
[1])
466 command
= self
._get
_upgrade
_command
(
469 instance_info
["namespace"],
474 paths
["kube_config"],
477 self
.log
.debug("upgrading: {}".format(command
))
481 # exec helm in a task
482 exec_task
= asyncio
.ensure_future(
483 coro_or_future
=self
._local
_async
_exec
(
484 command
=command
, raise_exception_on_error
=False, env
=env
487 # write status in another task
488 status_task
= asyncio
.ensure_future(
489 coro_or_future
=self
._store
_status
(
490 cluster_id
=cluster_uuid
,
491 kdu_instance
=kdu_instance
,
492 namespace
=instance_info
["namespace"],
499 # wait for execution task
500 await asyncio
.wait([exec_task
])
504 output
, rc
= exec_task
.result()
508 output
, rc
= await self
._local
_async
_exec
(
509 command
=command
, raise_exception_on_error
=False, env
=env
512 # remove temporal values yaml file
514 os
.remove(file_to_delete
)
517 await self
._store
_status
(
518 cluster_id
=cluster_uuid
,
519 kdu_instance
=kdu_instance
,
520 namespace
=instance_info
["namespace"],
528 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
530 raise K8sException(msg
)
533 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
535 # return new revision number
536 instance
= await self
.get_instance_info(
537 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
540 revision
= int(instance
.get("revision"))
541 self
.log
.debug("New revision: {}".format(revision
))
551 total_timeout
: float = 1800,
554 raise NotImplementedError("Method not implemented")
556 async def get_scale_count(
562 raise NotImplementedError("Method not implemented")
565 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
568 "rollback kdu_instance {} to revision {} from cluster {}".format(
569 kdu_instance
, revision
, cluster_uuid
574 self
.fs
.sync(from_path
=cluster_uuid
)
576 # look for instance to obtain namespace
577 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
578 if not instance_info
:
579 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
582 paths
, env
= self
._init
_paths
_env
(
583 cluster_name
=cluster_uuid
, create_if_not_exist
=True
587 self
.fs
.sync(from_path
=cluster_uuid
)
589 command
= self
._get
_rollback
_command
(
590 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
593 self
.log
.debug("rolling_back: {}".format(command
))
595 # exec helm in a task
596 exec_task
= asyncio
.ensure_future(
597 coro_or_future
=self
._local
_async
_exec
(
598 command
=command
, raise_exception_on_error
=False, env
=env
601 # write status in another task
602 status_task
= asyncio
.ensure_future(
603 coro_or_future
=self
._store
_status
(
604 cluster_id
=cluster_uuid
,
605 kdu_instance
=kdu_instance
,
606 namespace
=instance_info
["namespace"],
608 operation
="rollback",
613 # wait for execution task
614 await asyncio
.wait([exec_task
])
619 output
, rc
= exec_task
.result()
622 await self
._store
_status
(
623 cluster_id
=cluster_uuid
,
624 kdu_instance
=kdu_instance
,
625 namespace
=instance_info
["namespace"],
627 operation
="rollback",
633 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
635 raise K8sException(msg
)
638 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
640 # return new revision number
641 instance
= await self
.get_instance_info(
642 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
645 revision
= int(instance
.get("revision"))
646 self
.log
.debug("New revision: {}".format(revision
))
651 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
653 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
654 (this call should happen after all _terminate-config-primitive_ of the VNF
657 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
658 :param kdu_instance: unique name for the KDU instance to be deleted
659 :param kwargs: Additional parameters (None yet)
660 :return: True if successful
664 "uninstall kdu_instance {} from cluster {}".format(
665 kdu_instance
, cluster_uuid
670 self
.fs
.sync(from_path
=cluster_uuid
)
672 # look for instance to obtain namespace
673 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
674 if not instance_info
:
675 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
678 paths
, env
= self
._init
_paths
_env
(
679 cluster_name
=cluster_uuid
, create_if_not_exist
=True
683 self
.fs
.sync(from_path
=cluster_uuid
)
685 command
= self
._get
_uninstall
_command
(
686 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
688 output
, _rc
= await self
._local
_async
_exec
(
689 command
=command
, raise_exception_on_error
=True, env
=env
693 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
695 return self
._output
_to
_table
(output
)
697 async def instances_list(self
, cluster_uuid
: str) -> list:
699 returns a list of deployed releases in a cluster
701 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
705 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
708 self
.fs
.sync(from_path
=cluster_uuid
)
710 # execute internal command
711 result
= await self
._instances
_list
(cluster_uuid
)
714 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
718 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
719 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
720 for instance
in instances
:
721 if instance
.get("name") == kdu_instance
:
723 self
.log
.debug("Instance {} not found".format(kdu_instance
))
726 async def exec_primitive(
728 cluster_uuid
: str = None,
729 kdu_instance
: str = None,
730 primitive_name
: str = None,
731 timeout
: float = 300,
733 db_dict
: dict = None,
736 """Exec primitive (Juju action)
738 :param cluster_uuid: The UUID of the cluster or namespace:cluster
739 :param kdu_instance: The unique name of the KDU instance
740 :param primitive_name: Name of action that will be executed
741 :param timeout: Timeout for action execution
742 :param params: Dictionary of all the parameters needed for the action
743 :db_dict: Dictionary for any additional data
744 :param kwargs: Additional parameters (None yet)
746 :return: Returns the output of the action
749 "KDUs deployed with Helm don't support actions "
750 "different from rollback, upgrade and status"
753 async def get_services(
754 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
757 Returns a list of services defined for the specified kdu instance.
759 :param cluster_uuid: UUID of a K8s cluster known by OSM
760 :param kdu_instance: unique name for the KDU instance
761 :param namespace: K8s namespace used by the KDU instance
762 :return: If successful, it will return a list of services, Each service
763 can have the following data:
764 - `name` of the service
765 - `type` type of service in the k8 cluster
766 - `ports` List of ports offered by the service, for each port includes at least
768 - `cluster_ip` Internal ip to be used inside k8s cluster
769 - `external_ip` List of external ips (in case they are available)
773 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
774 cluster_uuid
, kdu_instance
779 paths
, env
= self
._init
_paths
_env
(
780 cluster_name
=cluster_uuid
, create_if_not_exist
=True
784 self
.fs
.sync(from_path
=cluster_uuid
)
786 # get list of services names for kdu
787 service_names
= await self
._get
_services
(
788 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
792 for service
in service_names
:
793 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
794 service_list
.append(service
)
797 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
801 async def get_service(
802 self
, cluster_uuid
: str, service_name
: str, namespace
: str
806 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
807 service_name
, namespace
, cluster_uuid
812 self
.fs
.sync(from_path
=cluster_uuid
)
814 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
817 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
821 async def status_kdu(
822 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
823 ) -> Union
[str, dict]:
825 This call would retrieve tha current state of a given KDU instance. It would be
826 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
827 values_ of the configuration parameters applied to a given instance. This call
828 would be based on the `status` call.
830 :param cluster_uuid: UUID of a K8s cluster known by OSM
831 :param kdu_instance: unique name for the KDU instance
832 :param kwargs: Additional parameters (None yet)
833 :param yaml_format: if the return shall be returned as an YAML string or as a
835 :return: If successful, it will return the following vector of arguments:
836 - K8s `namespace` in the cluster where the KDU lives
837 - `state` of the KDU instance. It can be:
844 - List of `resources` (objects) that this release consists of, sorted by kind,
845 and the status of those resources
846 - Last `deployment_time`.
850 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
851 cluster_uuid
, kdu_instance
856 self
.fs
.sync(from_path
=cluster_uuid
)
858 # get instance: needed to obtain namespace
859 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
860 for instance
in instances
:
861 if instance
.get("name") == kdu_instance
:
864 # instance does not exist
866 "Instance name: {} not found in cluster: {}".format(
867 kdu_instance
, cluster_uuid
871 status
= await self
._status
_kdu
(
872 cluster_id
=cluster_uuid
,
873 kdu_instance
=kdu_instance
,
874 namespace
=instance
["namespace"],
875 yaml_format
=yaml_format
,
880 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
884 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
887 "inspect kdu_model values {} from (optional) repo: {}".format(
892 return await self
._exec
_inspect
_comand
(
893 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
896 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
899 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
902 return await self
._exec
_inspect
_comand
(
903 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
906 async def synchronize_repos(self
, cluster_uuid
: str):
908 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
910 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
911 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
913 local_repo_list
= await self
.repo_list(cluster_uuid
)
914 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
916 deleted_repo_list
= []
919 # iterate over the list of repos in the database that should be
920 # added if not present
921 for repo_name
, db_repo
in db_repo_dict
.items():
923 # check if it is already present
924 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
925 repo_id
= db_repo
.get("_id")
926 if curr_repo_url
!= db_repo
["url"]:
929 "repo {} url changed, delete and and again".format(
933 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
934 deleted_repo_list
.append(repo_id
)
937 self
.log
.debug("add repo {}".format(db_repo
["name"]))
939 cluster_uuid
, db_repo
["name"], db_repo
["url"]
941 added_repo_dict
[repo_id
] = db_repo
["name"]
942 except Exception as e
:
944 "Error adding repo id: {}, err_msg: {} ".format(
949 # Delete repos that are present but not in nbi_list
950 for repo_name
in local_repo_dict
:
951 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
952 self
.log
.debug("delete repo {}".format(repo_name
))
954 await self
.repo_remove(cluster_uuid
, repo_name
)
955 deleted_repo_list
.append(repo_name
)
956 except Exception as e
:
958 "Error deleting repo, name: {}, err_msg: {}".format(
963 return deleted_repo_list
, added_repo_dict
967 except Exception as e
:
968 # Do not raise errors synchronizing repos
969 self
.log
.error("Error synchronizing repos: {}".format(e
))
970 raise Exception("Error synchronizing repos: {}".format(e
))
972 def _get_db_repos_dict(self
, repo_ids
: list):
974 for repo_id
in repo_ids
:
975 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
976 db_repos_dict
[db_repo
["name"]] = db_repo
980 ####################################################################################
981 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
982 ####################################################################################
986 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
988 Creates and returns base cluster and kube dirs and returns them.
989 Also created helm3 dirs according to new directory specification, paths are
990 not returned but assigned to helm environment variables
992 :param cluster_name: cluster_name
993 :return: Dictionary with config_paths and dictionary with helm environment variables
997 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
999 Implements the helm version dependent cluster initialization
1003 async def _instances_list(self
, cluster_id
):
1005 Implements the helm version dependent helm instances list
1009 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1011 Implements the helm version dependent method to obtain services from a helm instance
1015 async def _status_kdu(
1019 namespace
: str = None,
1020 yaml_format
: bool = False,
1021 show_error_log
: bool = False,
1022 ) -> Union
[str, dict]:
1024 Implements the helm version dependent method to obtain status of a helm instance
1028 def _get_install_command(
1040 Obtain command to be executed to delete the indicated instance
1044 def _get_upgrade_command(
1056 Obtain command to be executed to upgrade the indicated instance
1060 def _get_rollback_command(
1061 self
, kdu_instance
, namespace
, revision
, kubeconfig
1064 Obtain command to be executed to rollback the indicated instance
1068 def _get_uninstall_command(
1069 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1072 Obtain command to be executed to delete the indicated instance
1076 def _get_inspect_command(
1077 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1080 Obtain command to be executed to obtain information about the kdu
1084 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1086 Method call to uninstall cluster software for helm. This method is dependent
1088 For Helm v2 it will be called when Tiller must be uninstalled
1089 For Helm v3 it does nothing and does not need to be callled
1093 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1095 Obtains the cluster repos identifiers
1099 ####################################################################################
1100 ################################### P R I V A T E ##################################
1101 ####################################################################################
1105 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1106 if os
.path
.exists(filename
):
1109 msg
= "File {} does not exist".format(filename
)
1110 if exception_if_not_exists
:
1111 raise K8sException(msg
)
1114 def _remove_multiple_spaces(strobj
):
1115 strobj
= strobj
.strip()
1116 while " " in strobj
:
1117 strobj
= strobj
.replace(" ", " ")
1121 def _output_to_lines(output
: str) -> list:
1122 output_lines
= list()
1123 lines
= output
.splitlines(keepends
=False)
1127 output_lines
.append(line
)
1131 def _output_to_table(output
: str) -> list:
1132 output_table
= list()
1133 lines
= output
.splitlines(keepends
=False)
1135 line
= line
.replace("\t", " ")
1137 output_table
.append(line_list
)
1138 cells
= line
.split(sep
=" ")
1142 line_list
.append(cell
)
1146 def _parse_services(output
: str) -> list:
1147 lines
= output
.splitlines(keepends
=False)
1150 line
= line
.replace("\t", " ")
1151 cells
= line
.split(sep
=" ")
1152 if len(cells
) > 0 and cells
[0].startswith("service/"):
1153 elems
= cells
[0].split(sep
="/")
1155 services
.append(elems
[1])
1159 def _get_deep(dictionary
: dict, members
: tuple):
1164 value
= target
.get(m
)
1173 # find key:value in several lines
1175 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1176 for line
in p_lines
:
1178 if line
.startswith(p_key
+ ":"):
1179 parts
= line
.split(":")
1180 the_value
= parts
[1].strip()
1188 def _lower_keys_list(input_list
: list):
1190 Transform the keys in a list of dictionaries to lower case and returns a new list
1195 for dictionary
in input_list
:
1196 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1197 new_list
.append(new_dict
)
1200 async def _local_async_exec(
1203 raise_exception_on_error
: bool = False,
1204 show_error_log
: bool = True,
1205 encode_utf8
: bool = False,
1209 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1211 "Executing async local command: {}, env: {}".format(command
, env
)
1215 command
= shlex
.split(command
)
1217 environ
= os
.environ
.copy()
1222 process
= await asyncio
.create_subprocess_exec(
1224 stdout
=asyncio
.subprocess
.PIPE
,
1225 stderr
=asyncio
.subprocess
.PIPE
,
1229 # wait for command terminate
1230 stdout
, stderr
= await process
.communicate()
1232 return_code
= process
.returncode
1236 output
= stdout
.decode("utf-8").strip()
1237 # output = stdout.decode()
1239 output
= stderr
.decode("utf-8").strip()
1240 # output = stderr.decode()
1242 if return_code
!= 0 and show_error_log
:
1244 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1247 self
.log
.debug("Return code: {}".format(return_code
))
1249 if raise_exception_on_error
and return_code
!= 0:
1250 raise K8sException(output
)
1253 output
= output
.encode("utf-8").strip()
1254 output
= str(output
).replace("\\n", "\n")
1256 return output
, return_code
1258 except asyncio
.CancelledError
:
1260 except K8sException
:
1262 except Exception as e
:
1263 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1265 if raise_exception_on_error
:
1266 raise K8sException(e
) from e
1270 async def _local_async_exec_pipe(
1274 raise_exception_on_error
: bool = True,
1275 show_error_log
: bool = True,
1276 encode_utf8
: bool = False,
1280 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1281 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1282 command
= "{} | {}".format(command1
, command2
)
1284 "Executing async local command: {}, env: {}".format(command
, env
)
1288 command1
= shlex
.split(command1
)
1289 command2
= shlex
.split(command2
)
1291 environ
= os
.environ
.copy()
1296 read
, write
= os
.pipe()
1297 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1299 process_2
= await asyncio
.create_subprocess_exec(
1300 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1303 stdout
, stderr
= await process_2
.communicate()
1305 return_code
= process_2
.returncode
1309 output
= stdout
.decode("utf-8").strip()
1310 # output = stdout.decode()
1312 output
= stderr
.decode("utf-8").strip()
1313 # output = stderr.decode()
1315 if return_code
!= 0 and show_error_log
:
1317 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1320 self
.log
.debug("Return code: {}".format(return_code
))
1322 if raise_exception_on_error
and return_code
!= 0:
1323 raise K8sException(output
)
1326 output
= output
.encode("utf-8").strip()
1327 output
= str(output
).replace("\\n", "\n")
1329 return output
, return_code
1330 except asyncio
.CancelledError
:
1332 except K8sException
:
1334 except Exception as e
:
1335 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1337 if raise_exception_on_error
:
1338 raise K8sException(e
) from e
1342 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1344 Obtains the data of the specified service in the k8cluster.
1346 :param cluster_id: id of a K8s cluster known by OSM
1347 :param service_name: name of the K8s service in the specified namespace
1348 :param namespace: K8s namespace used by the KDU instance
1349 :return: If successful, it will return a service with the following data:
1350 - `name` of the service
1351 - `type` type of service in the k8 cluster
1352 - `ports` List of ports offered by the service, for each port includes at least
1353 name, port, protocol
1354 - `cluster_ip` Internal ip to be used inside k8s cluster
1355 - `external_ip` List of external ips (in case they are available)
1359 paths
, env
= self
._init
_paths
_env
(
1360 cluster_name
=cluster_id
, create_if_not_exist
=True
1363 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1364 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1367 output
, _rc
= await self
._local
_async
_exec
(
1368 command
=command
, raise_exception_on_error
=True, env
=env
1371 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1374 "name": service_name
,
1375 "type": self
._get
_deep
(data
, ("spec", "type")),
1376 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1377 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1379 if service
["type"] == "LoadBalancer":
1380 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1381 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1382 service
["external_ip"] = ip_list
1386 async def _exec_inspect_comand(
1387 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1390 Obtains information about a kdu, no cluster (no env)
1395 repo_str
= " --repo {}".format(repo_url
)
1397 idx
= kdu_model
.find("/")
1400 kdu_model
= kdu_model
[idx
:]
1403 if ":" in kdu_model
:
1404 parts
= kdu_model
.split(sep
=":")
1406 version
= "--version {}".format(str(parts
[1]))
1407 kdu_model
= parts
[0]
1409 full_command
= self
._get
_inspect
_command
(
1410 inspect_command
, kdu_model
, repo_str
, version
1412 output
, _rc
= await self
._local
_async
_exec
(
1413 command
=full_command
, encode_utf8
=True
1418 async def _store_status(
1423 namespace
: str = None,
1424 check_every
: float = 10,
1425 db_dict
: dict = None,
1426 run_once
: bool = False,
1430 await asyncio
.sleep(check_every
)
1431 detailed_status
= await self
._status
_kdu
(
1432 cluster_id
=cluster_id
,
1433 kdu_instance
=kdu_instance
,
1435 namespace
=namespace
,
1437 status
= detailed_status
.get("info").get("description")
1438 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1439 # write status to db
1440 result
= await self
.write_app_status_to_db(
1443 detailed_status
=str(detailed_status
),
1444 operation
=operation
,
1447 self
.log
.info("Error writing in database. Task exiting...")
1449 except asyncio
.CancelledError
:
1450 self
.log
.debug("Task cancelled")
1452 except Exception as e
:
1454 "_store_status exception: {}".format(str(e
)), exc_info
=True
1461 # params for use in -f file
1462 # returns values file option and filename (in order to delete it at the end)
1463 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1465 if params
and len(params
) > 0:
1466 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1468 def get_random_number():
1469 r
= random
.randrange(start
=1, stop
=99999999)
1477 value
= params
.get(key
)
1478 if "!!yaml" in str(value
):
1479 value
= yaml
.load(value
[7:])
1480 params2
[key
] = value
1482 values_file
= get_random_number() + ".yaml"
1483 with
open(values_file
, "w") as stream
:
1484 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1486 return "-f {}".format(values_file
), values_file
1490 # params for use in --set option
1492 def _params_to_set_option(params
: dict) -> str:
1494 if params
and len(params
) > 0:
1497 value
= params
.get(key
, None)
1498 if value
is not None:
1500 params_str
+= "--set "
1504 params_str
+= "{}={}".format(key
, value
)
1508 def generate_kdu_instance_name(**kwargs
):
1509 chart_name
= kwargs
["kdu_model"]
1510 # check embeded chart (file or dir)
1511 if chart_name
.startswith("/"):
1512 # extract file or directory name
1513 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1515 elif "://" in chart_name
:
1516 # extract last portion of URL
1517 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1520 for c
in chart_name
:
1521 if c
.isalpha() or c
.isnumeric():
1528 # if does not start with alpha character, prefix 'a'
1529 if not name
[0].isalpha():
1534 def get_random_number():
1535 r
= random
.randrange(start
=1, stop
=99999999)
1537 s
= s
.rjust(10, "0")
1540 name
= name
+ get_random_number()