2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
94 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
99 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(":")
100 return namespace
, cluster_id
105 namespace
: str = "kube-system",
106 reuse_cluster_uuid
=None,
110 It prepares a given K8s cluster environment to run Charts
112 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
114 :param namespace: optional namespace to be used for helm. By default,
115 'kube-system' will be used
116 :param reuse_cluster_uuid: existing cluster uuid for reuse
117 :param kwargs: Additional parameters (None yet)
118 :return: uuid of the K8s cluster and True if connector has installed some
119 software in the cluster
120 (on error, an exception will be raised)
123 if reuse_cluster_uuid
:
124 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
125 namespace
= namespace_
or namespace
127 cluster_id
= str(uuid4())
128 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
131 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
134 paths
, env
= self
._init
_paths
_env
(
135 cluster_name
=cluster_id
, create_if_not_exist
=True
137 mode
= stat
.S_IRUSR | stat
.S_IWUSR
138 with
open(paths
["kube_config"], "w", mode
) as f
:
140 os
.chmod(paths
["kube_config"], 0o600)
142 # Code with initialization specific of helm version
143 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
145 # sync fs with local data
146 self
.fs
.reverse_sync(from_path
=cluster_id
)
148 self
.log
.info("Cluster {} initialized".format(cluster_id
))
150 return cluster_uuid
, n2vc_installed_sw
153 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
155 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_id
, repo_type
, name
, url
163 paths
, env
= self
._init
_paths
_env
(
164 cluster_name
=cluster_id
, create_if_not_exist
=True
168 self
.fs
.sync(from_path
=cluster_id
)
170 # helm repo add name url
171 command
= "env KUBECONFIG={} {} repo add {} {}".format(
172 paths
["kube_config"], self
._helm
_command
, name
, url
174 self
.log
.debug("adding repo: {}".format(command
))
175 await self
._local
_async
_exec
(
176 command
=command
, raise_exception_on_error
=True, env
=env
180 command
= "env KUBECONFIG={} {} repo update {}".format(
181 paths
["kube_config"], self
._helm
_command
, name
183 self
.log
.debug("updating repo: {}".format(command
))
184 await self
._local
_async
_exec
(
185 command
=command
, raise_exception_on_error
=False, env
=env
189 self
.fs
.reverse_sync(from_path
=cluster_id
)
191 async def repo_update(self
, cluster_id
: str, name
: str, repo_type
: str = "chart"):
193 "Cluster {}, updating {} repository {}".format(cluster_id
, repo_type
, name
)
197 paths
, env
= self
._init
_paths
_env
(
198 cluster_name
=cluster_id
, create_if_not_exist
=True
202 self
.fs
.sync(from_path
=cluster_id
)
205 command
= "{} repo update {}".format(self
._helm
_command
, name
)
206 self
.log
.debug("updating repo: {}".format(command
))
207 await self
._local
_async
_exec
(
208 command
=command
, raise_exception_on_error
=False, env
=env
212 self
.fs
.reverse_sync(from_path
=cluster_id
)
214 async def repo_list(self
, cluster_uuid
: str) -> list:
216 Get the list of registered repositories
218 :return: list of registered repositories: [ (name, url) .... ]
221 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
222 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
225 paths
, env
= self
._init
_paths
_env
(
226 cluster_name
=cluster_id
, create_if_not_exist
=True
230 self
.fs
.sync(from_path
=cluster_id
)
232 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
233 paths
["kube_config"], self
._helm
_command
236 # Set exception to false because if there are no repos just want an empty list
237 output
, _rc
= await self
._local
_async
_exec
(
238 command
=command
, raise_exception_on_error
=False, env
=env
242 self
.fs
.reverse_sync(from_path
=cluster_id
)
245 if output
and len(output
) > 0:
246 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
247 # unify format between helm2 and helm3 setting all keys lowercase
248 return self
._lower
_keys
_list
(repos
)
254 async def repo_remove(self
, cluster_uuid
: str, name
: str):
256 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
257 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
260 paths
, env
= self
._init
_paths
_env
(
261 cluster_name
=cluster_id
, create_if_not_exist
=True
265 self
.fs
.sync(from_path
=cluster_id
)
267 command
= "env KUBECONFIG={} {} repo remove {}".format(
268 paths
["kube_config"], self
._helm
_command
, name
270 await self
._local
_async
_exec
(
271 command
=command
, raise_exception_on_error
=True, env
=env
275 self
.fs
.reverse_sync(from_path
=cluster_id
)
281 uninstall_sw
: bool = False,
286 Resets the Kubernetes cluster by removing the helm deployment that represents it.
288 :param cluster_uuid: The UUID of the cluster to reset
289 :param force: Boolean to force the reset
290 :param uninstall_sw: Boolean to force the reset
291 :param kwargs: Additional parameters (None yet)
292 :return: Returns True if successful or raises an exception.
294 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
296 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
297 cluster_id
, uninstall_sw
302 self
.fs
.sync(from_path
=cluster_id
)
304 # uninstall releases if needed.
306 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
307 if len(releases
) > 0:
311 kdu_instance
= r
.get("name")
312 chart
= r
.get("chart")
314 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
316 await self
.uninstall(
317 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
319 except Exception as e
:
320 # will not raise exception as it was found
321 # that in some cases of previously installed helm releases it
324 "Error uninstalling release {}: {}".format(
330 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
334 False # Allow to remove k8s cluster without removing Tiller
338 await self
._uninstall
_sw
(cluster_id
, namespace
)
340 # delete cluster directory
341 self
.log
.debug("Removing directory {}".format(cluster_id
))
342 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
343 # Remove also local directorio if still exist
344 direct
= self
.fs
.path
+ "/" + cluster_id
345 shutil
.rmtree(direct
, ignore_errors
=True)
349 async def _install_impl(
357 timeout
: float = 300,
359 db_dict
: dict = None,
360 kdu_name
: str = None,
361 namespace
: str = None,
364 paths
, env
= self
._init
_paths
_env
(
365 cluster_name
=cluster_id
, create_if_not_exist
=True
369 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
370 cluster_id
=cluster_id
, params
=params
376 parts
= kdu_model
.split(sep
=":")
378 version
= str(parts
[1])
381 repo
= self
._split
_repo
(kdu_model
)
383 await self
.repo_update(cluster_id
, repo
)
385 command
= self
._get
_install
_command
(
393 paths
["kube_config"],
396 self
.log
.debug("installing: {}".format(command
))
399 # exec helm in a task
400 exec_task
= asyncio
.ensure_future(
401 coro_or_future
=self
._local
_async
_exec
(
402 command
=command
, raise_exception_on_error
=False, env
=env
406 # write status in another task
407 status_task
= asyncio
.ensure_future(
408 coro_or_future
=self
._store
_status
(
409 cluster_id
=cluster_id
,
410 kdu_instance
=kdu_instance
,
417 # wait for execution task
418 await asyncio
.wait([exec_task
])
423 output
, rc
= exec_task
.result()
427 output
, rc
= await self
._local
_async
_exec
(
428 command
=command
, raise_exception_on_error
=False, env
=env
431 # remove temporal values yaml file
433 os
.remove(file_to_delete
)
436 await self
._store
_status
(
437 cluster_id
=cluster_id
,
438 kdu_instance
=kdu_instance
,
445 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
447 raise K8sException(msg
)
453 kdu_model
: str = None,
455 timeout
: float = 300,
457 db_dict
: dict = None,
459 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
460 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
463 self
.fs
.sync(from_path
=cluster_id
)
465 # look for instance to obtain namespace
466 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
467 if not instance_info
:
468 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
471 paths
, env
= self
._init
_paths
_env
(
472 cluster_name
=cluster_id
, create_if_not_exist
=True
476 self
.fs
.sync(from_path
=cluster_id
)
479 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
480 cluster_id
=cluster_id
, params
=params
486 parts
= kdu_model
.split(sep
=":")
488 version
= str(parts
[1])
491 repo
= self
._split
_repo
(kdu_model
)
493 await self
.repo_update(cluster_id
, repo
)
495 command
= self
._get
_upgrade
_command
(
498 instance_info
["namespace"],
503 paths
["kube_config"],
506 self
.log
.debug("upgrading: {}".format(command
))
510 # exec helm in a task
511 exec_task
= asyncio
.ensure_future(
512 coro_or_future
=self
._local
_async
_exec
(
513 command
=command
, raise_exception_on_error
=False, env
=env
516 # write status in another task
517 status_task
= asyncio
.ensure_future(
518 coro_or_future
=self
._store
_status
(
519 cluster_id
=cluster_id
,
520 kdu_instance
=kdu_instance
,
521 namespace
=instance_info
["namespace"],
527 # wait for execution task
528 await asyncio
.wait([exec_task
])
532 output
, rc
= exec_task
.result()
536 output
, rc
= await self
._local
_async
_exec
(
537 command
=command
, raise_exception_on_error
=False, env
=env
540 # remove temporal values yaml file
542 os
.remove(file_to_delete
)
545 await self
._store
_status
(
546 cluster_id
=cluster_id
,
547 kdu_instance
=kdu_instance
,
548 namespace
=instance_info
["namespace"],
554 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
556 raise K8sException(msg
)
559 self
.fs
.reverse_sync(from_path
=cluster_id
)
561 # return new revision number
562 instance
= await self
.get_instance_info(
563 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
566 revision
= int(instance
.get("revision"))
567 self
.log
.debug("New revision: {}".format(revision
))
577 total_timeout
: float = 1800,
580 raise NotImplementedError("Method not implemented")
582 async def get_scale_count(
588 raise NotImplementedError("Method not implemented")
591 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
594 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
596 "rollback kdu_instance {} to revision {} from cluster {}".format(
597 kdu_instance
, revision
, cluster_id
602 self
.fs
.sync(from_path
=cluster_id
)
604 # look for instance to obtain namespace
605 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
606 if not instance_info
:
607 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
610 paths
, env
= self
._init
_paths
_env
(
611 cluster_name
=cluster_id
, create_if_not_exist
=True
615 self
.fs
.sync(from_path
=cluster_id
)
617 command
= self
._get
_rollback
_command
(
618 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
621 self
.log
.debug("rolling_back: {}".format(command
))
623 # exec helm in a task
624 exec_task
= asyncio
.ensure_future(
625 coro_or_future
=self
._local
_async
_exec
(
626 command
=command
, raise_exception_on_error
=False, env
=env
629 # write status in another task
630 status_task
= asyncio
.ensure_future(
631 coro_or_future
=self
._store
_status
(
632 cluster_id
=cluster_id
,
633 kdu_instance
=kdu_instance
,
634 namespace
=instance_info
["namespace"],
636 operation
="rollback",
640 # wait for execution task
641 await asyncio
.wait([exec_task
])
646 output
, rc
= exec_task
.result()
649 await self
._store
_status
(
650 cluster_id
=cluster_id
,
651 kdu_instance
=kdu_instance
,
652 namespace
=instance_info
["namespace"],
654 operation
="rollback",
658 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
660 raise K8sException(msg
)
663 self
.fs
.reverse_sync(from_path
=cluster_id
)
665 # return new revision number
666 instance
= await self
.get_instance_info(
667 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
670 revision
= int(instance
.get("revision"))
671 self
.log
.debug("New revision: {}".format(revision
))
676 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
678 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
679 (this call should happen after all _terminate-config-primitive_ of the VNF
682 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
683 :param kdu_instance: unique name for the KDU instance to be deleted
684 :param kwargs: Additional parameters (None yet)
685 :return: True if successful
688 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
690 "uninstall kdu_instance {} from cluster {}".format(kdu_instance
, cluster_id
)
694 self
.fs
.sync(from_path
=cluster_id
)
696 # look for instance to obtain namespace
697 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
698 if not instance_info
:
699 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
702 paths
, env
= self
._init
_paths
_env
(
703 cluster_name
=cluster_id
, create_if_not_exist
=True
707 self
.fs
.sync(from_path
=cluster_id
)
709 command
= self
._get
_uninstall
_command
(
710 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
712 output
, _rc
= await self
._local
_async
_exec
(
713 command
=command
, raise_exception_on_error
=True, env
=env
717 self
.fs
.reverse_sync(from_path
=cluster_id
)
719 return self
._output
_to
_table
(output
)
721 async def instances_list(self
, cluster_uuid
: str) -> list:
723 returns a list of deployed releases in a cluster
725 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
729 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
730 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
733 self
.fs
.sync(from_path
=cluster_id
)
735 # execute internal command
736 result
= await self
._instances
_list
(cluster_id
)
739 self
.fs
.reverse_sync(from_path
=cluster_id
)
743 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
744 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
745 for instance
in instances
:
746 if instance
.get("name") == kdu_instance
:
748 self
.log
.debug("Instance {} not found".format(kdu_instance
))
751 async def exec_primitive(
753 cluster_uuid
: str = None,
754 kdu_instance
: str = None,
755 primitive_name
: str = None,
756 timeout
: float = 300,
758 db_dict
: dict = None,
761 """Exec primitive (Juju action)
763 :param cluster_uuid: The UUID of the cluster or namespace:cluster
764 :param kdu_instance: The unique name of the KDU instance
765 :param primitive_name: Name of action that will be executed
766 :param timeout: Timeout for action execution
767 :param params: Dictionary of all the parameters needed for the action
768 :db_dict: Dictionary for any additional data
769 :param kwargs: Additional parameters (None yet)
771 :return: Returns the output of the action
774 "KDUs deployed with Helm don't support actions "
775 "different from rollback, upgrade and status"
778 async def get_services(
779 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
782 Returns a list of services defined for the specified kdu instance.
784 :param cluster_uuid: UUID of a K8s cluster known by OSM
785 :param kdu_instance: unique name for the KDU instance
786 :param namespace: K8s namespace used by the KDU instance
787 :return: If successful, it will return a list of services, Each service
788 can have the following data:
789 - `name` of the service
790 - `type` type of service in the k8 cluster
791 - `ports` List of ports offered by the service, for each port includes at least
793 - `cluster_ip` Internal ip to be used inside k8s cluster
794 - `external_ip` List of external ips (in case they are available)
797 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
799 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
800 cluster_uuid
, kdu_instance
805 paths
, env
= self
._init
_paths
_env
(
806 cluster_name
=cluster_id
, create_if_not_exist
=True
810 self
.fs
.sync(from_path
=cluster_id
)
812 # get list of services names for kdu
813 service_names
= await self
._get
_services
(
814 cluster_id
, kdu_instance
, namespace
, paths
["kube_config"]
818 for service
in service_names
:
819 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
820 service_list
.append(service
)
823 self
.fs
.reverse_sync(from_path
=cluster_id
)
827 async def get_service(
828 self
, cluster_uuid
: str, service_name
: str, namespace
: str
832 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
833 service_name
, namespace
, cluster_uuid
837 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
840 self
.fs
.sync(from_path
=cluster_id
)
842 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
845 self
.fs
.reverse_sync(from_path
=cluster_id
)
849 async def status_kdu(
850 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
851 ) -> Union
[str, dict]:
853 This call would retrieve tha current state of a given KDU instance. It would be
854 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
855 values_ of the configuration parameters applied to a given instance. This call
856 would be based on the `status` call.
858 :param cluster_uuid: UUID of a K8s cluster known by OSM
859 :param kdu_instance: unique name for the KDU instance
860 :param kwargs: Additional parameters (None yet)
861 :param yaml_format: if the return shall be returned as an YAML string or as a
863 :return: If successful, it will return the following vector of arguments:
864 - K8s `namespace` in the cluster where the KDU lives
865 - `state` of the KDU instance. It can be:
872 - List of `resources` (objects) that this release consists of, sorted by kind,
873 and the status of those resources
874 - Last `deployment_time`.
878 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
879 cluster_uuid
, kdu_instance
883 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
886 self
.fs
.sync(from_path
=cluster_id
)
888 # get instance: needed to obtain namespace
889 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
890 for instance
in instances
:
891 if instance
.get("name") == kdu_instance
:
894 # instance does not exist
896 "Instance name: {} not found in cluster: {}".format(
897 kdu_instance
, cluster_id
901 status
= await self
._status
_kdu
(
902 cluster_id
=cluster_id
,
903 kdu_instance
=kdu_instance
,
904 namespace
=instance
["namespace"],
905 yaml_format
=yaml_format
,
910 self
.fs
.reverse_sync(from_path
=cluster_id
)
914 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
917 "inspect kdu_model values {} from (optional) repo: {}".format(
922 return await self
._exec
_inspect
_comand
(
923 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
926 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
929 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
932 return await self
._exec
_inspect
_comand
(
933 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
936 async def synchronize_repos(self
, cluster_uuid
: str):
938 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
940 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
941 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
943 local_repo_list
= await self
.repo_list(cluster_uuid
)
944 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
946 deleted_repo_list
= []
949 # iterate over the list of repos in the database that should be
950 # added if not present
951 for repo_name
, db_repo
in db_repo_dict
.items():
953 # check if it is already present
954 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
955 repo_id
= db_repo
.get("_id")
956 if curr_repo_url
!= db_repo
["url"]:
959 "repo {} url changed, delete and and again".format(
963 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
964 deleted_repo_list
.append(repo_id
)
967 self
.log
.debug("add repo {}".format(db_repo
["name"]))
969 cluster_uuid
, db_repo
["name"], db_repo
["url"]
971 added_repo_dict
[repo_id
] = db_repo
["name"]
972 except Exception as e
:
974 "Error adding repo id: {}, err_msg: {} ".format(
979 # Delete repos that are present but not in nbi_list
980 for repo_name
in local_repo_dict
:
981 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
982 self
.log
.debug("delete repo {}".format(repo_name
))
984 await self
.repo_remove(cluster_uuid
, repo_name
)
985 deleted_repo_list
.append(repo_name
)
986 except Exception as e
:
988 "Error deleting repo, name: {}, err_msg: {}".format(
993 return deleted_repo_list
, added_repo_dict
997 except Exception as e
:
998 # Do not raise errors synchronizing repos
999 self
.log
.error("Error synchronizing repos: {}".format(e
))
1000 raise Exception("Error synchronizing repos: {}".format(e
))
1002 def _get_db_repos_dict(self
, repo_ids
: list):
1004 for repo_id
in repo_ids
:
1005 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1006 db_repos_dict
[db_repo
["name"]] = db_repo
1007 return db_repos_dict
1010 ####################################################################################
1011 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1012 ####################################################################################
1016 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1018 Creates and returns base cluster and kube dirs and returns them.
1019 Also created helm3 dirs according to new directory specification, paths are
1020 not returned but assigned to helm environment variables
1022 :param cluster_name: cluster_name
1023 :return: Dictionary with config_paths and dictionary with helm environment variables
1027 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1029 Implements the helm version dependent cluster initialization
1033 async def _instances_list(self
, cluster_id
):
1035 Implements the helm version dependent helm instances list
1039 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1041 Implements the helm version dependent method to obtain services from a helm instance
1045 async def _status_kdu(
1049 namespace
: str = None,
1050 yaml_format
: bool = False,
1051 show_error_log
: bool = False,
1052 ) -> Union
[str, dict]:
1054 Implements the helm version dependent method to obtain status of a helm instance
1058 def _get_install_command(
1070 Obtain command to be executed to delete the indicated instance
1074 def _get_upgrade_command(
1086 Obtain command to be executed to upgrade the indicated instance
1090 def _get_rollback_command(
1091 self
, kdu_instance
, namespace
, revision
, kubeconfig
1094 Obtain command to be executed to rollback the indicated instance
1098 def _get_uninstall_command(
1099 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1102 Obtain command to be executed to delete the indicated instance
1106 def _get_inspect_command(
1107 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1110 Obtain command to be executed to obtain information about the kdu
1114 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1116 Method call to uninstall cluster software for helm. This method is dependent
1118 For Helm v2 it will be called when Tiller must be uninstalled
1119 For Helm v3 it does nothing and does not need to be callled
1123 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1125 Obtains the cluster repos identifiers
1129 ####################################################################################
1130 ################################### P R I V A T E ##################################
1131 ####################################################################################
1135 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1136 if os
.path
.exists(filename
):
1139 msg
= "File {} does not exist".format(filename
)
1140 if exception_if_not_exists
:
1141 raise K8sException(msg
)
1144 def _remove_multiple_spaces(strobj
):
1145 strobj
= strobj
.strip()
1146 while " " in strobj
:
1147 strobj
= strobj
.replace(" ", " ")
1151 def _output_to_lines(output
: str) -> list:
1152 output_lines
= list()
1153 lines
= output
.splitlines(keepends
=False)
1157 output_lines
.append(line
)
1161 def _output_to_table(output
: str) -> list:
1162 output_table
= list()
1163 lines
= output
.splitlines(keepends
=False)
1165 line
= line
.replace("\t", " ")
1167 output_table
.append(line_list
)
1168 cells
= line
.split(sep
=" ")
1172 line_list
.append(cell
)
1176 def _parse_services(output
: str) -> list:
1177 lines
= output
.splitlines(keepends
=False)
1180 line
= line
.replace("\t", " ")
1181 cells
= line
.split(sep
=" ")
1182 if len(cells
) > 0 and cells
[0].startswith("service/"):
1183 elems
= cells
[0].split(sep
="/")
1185 services
.append(elems
[1])
1189 def _get_deep(dictionary
: dict, members
: tuple):
1194 value
= target
.get(m
)
1203 # find key:value in several lines
1205 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1206 for line
in p_lines
:
1208 if line
.startswith(p_key
+ ":"):
1209 parts
= line
.split(":")
1210 the_value
= parts
[1].strip()
1218 def _lower_keys_list(input_list
: list):
1220 Transform the keys in a list of dictionaries to lower case and returns a new list
1225 for dictionary
in input_list
:
1226 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1227 new_list
.append(new_dict
)
1230 async def _local_async_exec(
1233 raise_exception_on_error
: bool = False,
1234 show_error_log
: bool = True,
1235 encode_utf8
: bool = False,
1239 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1241 "Executing async local command: {}, env: {}".format(command
, env
)
1245 command
= shlex
.split(command
)
1247 environ
= os
.environ
.copy()
1252 process
= await asyncio
.create_subprocess_exec(
1254 stdout
=asyncio
.subprocess
.PIPE
,
1255 stderr
=asyncio
.subprocess
.PIPE
,
1259 # wait for command terminate
1260 stdout
, stderr
= await process
.communicate()
1262 return_code
= process
.returncode
1266 output
= stdout
.decode("utf-8").strip()
1267 # output = stdout.decode()
1269 output
= stderr
.decode("utf-8").strip()
1270 # output = stderr.decode()
1272 if return_code
!= 0 and show_error_log
:
1274 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1277 self
.log
.debug("Return code: {}".format(return_code
))
1279 if raise_exception_on_error
and return_code
!= 0:
1280 raise K8sException(output
)
1283 output
= output
.encode("utf-8").strip()
1284 output
= str(output
).replace("\\n", "\n")
1286 return output
, return_code
1288 except asyncio
.CancelledError
:
1290 except K8sException
:
1292 except Exception as e
:
1293 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1295 if raise_exception_on_error
:
1296 raise K8sException(e
) from e
1300 async def _local_async_exec_pipe(
1304 raise_exception_on_error
: bool = True,
1305 show_error_log
: bool = True,
1306 encode_utf8
: bool = False,
1310 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1311 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1312 command
= "{} | {}".format(command1
, command2
)
1314 "Executing async local command: {}, env: {}".format(command
, env
)
1318 command1
= shlex
.split(command1
)
1319 command2
= shlex
.split(command2
)
1321 environ
= os
.environ
.copy()
1326 read
, write
= os
.pipe()
1327 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1329 process_2
= await asyncio
.create_subprocess_exec(
1330 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1333 stdout
, stderr
= await process_2
.communicate()
1335 return_code
= process_2
.returncode
1339 output
= stdout
.decode("utf-8").strip()
1340 # output = stdout.decode()
1342 output
= stderr
.decode("utf-8").strip()
1343 # output = stderr.decode()
1345 if return_code
!= 0 and show_error_log
:
1347 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1350 self
.log
.debug("Return code: {}".format(return_code
))
1352 if raise_exception_on_error
and return_code
!= 0:
1353 raise K8sException(output
)
1356 output
= output
.encode("utf-8").strip()
1357 output
= str(output
).replace("\\n", "\n")
1359 return output
, return_code
1360 except asyncio
.CancelledError
:
1362 except K8sException
:
1364 except Exception as e
:
1365 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1367 if raise_exception_on_error
:
1368 raise K8sException(e
) from e
1372 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1374 Obtains the data of the specified service in the k8cluster.
1376 :param cluster_id: id of a K8s cluster known by OSM
1377 :param service_name: name of the K8s service in the specified namespace
1378 :param namespace: K8s namespace used by the KDU instance
1379 :return: If successful, it will return a service with the following data:
1380 - `name` of the service
1381 - `type` type of service in the k8 cluster
1382 - `ports` List of ports offered by the service, for each port includes at least
1383 name, port, protocol
1384 - `cluster_ip` Internal ip to be used inside k8s cluster
1385 - `external_ip` List of external ips (in case they are available)
1389 paths
, env
= self
._init
_paths
_env
(
1390 cluster_name
=cluster_id
, create_if_not_exist
=True
1393 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1394 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1397 output
, _rc
= await self
._local
_async
_exec
(
1398 command
=command
, raise_exception_on_error
=True, env
=env
1401 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1404 "name": service_name
,
1405 "type": self
._get
_deep
(data
, ("spec", "type")),
1406 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1407 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1409 if service
["type"] == "LoadBalancer":
1410 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1411 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1412 service
["external_ip"] = ip_list
1416 async def _exec_inspect_comand(
1417 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1420 Obtains information about a kdu, no cluster (no env)
1425 repo_str
= " --repo {}".format(repo_url
)
1427 idx
= kdu_model
.find("/")
1430 kdu_model
= kdu_model
[idx
:]
1433 if ":" in kdu_model
:
1434 parts
= kdu_model
.split(sep
=":")
1436 version
= "--version {}".format(str(parts
[1]))
1437 kdu_model
= parts
[0]
1439 full_command
= self
._get
_inspect
_command
(
1440 inspect_command
, kdu_model
, repo_str
, version
1442 output
, _rc
= await self
._local
_async
_exec
(
1443 command
=full_command
, encode_utf8
=True
1448 async def _store_status(
1453 namespace
: str = None,
1454 db_dict
: dict = None,
1457 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1459 :param cluster_id (str): the cluster where the KDU instance is deployed
1460 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1461 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1462 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1463 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1464 values for the keys:
1465 - "collection": The Mongo DB collection to write to
1466 - "filter": The query filter to use in the update process
1467 - "path": The dot separated keys which targets the object to be updated
1472 detailed_status
= await self
._status
_kdu
(
1473 cluster_id
=cluster_id
,
1474 kdu_instance
=kdu_instance
,
1476 namespace
=namespace
,
1479 status
= detailed_status
.get("info").get("description")
1480 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1482 # write status to db
1483 result
= await self
.write_app_status_to_db(
1486 detailed_status
=str(detailed_status
),
1487 operation
=operation
,
1491 self
.log
.info("Error writing in database. Task exiting...")
1493 except asyncio
.CancelledError
as e
:
1495 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1497 except Exception as e
:
1498 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1500 # params for use in -f file
1501 # returns values file option and filename (in order to delete it at the end)
1502 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1504 if params
and len(params
) > 0:
1505 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1507 def get_random_number():
1508 r
= random
.randrange(start
=1, stop
=99999999)
1516 value
= params
.get(key
)
1517 if "!!yaml" in str(value
):
1518 value
= yaml
.load(value
[7:])
1519 params2
[key
] = value
1521 values_file
= get_random_number() + ".yaml"
1522 with
open(values_file
, "w") as stream
:
1523 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1525 return "-f {}".format(values_file
), values_file
1529 # params for use in --set option
1531 def _params_to_set_option(params
: dict) -> str:
1533 if params
and len(params
) > 0:
1536 value
= params
.get(key
, None)
1537 if value
is not None:
1539 params_str
+= "--set "
1543 params_str
+= "{}={}".format(key
, value
)
1547 def generate_kdu_instance_name(**kwargs
):
1548 chart_name
= kwargs
["kdu_model"]
1549 # check embeded chart (file or dir)
1550 if chart_name
.startswith("/"):
1551 # extract file or directory name
1552 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1554 elif "://" in chart_name
:
1555 # extract last portion of URL
1556 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1559 for c
in chart_name
:
1560 if c
.isalpha() or c
.isnumeric():
1567 # if does not start with alpha character, prefix 'a'
1568 if not name
[0].isalpha():
1573 def get_random_number():
1574 r
= random
.randrange(start
=1, stop
=99999999)
1576 s
= s
.rjust(10, "0")
1579 name
= name
+ get_random_number()
1582 def _split_repo(self
, kdu_model
: str) -> str:
1584 idx
= kdu_model
.find("/")
1586 repo_name
= kdu_model
[:idx
]