2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
94 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
99 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(":")
100 return namespace
, cluster_id
105 namespace
: str = "kube-system",
106 reuse_cluster_uuid
=None,
110 It prepares a given K8s cluster environment to run Charts
112 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
114 :param namespace: optional namespace to be used for helm. By default,
115 'kube-system' will be used
116 :param reuse_cluster_uuid: existing cluster uuid for reuse
117 :param kwargs: Additional parameters (None yet)
118 :return: uuid of the K8s cluster and True if connector has installed some
119 software in the cluster
120 (on error, an exception will be raised)
123 if reuse_cluster_uuid
:
124 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
125 namespace
= namespace_
or namespace
127 cluster_id
= str(uuid4())
128 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
131 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
134 paths
, env
= self
._init
_paths
_env
(
135 cluster_name
=cluster_id
, create_if_not_exist
=True
137 mode
= stat
.S_IRUSR | stat
.S_IWUSR
138 with
open(paths
["kube_config"], "w", mode
) as f
:
140 os
.chmod(paths
["kube_config"], 0o600)
142 # Code with initialization specific of helm version
143 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
145 # sync fs with local data
146 self
.fs
.reverse_sync(from_path
=cluster_id
)
148 self
.log
.info("Cluster {} initialized".format(cluster_id
))
150 return cluster_uuid
, n2vc_installed_sw
153 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
155 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_id
, repo_type
, name
, url
163 paths
, env
= self
._init
_paths
_env
(
164 cluster_name
=cluster_id
, create_if_not_exist
=True
168 self
.fs
.sync(from_path
=cluster_id
)
170 # helm repo add name url
171 command
= "env KUBECONFIG={} {} repo add {} {}".format(
172 paths
["kube_config"], self
._helm
_command
, name
, url
174 self
.log
.debug("adding repo: {}".format(command
))
175 await self
._local
_async
_exec
(
176 command
=command
, raise_exception_on_error
=True, env
=env
180 command
= "env KUBECONFIG={} {} repo update {}".format(
181 paths
["kube_config"], self
._helm
_command
, name
183 self
.log
.debug("updating repo: {}".format(command
))
184 await self
._local
_async
_exec
(
185 command
=command
, raise_exception_on_error
=False, env
=env
189 self
.fs
.reverse_sync(from_path
=cluster_id
)
191 async def repo_update(self
, cluster_id
: str, name
: str, repo_type
: str = "chart"):
193 "Cluster {}, updating {} repository {}".format(cluster_id
, repo_type
, name
)
197 paths
, env
= self
._init
_paths
_env
(
198 cluster_name
=cluster_id
, create_if_not_exist
=True
202 self
.fs
.sync(from_path
=cluster_id
)
205 command
= "{} repo update {}".format(self
._helm
_command
, name
)
206 self
.log
.debug("updating repo: {}".format(command
))
207 await self
._local
_async
_exec
(
208 command
=command
, raise_exception_on_error
=False, env
=env
212 self
.fs
.reverse_sync(from_path
=cluster_id
)
214 async def repo_list(self
, cluster_uuid
: str) -> list:
216 Get the list of registered repositories
218 :return: list of registered repositories: [ (name, url) .... ]
221 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
222 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
225 paths
, env
= self
._init
_paths
_env
(
226 cluster_name
=cluster_id
, create_if_not_exist
=True
230 self
.fs
.sync(from_path
=cluster_id
)
232 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
233 paths
["kube_config"], self
._helm
_command
236 # Set exception to false because if there are no repos just want an empty list
237 output
, _rc
= await self
._local
_async
_exec
(
238 command
=command
, raise_exception_on_error
=False, env
=env
242 self
.fs
.reverse_sync(from_path
=cluster_id
)
245 if output
and len(output
) > 0:
246 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
247 # unify format between helm2 and helm3 setting all keys lowercase
248 return self
._lower
_keys
_list
(repos
)
254 async def repo_remove(self
, cluster_uuid
: str, name
: str):
255 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
256 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
259 paths
, env
= self
._init
_paths
_env
(
260 cluster_name
=cluster_id
, create_if_not_exist
=True
264 self
.fs
.sync(from_path
=cluster_id
)
266 command
= "env KUBECONFIG={} {} repo remove {}".format(
267 paths
["kube_config"], self
._helm
_command
, name
269 await self
._local
_async
_exec
(
270 command
=command
, raise_exception_on_error
=True, env
=env
274 self
.fs
.reverse_sync(from_path
=cluster_id
)
280 uninstall_sw
: bool = False,
285 Resets the Kubernetes cluster by removing the helm deployment that represents it.
287 :param cluster_uuid: The UUID of the cluster to reset
288 :param force: Boolean to force the reset
289 :param uninstall_sw: Boolean to force the reset
290 :param kwargs: Additional parameters (None yet)
291 :return: Returns True if successful or raises an exception.
293 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
295 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
296 cluster_id
, uninstall_sw
301 self
.fs
.sync(from_path
=cluster_id
)
303 # uninstall releases if needed.
305 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
306 if len(releases
) > 0:
310 kdu_instance
= r
.get("name")
311 chart
= r
.get("chart")
313 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
315 await self
.uninstall(
316 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
318 except Exception as e
:
319 # will not raise exception as it was found
320 # that in some cases of previously installed helm releases it
323 "Error uninstalling release {}: {}".format(
329 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
333 False # Allow to remove k8s cluster without removing Tiller
337 await self
._uninstall
_sw
(cluster_id
, namespace
)
339 # delete cluster directory
340 self
.log
.debug("Removing directory {}".format(cluster_id
))
341 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
342 # Remove also local directorio if still exist
343 direct
= self
.fs
.path
+ "/" + cluster_id
344 shutil
.rmtree(direct
, ignore_errors
=True)
348 async def _install_impl(
356 timeout
: float = 300,
358 db_dict
: dict = None,
359 kdu_name
: str = None,
360 namespace
: str = None,
363 paths
, env
= self
._init
_paths
_env
(
364 cluster_name
=cluster_id
, create_if_not_exist
=True
368 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
369 cluster_id
=cluster_id
, params
=params
375 parts
= kdu_model
.split(sep
=":")
377 version
= str(parts
[1])
380 repo
= self
._split
_repo
(kdu_model
)
382 await self
.repo_update(cluster_id
, repo
)
384 command
= self
._get
_install
_command
(
392 paths
["kube_config"],
395 self
.log
.debug("installing: {}".format(command
))
398 # exec helm in a task
399 exec_task
= asyncio
.ensure_future(
400 coro_or_future
=self
._local
_async
_exec
(
401 command
=command
, raise_exception_on_error
=False, env
=env
405 # write status in another task
406 status_task
= asyncio
.ensure_future(
407 coro_or_future
=self
._store
_status
(
408 cluster_id
=cluster_id
,
409 kdu_instance
=kdu_instance
,
416 # wait for execution task
417 await asyncio
.wait([exec_task
])
422 output
, rc
= exec_task
.result()
425 output
, rc
= await self
._local
_async
_exec
(
426 command
=command
, raise_exception_on_error
=False, env
=env
429 # remove temporal values yaml file
431 os
.remove(file_to_delete
)
434 await self
._store
_status
(
435 cluster_id
=cluster_id
,
436 kdu_instance
=kdu_instance
,
443 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
445 raise K8sException(msg
)
451 kdu_model
: str = None,
453 timeout
: float = 300,
455 db_dict
: dict = None,
457 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
458 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
461 self
.fs
.sync(from_path
=cluster_id
)
463 # look for instance to obtain namespace
464 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
465 if not instance_info
:
466 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
469 paths
, env
= self
._init
_paths
_env
(
470 cluster_name
=cluster_id
, create_if_not_exist
=True
474 self
.fs
.sync(from_path
=cluster_id
)
477 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
478 cluster_id
=cluster_id
, params
=params
484 parts
= kdu_model
.split(sep
=":")
486 version
= str(parts
[1])
489 repo
= self
._split
_repo
(kdu_model
)
491 await self
.repo_update(cluster_id
, repo
)
493 command
= self
._get
_upgrade
_command
(
496 instance_info
["namespace"],
501 paths
["kube_config"],
504 self
.log
.debug("upgrading: {}".format(command
))
507 # exec helm in a task
508 exec_task
= asyncio
.ensure_future(
509 coro_or_future
=self
._local
_async
_exec
(
510 command
=command
, raise_exception_on_error
=False, env
=env
513 # write status in another task
514 status_task
= asyncio
.ensure_future(
515 coro_or_future
=self
._store
_status
(
516 cluster_id
=cluster_id
,
517 kdu_instance
=kdu_instance
,
518 namespace
=instance_info
["namespace"],
524 # wait for execution task
525 await asyncio
.wait([exec_task
])
529 output
, rc
= exec_task
.result()
532 output
, rc
= await self
._local
_async
_exec
(
533 command
=command
, raise_exception_on_error
=False, env
=env
536 # remove temporal values yaml file
538 os
.remove(file_to_delete
)
541 await self
._store
_status
(
542 cluster_id
=cluster_id
,
543 kdu_instance
=kdu_instance
,
544 namespace
=instance_info
["namespace"],
550 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
552 raise K8sException(msg
)
555 self
.fs
.reverse_sync(from_path
=cluster_id
)
557 # return new revision number
558 instance
= await self
.get_instance_info(
559 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
562 revision
= int(instance
.get("revision"))
563 self
.log
.debug("New revision: {}".format(revision
))
573 total_timeout
: float = 1800,
576 raise NotImplementedError("Method not implemented")
578 async def get_scale_count(
584 raise NotImplementedError("Method not implemented")
587 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
589 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
591 "rollback kdu_instance {} to revision {} from cluster {}".format(
592 kdu_instance
, revision
, cluster_id
597 self
.fs
.sync(from_path
=cluster_id
)
599 # look for instance to obtain namespace
600 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
601 if not instance_info
:
602 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
605 paths
, env
= self
._init
_paths
_env
(
606 cluster_name
=cluster_id
, create_if_not_exist
=True
610 self
.fs
.sync(from_path
=cluster_id
)
612 command
= self
._get
_rollback
_command
(
613 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
616 self
.log
.debug("rolling_back: {}".format(command
))
618 # exec helm in a task
619 exec_task
= asyncio
.ensure_future(
620 coro_or_future
=self
._local
_async
_exec
(
621 command
=command
, raise_exception_on_error
=False, env
=env
624 # write status in another task
625 status_task
= asyncio
.ensure_future(
626 coro_or_future
=self
._store
_status
(
627 cluster_id
=cluster_id
,
628 kdu_instance
=kdu_instance
,
629 namespace
=instance_info
["namespace"],
631 operation
="rollback",
635 # wait for execution task
636 await asyncio
.wait([exec_task
])
641 output
, rc
= exec_task
.result()
644 await self
._store
_status
(
645 cluster_id
=cluster_id
,
646 kdu_instance
=kdu_instance
,
647 namespace
=instance_info
["namespace"],
649 operation
="rollback",
653 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
655 raise K8sException(msg
)
658 self
.fs
.reverse_sync(from_path
=cluster_id
)
660 # return new revision number
661 instance
= await self
.get_instance_info(
662 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
665 revision
= int(instance
.get("revision"))
666 self
.log
.debug("New revision: {}".format(revision
))
671 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
673 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
674 (this call should happen after all _terminate-config-primitive_ of the VNF
677 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
678 :param kdu_instance: unique name for the KDU instance to be deleted
679 :param kwargs: Additional parameters (None yet)
680 :return: True if successful
683 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
685 "uninstall kdu_instance {} from cluster {}".format(kdu_instance
, cluster_id
)
689 self
.fs
.sync(from_path
=cluster_id
)
691 # look for instance to obtain namespace
692 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
693 if not instance_info
:
694 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
697 paths
, env
= self
._init
_paths
_env
(
698 cluster_name
=cluster_id
, create_if_not_exist
=True
702 self
.fs
.sync(from_path
=cluster_id
)
704 command
= self
._get
_uninstall
_command
(
705 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
707 output
, _rc
= await self
._local
_async
_exec
(
708 command
=command
, raise_exception_on_error
=True, env
=env
712 self
.fs
.reverse_sync(from_path
=cluster_id
)
714 return self
._output
_to
_table
(output
)
716 async def instances_list(self
, cluster_uuid
: str) -> list:
718 returns a list of deployed releases in a cluster
720 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
724 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
725 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
728 self
.fs
.sync(from_path
=cluster_id
)
730 # execute internal command
731 result
= await self
._instances
_list
(cluster_id
)
734 self
.fs
.reverse_sync(from_path
=cluster_id
)
738 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
739 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
740 for instance
in instances
:
741 if instance
.get("name") == kdu_instance
:
743 self
.log
.debug("Instance {} not found".format(kdu_instance
))
746 async def exec_primitive(
748 cluster_uuid
: str = None,
749 kdu_instance
: str = None,
750 primitive_name
: str = None,
751 timeout
: float = 300,
753 db_dict
: dict = None,
756 """Exec primitive (Juju action)
758 :param cluster_uuid: The UUID of the cluster or namespace:cluster
759 :param kdu_instance: The unique name of the KDU instance
760 :param primitive_name: Name of action that will be executed
761 :param timeout: Timeout for action execution
762 :param params: Dictionary of all the parameters needed for the action
763 :db_dict: Dictionary for any additional data
764 :param kwargs: Additional parameters (None yet)
766 :return: Returns the output of the action
769 "KDUs deployed with Helm don't support actions "
770 "different from rollback, upgrade and status"
773 async def get_services(
774 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
777 Returns a list of services defined for the specified kdu instance.
779 :param cluster_uuid: UUID of a K8s cluster known by OSM
780 :param kdu_instance: unique name for the KDU instance
781 :param namespace: K8s namespace used by the KDU instance
782 :return: If successful, it will return a list of services, Each service
783 can have the following data:
784 - `name` of the service
785 - `type` type of service in the k8 cluster
786 - `ports` List of ports offered by the service, for each port includes at least
788 - `cluster_ip` Internal ip to be used inside k8s cluster
789 - `external_ip` List of external ips (in case they are available)
792 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
794 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
795 cluster_uuid
, kdu_instance
800 paths
, env
= self
._init
_paths
_env
(
801 cluster_name
=cluster_id
, create_if_not_exist
=True
805 self
.fs
.sync(from_path
=cluster_id
)
807 # get list of services names for kdu
808 service_names
= await self
._get
_services
(
809 cluster_id
, kdu_instance
, namespace
, paths
["kube_config"]
813 for service
in service_names
:
814 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
815 service_list
.append(service
)
818 self
.fs
.reverse_sync(from_path
=cluster_id
)
822 async def get_service(
823 self
, cluster_uuid
: str, service_name
: str, namespace
: str
826 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
827 service_name
, namespace
, cluster_uuid
831 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
834 self
.fs
.sync(from_path
=cluster_id
)
836 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
839 self
.fs
.reverse_sync(from_path
=cluster_id
)
843 async def status_kdu(
844 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
845 ) -> Union
[str, dict]:
847 This call would retrieve tha current state of a given KDU instance. It would be
848 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
849 values_ of the configuration parameters applied to a given instance. This call
850 would be based on the `status` call.
852 :param cluster_uuid: UUID of a K8s cluster known by OSM
853 :param kdu_instance: unique name for the KDU instance
854 :param kwargs: Additional parameters (None yet)
855 :param yaml_format: if the return shall be returned as an YAML string or as a
857 :return: If successful, it will return the following vector of arguments:
858 - K8s `namespace` in the cluster where the KDU lives
859 - `state` of the KDU instance. It can be:
866 - List of `resources` (objects) that this release consists of, sorted by kind,
867 and the status of those resources
868 - Last `deployment_time`.
872 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
873 cluster_uuid
, kdu_instance
877 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
880 self
.fs
.sync(from_path
=cluster_id
)
882 # get instance: needed to obtain namespace
883 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
884 for instance
in instances
:
885 if instance
.get("name") == kdu_instance
:
888 # instance does not exist
890 "Instance name: {} not found in cluster: {}".format(
891 kdu_instance
, cluster_id
895 status
= await self
._status
_kdu
(
896 cluster_id
=cluster_id
,
897 kdu_instance
=kdu_instance
,
898 namespace
=instance
["namespace"],
899 yaml_format
=yaml_format
,
904 self
.fs
.reverse_sync(from_path
=cluster_id
)
908 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
910 "inspect kdu_model values {} from (optional) repo: {}".format(
915 return await self
._exec
_inspect
_comand
(
916 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
919 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
921 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
924 return await self
._exec
_inspect
_comand
(
925 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
928 async def synchronize_repos(self
, cluster_uuid
: str):
929 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
931 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
932 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
934 local_repo_list
= await self
.repo_list(cluster_uuid
)
935 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
937 deleted_repo_list
= []
940 # iterate over the list of repos in the database that should be
941 # added if not present
942 for repo_name
, db_repo
in db_repo_dict
.items():
944 # check if it is already present
945 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
946 repo_id
= db_repo
.get("_id")
947 if curr_repo_url
!= db_repo
["url"]:
950 "repo {} url changed, delete and and again".format(
954 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
955 deleted_repo_list
.append(repo_id
)
958 self
.log
.debug("add repo {}".format(db_repo
["name"]))
960 cluster_uuid
, db_repo
["name"], db_repo
["url"]
962 added_repo_dict
[repo_id
] = db_repo
["name"]
963 except Exception as e
:
965 "Error adding repo id: {}, err_msg: {} ".format(
970 # Delete repos that are present but not in nbi_list
971 for repo_name
in local_repo_dict
:
972 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
973 self
.log
.debug("delete repo {}".format(repo_name
))
975 await self
.repo_remove(cluster_uuid
, repo_name
)
976 deleted_repo_list
.append(repo_name
)
977 except Exception as e
:
979 "Error deleting repo, name: {}, err_msg: {}".format(
984 return deleted_repo_list
, added_repo_dict
988 except Exception as e
:
989 # Do not raise errors synchronizing repos
990 self
.log
.error("Error synchronizing repos: {}".format(e
))
991 raise Exception("Error synchronizing repos: {}".format(e
))
993 def _get_db_repos_dict(self
, repo_ids
: list):
995 for repo_id
in repo_ids
:
996 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
997 db_repos_dict
[db_repo
["name"]] = db_repo
1001 ####################################################################################
1002 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1003 ####################################################################################
1007 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1009 Creates and returns base cluster and kube dirs and returns them.
1010 Also created helm3 dirs according to new directory specification, paths are
1011 not returned but assigned to helm environment variables
1013 :param cluster_name: cluster_name
1014 :return: Dictionary with config_paths and dictionary with helm environment variables
1018 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1020 Implements the helm version dependent cluster initialization
1024 async def _instances_list(self
, cluster_id
):
1026 Implements the helm version dependent helm instances list
1030 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1032 Implements the helm version dependent method to obtain services from a helm instance
1036 async def _status_kdu(
1040 namespace
: str = None,
1041 yaml_format
: bool = False,
1042 show_error_log
: bool = False,
1043 ) -> Union
[str, dict]:
1045 Implements the helm version dependent method to obtain status of a helm instance
1049 def _get_install_command(
1061 Obtain command to be executed to delete the indicated instance
1065 def _get_upgrade_command(
1077 Obtain command to be executed to upgrade the indicated instance
1081 def _get_rollback_command(
1082 self
, kdu_instance
, namespace
, revision
, kubeconfig
1085 Obtain command to be executed to rollback the indicated instance
1089 def _get_uninstall_command(
1090 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1093 Obtain command to be executed to delete the indicated instance
1097 def _get_inspect_command(
1098 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1101 Obtain command to be executed to obtain information about the kdu
1105 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1107 Method call to uninstall cluster software for helm. This method is dependent
1109 For Helm v2 it will be called when Tiller must be uninstalled
1110 For Helm v3 it does nothing and does not need to be callled
1114 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1116 Obtains the cluster repos identifiers
1120 ####################################################################################
1121 ################################### P R I V A T E ##################################
1122 ####################################################################################
1126 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1127 if os
.path
.exists(filename
):
1130 msg
= "File {} does not exist".format(filename
)
1131 if exception_if_not_exists
:
1132 raise K8sException(msg
)
1135 def _remove_multiple_spaces(strobj
):
1136 strobj
= strobj
.strip()
1137 while " " in strobj
:
1138 strobj
= strobj
.replace(" ", " ")
1142 def _output_to_lines(output
: str) -> list:
1143 output_lines
= list()
1144 lines
= output
.splitlines(keepends
=False)
1148 output_lines
.append(line
)
1152 def _output_to_table(output
: str) -> list:
1153 output_table
= list()
1154 lines
= output
.splitlines(keepends
=False)
1156 line
= line
.replace("\t", " ")
1158 output_table
.append(line_list
)
1159 cells
= line
.split(sep
=" ")
1163 line_list
.append(cell
)
1167 def _parse_services(output
: str) -> list:
1168 lines
= output
.splitlines(keepends
=False)
1171 line
= line
.replace("\t", " ")
1172 cells
= line
.split(sep
=" ")
1173 if len(cells
) > 0 and cells
[0].startswith("service/"):
1174 elems
= cells
[0].split(sep
="/")
1176 services
.append(elems
[1])
1180 def _get_deep(dictionary
: dict, members
: tuple):
1185 value
= target
.get(m
)
1194 # find key:value in several lines
1196 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1197 for line
in p_lines
:
1199 if line
.startswith(p_key
+ ":"):
1200 parts
= line
.split(":")
1201 the_value
= parts
[1].strip()
1209 def _lower_keys_list(input_list
: list):
1211 Transform the keys in a list of dictionaries to lower case and returns a new list
1216 for dictionary
in input_list
:
1217 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1218 new_list
.append(new_dict
)
1221 async def _local_async_exec(
1224 raise_exception_on_error
: bool = False,
1225 show_error_log
: bool = True,
1226 encode_utf8
: bool = False,
1229 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1231 "Executing async local command: {}, env: {}".format(command
, env
)
1235 command
= shlex
.split(command
)
1237 environ
= os
.environ
.copy()
1242 process
= await asyncio
.create_subprocess_exec(
1244 stdout
=asyncio
.subprocess
.PIPE
,
1245 stderr
=asyncio
.subprocess
.PIPE
,
1249 # wait for command terminate
1250 stdout
, stderr
= await process
.communicate()
1252 return_code
= process
.returncode
1256 output
= stdout
.decode("utf-8").strip()
1257 # output = stdout.decode()
1259 output
= stderr
.decode("utf-8").strip()
1260 # output = stderr.decode()
1262 if return_code
!= 0 and show_error_log
:
1264 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1267 self
.log
.debug("Return code: {}".format(return_code
))
1269 if raise_exception_on_error
and return_code
!= 0:
1270 raise K8sException(output
)
1273 output
= output
.encode("utf-8").strip()
1274 output
= str(output
).replace("\\n", "\n")
1276 return output
, return_code
1278 except asyncio
.CancelledError
:
1280 except K8sException
:
1282 except Exception as e
:
1283 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1285 if raise_exception_on_error
:
1286 raise K8sException(e
) from e
1290 async def _local_async_exec_pipe(
1294 raise_exception_on_error
: bool = True,
1295 show_error_log
: bool = True,
1296 encode_utf8
: bool = False,
1299 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1300 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1301 command
= "{} | {}".format(command1
, command2
)
1303 "Executing async local command: {}, env: {}".format(command
, env
)
1307 command1
= shlex
.split(command1
)
1308 command2
= shlex
.split(command2
)
1310 environ
= os
.environ
.copy()
1315 read
, write
= os
.pipe()
1316 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1318 process_2
= await asyncio
.create_subprocess_exec(
1319 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1322 stdout
, stderr
= await process_2
.communicate()
1324 return_code
= process_2
.returncode
1328 output
= stdout
.decode("utf-8").strip()
1329 # output = stdout.decode()
1331 output
= stderr
.decode("utf-8").strip()
1332 # output = stderr.decode()
1334 if return_code
!= 0 and show_error_log
:
1336 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1339 self
.log
.debug("Return code: {}".format(return_code
))
1341 if raise_exception_on_error
and return_code
!= 0:
1342 raise K8sException(output
)
1345 output
= output
.encode("utf-8").strip()
1346 output
= str(output
).replace("\\n", "\n")
1348 return output
, return_code
1349 except asyncio
.CancelledError
:
1351 except K8sException
:
1353 except Exception as e
:
1354 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1356 if raise_exception_on_error
:
1357 raise K8sException(e
) from e
1361 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1363 Obtains the data of the specified service in the k8cluster.
1365 :param cluster_id: id of a K8s cluster known by OSM
1366 :param service_name: name of the K8s service in the specified namespace
1367 :param namespace: K8s namespace used by the KDU instance
1368 :return: If successful, it will return a service with the following data:
1369 - `name` of the service
1370 - `type` type of service in the k8 cluster
1371 - `ports` List of ports offered by the service, for each port includes at least
1372 name, port, protocol
1373 - `cluster_ip` Internal ip to be used inside k8s cluster
1374 - `external_ip` List of external ips (in case they are available)
1378 paths
, env
= self
._init
_paths
_env
(
1379 cluster_name
=cluster_id
, create_if_not_exist
=True
1382 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1383 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1386 output
, _rc
= await self
._local
_async
_exec
(
1387 command
=command
, raise_exception_on_error
=True, env
=env
1390 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1393 "name": service_name
,
1394 "type": self
._get
_deep
(data
, ("spec", "type")),
1395 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1396 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1398 if service
["type"] == "LoadBalancer":
1399 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1400 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1401 service
["external_ip"] = ip_list
1405 async def _exec_inspect_comand(
1406 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1409 Obtains information about a kdu, no cluster (no env)
1414 repo_str
= " --repo {}".format(repo_url
)
1416 idx
= kdu_model
.find("/")
1419 kdu_model
= kdu_model
[idx
:]
1422 if ":" in kdu_model
:
1423 parts
= kdu_model
.split(sep
=":")
1425 version
= "--version {}".format(str(parts
[1]))
1426 kdu_model
= parts
[0]
1428 full_command
= self
._get
_inspect
_command
(
1429 inspect_command
, kdu_model
, repo_str
, version
1431 output
, _rc
= await self
._local
_async
_exec
(
1432 command
=full_command
, encode_utf8
=True
1437 async def _store_status(
1442 namespace
: str = None,
1443 db_dict
: dict = None,
1446 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1448 :param cluster_id (str): the cluster where the KDU instance is deployed
1449 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1450 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1451 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1452 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1453 values for the keys:
1454 - "collection": The Mongo DB collection to write to
1455 - "filter": The query filter to use in the update process
1456 - "path": The dot separated keys which targets the object to be updated
1461 detailed_status
= await self
._status
_kdu
(
1462 cluster_id
=cluster_id
,
1463 kdu_instance
=kdu_instance
,
1465 namespace
=namespace
,
1468 status
= detailed_status
.get("info").get("description")
1469 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1471 # write status to db
1472 result
= await self
.write_app_status_to_db(
1475 detailed_status
=str(detailed_status
),
1476 operation
=operation
,
1480 self
.log
.info("Error writing in database. Task exiting...")
1482 except asyncio
.CancelledError
as e
:
1484 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1486 except Exception as e
:
1487 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1489 # params for use in -f file
1490 # returns values file option and filename (in order to delete it at the end)
1491 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1492 if params
and len(params
) > 0:
1493 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1495 def get_random_number():
1496 r
= random
.randrange(start
=1, stop
=99999999)
1504 value
= params
.get(key
)
1505 if "!!yaml" in str(value
):
1506 value
= yaml
.load(value
[7:])
1507 params2
[key
] = value
1509 values_file
= get_random_number() + ".yaml"
1510 with
open(values_file
, "w") as stream
:
1511 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1513 return "-f {}".format(values_file
), values_file
1517 # params for use in --set option
1519 def _params_to_set_option(params
: dict) -> str:
1521 if params
and len(params
) > 0:
1524 value
= params
.get(key
, None)
1525 if value
is not None:
1527 params_str
+= "--set "
1531 params_str
+= "{}={}".format(key
, value
)
1535 def generate_kdu_instance_name(**kwargs
):
1536 chart_name
= kwargs
["kdu_model"]
1537 # check embeded chart (file or dir)
1538 if chart_name
.startswith("/"):
1539 # extract file or directory name
1540 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1542 elif "://" in chart_name
:
1543 # extract last portion of URL
1544 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1547 for c
in chart_name
:
1548 if c
.isalpha() or c
.isnumeric():
1555 # if does not start with alpha character, prefix 'a'
1556 if not name
[0].isalpha():
1561 def get_random_number():
1562 r
= random
.randrange(start
=1, stop
=99999999)
1564 s
= s
.rjust(10, "0")
1567 name
= name
+ get_random_number()
1570 def _split_repo(self
, kdu_model
: str) -> str:
1572 idx
= kdu_model
.find("/")
1574 repo_name
= kdu_model
[:idx
]