2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
94 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
99 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(":")
100 return namespace
, cluster_id
105 namespace
: str = "kube-system",
106 reuse_cluster_uuid
=None,
110 It prepares a given K8s cluster environment to run Charts
112 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
114 :param namespace: optional namespace to be used for helm. By default,
115 'kube-system' will be used
116 :param reuse_cluster_uuid: existing cluster uuid for reuse
117 :param kwargs: Additional parameters (None yet)
118 :return: uuid of the K8s cluster and True if connector has installed some
119 software in the cluster
120 (on error, an exception will be raised)
123 if reuse_cluster_uuid
:
124 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
125 namespace
= namespace_
or namespace
127 cluster_id
= str(uuid4())
128 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
131 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
134 paths
, env
= self
._init
_paths
_env
(
135 cluster_name
=cluster_id
, create_if_not_exist
=True
137 mode
= stat
.S_IRUSR | stat
.S_IWUSR
138 with
open(paths
["kube_config"], "w", mode
) as f
:
140 os
.chmod(paths
["kube_config"], 0o600)
142 # Code with initialization specific of helm version
143 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
145 # sync fs with local data
146 self
.fs
.reverse_sync(from_path
=cluster_id
)
148 self
.log
.info("Cluster {} initialized".format(cluster_id
))
150 return cluster_uuid
, n2vc_installed_sw
153 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
155 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_id
, repo_type
, name
, url
163 self
.fs
.sync(from_path
=cluster_id
)
166 paths
, env
= self
._init
_paths
_env
(
167 cluster_name
=cluster_id
, create_if_not_exist
=True
171 command
= "{} repo update".format(self
._helm
_command
)
172 self
.log
.debug("updating repo: {}".format(command
))
173 await self
._local
_async
_exec
(
174 command
=command
, raise_exception_on_error
=False, env
=env
177 # helm repo add name url
178 command
= "{} repo add {} {}".format(self
._helm
_command
, name
, url
)
179 self
.log
.debug("adding repo: {}".format(command
))
180 await self
._local
_async
_exec
(
181 command
=command
, raise_exception_on_error
=True, env
=env
185 self
.fs
.reverse_sync(from_path
=cluster_id
)
187 async def repo_list(self
, cluster_uuid
: str) -> list:
189 Get the list of registered repositories
191 :return: list of registered repositories: [ (name, url) .... ]
194 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
195 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
198 self
.fs
.sync(from_path
=cluster_id
)
201 paths
, env
= self
._init
_paths
_env
(
202 cluster_name
=cluster_id
, create_if_not_exist
=True
205 command
= "{} repo list --output yaml".format(self
._helm
_command
)
207 # Set exception to false because if there are no repos just want an empty list
208 output
, _rc
= await self
._local
_async
_exec
(
209 command
=command
, raise_exception_on_error
=False, env
=env
213 self
.fs
.reverse_sync(from_path
=cluster_id
)
216 if output
and len(output
) > 0:
217 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
218 # unify format between helm2 and helm3 setting all keys lowercase
219 return self
._lower
_keys
_list
(repos
)
225 async def repo_remove(self
, cluster_uuid
: str, name
: str):
227 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
228 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
231 self
.fs
.sync(from_path
=cluster_id
)
234 paths
, env
= self
._init
_paths
_env
(
235 cluster_name
=cluster_id
, create_if_not_exist
=True
238 command
= "{} repo remove {}".format(self
._helm
_command
, name
)
239 await self
._local
_async
_exec
(
240 command
=command
, raise_exception_on_error
=True, env
=env
244 self
.fs
.reverse_sync(from_path
=cluster_id
)
250 uninstall_sw
: bool = False,
255 Resets the Kubernetes cluster by removing the helm deployment that represents it.
257 :param cluster_uuid: The UUID of the cluster to reset
258 :param force: Boolean to force the reset
259 :param uninstall_sw: Boolean to force the reset
260 :param kwargs: Additional parameters (None yet)
261 :return: Returns True if successful or raises an exception.
263 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
265 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
266 cluster_id
, uninstall_sw
271 self
.fs
.sync(from_path
=cluster_id
)
273 # uninstall releases if needed.
275 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
276 if len(releases
) > 0:
280 kdu_instance
= r
.get("name")
281 chart
= r
.get("chart")
283 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
285 await self
.uninstall(
286 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
288 except Exception as e
:
289 # will not raise exception as it was found
290 # that in some cases of previously installed helm releases it
293 "Error uninstalling release {}: {}".format(
299 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
303 False # Allow to remove k8s cluster without removing Tiller
307 await self
._uninstall
_sw
(cluster_id
, namespace
)
309 # delete cluster directory
310 self
.log
.debug("Removing directory {}".format(cluster_id
))
311 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
312 # Remove also local directorio if still exist
313 direct
= self
.fs
.path
+ "/" + cluster_id
314 shutil
.rmtree(direct
, ignore_errors
=True)
318 async def _install_impl(
326 timeout
: float = 300,
328 db_dict
: dict = None,
329 kdu_name
: str = None,
330 namespace
: str = None,
333 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
334 cluster_id
=cluster_id
, params
=params
340 parts
= kdu_model
.split(sep
=":")
342 version
= str(parts
[1])
345 command
= self
._get
_install
_command
(
346 kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
349 self
.log
.debug("installing: {}".format(command
))
352 # exec helm in a task
353 exec_task
= asyncio
.ensure_future(
354 coro_or_future
=self
._local
_async
_exec
(
355 command
=command
, raise_exception_on_error
=False, env
=env
359 # write status in another task
360 status_task
= asyncio
.ensure_future(
361 coro_or_future
=self
._store
_status
(
362 cluster_id
=cluster_id
,
363 kdu_instance
=kdu_instance
,
371 # wait for execution task
372 await asyncio
.wait([exec_task
])
377 output
, rc
= exec_task
.result()
381 output
, rc
= await self
._local
_async
_exec
(
382 command
=command
, raise_exception_on_error
=False, env
=env
385 # remove temporal values yaml file
387 os
.remove(file_to_delete
)
390 await self
._store
_status
(
391 cluster_id
=cluster_id
,
392 kdu_instance
=kdu_instance
,
401 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
403 raise K8sException(msg
)
409 kdu_model
: str = None,
411 timeout
: float = 300,
413 db_dict
: dict = None,
415 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
416 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
419 self
.fs
.sync(from_path
=cluster_id
)
421 # look for instance to obtain namespace
422 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
423 if not instance_info
:
424 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
427 paths
, env
= self
._init
_paths
_env
(
428 cluster_name
=cluster_id
, create_if_not_exist
=True
432 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
433 cluster_id
=cluster_id
, params
=params
439 parts
= kdu_model
.split(sep
=":")
441 version
= str(parts
[1])
444 command
= self
._get
_upgrade
_command
(
447 instance_info
["namespace"],
454 self
.log
.debug("upgrading: {}".format(command
))
458 # exec helm in a task
459 exec_task
= asyncio
.ensure_future(
460 coro_or_future
=self
._local
_async
_exec
(
461 command
=command
, raise_exception_on_error
=False, env
=env
464 # write status in another task
465 status_task
= asyncio
.ensure_future(
466 coro_or_future
=self
._store
_status
(
467 cluster_id
=cluster_id
,
468 kdu_instance
=kdu_instance
,
469 namespace
=instance_info
["namespace"],
476 # wait for execution task
477 await asyncio
.wait([exec_task
])
481 output
, rc
= exec_task
.result()
485 output
, rc
= await self
._local
_async
_exec
(
486 command
=command
, raise_exception_on_error
=False, env
=env
489 # remove temporal values yaml file
491 os
.remove(file_to_delete
)
494 await self
._store
_status
(
495 cluster_id
=cluster_id
,
496 kdu_instance
=kdu_instance
,
497 namespace
=instance_info
["namespace"],
505 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
507 raise K8sException(msg
)
510 self
.fs
.reverse_sync(from_path
=cluster_id
)
512 # return new revision number
513 instance
= await self
.get_instance_info(
514 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
517 revision
= int(instance
.get("revision"))
518 self
.log
.debug("New revision: {}".format(revision
))
528 total_timeout
: float = 1800,
531 raise NotImplementedError("Method not implemented")
533 async def get_scale_count(
539 raise NotImplementedError("Method not implemented")
542 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
545 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
547 "rollback kdu_instance {} to revision {} from cluster {}".format(
548 kdu_instance
, revision
, cluster_id
553 self
.fs
.sync(from_path
=cluster_id
)
555 # look for instance to obtain namespace
556 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
557 if not instance_info
:
558 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
561 paths
, env
= self
._init
_paths
_env
(
562 cluster_name
=cluster_id
, create_if_not_exist
=True
565 command
= self
._get
_rollback
_command
(
566 kdu_instance
, instance_info
["namespace"], revision
569 self
.log
.debug("rolling_back: {}".format(command
))
571 # exec helm in a task
572 exec_task
= asyncio
.ensure_future(
573 coro_or_future
=self
._local
_async
_exec
(
574 command
=command
, raise_exception_on_error
=False, env
=env
577 # write status in another task
578 status_task
= asyncio
.ensure_future(
579 coro_or_future
=self
._store
_status
(
580 cluster_id
=cluster_id
,
581 kdu_instance
=kdu_instance
,
582 namespace
=instance_info
["namespace"],
584 operation
="rollback",
589 # wait for execution task
590 await asyncio
.wait([exec_task
])
595 output
, rc
= exec_task
.result()
598 await self
._store
_status
(
599 cluster_id
=cluster_id
,
600 kdu_instance
=kdu_instance
,
601 namespace
=instance_info
["namespace"],
603 operation
="rollback",
609 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
611 raise K8sException(msg
)
614 self
.fs
.reverse_sync(from_path
=cluster_id
)
616 # return new revision number
617 instance
= await self
.get_instance_info(
618 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
621 revision
= int(instance
.get("revision"))
622 self
.log
.debug("New revision: {}".format(revision
))
627 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
629 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
630 (this call should happen after all _terminate-config-primitive_ of the VNF
633 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
634 :param kdu_instance: unique name for the KDU instance to be deleted
635 :param kwargs: Additional parameters (None yet)
636 :return: True if successful
639 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
641 "uninstall kdu_instance {} from cluster {}".format(kdu_instance
, cluster_id
)
645 self
.fs
.sync(from_path
=cluster_id
)
647 # look for instance to obtain namespace
648 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
649 if not instance_info
:
650 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
653 paths
, env
= self
._init
_paths
_env
(
654 cluster_name
=cluster_id
, create_if_not_exist
=True
657 command
= self
._get
_uninstall
_command
(kdu_instance
, instance_info
["namespace"])
658 output
, _rc
= await self
._local
_async
_exec
(
659 command
=command
, raise_exception_on_error
=True, env
=env
663 self
.fs
.reverse_sync(from_path
=cluster_id
)
665 return self
._output
_to
_table
(output
)
667 async def instances_list(self
, cluster_uuid
: str) -> list:
669 returns a list of deployed releases in a cluster
671 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
675 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
676 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
679 self
.fs
.sync(from_path
=cluster_id
)
681 # execute internal command
682 result
= await self
._instances
_list
(cluster_id
)
685 self
.fs
.reverse_sync(from_path
=cluster_id
)
689 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
690 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
691 for instance
in instances
:
692 if instance
.get("name") == kdu_instance
:
694 self
.log
.debug("Instance {} not found".format(kdu_instance
))
697 async def exec_primitive(
699 cluster_uuid
: str = None,
700 kdu_instance
: str = None,
701 primitive_name
: str = None,
702 timeout
: float = 300,
704 db_dict
: dict = None,
707 """Exec primitive (Juju action)
709 :param cluster_uuid: The UUID of the cluster or namespace:cluster
710 :param kdu_instance: The unique name of the KDU instance
711 :param primitive_name: Name of action that will be executed
712 :param timeout: Timeout for action execution
713 :param params: Dictionary of all the parameters needed for the action
714 :db_dict: Dictionary for any additional data
715 :param kwargs: Additional parameters (None yet)
717 :return: Returns the output of the action
720 "KDUs deployed with Helm don't support actions "
721 "different from rollback, upgrade and status"
724 async def get_services(
725 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
728 Returns a list of services defined for the specified kdu instance.
730 :param cluster_uuid: UUID of a K8s cluster known by OSM
731 :param kdu_instance: unique name for the KDU instance
732 :param namespace: K8s namespace used by the KDU instance
733 :return: If successful, it will return a list of services, Each service
734 can have the following data:
735 - `name` of the service
736 - `type` type of service in the k8 cluster
737 - `ports` List of ports offered by the service, for each port includes at least
739 - `cluster_ip` Internal ip to be used inside k8s cluster
740 - `external_ip` List of external ips (in case they are available)
743 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
745 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
746 cluster_uuid
, kdu_instance
751 self
.fs
.sync(from_path
=cluster_id
)
753 # get list of services names for kdu
754 service_names
= await self
._get
_services
(cluster_id
, kdu_instance
, namespace
)
757 for service
in service_names
:
758 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
759 service_list
.append(service
)
762 self
.fs
.reverse_sync(from_path
=cluster_id
)
766 async def get_service(
767 self
, cluster_uuid
: str, service_name
: str, namespace
: str
771 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
772 service_name
, namespace
, cluster_uuid
776 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
779 self
.fs
.sync(from_path
=cluster_id
)
781 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
784 self
.fs
.reverse_sync(from_path
=cluster_id
)
788 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
) -> str:
790 This call would retrieve tha current state of a given KDU instance. It would be
791 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
792 values_ of the configuration parameters applied to a given instance. This call
793 would be based on the `status` call.
795 :param cluster_uuid: UUID of a K8s cluster known by OSM
796 :param kdu_instance: unique name for the KDU instance
797 :param kwargs: Additional parameters (None yet)
798 :return: If successful, it will return the following vector of arguments:
799 - K8s `namespace` in the cluster where the KDU lives
800 - `state` of the KDU instance. It can be:
807 - List of `resources` (objects) that this release consists of, sorted by kind,
808 and the status of those resources
809 - Last `deployment_time`.
813 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
814 cluster_uuid
, kdu_instance
818 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
821 self
.fs
.sync(from_path
=cluster_id
)
823 # get instance: needed to obtain namespace
824 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
825 for instance
in instances
:
826 if instance
.get("name") == kdu_instance
:
829 # instance does not exist
831 "Instance name: {} not found in cluster: {}".format(
832 kdu_instance
, cluster_id
836 status
= await self
._status
_kdu
(
837 cluster_id
=cluster_id
,
838 kdu_instance
=kdu_instance
,
839 namespace
=instance
["namespace"],
845 self
.fs
.reverse_sync(from_path
=cluster_id
)
849 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
852 "inspect kdu_model values {} from (optional) repo: {}".format(
857 return await self
._exec
_inspect
_comand
(
858 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
861 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
864 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
867 return await self
._exec
_inspect
_comand
(
868 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
871 async def synchronize_repos(self
, cluster_uuid
: str):
873 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
875 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
876 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
878 local_repo_list
= await self
.repo_list(cluster_uuid
)
879 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
881 deleted_repo_list
= []
884 # iterate over the list of repos in the database that should be
885 # added if not present
886 for repo_name
, db_repo
in db_repo_dict
.items():
888 # check if it is already present
889 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
890 repo_id
= db_repo
.get("_id")
891 if curr_repo_url
!= db_repo
["url"]:
894 "repo {} url changed, delete and and again".format(
898 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
899 deleted_repo_list
.append(repo_id
)
902 self
.log
.debug("add repo {}".format(db_repo
["name"]))
904 cluster_uuid
, db_repo
["name"], db_repo
["url"]
906 added_repo_dict
[repo_id
] = db_repo
["name"]
907 except Exception as e
:
909 "Error adding repo id: {}, err_msg: {} ".format(
914 # Delete repos that are present but not in nbi_list
915 for repo_name
in local_repo_dict
:
916 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
917 self
.log
.debug("delete repo {}".format(repo_name
))
919 await self
.repo_remove(cluster_uuid
, repo_name
)
920 deleted_repo_list
.append(repo_name
)
921 except Exception as e
:
923 "Error deleting repo, name: {}, err_msg: {}".format(
928 return deleted_repo_list
, added_repo_dict
932 except Exception as e
:
933 # Do not raise errors synchronizing repos
934 self
.log
.error("Error synchronizing repos: {}".format(e
))
935 raise Exception("Error synchronizing repos: {}".format(e
))
937 def _get_db_repos_dict(self
, repo_ids
: list):
939 for repo_id
in repo_ids
:
940 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
941 db_repos_dict
[db_repo
["name"]] = db_repo
945 ####################################################################################
946 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
947 ####################################################################################
951 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
953 Creates and returns base cluster and kube dirs and returns them.
954 Also created helm3 dirs according to new directory specification, paths are
955 not returned but assigned to helm environment variables
957 :param cluster_name: cluster_name
958 :return: Dictionary with config_paths and dictionary with helm environment variables
962 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
964 Implements the helm version dependent cluster initialization
968 async def _instances_list(self
, cluster_id
):
970 Implements the helm version dependent helm instances list
974 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
976 Implements the helm version dependent method to obtain services from a helm instance
980 async def _status_kdu(
984 namespace
: str = None,
985 show_error_log
: bool = False,
986 return_text
: bool = False,
989 Implements the helm version dependent method to obtain status of a helm instance
993 def _get_install_command(
994 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
997 Obtain command to be executed to delete the indicated instance
1001 def _get_upgrade_command(
1002 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
1005 Obtain command to be executed to upgrade the indicated instance
1009 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
1011 Obtain command to be executed to rollback the indicated instance
1015 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
1017 Obtain command to be executed to delete the indicated instance
1021 def _get_inspect_command(
1022 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1025 Obtain command to be executed to obtain information about the kdu
1029 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1031 Method call to uninstall cluster software for helm. This method is dependent
1033 For Helm v2 it will be called when Tiller must be uninstalled
1034 For Helm v3 it does nothing and does not need to be callled
1038 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1040 Obtains the cluster repos identifiers
1044 ####################################################################################
1045 ################################### P R I V A T E ##################################
1046 ####################################################################################
1050 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1051 if os
.path
.exists(filename
):
1054 msg
= "File {} does not exist".format(filename
)
1055 if exception_if_not_exists
:
1056 raise K8sException(msg
)
1059 def _remove_multiple_spaces(strobj
):
1060 strobj
= strobj
.strip()
1061 while " " in strobj
:
1062 strobj
= strobj
.replace(" ", " ")
1066 def _output_to_lines(output
: str) -> list:
1067 output_lines
= list()
1068 lines
= output
.splitlines(keepends
=False)
1072 output_lines
.append(line
)
1076 def _output_to_table(output
: str) -> list:
1077 output_table
= list()
1078 lines
= output
.splitlines(keepends
=False)
1080 line
= line
.replace("\t", " ")
1082 output_table
.append(line_list
)
1083 cells
= line
.split(sep
=" ")
1087 line_list
.append(cell
)
1091 def _parse_services(output
: str) -> list:
1092 lines
= output
.splitlines(keepends
=False)
1095 line
= line
.replace("\t", " ")
1096 cells
= line
.split(sep
=" ")
1097 if len(cells
) > 0 and cells
[0].startswith("service/"):
1098 elems
= cells
[0].split(sep
="/")
1100 services
.append(elems
[1])
1104 def _get_deep(dictionary
: dict, members
: tuple):
1109 value
= target
.get(m
)
1118 # find key:value in several lines
1120 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1121 for line
in p_lines
:
1123 if line
.startswith(p_key
+ ":"):
1124 parts
= line
.split(":")
1125 the_value
= parts
[1].strip()
1133 def _lower_keys_list(input_list
: list):
1135 Transform the keys in a list of dictionaries to lower case and returns a new list
1140 for dictionary
in input_list
:
1141 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1142 new_list
.append(new_dict
)
1145 def _local_exec(self
, command
: str) -> (str, int):
1146 command
= self
._remove
_multiple
_spaces
(command
)
1147 self
.log
.debug("Executing sync local command: {}".format(command
))
1148 # raise exception if fails
1151 output
= subprocess
.check_output(
1152 command
, shell
=True, universal_newlines
=True
1155 self
.log
.debug(output
)
1159 return output
, return_code
1161 async def _local_async_exec(
1164 raise_exception_on_error
: bool = False,
1165 show_error_log
: bool = True,
1166 encode_utf8
: bool = False,
1170 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1172 "Executing async local command: {}, env: {}".format(command
, env
)
1176 command
= shlex
.split(command
)
1178 environ
= os
.environ
.copy()
1183 process
= await asyncio
.create_subprocess_exec(
1185 stdout
=asyncio
.subprocess
.PIPE
,
1186 stderr
=asyncio
.subprocess
.PIPE
,
1190 # wait for command terminate
1191 stdout
, stderr
= await process
.communicate()
1193 return_code
= process
.returncode
1197 output
= stdout
.decode("utf-8").strip()
1198 # output = stdout.decode()
1200 output
= stderr
.decode("utf-8").strip()
1201 # output = stderr.decode()
1203 if return_code
!= 0 and show_error_log
:
1205 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1208 self
.log
.debug("Return code: {}".format(return_code
))
1210 if raise_exception_on_error
and return_code
!= 0:
1211 raise K8sException(output
)
1214 output
= output
.encode("utf-8").strip()
1215 output
= str(output
).replace("\\n", "\n")
1217 return output
, return_code
1219 except asyncio
.CancelledError
:
1221 except K8sException
:
1223 except Exception as e
:
1224 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1226 if raise_exception_on_error
:
1227 raise K8sException(e
) from e
1231 async def _local_async_exec_pipe(
1235 raise_exception_on_error
: bool = True,
1236 show_error_log
: bool = True,
1237 encode_utf8
: bool = False,
1241 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1242 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1243 command
= "{} | {}".format(command1
, command2
)
1245 "Executing async local command: {}, env: {}".format(command
, env
)
1249 command1
= shlex
.split(command1
)
1250 command2
= shlex
.split(command2
)
1252 environ
= os
.environ
.copy()
1257 read
, write
= os
.pipe()
1258 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1260 process_2
= await asyncio
.create_subprocess_exec(
1261 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1264 stdout
, stderr
= await process_2
.communicate()
1266 return_code
= process_2
.returncode
1270 output
= stdout
.decode("utf-8").strip()
1271 # output = stdout.decode()
1273 output
= stderr
.decode("utf-8").strip()
1274 # output = stderr.decode()
1276 if return_code
!= 0 and show_error_log
:
1278 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1281 self
.log
.debug("Return code: {}".format(return_code
))
1283 if raise_exception_on_error
and return_code
!= 0:
1284 raise K8sException(output
)
1287 output
= output
.encode("utf-8").strip()
1288 output
= str(output
).replace("\\n", "\n")
1290 return output
, return_code
1291 except asyncio
.CancelledError
:
1293 except K8sException
:
1295 except Exception as e
:
1296 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1298 if raise_exception_on_error
:
1299 raise K8sException(e
) from e
1303 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1305 Obtains the data of the specified service in the k8cluster.
1307 :param cluster_id: id of a K8s cluster known by OSM
1308 :param service_name: name of the K8s service in the specified namespace
1309 :param namespace: K8s namespace used by the KDU instance
1310 :return: If successful, it will return a service with the following data:
1311 - `name` of the service
1312 - `type` type of service in the k8 cluster
1313 - `ports` List of ports offered by the service, for each port includes at least
1314 name, port, protocol
1315 - `cluster_ip` Internal ip to be used inside k8s cluster
1316 - `external_ip` List of external ips (in case they are available)
1320 paths
, env
= self
._init
_paths
_env
(
1321 cluster_name
=cluster_id
, create_if_not_exist
=True
1324 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1325 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1328 output
, _rc
= await self
._local
_async
_exec
(
1329 command
=command
, raise_exception_on_error
=True, env
=env
1332 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1335 "name": service_name
,
1336 "type": self
._get
_deep
(data
, ("spec", "type")),
1337 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1338 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1340 if service
["type"] == "LoadBalancer":
1341 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1342 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1343 service
["external_ip"] = ip_list
1347 async def _exec_inspect_comand(
1348 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1351 Obtains information about a kdu, no cluster (no env)
1356 repo_str
= " --repo {}".format(repo_url
)
1358 idx
= kdu_model
.find("/")
1361 kdu_model
= kdu_model
[idx
:]
1364 if ":" in kdu_model
:
1365 parts
= kdu_model
.split(sep
=":")
1367 version
= "--version {}".format(str(parts
[1]))
1368 kdu_model
= parts
[0]
1370 full_command
= self
._get
_inspect
_command
(
1371 inspect_command
, kdu_model
, repo_str
, version
1373 output
, _rc
= await self
._local
_async
_exec
(
1374 command
=full_command
, encode_utf8
=True
1379 async def _store_status(
1384 namespace
: str = None,
1385 check_every
: float = 10,
1386 db_dict
: dict = None,
1387 run_once
: bool = False,
1391 await asyncio
.sleep(check_every
)
1392 detailed_status
= await self
._status
_kdu
(
1393 cluster_id
=cluster_id
,
1394 kdu_instance
=kdu_instance
,
1395 namespace
=namespace
,
1398 status
= detailed_status
.get("info").get("description")
1399 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1400 # write status to db
1401 result
= await self
.write_app_status_to_db(
1404 detailed_status
=str(detailed_status
),
1405 operation
=operation
,
1408 self
.log
.info("Error writing in database. Task exiting...")
1410 except asyncio
.CancelledError
:
1411 self
.log
.debug("Task cancelled")
1413 except Exception as e
:
1415 "_store_status exception: {}".format(str(e
)), exc_info
=True
1422 # params for use in -f file
1423 # returns values file option and filename (in order to delete it at the end)
1424 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1426 if params
and len(params
) > 0:
1427 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1429 def get_random_number():
1430 r
= random
.randrange(start
=1, stop
=99999999)
1438 value
= params
.get(key
)
1439 if "!!yaml" in str(value
):
1440 value
= yaml
.load(value
[7:])
1441 params2
[key
] = value
1443 values_file
= get_random_number() + ".yaml"
1444 with
open(values_file
, "w") as stream
:
1445 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1447 return "-f {}".format(values_file
), values_file
1451 # params for use in --set option
1453 def _params_to_set_option(params
: dict) -> str:
1455 if params
and len(params
) > 0:
1458 value
= params
.get(key
, None)
1459 if value
is not None:
1461 params_str
+= "--set "
1465 params_str
+= "{}={}".format(key
, value
)
1469 def generate_kdu_instance_name(**kwargs
):
1470 chart_name
= kwargs
["kdu_model"]
1471 # check embeded chart (file or dir)
1472 if chart_name
.startswith("/"):
1473 # extract file or directory name
1474 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1476 elif "://" in chart_name
:
1477 # extract last portion of URL
1478 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1481 for c
in chart_name
:
1482 if c
.isalpha() or c
.isnumeric():
1489 # if does not start with alpha character, prefix 'a'
1490 if not name
[0].isalpha():
1495 def get_random_number():
1496 r
= random
.randrange(start
=1, stop
=99999999)
1498 s
= s
.rjust(10, "0")
1501 name
= name
+ get_random_number()