2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 def _get_namespace(self
, cluster_uuid
: str) -> str:
95 Obtains the namespace used by the cluster with the uuid passed by argument
97 param: cluster_uuid: cluster's uuid
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster
= self
.db
.get_one(
102 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
104 return k8scluster
.get("namespace")
109 namespace
: str = "kube-system",
110 reuse_cluster_uuid
=None,
114 It prepares a given K8s cluster environment to run Charts
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
127 if reuse_cluster_uuid
:
128 cluster_id
= reuse_cluster_uuid
130 cluster_id
= str(uuid4())
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
136 paths
, env
= self
._init
_paths
_env
(
137 cluster_name
=cluster_id
, create_if_not_exist
=True
139 mode
= stat
.S_IRUSR | stat
.S_IWUSR
140 with
open(paths
["kube_config"], "w", mode
) as f
:
142 os
.chmod(paths
["kube_config"], 0o600)
144 # Code with initialization specific of helm version
145 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
147 # sync fs with local data
148 self
.fs
.reverse_sync(from_path
=cluster_id
)
150 self
.log
.info("Cluster {} initialized".format(cluster_id
))
152 return cluster_id
, n2vc_installed_sw
155 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
158 "Cluster {}, adding {} repository {}. URL: {}".format(
159 cluster_uuid
, repo_type
, name
, url
164 paths
, env
= self
._init
_paths
_env
(
165 cluster_name
=cluster_uuid
, create_if_not_exist
=True
169 self
.fs
.sync(from_path
=cluster_uuid
)
171 # helm repo add name url
172 command
= "env KUBECONFIG={} {} repo add {} {}".format(
173 paths
["kube_config"], self
._helm
_command
, name
, url
175 self
.log
.debug("adding repo: {}".format(command
))
176 await self
._local
_async
_exec
(
177 command
=command
, raise_exception_on_error
=True, env
=env
181 command
= "env KUBECONFIG={} {} repo update {}".format(
182 paths
["kube_config"], self
._helm
_command
, name
184 self
.log
.debug("updating repo: {}".format(command
))
185 await self
._local
_async
_exec
(
186 command
=command
, raise_exception_on_error
=False, env
=env
190 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
192 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
194 "Cluster {}, updating {} repository {}".format(
195 cluster_uuid
, repo_type
, name
200 paths
, env
= self
._init
_paths
_env
(
201 cluster_name
=cluster_uuid
, create_if_not_exist
=True
205 self
.fs
.sync(from_path
=cluster_uuid
)
208 command
= "{} repo update {}".format(self
._helm
_command
, name
)
209 self
.log
.debug("updating repo: {}".format(command
))
210 await self
._local
_async
_exec
(
211 command
=command
, raise_exception_on_error
=False, env
=env
215 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
217 async def repo_list(self
, cluster_uuid
: str) -> list:
219 Get the list of registered repositories
221 :return: list of registered repositories: [ (name, url) .... ]
224 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
227 paths
, env
= self
._init
_paths
_env
(
228 cluster_name
=cluster_uuid
, create_if_not_exist
=True
232 self
.fs
.sync(from_path
=cluster_uuid
)
234 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
235 paths
["kube_config"], self
._helm
_command
238 # Set exception to false because if there are no repos just want an empty list
239 output
, _rc
= await self
._local
_async
_exec
(
240 command
=command
, raise_exception_on_error
=False, env
=env
244 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
247 if output
and len(output
) > 0:
248 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
249 # unify format between helm2 and helm3 setting all keys lowercase
250 return self
._lower
_keys
_list
(repos
)
256 async def repo_remove(self
, cluster_uuid
: str, name
: str):
258 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
262 paths
, env
= self
._init
_paths
_env
(
263 cluster_name
=cluster_uuid
, create_if_not_exist
=True
267 self
.fs
.sync(from_path
=cluster_uuid
)
269 command
= "env KUBECONFIG={} {} repo remove {}".format(
270 paths
["kube_config"], self
._helm
_command
, name
272 await self
._local
_async
_exec
(
273 command
=command
, raise_exception_on_error
=True, env
=env
277 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
283 uninstall_sw
: bool = False,
288 Resets the Kubernetes cluster by removing the helm deployment that represents it.
290 :param cluster_uuid: The UUID of the cluster to reset
291 :param force: Boolean to force the reset
292 :param uninstall_sw: Boolean to force the reset
293 :param kwargs: Additional parameters (None yet)
294 :return: Returns True if successful or raises an exception.
296 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
298 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
299 cluster_uuid
, uninstall_sw
304 self
.fs
.sync(from_path
=cluster_uuid
)
306 # uninstall releases if needed.
308 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
309 if len(releases
) > 0:
313 kdu_instance
= r
.get("name")
314 chart
= r
.get("chart")
316 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
318 await self
.uninstall(
319 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
321 except Exception as e
:
322 # will not raise exception as it was found
323 # that in some cases of previously installed helm releases it
326 "Error uninstalling release {}: {}".format(
332 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
333 ).format(cluster_uuid
)
336 False # Allow to remove k8s cluster without removing Tiller
340 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
342 # delete cluster directory
343 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
344 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
345 # Remove also local directorio if still exist
346 direct
= self
.fs
.path
+ "/" + cluster_uuid
347 shutil
.rmtree(direct
, ignore_errors
=True)
351 async def _install_impl(
359 timeout
: float = 300,
361 db_dict
: dict = None,
362 kdu_name
: str = None,
363 namespace
: str = None,
366 paths
, env
= self
._init
_paths
_env
(
367 cluster_name
=cluster_id
, create_if_not_exist
=True
371 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
372 cluster_id
=cluster_id
, params
=params
378 parts
= kdu_model
.split(sep
=":")
380 version
= str(parts
[1])
383 repo
= self
._split
_repo
(kdu_model
)
385 self
.repo_update(cluster_id
, repo
)
387 command
= self
._get
_install
_command
(
395 paths
["kube_config"],
398 self
.log
.debug("installing: {}".format(command
))
401 # exec helm in a task
402 exec_task
= asyncio
.ensure_future(
403 coro_or_future
=self
._local
_async
_exec
(
404 command
=command
, raise_exception_on_error
=False, env
=env
408 # write status in another task
409 status_task
= asyncio
.ensure_future(
410 coro_or_future
=self
._store
_status
(
411 cluster_id
=cluster_id
,
412 kdu_instance
=kdu_instance
,
419 # wait for execution task
420 await asyncio
.wait([exec_task
])
425 output
, rc
= exec_task
.result()
429 output
, rc
= await self
._local
_async
_exec
(
430 command
=command
, raise_exception_on_error
=False, env
=env
433 # remove temporal values yaml file
435 os
.remove(file_to_delete
)
438 await self
._store
_status
(
439 cluster_id
=cluster_id
,
440 kdu_instance
=kdu_instance
,
447 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
449 raise K8sException(msg
)
455 kdu_model
: str = None,
457 timeout
: float = 300,
459 db_dict
: dict = None,
461 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
464 self
.fs
.sync(from_path
=cluster_uuid
)
466 # look for instance to obtain namespace
467 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
468 if not instance_info
:
469 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
472 paths
, env
= self
._init
_paths
_env
(
473 cluster_name
=cluster_uuid
, create_if_not_exist
=True
477 self
.fs
.sync(from_path
=cluster_uuid
)
480 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
481 cluster_id
=cluster_uuid
, params
=params
487 parts
= kdu_model
.split(sep
=":")
489 version
= str(parts
[1])
492 repo
= self
._split
_repo
(kdu_model
)
494 self
.repo_update(cluster_uuid
, repo
)
496 command
= self
._get
_upgrade
_command
(
499 instance_info
["namespace"],
504 paths
["kube_config"],
507 self
.log
.debug("upgrading: {}".format(command
))
511 # exec helm in a task
512 exec_task
= asyncio
.ensure_future(
513 coro_or_future
=self
._local
_async
_exec
(
514 command
=command
, raise_exception_on_error
=False, env
=env
517 # write status in another task
518 status_task
= asyncio
.ensure_future(
519 coro_or_future
=self
._store
_status
(
520 cluster_id
=cluster_uuid
,
521 kdu_instance
=kdu_instance
,
522 namespace
=instance_info
["namespace"],
528 # wait for execution task
529 await asyncio
.wait([exec_task
])
533 output
, rc
= exec_task
.result()
537 output
, rc
= await self
._local
_async
_exec
(
538 command
=command
, raise_exception_on_error
=False, env
=env
541 # remove temporal values yaml file
543 os
.remove(file_to_delete
)
546 await self
._store
_status
(
547 cluster_id
=cluster_uuid
,
548 kdu_instance
=kdu_instance
,
549 namespace
=instance_info
["namespace"],
555 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
557 raise K8sException(msg
)
560 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
562 # return new revision number
563 instance
= await self
.get_instance_info(
564 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
567 revision
= int(instance
.get("revision"))
568 self
.log
.debug("New revision: {}".format(revision
))
578 total_timeout
: float = 1800,
581 raise NotImplementedError("Method not implemented")
583 async def get_scale_count(
589 raise NotImplementedError("Method not implemented")
592 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
595 "rollback kdu_instance {} to revision {} from cluster {}".format(
596 kdu_instance
, revision
, cluster_uuid
601 self
.fs
.sync(from_path
=cluster_uuid
)
603 # look for instance to obtain namespace
604 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
605 if not instance_info
:
606 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
609 paths
, env
= self
._init
_paths
_env
(
610 cluster_name
=cluster_uuid
, create_if_not_exist
=True
614 self
.fs
.sync(from_path
=cluster_uuid
)
616 command
= self
._get
_rollback
_command
(
617 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
620 self
.log
.debug("rolling_back: {}".format(command
))
622 # exec helm in a task
623 exec_task
= asyncio
.ensure_future(
624 coro_or_future
=self
._local
_async
_exec
(
625 command
=command
, raise_exception_on_error
=False, env
=env
628 # write status in another task
629 status_task
= asyncio
.ensure_future(
630 coro_or_future
=self
._store
_status
(
631 cluster_id
=cluster_uuid
,
632 kdu_instance
=kdu_instance
,
633 namespace
=instance_info
["namespace"],
635 operation
="rollback",
639 # wait for execution task
640 await asyncio
.wait([exec_task
])
645 output
, rc
= exec_task
.result()
648 await self
._store
_status
(
649 cluster_id
=cluster_uuid
,
650 kdu_instance
=kdu_instance
,
651 namespace
=instance_info
["namespace"],
653 operation
="rollback",
657 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
659 raise K8sException(msg
)
662 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
664 # return new revision number
665 instance
= await self
.get_instance_info(
666 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
669 revision
= int(instance
.get("revision"))
670 self
.log
.debug("New revision: {}".format(revision
))
675 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
677 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
678 (this call should happen after all _terminate-config-primitive_ of the VNF
681 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
682 :param kdu_instance: unique name for the KDU instance to be deleted
683 :param kwargs: Additional parameters (None yet)
684 :return: True if successful
688 "uninstall kdu_instance {} from cluster {}".format(
689 kdu_instance
, cluster_uuid
694 self
.fs
.sync(from_path
=cluster_uuid
)
696 # look for instance to obtain namespace
697 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
698 if not instance_info
:
699 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
702 paths
, env
= self
._init
_paths
_env
(
703 cluster_name
=cluster_uuid
, create_if_not_exist
=True
707 self
.fs
.sync(from_path
=cluster_uuid
)
709 command
= self
._get
_uninstall
_command
(
710 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
712 output
, _rc
= await self
._local
_async
_exec
(
713 command
=command
, raise_exception_on_error
=True, env
=env
717 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
719 return self
._output
_to
_table
(output
)
721 async def instances_list(self
, cluster_uuid
: str) -> list:
723 returns a list of deployed releases in a cluster
725 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
729 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
732 self
.fs
.sync(from_path
=cluster_uuid
)
734 # execute internal command
735 result
= await self
._instances
_list
(cluster_uuid
)
738 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
742 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
743 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
744 for instance
in instances
:
745 if instance
.get("name") == kdu_instance
:
747 self
.log
.debug("Instance {} not found".format(kdu_instance
))
750 async def exec_primitive(
752 cluster_uuid
: str = None,
753 kdu_instance
: str = None,
754 primitive_name
: str = None,
755 timeout
: float = 300,
757 db_dict
: dict = None,
760 """Exec primitive (Juju action)
762 :param cluster_uuid: The UUID of the cluster or namespace:cluster
763 :param kdu_instance: The unique name of the KDU instance
764 :param primitive_name: Name of action that will be executed
765 :param timeout: Timeout for action execution
766 :param params: Dictionary of all the parameters needed for the action
767 :db_dict: Dictionary for any additional data
768 :param kwargs: Additional parameters (None yet)
770 :return: Returns the output of the action
773 "KDUs deployed with Helm don't support actions "
774 "different from rollback, upgrade and status"
777 async def get_services(
778 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
781 Returns a list of services defined for the specified kdu instance.
783 :param cluster_uuid: UUID of a K8s cluster known by OSM
784 :param kdu_instance: unique name for the KDU instance
785 :param namespace: K8s namespace used by the KDU instance
786 :return: If successful, it will return a list of services, Each service
787 can have the following data:
788 - `name` of the service
789 - `type` type of service in the k8 cluster
790 - `ports` List of ports offered by the service, for each port includes at least
792 - `cluster_ip` Internal ip to be used inside k8s cluster
793 - `external_ip` List of external ips (in case they are available)
797 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
798 cluster_uuid
, kdu_instance
803 paths
, env
= self
._init
_paths
_env
(
804 cluster_name
=cluster_uuid
, create_if_not_exist
=True
808 self
.fs
.sync(from_path
=cluster_uuid
)
810 # get list of services names for kdu
811 service_names
= await self
._get
_services
(
812 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
816 for service
in service_names
:
817 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
818 service_list
.append(service
)
821 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
825 async def get_service(
826 self
, cluster_uuid
: str, service_name
: str, namespace
: str
830 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
831 service_name
, namespace
, cluster_uuid
836 self
.fs
.sync(from_path
=cluster_uuid
)
838 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
841 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
845 async def status_kdu(
846 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
847 ) -> Union
[str, dict]:
849 This call would retrieve tha current state of a given KDU instance. It would be
850 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
851 values_ of the configuration parameters applied to a given instance. This call
852 would be based on the `status` call.
854 :param cluster_uuid: UUID of a K8s cluster known by OSM
855 :param kdu_instance: unique name for the KDU instance
856 :param kwargs: Additional parameters (None yet)
857 :param yaml_format: if the return shall be returned as an YAML string or as a
859 :return: If successful, it will return the following vector of arguments:
860 - K8s `namespace` in the cluster where the KDU lives
861 - `state` of the KDU instance. It can be:
868 - List of `resources` (objects) that this release consists of, sorted by kind,
869 and the status of those resources
870 - Last `deployment_time`.
874 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
875 cluster_uuid
, kdu_instance
880 self
.fs
.sync(from_path
=cluster_uuid
)
882 # get instance: needed to obtain namespace
883 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
884 for instance
in instances
:
885 if instance
.get("name") == kdu_instance
:
888 # instance does not exist
890 "Instance name: {} not found in cluster: {}".format(
891 kdu_instance
, cluster_uuid
895 status
= await self
._status
_kdu
(
896 cluster_id
=cluster_uuid
,
897 kdu_instance
=kdu_instance
,
898 namespace
=instance
["namespace"],
899 yaml_format
=yaml_format
,
904 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
908 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
911 "inspect kdu_model values {} from (optional) repo: {}".format(
916 return await self
._exec
_inspect
_comand
(
917 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
920 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
923 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
926 return await self
._exec
_inspect
_comand
(
927 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
930 async def synchronize_repos(self
, cluster_uuid
: str):
932 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
934 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
935 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
937 local_repo_list
= await self
.repo_list(cluster_uuid
)
938 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
940 deleted_repo_list
= []
943 # iterate over the list of repos in the database that should be
944 # added if not present
945 for repo_name
, db_repo
in db_repo_dict
.items():
947 # check if it is already present
948 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
949 repo_id
= db_repo
.get("_id")
950 if curr_repo_url
!= db_repo
["url"]:
953 "repo {} url changed, delete and and again".format(
957 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
958 deleted_repo_list
.append(repo_id
)
961 self
.log
.debug("add repo {}".format(db_repo
["name"]))
963 cluster_uuid
, db_repo
["name"], db_repo
["url"]
965 added_repo_dict
[repo_id
] = db_repo
["name"]
966 except Exception as e
:
968 "Error adding repo id: {}, err_msg: {} ".format(
973 # Delete repos that are present but not in nbi_list
974 for repo_name
in local_repo_dict
:
975 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
976 self
.log
.debug("delete repo {}".format(repo_name
))
978 await self
.repo_remove(cluster_uuid
, repo_name
)
979 deleted_repo_list
.append(repo_name
)
980 except Exception as e
:
982 "Error deleting repo, name: {}, err_msg: {}".format(
987 return deleted_repo_list
, added_repo_dict
991 except Exception as e
:
992 # Do not raise errors synchronizing repos
993 self
.log
.error("Error synchronizing repos: {}".format(e
))
994 raise Exception("Error synchronizing repos: {}".format(e
))
996 def _get_db_repos_dict(self
, repo_ids
: list):
998 for repo_id
in repo_ids
:
999 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1000 db_repos_dict
[db_repo
["name"]] = db_repo
1001 return db_repos_dict
1004 ####################################################################################
1005 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1006 ####################################################################################
1010 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1012 Creates and returns base cluster and kube dirs and returns them.
1013 Also created helm3 dirs according to new directory specification, paths are
1014 not returned but assigned to helm environment variables
1016 :param cluster_name: cluster_name
1017 :return: Dictionary with config_paths and dictionary with helm environment variables
1021 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1023 Implements the helm version dependent cluster initialization
1027 async def _instances_list(self
, cluster_id
):
1029 Implements the helm version dependent helm instances list
1033 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1035 Implements the helm version dependent method to obtain services from a helm instance
1039 async def _status_kdu(
1043 namespace
: str = None,
1044 yaml_format
: bool = False,
1045 show_error_log
: bool = False,
1046 ) -> Union
[str, dict]:
1048 Implements the helm version dependent method to obtain status of a helm instance
1052 def _get_install_command(
1064 Obtain command to be executed to delete the indicated instance
1068 def _get_upgrade_command(
1080 Obtain command to be executed to upgrade the indicated instance
1084 def _get_rollback_command(
1085 self
, kdu_instance
, namespace
, revision
, kubeconfig
1088 Obtain command to be executed to rollback the indicated instance
1092 def _get_uninstall_command(
1093 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1096 Obtain command to be executed to delete the indicated instance
1100 def _get_inspect_command(
1101 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1104 Obtain command to be executed to obtain information about the kdu
1108 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1110 Method call to uninstall cluster software for helm. This method is dependent
1112 For Helm v2 it will be called when Tiller must be uninstalled
1113 For Helm v3 it does nothing and does not need to be callled
1117 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1119 Obtains the cluster repos identifiers
1123 ####################################################################################
1124 ################################### P R I V A T E ##################################
1125 ####################################################################################
1129 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1130 if os
.path
.exists(filename
):
1133 msg
= "File {} does not exist".format(filename
)
1134 if exception_if_not_exists
:
1135 raise K8sException(msg
)
1138 def _remove_multiple_spaces(strobj
):
1139 strobj
= strobj
.strip()
1140 while " " in strobj
:
1141 strobj
= strobj
.replace(" ", " ")
1145 def _output_to_lines(output
: str) -> list:
1146 output_lines
= list()
1147 lines
= output
.splitlines(keepends
=False)
1151 output_lines
.append(line
)
1155 def _output_to_table(output
: str) -> list:
1156 output_table
= list()
1157 lines
= output
.splitlines(keepends
=False)
1159 line
= line
.replace("\t", " ")
1161 output_table
.append(line_list
)
1162 cells
= line
.split(sep
=" ")
1166 line_list
.append(cell
)
1170 def _parse_services(output
: str) -> list:
1171 lines
= output
.splitlines(keepends
=False)
1174 line
= line
.replace("\t", " ")
1175 cells
= line
.split(sep
=" ")
1176 if len(cells
) > 0 and cells
[0].startswith("service/"):
1177 elems
= cells
[0].split(sep
="/")
1179 services
.append(elems
[1])
1183 def _get_deep(dictionary
: dict, members
: tuple):
1188 value
= target
.get(m
)
1197 # find key:value in several lines
1199 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1200 for line
in p_lines
:
1202 if line
.startswith(p_key
+ ":"):
1203 parts
= line
.split(":")
1204 the_value
= parts
[1].strip()
1212 def _lower_keys_list(input_list
: list):
1214 Transform the keys in a list of dictionaries to lower case and returns a new list
1219 for dictionary
in input_list
:
1220 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1221 new_list
.append(new_dict
)
1224 async def _local_async_exec(
1227 raise_exception_on_error
: bool = False,
1228 show_error_log
: bool = True,
1229 encode_utf8
: bool = False,
1233 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1235 "Executing async local command: {}, env: {}".format(command
, env
)
1239 command
= shlex
.split(command
)
1241 environ
= os
.environ
.copy()
1246 process
= await asyncio
.create_subprocess_exec(
1248 stdout
=asyncio
.subprocess
.PIPE
,
1249 stderr
=asyncio
.subprocess
.PIPE
,
1253 # wait for command terminate
1254 stdout
, stderr
= await process
.communicate()
1256 return_code
= process
.returncode
1260 output
= stdout
.decode("utf-8").strip()
1261 # output = stdout.decode()
1263 output
= stderr
.decode("utf-8").strip()
1264 # output = stderr.decode()
1266 if return_code
!= 0 and show_error_log
:
1268 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1271 self
.log
.debug("Return code: {}".format(return_code
))
1273 if raise_exception_on_error
and return_code
!= 0:
1274 raise K8sException(output
)
1277 output
= output
.encode("utf-8").strip()
1278 output
= str(output
).replace("\\n", "\n")
1280 return output
, return_code
1282 except asyncio
.CancelledError
:
1284 except K8sException
:
1286 except Exception as e
:
1287 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1289 if raise_exception_on_error
:
1290 raise K8sException(e
) from e
1294 async def _local_async_exec_pipe(
1298 raise_exception_on_error
: bool = True,
1299 show_error_log
: bool = True,
1300 encode_utf8
: bool = False,
1304 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1305 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1306 command
= "{} | {}".format(command1
, command2
)
1308 "Executing async local command: {}, env: {}".format(command
, env
)
1312 command1
= shlex
.split(command1
)
1313 command2
= shlex
.split(command2
)
1315 environ
= os
.environ
.copy()
1320 read
, write
= os
.pipe()
1321 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1323 process_2
= await asyncio
.create_subprocess_exec(
1324 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1327 stdout
, stderr
= await process_2
.communicate()
1329 return_code
= process_2
.returncode
1333 output
= stdout
.decode("utf-8").strip()
1334 # output = stdout.decode()
1336 output
= stderr
.decode("utf-8").strip()
1337 # output = stderr.decode()
1339 if return_code
!= 0 and show_error_log
:
1341 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1344 self
.log
.debug("Return code: {}".format(return_code
))
1346 if raise_exception_on_error
and return_code
!= 0:
1347 raise K8sException(output
)
1350 output
= output
.encode("utf-8").strip()
1351 output
= str(output
).replace("\\n", "\n")
1353 return output
, return_code
1354 except asyncio
.CancelledError
:
1356 except K8sException
:
1358 except Exception as e
:
1359 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1361 if raise_exception_on_error
:
1362 raise K8sException(e
) from e
1366 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1368 Obtains the data of the specified service in the k8cluster.
1370 :param cluster_id: id of a K8s cluster known by OSM
1371 :param service_name: name of the K8s service in the specified namespace
1372 :param namespace: K8s namespace used by the KDU instance
1373 :return: If successful, it will return a service with the following data:
1374 - `name` of the service
1375 - `type` type of service in the k8 cluster
1376 - `ports` List of ports offered by the service, for each port includes at least
1377 name, port, protocol
1378 - `cluster_ip` Internal ip to be used inside k8s cluster
1379 - `external_ip` List of external ips (in case they are available)
1383 paths
, env
= self
._init
_paths
_env
(
1384 cluster_name
=cluster_id
, create_if_not_exist
=True
1387 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1388 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1391 output
, _rc
= await self
._local
_async
_exec
(
1392 command
=command
, raise_exception_on_error
=True, env
=env
1395 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1398 "name": service_name
,
1399 "type": self
._get
_deep
(data
, ("spec", "type")),
1400 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1401 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1403 if service
["type"] == "LoadBalancer":
1404 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1405 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1406 service
["external_ip"] = ip_list
1410 async def _exec_inspect_comand(
1411 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1414 Obtains information about a kdu, no cluster (no env)
1419 repo_str
= " --repo {}".format(repo_url
)
1421 idx
= kdu_model
.find("/")
1424 kdu_model
= kdu_model
[idx
:]
1427 if ":" in kdu_model
:
1428 parts
= kdu_model
.split(sep
=":")
1430 version
= "--version {}".format(str(parts
[1]))
1431 kdu_model
= parts
[0]
1433 full_command
= self
._get
_inspect
_command
(
1434 inspect_command
, kdu_model
, repo_str
, version
1436 output
, _rc
= await self
._local
_async
_exec
(
1437 command
=full_command
, encode_utf8
=True
1442 async def _store_status(
1447 namespace
: str = None,
1448 db_dict
: dict = None,
1451 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1453 :param cluster_id (str): the cluster where the KDU instance is deployed
1454 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1455 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1456 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1457 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1458 values for the keys:
1459 - "collection": The Mongo DB collection to write to
1460 - "filter": The query filter to use in the update process
1461 - "path": The dot separated keys which targets the object to be updated
1466 detailed_status
= await self
._status
_kdu
(
1467 cluster_id
=cluster_id
,
1468 kdu_instance
=kdu_instance
,
1470 namespace
=namespace
,
1473 status
= detailed_status
.get("info").get("description")
1474 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1476 # write status to db
1477 result
= await self
.write_app_status_to_db(
1480 detailed_status
=str(detailed_status
),
1481 operation
=operation
,
1485 self
.log
.info("Error writing in database. Task exiting...")
1487 except asyncio
.CancelledError
as e
:
1489 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1491 except Exception as e
:
1492 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1494 # params for use in -f file
1495 # returns values file option and filename (in order to delete it at the end)
1496 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1498 if params
and len(params
) > 0:
1499 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1501 def get_random_number():
1502 r
= random
.randrange(start
=1, stop
=99999999)
1510 value
= params
.get(key
)
1511 if "!!yaml" in str(value
):
1512 value
= yaml
.load(value
[7:])
1513 params2
[key
] = value
1515 values_file
= get_random_number() + ".yaml"
1516 with
open(values_file
, "w") as stream
:
1517 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1519 return "-f {}".format(values_file
), values_file
1523 # params for use in --set option
1525 def _params_to_set_option(params
: dict) -> str:
1527 if params
and len(params
) > 0:
1530 value
= params
.get(key
, None)
1531 if value
is not None:
1533 params_str
+= "--set "
1537 params_str
+= "{}={}".format(key
, value
)
1541 def generate_kdu_instance_name(**kwargs
):
1542 chart_name
= kwargs
["kdu_model"]
1543 # check embeded chart (file or dir)
1544 if chart_name
.startswith("/"):
1545 # extract file or directory name
1546 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1548 elif "://" in chart_name
:
1549 # extract last portion of URL
1550 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1553 for c
in chart_name
:
1554 if c
.isalpha() or c
.isnumeric():
1561 # if does not start with alpha character, prefix 'a'
1562 if not name
[0].isalpha():
1567 def get_random_number():
1568 r
= random
.randrange(start
=1, stop
=99999999)
1570 s
= s
.rjust(10, "0")
1573 name
= name
+ get_random_number()
1576 def _split_version(self
, kdu_model
: str) -> (str, str):
1578 if ":" in kdu_model
:
1579 parts
= kdu_model
.split(sep
=":")
1581 version
= str(parts
[1])
1582 kdu_model
= parts
[0]
1583 return kdu_model
, version
1585 async def _split_repo(self
, kdu_model
: str) -> str:
1587 idx
= kdu_model
.find("/")
1589 repo_name
= kdu_model
[:idx
]
1592 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
1594 idx
= kdu_model
.find("/")
1596 repo_name
= kdu_model
[:idx
]
1597 # Find repository link
1598 local_repo_list
= await self
.repo_list(cluster_uuid
)
1599 for repo
in local_repo_list
:
1600 repo_url
= repo
["url"] if repo
["name"] == repo_name
else None