2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
32 from uuid
import uuid4
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
46 service_account
= "osm"
47 _STABLE_REPO_URL
= "https://charts.helm.sh/stable"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
57 vca_config
: dict = None,
61 :param fs: file system for kubernetes and helm configuration
62 :param db: database object to write current operation status
63 :param kubectl_command: path to kubectl executable
64 :param helm_command: path to helm executable
66 :param on_update_db: callback called when k8s connector updates database
70 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
72 self
.log
.info("Initializing K8S Helm connector")
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 if not vca_config
or not vca_config
.get("stablerepourl"):
90 self
._stable
_repo
_url
= self
._STABLE
_REPO
_URL
92 self
._stable
_repo
_url
= vca_config
.get("stablerepourl")
95 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
97 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
98 cluster_id for backward compatibility
100 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(":")
101 return namespace
, cluster_id
106 namespace
: str = "kube-system",
107 reuse_cluster_uuid
=None,
111 It prepares a given K8s cluster environment to run Charts
113 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
115 :param namespace: optional namespace to be used for helm. By default,
116 'kube-system' will be used
117 :param reuse_cluster_uuid: existing cluster uuid for reuse
118 :param kwargs: Additional parameters (None yet)
119 :return: uuid of the K8s cluster and True if connector has installed some
120 software in the cluster
121 (on error, an exception will be raised)
124 if reuse_cluster_uuid
:
125 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
126 namespace
= namespace_
or namespace
128 cluster_id
= str(uuid4())
129 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
132 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
135 paths
, env
= self
._init
_paths
_env
(
136 cluster_name
=cluster_id
, create_if_not_exist
=True
138 mode
= stat
.S_IRUSR | stat
.S_IWUSR
139 with
open(paths
["kube_config"], "w", mode
) as f
:
141 os
.chmod(paths
["kube_config"], 0o600)
143 # Code with initialization specific of helm version
144 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
146 # sync fs with local data
147 self
.fs
.reverse_sync(from_path
=cluster_id
)
149 self
.log
.info("Cluster {} initialized".format(cluster_id
))
151 return cluster_uuid
, n2vc_installed_sw
154 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
156 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
158 "Cluster {}, adding {} repository {}. URL: {}".format(
159 cluster_id
, repo_type
, name
, url
164 self
.fs
.sync(from_path
=cluster_id
)
167 paths
, env
= self
._init
_paths
_env
(
168 cluster_name
=cluster_id
, create_if_not_exist
=True
172 command
= "{} repo update".format(self
._helm
_command
)
173 self
.log
.debug("updating repo: {}".format(command
))
174 await self
._local
_async
_exec
(
175 command
=command
, raise_exception_on_error
=False, env
=env
178 # helm repo add name url
179 command
= "{} repo add {} {}".format(self
._helm
_command
, name
, url
)
180 self
.log
.debug("adding repo: {}".format(command
))
181 await self
._local
_async
_exec
(
182 command
=command
, raise_exception_on_error
=True, env
=env
186 self
.fs
.reverse_sync(from_path
=cluster_id
)
188 async def repo_list(self
, cluster_uuid
: str) -> list:
190 Get the list of registered repositories
192 :return: list of registered repositories: [ (name, url) .... ]
195 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
196 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
199 self
.fs
.sync(from_path
=cluster_id
)
202 paths
, env
= self
._init
_paths
_env
(
203 cluster_name
=cluster_id
, create_if_not_exist
=True
206 command
= "{} repo list --output yaml".format(self
._helm
_command
)
208 # Set exception to false because if there are no repos just want an empty list
209 output
, _rc
= await self
._local
_async
_exec
(
210 command
=command
, raise_exception_on_error
=False, env
=env
214 self
.fs
.reverse_sync(from_path
=cluster_id
)
217 if output
and len(output
) > 0:
218 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
219 # unify format between helm2 and helm3 setting all keys lowercase
220 return self
._lower
_keys
_list
(repos
)
226 async def repo_remove(self
, cluster_uuid
: str, name
: str):
228 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
229 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
232 self
.fs
.sync(from_path
=cluster_id
)
235 paths
, env
= self
._init
_paths
_env
(
236 cluster_name
=cluster_id
, create_if_not_exist
=True
239 command
= "{} repo remove {}".format(self
._helm
_command
, name
)
240 await self
._local
_async
_exec
(
241 command
=command
, raise_exception_on_error
=True, env
=env
245 self
.fs
.reverse_sync(from_path
=cluster_id
)
251 uninstall_sw
: bool = False,
256 Resets the Kubernetes cluster by removing the helm deployment that represents it.
258 :param cluster_uuid: The UUID of the cluster to reset
259 :param force: Boolean to force the reset
260 :param uninstall_sw: Boolean to force the reset
261 :param kwargs: Additional parameters (None yet)
262 :return: Returns True if successful or raises an exception.
264 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
266 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
267 cluster_id
, uninstall_sw
272 self
.fs
.sync(from_path
=cluster_id
)
274 # uninstall releases if needed.
276 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
277 if len(releases
) > 0:
281 kdu_instance
= r
.get("name")
282 chart
= r
.get("chart")
284 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
286 await self
.uninstall(
287 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
289 except Exception as e
:
290 # will not raise exception as it was found
291 # that in some cases of previously installed helm releases it
294 "Error uninstalling release {}: {}".format(
300 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
304 False # Allow to remove k8s cluster without removing Tiller
308 await self
._uninstall
_sw
(cluster_id
, namespace
)
310 # delete cluster directory
311 self
.log
.debug("Removing directory {}".format(cluster_id
))
312 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
313 # Remove also local directorio if still exist
314 direct
= self
.fs
.path
+ "/" + cluster_id
315 shutil
.rmtree(direct
, ignore_errors
=True)
319 async def _install_impl(
327 timeout
: float = 300,
329 db_dict
: dict = None,
330 kdu_name
: str = None,
331 namespace
: str = None,
334 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
335 cluster_id
=cluster_id
, params
=params
341 parts
= kdu_model
.split(sep
=":")
343 version
= str(parts
[1])
346 command
= self
._get
_install
_command
(
347 kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
350 self
.log
.debug("installing: {}".format(command
))
353 # exec helm in a task
354 exec_task
= asyncio
.ensure_future(
355 coro_or_future
=self
._local
_async
_exec
(
356 command
=command
, raise_exception_on_error
=False, env
=env
360 # write status in another task
361 status_task
= asyncio
.ensure_future(
362 coro_or_future
=self
._store
_status
(
363 cluster_id
=cluster_id
,
364 kdu_instance
=kdu_instance
,
372 # wait for execution task
373 await asyncio
.wait([exec_task
])
378 output
, rc
= exec_task
.result()
382 output
, rc
= await self
._local
_async
_exec
(
383 command
=command
, raise_exception_on_error
=False, env
=env
386 # remove temporal values yaml file
388 os
.remove(file_to_delete
)
391 await self
._store
_status
(
392 cluster_id
=cluster_id
,
393 kdu_instance
=kdu_instance
,
402 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
404 raise K8sException(msg
)
410 kdu_model
: str = None,
412 timeout
: float = 300,
414 db_dict
: dict = None,
416 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
417 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
420 self
.fs
.sync(from_path
=cluster_id
)
422 # look for instance to obtain namespace
423 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
424 if not instance_info
:
425 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
428 paths
, env
= self
._init
_paths
_env
(
429 cluster_name
=cluster_id
, create_if_not_exist
=True
433 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
434 cluster_id
=cluster_id
, params
=params
440 parts
= kdu_model
.split(sep
=":")
442 version
= str(parts
[1])
445 command
= self
._get
_upgrade
_command
(
448 instance_info
["namespace"],
455 self
.log
.debug("upgrading: {}".format(command
))
459 # exec helm in a task
460 exec_task
= asyncio
.ensure_future(
461 coro_or_future
=self
._local
_async
_exec
(
462 command
=command
, raise_exception_on_error
=False, env
=env
465 # write status in another task
466 status_task
= asyncio
.ensure_future(
467 coro_or_future
=self
._store
_status
(
468 cluster_id
=cluster_id
,
469 kdu_instance
=kdu_instance
,
470 namespace
=instance_info
["namespace"],
477 # wait for execution task
478 await asyncio
.wait([exec_task
])
482 output
, rc
= exec_task
.result()
486 output
, rc
= await self
._local
_async
_exec
(
487 command
=command
, raise_exception_on_error
=False, env
=env
490 # remove temporal values yaml file
492 os
.remove(file_to_delete
)
495 await self
._store
_status
(
496 cluster_id
=cluster_id
,
497 kdu_instance
=kdu_instance
,
498 namespace
=instance_info
["namespace"],
506 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
508 raise K8sException(msg
)
511 self
.fs
.reverse_sync(from_path
=cluster_id
)
513 # return new revision number
514 instance
= await self
.get_instance_info(
515 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
518 revision
= int(instance
.get("revision"))
519 self
.log
.debug("New revision: {}".format(revision
))
529 total_timeout
: float = 1800,
532 raise NotImplementedError("Method not implemented")
534 async def get_scale_count(
540 raise NotImplementedError("Method not implemented")
543 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
546 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
548 "rollback kdu_instance {} to revision {} from cluster {}".format(
549 kdu_instance
, revision
, cluster_id
554 self
.fs
.sync(from_path
=cluster_id
)
556 # look for instance to obtain namespace
557 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
558 if not instance_info
:
559 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
562 paths
, env
= self
._init
_paths
_env
(
563 cluster_name
=cluster_id
, create_if_not_exist
=True
566 command
= self
._get
_rollback
_command
(
567 kdu_instance
, instance_info
["namespace"], revision
570 self
.log
.debug("rolling_back: {}".format(command
))
572 # exec helm in a task
573 exec_task
= asyncio
.ensure_future(
574 coro_or_future
=self
._local
_async
_exec
(
575 command
=command
, raise_exception_on_error
=False, env
=env
578 # write status in another task
579 status_task
= asyncio
.ensure_future(
580 coro_or_future
=self
._store
_status
(
581 cluster_id
=cluster_id
,
582 kdu_instance
=kdu_instance
,
583 namespace
=instance_info
["namespace"],
585 operation
="rollback",
590 # wait for execution task
591 await asyncio
.wait([exec_task
])
596 output
, rc
= exec_task
.result()
599 await self
._store
_status
(
600 cluster_id
=cluster_id
,
601 kdu_instance
=kdu_instance
,
602 namespace
=instance_info
["namespace"],
604 operation
="rollback",
610 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
612 raise K8sException(msg
)
615 self
.fs
.reverse_sync(from_path
=cluster_id
)
617 # return new revision number
618 instance
= await self
.get_instance_info(
619 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
622 revision
= int(instance
.get("revision"))
623 self
.log
.debug("New revision: {}".format(revision
))
628 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
630 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
631 (this call should happen after all _terminate-config-primitive_ of the VNF
634 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
635 :param kdu_instance: unique name for the KDU instance to be deleted
636 :param kwargs: Additional parameters (None yet)
637 :return: True if successful
640 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
642 "uninstall kdu_instance {} from cluster {}".format(kdu_instance
, cluster_id
)
646 self
.fs
.sync(from_path
=cluster_id
)
648 # look for instance to obtain namespace
649 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
650 if not instance_info
:
651 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
654 paths
, env
= self
._init
_paths
_env
(
655 cluster_name
=cluster_id
, create_if_not_exist
=True
658 command
= self
._get
_uninstall
_command
(kdu_instance
, instance_info
["namespace"])
659 output
, _rc
= await self
._local
_async
_exec
(
660 command
=command
, raise_exception_on_error
=True, env
=env
664 self
.fs
.reverse_sync(from_path
=cluster_id
)
666 return self
._output
_to
_table
(output
)
668 async def instances_list(self
, cluster_uuid
: str) -> list:
670 returns a list of deployed releases in a cluster
672 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
676 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
677 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
680 self
.fs
.sync(from_path
=cluster_id
)
682 # execute internal command
683 result
= await self
._instances
_list
(cluster_id
)
686 self
.fs
.reverse_sync(from_path
=cluster_id
)
690 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
691 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
692 for instance
in instances
:
693 if instance
.get("name") == kdu_instance
:
695 self
.log
.debug("Instance {} not found".format(kdu_instance
))
698 async def exec_primitive(
700 cluster_uuid
: str = None,
701 kdu_instance
: str = None,
702 primitive_name
: str = None,
703 timeout
: float = 300,
705 db_dict
: dict = None,
708 """Exec primitive (Juju action)
710 :param cluster_uuid: The UUID of the cluster or namespace:cluster
711 :param kdu_instance: The unique name of the KDU instance
712 :param primitive_name: Name of action that will be executed
713 :param timeout: Timeout for action execution
714 :param params: Dictionary of all the parameters needed for the action
715 :db_dict: Dictionary for any additional data
716 :param kwargs: Additional parameters (None yet)
718 :return: Returns the output of the action
721 "KDUs deployed with Helm don't support actions "
722 "different from rollback, upgrade and status"
725 async def get_services(
726 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
729 Returns a list of services defined for the specified kdu instance.
731 :param cluster_uuid: UUID of a K8s cluster known by OSM
732 :param kdu_instance: unique name for the KDU instance
733 :param namespace: K8s namespace used by the KDU instance
734 :return: If successful, it will return a list of services, Each service
735 can have the following data:
736 - `name` of the service
737 - `type` type of service in the k8 cluster
738 - `ports` List of ports offered by the service, for each port includes at least
740 - `cluster_ip` Internal ip to be used inside k8s cluster
741 - `external_ip` List of external ips (in case they are available)
744 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
746 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
747 cluster_uuid
, kdu_instance
752 self
.fs
.sync(from_path
=cluster_id
)
754 # get list of services names for kdu
755 service_names
= await self
._get
_services
(cluster_id
, kdu_instance
, namespace
)
758 for service
in service_names
:
759 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
760 service_list
.append(service
)
763 self
.fs
.reverse_sync(from_path
=cluster_id
)
767 async def get_service(
768 self
, cluster_uuid
: str, service_name
: str, namespace
: str
772 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
773 service_name
, namespace
, cluster_uuid
777 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
780 self
.fs
.sync(from_path
=cluster_id
)
782 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
785 self
.fs
.reverse_sync(from_path
=cluster_id
)
789 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
) -> str:
791 This call would retrieve tha current state of a given KDU instance. It would be
792 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
793 values_ of the configuration parameters applied to a given instance. This call
794 would be based on the `status` call.
796 :param cluster_uuid: UUID of a K8s cluster known by OSM
797 :param kdu_instance: unique name for the KDU instance
798 :param kwargs: Additional parameters (None yet)
799 :return: If successful, it will return the following vector of arguments:
800 - K8s `namespace` in the cluster where the KDU lives
801 - `state` of the KDU instance. It can be:
808 - List of `resources` (objects) that this release consists of, sorted by kind,
809 and the status of those resources
810 - Last `deployment_time`.
814 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
815 cluster_uuid
, kdu_instance
819 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
822 self
.fs
.sync(from_path
=cluster_id
)
824 # get instance: needed to obtain namespace
825 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
826 for instance
in instances
:
827 if instance
.get("name") == kdu_instance
:
830 # instance does not exist
832 "Instance name: {} not found in cluster: {}".format(
833 kdu_instance
, cluster_id
837 status
= await self
._status
_kdu
(
838 cluster_id
=cluster_id
,
839 kdu_instance
=kdu_instance
,
840 namespace
=instance
["namespace"],
846 self
.fs
.reverse_sync(from_path
=cluster_id
)
850 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
853 "inspect kdu_model values {} from (optional) repo: {}".format(
858 return await self
._exec
_inspect
_comand
(
859 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
862 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
865 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
868 return await self
._exec
_inspect
_comand
(
869 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
872 async def synchronize_repos(self
, cluster_uuid
: str):
874 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
876 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
877 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
879 local_repo_list
= await self
.repo_list(cluster_uuid
)
880 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
882 deleted_repo_list
= []
885 # iterate over the list of repos in the database that should be
886 # added if not present
887 for repo_name
, db_repo
in db_repo_dict
.items():
889 # check if it is already present
890 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
891 repo_id
= db_repo
.get("_id")
892 if curr_repo_url
!= db_repo
["url"]:
895 "repo {} url changed, delete and and again".format(
899 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
900 deleted_repo_list
.append(repo_id
)
903 self
.log
.debug("add repo {}".format(db_repo
["name"]))
905 cluster_uuid
, db_repo
["name"], db_repo
["url"]
907 added_repo_dict
[repo_id
] = db_repo
["name"]
908 except Exception as e
:
910 "Error adding repo id: {}, err_msg: {} ".format(
915 # Delete repos that are present but not in nbi_list
916 for repo_name
in local_repo_dict
:
917 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
918 self
.log
.debug("delete repo {}".format(repo_name
))
920 await self
.repo_remove(cluster_uuid
, repo_name
)
921 deleted_repo_list
.append(repo_name
)
922 except Exception as e
:
924 "Error deleting repo, name: {}, err_msg: {}".format(
929 return deleted_repo_list
, added_repo_dict
933 except Exception as e
:
934 # Do not raise errors synchronizing repos
935 self
.log
.error("Error synchronizing repos: {}".format(e
))
936 raise Exception("Error synchronizing repos: {}".format(e
))
938 def _get_db_repos_dict(self
, repo_ids
: list):
940 for repo_id
in repo_ids
:
941 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
942 db_repos_dict
[db_repo
["name"]] = db_repo
946 ####################################################################################
947 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
948 ####################################################################################
952 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
954 Creates and returns base cluster and kube dirs and returns them.
955 Also created helm3 dirs according to new directory specification, paths are
956 not returned but assigned to helm environment variables
958 :param cluster_name: cluster_name
959 :return: Dictionary with config_paths and dictionary with helm environment variables
963 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
965 Implements the helm version dependent cluster initialization
969 async def _instances_list(self
, cluster_id
):
971 Implements the helm version dependent helm instances list
975 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
977 Implements the helm version dependent method to obtain services from a helm instance
981 async def _status_kdu(
985 namespace
: str = None,
986 show_error_log
: bool = False,
987 return_text
: bool = False,
990 Implements the helm version dependent method to obtain status of a helm instance
994 def _get_install_command(
995 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
998 Obtain command to be executed to delete the indicated instance
1002 def _get_upgrade_command(
1003 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
1006 Obtain command to be executed to upgrade the indicated instance
1010 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
1012 Obtain command to be executed to rollback the indicated instance
1016 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
1018 Obtain command to be executed to delete the indicated instance
1022 def _get_inspect_command(
1023 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1026 Obtain command to be executed to obtain information about the kdu
1030 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1032 Method call to uninstall cluster software for helm. This method is dependent
1034 For Helm v2 it will be called when Tiller must be uninstalled
1035 For Helm v3 it does nothing and does not need to be callled
1039 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1041 Obtains the cluster repos identifiers
1045 ####################################################################################
1046 ################################### P R I V A T E ##################################
1047 ####################################################################################
1051 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1052 if os
.path
.exists(filename
):
1055 msg
= "File {} does not exist".format(filename
)
1056 if exception_if_not_exists
:
1057 raise K8sException(msg
)
1060 def _remove_multiple_spaces(strobj
):
1061 strobj
= strobj
.strip()
1062 while " " in strobj
:
1063 strobj
= strobj
.replace(" ", " ")
1067 def _output_to_lines(output
: str) -> list:
1068 output_lines
= list()
1069 lines
= output
.splitlines(keepends
=False)
1073 output_lines
.append(line
)
1077 def _output_to_table(output
: str) -> list:
1078 output_table
= list()
1079 lines
= output
.splitlines(keepends
=False)
1081 line
= line
.replace("\t", " ")
1083 output_table
.append(line_list
)
1084 cells
= line
.split(sep
=" ")
1088 line_list
.append(cell
)
1092 def _parse_services(output
: str) -> list:
1093 lines
= output
.splitlines(keepends
=False)
1096 line
= line
.replace("\t", " ")
1097 cells
= line
.split(sep
=" ")
1098 if len(cells
) > 0 and cells
[0].startswith("service/"):
1099 elems
= cells
[0].split(sep
="/")
1101 services
.append(elems
[1])
1105 def _get_deep(dictionary
: dict, members
: tuple):
1110 value
= target
.get(m
)
1119 # find key:value in several lines
1121 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1122 for line
in p_lines
:
1124 if line
.startswith(p_key
+ ":"):
1125 parts
= line
.split(":")
1126 the_value
= parts
[1].strip()
1134 def _lower_keys_list(input_list
: list):
1136 Transform the keys in a list of dictionaries to lower case and returns a new list
1140 for dictionary
in input_list
:
1141 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1142 new_list
.append(new_dict
)
1145 def _local_exec(self
, command
: str) -> (str, int):
1146 command
= self
._remove
_multiple
_spaces
(command
)
1147 self
.log
.debug("Executing sync local command: {}".format(command
))
1148 # raise exception if fails
1151 output
= subprocess
.check_output(
1152 command
, shell
=True, universal_newlines
=True
1155 self
.log
.debug(output
)
1159 return output
, return_code
1161 async def _local_async_exec(
1164 raise_exception_on_error
: bool = False,
1165 show_error_log
: bool = True,
1166 encode_utf8
: bool = False,
1170 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1172 "Executing async local command: {}, env: {}".format(command
, env
)
1176 command
= shlex
.split(command
)
1178 environ
= os
.environ
.copy()
1183 process
= await asyncio
.create_subprocess_exec(
1185 stdout
=asyncio
.subprocess
.PIPE
,
1186 stderr
=asyncio
.subprocess
.PIPE
,
1190 # wait for command terminate
1191 stdout
, stderr
= await process
.communicate()
1193 return_code
= process
.returncode
1197 output
= stdout
.decode("utf-8").strip()
1198 # output = stdout.decode()
1200 output
= stderr
.decode("utf-8").strip()
1201 # output = stderr.decode()
1203 if return_code
!= 0 and show_error_log
:
1205 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1208 self
.log
.debug("Return code: {}".format(return_code
))
1210 if raise_exception_on_error
and return_code
!= 0:
1211 raise K8sException(output
)
1214 output
= output
.encode("utf-8").strip()
1215 output
= str(output
).replace("\\n", "\n")
1217 return output
, return_code
1219 except asyncio
.CancelledError
:
1221 except K8sException
:
1223 except Exception as e
:
1224 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1226 if raise_exception_on_error
:
1227 raise K8sException(e
) from e
1231 async def _local_async_exec_pipe(
1235 raise_exception_on_error
: bool = True,
1236 show_error_log
: bool = True,
1237 encode_utf8
: bool = False,
1241 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1242 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1243 command
= "{} | {}".format(command1
, command2
)
1245 "Executing async local command: {}, env: {}".format(command
, env
)
1249 command1
= shlex
.split(command1
)
1250 command2
= shlex
.split(command2
)
1252 environ
= os
.environ
.copy()
1257 read
, write
= os
.pipe()
1258 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1260 process_2
= await asyncio
.create_subprocess_exec(
1261 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1264 stdout
, stderr
= await process_2
.communicate()
1266 return_code
= process_2
.returncode
1270 output
= stdout
.decode("utf-8").strip()
1271 # output = stdout.decode()
1273 output
= stderr
.decode("utf-8").strip()
1274 # output = stderr.decode()
1276 if return_code
!= 0 and show_error_log
:
1278 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1281 self
.log
.debug("Return code: {}".format(return_code
))
1283 if raise_exception_on_error
and return_code
!= 0:
1284 raise K8sException(output
)
1287 output
= output
.encode("utf-8").strip()
1288 output
= str(output
).replace("\\n", "\n")
1290 return output
, return_code
1291 except asyncio
.CancelledError
:
1293 except K8sException
:
1295 except Exception as e
:
1296 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1298 if raise_exception_on_error
:
1299 raise K8sException(e
) from e
1303 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1305 Obtains the data of the specified service in the k8cluster.
1307 :param cluster_id: id of a K8s cluster known by OSM
1308 :param service_name: name of the K8s service in the specified namespace
1309 :param namespace: K8s namespace used by the KDU instance
1310 :return: If successful, it will return a service with the following data:
1311 - `name` of the service
1312 - `type` type of service in the k8 cluster
1313 - `ports` List of ports offered by the service, for each port includes at least
1314 name, port, protocol
1315 - `cluster_ip` Internal ip to be used inside k8s cluster
1316 - `external_ip` List of external ips (in case they are available)
1320 paths
, env
= self
._init
_paths
_env
(
1321 cluster_name
=cluster_id
, create_if_not_exist
=True
1324 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1325 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1328 output
, _rc
= await self
._local
_async
_exec
(
1329 command
=command
, raise_exception_on_error
=True, env
=env
1332 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1335 "name": service_name
,
1336 "type": self
._get
_deep
(data
, ("spec", "type")),
1337 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1338 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1340 if service
["type"] == "LoadBalancer":
1341 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1342 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1343 service
["external_ip"] = ip_list
1347 async def _exec_inspect_comand(
1348 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1351 Obtains information about a kdu, no cluster (no env)
1356 repo_str
= " --repo {}".format(repo_url
)
1358 idx
= kdu_model
.find("/")
1361 kdu_model
= kdu_model
[idx
:]
1364 if ":" in kdu_model
:
1365 parts
= kdu_model
.split(sep
=":")
1367 version
= "--version {}".format(str(parts
[1]))
1368 kdu_model
= parts
[0]
1370 full_command
= self
._get
_inspect
_command
(
1371 inspect_command
, kdu_model
, repo_str
, version
1373 output
, _rc
= await self
._local
_async
_exec
(
1374 command
=full_command
, encode_utf8
=True
1379 async def _store_status(
1384 namespace
: str = None,
1385 check_every
: float = 10,
1386 db_dict
: dict = None,
1387 run_once
: bool = False,
1391 await asyncio
.sleep(check_every
)
1392 detailed_status
= await self
._status
_kdu
(
1393 cluster_id
=cluster_id
,
1394 kdu_instance
=kdu_instance
,
1395 namespace
=namespace
,
1398 status
= detailed_status
.get("info").get("description")
1399 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1400 # write status to db
1401 result
= await self
.write_app_status_to_db(
1404 detailed_status
=str(detailed_status
),
1405 operation
=operation
,
1408 self
.log
.info("Error writing in database. Task exiting...")
1410 except asyncio
.CancelledError
:
1411 self
.log
.debug("Task cancelled")
1413 except Exception as e
:
1415 "_store_status exception: {}".format(str(e
)), exc_info
=True
1422 # params for use in -f file
1423 # returns values file option and filename (in order to delete it at the end)
1424 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1426 if params
and len(params
) > 0:
1427 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1429 def get_random_number():
1430 r
= random
.randrange(start
=1, stop
=99999999)
1438 value
= params
.get(key
)
1439 if "!!yaml" in str(value
):
1440 value
= yaml
.load(value
[7:])
1441 params2
[key
] = value
1443 values_file
= get_random_number() + ".yaml"
1444 with
open(values_file
, "w") as stream
:
1445 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1447 return "-f {}".format(values_file
), values_file
1451 # params for use in --set option
1453 def _params_to_set_option(params
: dict) -> str:
1455 if params
and len(params
) > 0:
1458 value
= params
.get(key
, None)
1459 if value
is not None:
1461 params_str
+= "--set "
1465 params_str
+= "{}={}".format(key
, value
)
1469 def generate_kdu_instance_name(**kwargs
):
1470 chart_name
= kwargs
["kdu_model"]
1471 # check embeded chart (file or dir)
1472 if chart_name
.startswith("/"):
1473 # extract file or directory name
1474 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1476 elif "://" in chart_name
:
1477 # extract last portion of URL
1478 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1481 for c
in chart_name
:
1482 if c
.isalpha() or c
.isnumeric():
1489 # if does not start with alpha character, prefix 'a'
1490 if not name
[0].isalpha():
1495 def get_random_number():
1496 r
= random
.randrange(start
=1, stop
=99999999)
1498 s
= s
.rjust(10, "0")
1501 name
= name
+ get_random_number()