703bd737cc5ced06b2021ae00267ca45d1baca79
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
31 from uuid
import uuid4
33 from n2vc
.config
import EnvironConfig
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
46 service_account
= "osm"
52 kubectl_command
: str = "/usr/bin/kubectl",
53 helm_command
: str = "/usr/bin/helm",
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
64 :param on_update_db: callback called when k8s connector updates database
68 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
70 self
.log
.info("Initializing K8S Helm connector")
72 self
.config
= EnvironConfig()
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 # obtain stable repo url from config or apply default
88 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
89 if self
._stable
_repo
_url
== "None":
90 self
._stable
_repo
_url
= None
92 def _get_namespace(self
, cluster_uuid
: str) -> str:
94 Obtains the namespace used by the cluster with the uuid passed by argument
96 param: cluster_uuid: cluster's uuid
99 # first, obtain the cluster corresponding to the uuid passed by argument
100 k8scluster
= self
.db
.get_one(
101 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
103 return k8scluster
.get("namespace")
108 namespace
: str = "kube-system",
109 reuse_cluster_uuid
=None,
113 It prepares a given K8s cluster environment to run Charts
115 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 :param namespace: optional namespace to be used for helm. By default,
118 'kube-system' will be used
119 :param reuse_cluster_uuid: existing cluster uuid for reuse
120 :param kwargs: Additional parameters (None yet)
121 :return: uuid of the K8s cluster and True if connector has installed some
122 software in the cluster
123 (on error, an exception will be raised)
126 if reuse_cluster_uuid
:
127 cluster_id
= reuse_cluster_uuid
129 cluster_id
= str(uuid4())
132 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
135 paths
, env
= self
._init
_paths
_env
(
136 cluster_name
=cluster_id
, create_if_not_exist
=True
138 mode
= stat
.S_IRUSR | stat
.S_IWUSR
139 with
open(paths
["kube_config"], "w", mode
) as f
:
141 os
.chmod(paths
["kube_config"], 0o600)
143 # Code with initialization specific of helm version
144 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
146 # sync fs with local data
147 self
.fs
.reverse_sync(from_path
=cluster_id
)
149 self
.log
.info("Cluster {} initialized".format(cluster_id
))
151 return cluster_id
, n2vc_installed_sw
154 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_uuid
, repo_type
, name
, url
163 paths
, env
= self
._init
_paths
_env
(
164 cluster_name
=cluster_uuid
, create_if_not_exist
=True
168 self
.fs
.sync(from_path
=cluster_uuid
)
171 command
= "env KUBECONFIG={} {} repo update".format(
172 paths
["kube_config"], self
._helm
_command
174 self
.log
.debug("updating repo: {}".format(command
))
175 await self
._local
_async
_exec
(
176 command
=command
, raise_exception_on_error
=False, env
=env
179 # helm repo add name url
180 command
= "env KUBECONFIG={} {} repo add {} {}".format(
181 paths
["kube_config"], self
._helm
_command
, name
, url
183 self
.log
.debug("adding repo: {}".format(command
))
184 await self
._local
_async
_exec
(
185 command
=command
, raise_exception_on_error
=True, env
=env
189 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
191 async def repo_list(self
, cluster_uuid
: str) -> list:
193 Get the list of registered repositories
195 :return: list of registered repositories: [ (name, url) .... ]
198 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
201 paths
, env
= self
._init
_paths
_env
(
202 cluster_name
=cluster_uuid
, create_if_not_exist
=True
206 self
.fs
.sync(from_path
=cluster_uuid
)
208 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
209 paths
["kube_config"], self
._helm
_command
212 # Set exception to false because if there are no repos just want an empty list
213 output
, _rc
= await self
._local
_async
_exec
(
214 command
=command
, raise_exception_on_error
=False, env
=env
218 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
221 if output
and len(output
) > 0:
222 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
223 # unify format between helm2 and helm3 setting all keys lowercase
224 return self
._lower
_keys
_list
(repos
)
230 async def repo_remove(self
, cluster_uuid
: str, name
: str):
232 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
236 paths
, env
= self
._init
_paths
_env
(
237 cluster_name
=cluster_uuid
, create_if_not_exist
=True
241 self
.fs
.sync(from_path
=cluster_uuid
)
243 command
= "env KUBECONFIG={} {} repo remove {}".format(
244 paths
["kube_config"], self
._helm
_command
, name
246 await self
._local
_async
_exec
(
247 command
=command
, raise_exception_on_error
=True, env
=env
251 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
257 uninstall_sw
: bool = False,
262 Resets the Kubernetes cluster by removing the helm deployment that represents it.
264 :param cluster_uuid: The UUID of the cluster to reset
265 :param force: Boolean to force the reset
266 :param uninstall_sw: Boolean to force the reset
267 :param kwargs: Additional parameters (None yet)
268 :return: Returns True if successful or raises an exception.
270 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
272 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
273 cluster_uuid
, uninstall_sw
278 self
.fs
.sync(from_path
=cluster_uuid
)
280 # uninstall releases if needed.
282 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
283 if len(releases
) > 0:
287 kdu_instance
= r
.get("name")
288 chart
= r
.get("chart")
290 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
292 await self
.uninstall(
293 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
295 except Exception as e
:
296 # will not raise exception as it was found
297 # that in some cases of previously installed helm releases it
300 "Error uninstalling release {}: {}".format(
306 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
307 ).format(cluster_uuid
)
310 False # Allow to remove k8s cluster without removing Tiller
314 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
316 # delete cluster directory
317 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
318 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
319 # Remove also local directorio if still exist
320 direct
= self
.fs
.path
+ "/" + cluster_uuid
321 shutil
.rmtree(direct
, ignore_errors
=True)
325 async def _install_impl(
333 timeout
: float = 300,
335 db_dict
: dict = None,
336 kdu_name
: str = None,
337 namespace
: str = None,
340 paths
, env
= self
._init
_paths
_env
(
341 cluster_name
=cluster_id
, create_if_not_exist
=True
345 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
346 cluster_id
=cluster_id
, params
=params
350 kdu_model
, version
= self
._split
_version
(kdu_model
)
352 command
= self
._get
_install
_command
(
360 paths
["kube_config"],
363 self
.log
.debug("installing: {}".format(command
))
366 # exec helm in a task
367 exec_task
= asyncio
.ensure_future(
368 coro_or_future
=self
._local
_async
_exec
(
369 command
=command
, raise_exception_on_error
=False, env
=env
373 # write status in another task
374 status_task
= asyncio
.ensure_future(
375 coro_or_future
=self
._store
_status
(
376 cluster_id
=cluster_id
,
377 kdu_instance
=kdu_instance
,
385 # wait for execution task
386 await asyncio
.wait([exec_task
])
391 output
, rc
= exec_task
.result()
395 output
, rc
= await self
._local
_async
_exec
(
396 command
=command
, raise_exception_on_error
=False, env
=env
399 # remove temporal values yaml file
401 os
.remove(file_to_delete
)
404 await self
._store
_status
(
405 cluster_id
=cluster_id
,
406 kdu_instance
=kdu_instance
,
415 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
417 raise K8sException(msg
)
423 kdu_model
: str = None,
425 timeout
: float = 300,
427 db_dict
: dict = None,
429 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
432 self
.fs
.sync(from_path
=cluster_uuid
)
434 # look for instance to obtain namespace
435 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
436 if not instance_info
:
437 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
440 paths
, env
= self
._init
_paths
_env
(
441 cluster_name
=cluster_uuid
, create_if_not_exist
=True
445 self
.fs
.sync(from_path
=cluster_uuid
)
448 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
449 cluster_id
=cluster_uuid
, params
=params
453 kdu_model
, version
= self
._split
_version
(kdu_model
)
455 command
= self
._get
_upgrade
_command
(
458 instance_info
["namespace"],
463 paths
["kube_config"],
466 self
.log
.debug("upgrading: {}".format(command
))
470 # exec helm in a task
471 exec_task
= asyncio
.ensure_future(
472 coro_or_future
=self
._local
_async
_exec
(
473 command
=command
, raise_exception_on_error
=False, env
=env
476 # write status in another task
477 status_task
= asyncio
.ensure_future(
478 coro_or_future
=self
._store
_status
(
479 cluster_id
=cluster_uuid
,
480 kdu_instance
=kdu_instance
,
481 namespace
=instance_info
["namespace"],
488 # wait for execution task
489 await asyncio
.wait([exec_task
])
493 output
, rc
= exec_task
.result()
497 output
, rc
= await self
._local
_async
_exec
(
498 command
=command
, raise_exception_on_error
=False, env
=env
501 # remove temporal values yaml file
503 os
.remove(file_to_delete
)
506 await self
._store
_status
(
507 cluster_id
=cluster_uuid
,
508 kdu_instance
=kdu_instance
,
509 namespace
=instance_info
["namespace"],
517 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
519 raise K8sException(msg
)
522 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
524 # return new revision number
525 instance
= await self
.get_instance_info(
526 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
529 revision
= int(instance
.get("revision"))
530 self
.log
.debug("New revision: {}".format(revision
))
540 total_timeout
: float = 1800,
541 cluster_uuid
: str = None,
542 kdu_model
: str = None,
544 db_dict
: dict = None,
547 """Scale a resource in a Helm Chart.
550 kdu_instance: KDU instance name
551 scale: Scale to which to set the resource
552 resource_name: Resource name
553 total_timeout: The time, in seconds, to wait
554 cluster_uuid: The UUID of the cluster
555 kdu_model: The chart reference
556 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
557 The --wait flag will be set automatically if --atomic is used
558 db_dict: Dictionary for any additional data
559 kwargs: Additional parameters
562 True if successful, False otherwise
565 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
567 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
568 resource_name
, kdu_model
, cluster_uuid
571 self
.log
.debug(debug_mgs
)
573 # look for instance to obtain namespace
574 # get_instance_info function calls the sync command
575 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
576 if not instance_info
:
577 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
580 paths
, env
= self
._init
_paths
_env
(
581 cluster_name
=cluster_uuid
, create_if_not_exist
=True
585 kdu_model
, version
= self
._split
_version
(kdu_model
)
587 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
590 "Repository not found for kdu_model {}".format(kdu_model
)
593 _
, replica_str
= await self
._get
_replica
_count
_url
(
594 kdu_model
, repo_url
, resource_name
597 command
= self
._get
_upgrade
_scale
_command
(
600 instance_info
["namespace"],
607 paths
["kube_config"],
610 self
.log
.debug("scaling: {}".format(command
))
613 # exec helm in a task
614 exec_task
= asyncio
.ensure_future(
615 coro_or_future
=self
._local
_async
_exec
(
616 command
=command
, raise_exception_on_error
=False, env
=env
619 # write status in another task
620 status_task
= asyncio
.ensure_future(
621 coro_or_future
=self
._store
_status
(
622 cluster_id
=cluster_uuid
,
623 kdu_instance
=kdu_instance
,
624 namespace
=instance_info
["namespace"],
631 # wait for execution task
632 await asyncio
.wait([exec_task
])
636 output
, rc
= exec_task
.result()
639 output
, rc
= await self
._local
_async
_exec
(
640 command
=command
, raise_exception_on_error
=False, env
=env
644 await self
._store
_status
(
645 cluster_id
=cluster_uuid
,
646 kdu_instance
=kdu_instance
,
647 namespace
=instance_info
["namespace"],
655 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
657 raise K8sException(msg
)
660 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
664 async def get_scale_count(
672 """Get a resource scale count.
675 cluster_uuid: The UUID of the cluster
676 resource_name: Resource name
677 kdu_instance: KDU instance name
678 kdu_model: The name or path of a bundle
679 kwargs: Additional parameters
682 Resource instance count
686 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
689 # look for instance to obtain namespace
690 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
691 if not instance_info
:
692 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
695 paths
, env
= self
._init
_paths
_env
(
696 cluster_name
=cluster_uuid
, create_if_not_exist
=True
699 replicas
= await self
._get
_replica
_count
_instance
(
700 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
703 # Get default value if scale count is not found from provided values
705 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
708 "Repository not found for kdu_model {}".format(kdu_model
)
711 replicas
, _
= await self
._get
_replica
_count
_url
(
712 kdu_model
, repo_url
, resource_name
716 msg
= "Replica count not found. Cannot be scaled"
718 raise K8sException(msg
)
723 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
726 "rollback kdu_instance {} to revision {} from cluster {}".format(
727 kdu_instance
, revision
, cluster_uuid
732 self
.fs
.sync(from_path
=cluster_uuid
)
734 # look for instance to obtain namespace
735 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
736 if not instance_info
:
737 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
740 paths
, env
= self
._init
_paths
_env
(
741 cluster_name
=cluster_uuid
, create_if_not_exist
=True
745 self
.fs
.sync(from_path
=cluster_uuid
)
747 command
= self
._get
_rollback
_command
(
748 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
751 self
.log
.debug("rolling_back: {}".format(command
))
753 # exec helm in a task
754 exec_task
= asyncio
.ensure_future(
755 coro_or_future
=self
._local
_async
_exec
(
756 command
=command
, raise_exception_on_error
=False, env
=env
759 # write status in another task
760 status_task
= asyncio
.ensure_future(
761 coro_or_future
=self
._store
_status
(
762 cluster_id
=cluster_uuid
,
763 kdu_instance
=kdu_instance
,
764 namespace
=instance_info
["namespace"],
766 operation
="rollback",
771 # wait for execution task
772 await asyncio
.wait([exec_task
])
777 output
, rc
= exec_task
.result()
780 await self
._store
_status
(
781 cluster_id
=cluster_uuid
,
782 kdu_instance
=kdu_instance
,
783 namespace
=instance_info
["namespace"],
785 operation
="rollback",
791 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
793 raise K8sException(msg
)
796 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
798 # return new revision number
799 instance
= await self
.get_instance_info(
800 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
803 revision
= int(instance
.get("revision"))
804 self
.log
.debug("New revision: {}".format(revision
))
809 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
811 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
812 (this call should happen after all _terminate-config-primitive_ of the VNF
815 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
816 :param kdu_instance: unique name for the KDU instance to be deleted
817 :param kwargs: Additional parameters (None yet)
818 :return: True if successful
822 "uninstall kdu_instance {} from cluster {}".format(
823 kdu_instance
, cluster_uuid
828 self
.fs
.sync(from_path
=cluster_uuid
)
830 # look for instance to obtain namespace
831 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
832 if not instance_info
:
833 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
836 paths
, env
= self
._init
_paths
_env
(
837 cluster_name
=cluster_uuid
, create_if_not_exist
=True
841 self
.fs
.sync(from_path
=cluster_uuid
)
843 command
= self
._get
_uninstall
_command
(
844 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
846 output
, _rc
= await self
._local
_async
_exec
(
847 command
=command
, raise_exception_on_error
=True, env
=env
851 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
853 return self
._output
_to
_table
(output
)
855 async def instances_list(self
, cluster_uuid
: str) -> list:
857 returns a list of deployed releases in a cluster
859 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
863 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
866 self
.fs
.sync(from_path
=cluster_uuid
)
868 # execute internal command
869 result
= await self
._instances
_list
(cluster_uuid
)
872 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
876 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
877 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
878 for instance
in instances
:
879 if instance
.get("name") == kdu_instance
:
881 self
.log
.debug("Instance {} not found".format(kdu_instance
))
884 async def exec_primitive(
886 cluster_uuid
: str = None,
887 kdu_instance
: str = None,
888 primitive_name
: str = None,
889 timeout
: float = 300,
891 db_dict
: dict = None,
894 """Exec primitive (Juju action)
896 :param cluster_uuid: The UUID of the cluster or namespace:cluster
897 :param kdu_instance: The unique name of the KDU instance
898 :param primitive_name: Name of action that will be executed
899 :param timeout: Timeout for action execution
900 :param params: Dictionary of all the parameters needed for the action
901 :db_dict: Dictionary for any additional data
902 :param kwargs: Additional parameters (None yet)
904 :return: Returns the output of the action
907 "KDUs deployed with Helm don't support actions "
908 "different from rollback, upgrade and status"
911 async def get_services(
912 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
915 Returns a list of services defined for the specified kdu instance.
917 :param cluster_uuid: UUID of a K8s cluster known by OSM
918 :param kdu_instance: unique name for the KDU instance
919 :param namespace: K8s namespace used by the KDU instance
920 :return: If successful, it will return a list of services, Each service
921 can have the following data:
922 - `name` of the service
923 - `type` type of service in the k8 cluster
924 - `ports` List of ports offered by the service, for each port includes at least
926 - `cluster_ip` Internal ip to be used inside k8s cluster
927 - `external_ip` List of external ips (in case they are available)
931 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
932 cluster_uuid
, kdu_instance
937 paths
, env
= self
._init
_paths
_env
(
938 cluster_name
=cluster_uuid
, create_if_not_exist
=True
942 self
.fs
.sync(from_path
=cluster_uuid
)
944 # get list of services names for kdu
945 service_names
= await self
._get
_services
(
946 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
950 for service
in service_names
:
951 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
952 service_list
.append(service
)
955 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
959 async def get_service(
960 self
, cluster_uuid
: str, service_name
: str, namespace
: str
964 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
965 service_name
, namespace
, cluster_uuid
970 self
.fs
.sync(from_path
=cluster_uuid
)
972 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
975 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
979 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
) -> str:
981 This call would retrieve tha current state of a given KDU instance. It would be
982 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
983 values_ of the configuration parameters applied to a given instance. This call
984 would be based on the `status` call.
986 :param cluster_uuid: UUID of a K8s cluster known by OSM
987 :param kdu_instance: unique name for the KDU instance
988 :param kwargs: Additional parameters (None yet)
989 :return: If successful, it will return the following vector of arguments:
990 - K8s `namespace` in the cluster where the KDU lives
991 - `state` of the KDU instance. It can be:
998 - List of `resources` (objects) that this release consists of, sorted by kind,
999 and the status of those resources
1000 - Last `deployment_time`.
1004 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1005 cluster_uuid
, kdu_instance
1010 self
.fs
.sync(from_path
=cluster_uuid
)
1012 # get instance: needed to obtain namespace
1013 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1014 for instance
in instances
:
1015 if instance
.get("name") == kdu_instance
:
1018 # instance does not exist
1020 "Instance name: {} not found in cluster: {}".format(
1021 kdu_instance
, cluster_uuid
1025 status
= await self
._status
_kdu
(
1026 cluster_id
=cluster_uuid
,
1027 kdu_instance
=kdu_instance
,
1028 namespace
=instance
["namespace"],
1029 show_error_log
=True,
1034 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1038 async def get_values_kdu(
1039 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1042 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1044 return await self
._exec
_get
_command
(
1045 get_command
="values",
1046 kdu_instance
=kdu_instance
,
1047 namespace
=namespace
,
1048 kubeconfig
=kubeconfig
,
1051 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1054 "inspect kdu_model values {} from (optional) repo: {}".format(
1059 return await self
._exec
_inspect
_command
(
1060 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1063 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1066 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1069 return await self
._exec
_inspect
_command
(
1070 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1073 async def synchronize_repos(self
, cluster_uuid
: str):
1075 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1077 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1078 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1080 local_repo_list
= await self
.repo_list(cluster_uuid
)
1081 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1083 deleted_repo_list
= []
1084 added_repo_dict
= {}
1086 # iterate over the list of repos in the database that should be
1087 # added if not present
1088 for repo_name
, db_repo
in db_repo_dict
.items():
1090 # check if it is already present
1091 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1092 repo_id
= db_repo
.get("_id")
1093 if curr_repo_url
!= db_repo
["url"]:
1096 "repo {} url changed, delete and and again".format(
1100 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1101 deleted_repo_list
.append(repo_id
)
1104 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1105 await self
.repo_add(
1106 cluster_uuid
, db_repo
["name"], db_repo
["url"]
1108 added_repo_dict
[repo_id
] = db_repo
["name"]
1109 except Exception as e
:
1111 "Error adding repo id: {}, err_msg: {} ".format(
1116 # Delete repos that are present but not in nbi_list
1117 for repo_name
in local_repo_dict
:
1118 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1119 self
.log
.debug("delete repo {}".format(repo_name
))
1121 await self
.repo_remove(cluster_uuid
, repo_name
)
1122 deleted_repo_list
.append(repo_name
)
1123 except Exception as e
:
1125 "Error deleting repo, name: {}, err_msg: {}".format(
1130 return deleted_repo_list
, added_repo_dict
1132 except K8sException
:
1134 except Exception as e
:
1135 # Do not raise errors synchronizing repos
1136 self
.log
.error("Error synchronizing repos: {}".format(e
))
1137 raise Exception("Error synchronizing repos: {}".format(e
))
1139 def _get_db_repos_dict(self
, repo_ids
: list):
1141 for repo_id
in repo_ids
:
1142 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1143 db_repos_dict
[db_repo
["name"]] = db_repo
1144 return db_repos_dict
1147 ####################################################################################
1148 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1149 ####################################################################################
1153 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1155 Creates and returns base cluster and kube dirs and returns them.
1156 Also created helm3 dirs according to new directory specification, paths are
1157 not returned but assigned to helm environment variables
1159 :param cluster_name: cluster_name
1160 :return: Dictionary with config_paths and dictionary with helm environment variables
1164 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1166 Implements the helm version dependent cluster initialization
1170 async def _instances_list(self
, cluster_id
):
1172 Implements the helm version dependent helm instances list
1176 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1178 Implements the helm version dependent method to obtain services from a helm instance
1182 async def _status_kdu(
1186 namespace
: str = None,
1187 show_error_log
: bool = False,
1188 return_text
: bool = False,
1191 Implements the helm version dependent method to obtain status of a helm instance
1195 def _get_install_command(
1207 Obtain command to be executed to delete the indicated instance
1211 def _get_upgrade_scale_command(
1224 """Obtain command to be executed to upgrade the indicated instance."""
1227 def _get_upgrade_command(
1239 Obtain command to be executed to upgrade the indicated instance
1243 def _get_rollback_command(
1244 self
, kdu_instance
, namespace
, revision
, kubeconfig
1247 Obtain command to be executed to rollback the indicated instance
1251 def _get_uninstall_command(
1252 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1255 Obtain command to be executed to delete the indicated instance
1259 def _get_inspect_command(
1260 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1263 Obtain command to be executed to obtain information about the kdu
1267 def _get_get_command(
1268 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1270 """Obtain command to be executed to get information about the kdu instance."""
1273 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1275 Method call to uninstall cluster software for helm. This method is dependent
1277 For Helm v2 it will be called when Tiller must be uninstalled
1278 For Helm v3 it does nothing and does not need to be callled
1282 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1284 Obtains the cluster repos identifiers
1288 ####################################################################################
1289 ################################### P R I V A T E ##################################
1290 ####################################################################################
1294 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1295 if os
.path
.exists(filename
):
1298 msg
= "File {} does not exist".format(filename
)
1299 if exception_if_not_exists
:
1300 raise K8sException(msg
)
1303 def _remove_multiple_spaces(strobj
):
1304 strobj
= strobj
.strip()
1305 while " " in strobj
:
1306 strobj
= strobj
.replace(" ", " ")
1310 def _output_to_lines(output
: str) -> list:
1311 output_lines
= list()
1312 lines
= output
.splitlines(keepends
=False)
1316 output_lines
.append(line
)
1320 def _output_to_table(output
: str) -> list:
1321 output_table
= list()
1322 lines
= output
.splitlines(keepends
=False)
1324 line
= line
.replace("\t", " ")
1326 output_table
.append(line_list
)
1327 cells
= line
.split(sep
=" ")
1331 line_list
.append(cell
)
1335 def _parse_services(output
: str) -> list:
1336 lines
= output
.splitlines(keepends
=False)
1339 line
= line
.replace("\t", " ")
1340 cells
= line
.split(sep
=" ")
1341 if len(cells
) > 0 and cells
[0].startswith("service/"):
1342 elems
= cells
[0].split(sep
="/")
1344 services
.append(elems
[1])
1348 def _get_deep(dictionary
: dict, members
: tuple):
1353 value
= target
.get(m
)
1362 # find key:value in several lines
1364 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1365 for line
in p_lines
:
1367 if line
.startswith(p_key
+ ":"):
1368 parts
= line
.split(":")
1369 the_value
= parts
[1].strip()
1377 def _lower_keys_list(input_list
: list):
1379 Transform the keys in a list of dictionaries to lower case and returns a new list
1384 for dictionary
in input_list
:
1385 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1386 new_list
.append(new_dict
)
1389 async def _local_async_exec(
1392 raise_exception_on_error
: bool = False,
1393 show_error_log
: bool = True,
1394 encode_utf8
: bool = False,
1398 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1400 "Executing async local command: {}, env: {}".format(command
, env
)
1404 command
= shlex
.split(command
)
1406 environ
= os
.environ
.copy()
1411 process
= await asyncio
.create_subprocess_exec(
1413 stdout
=asyncio
.subprocess
.PIPE
,
1414 stderr
=asyncio
.subprocess
.PIPE
,
1418 # wait for command terminate
1419 stdout
, stderr
= await process
.communicate()
1421 return_code
= process
.returncode
1425 output
= stdout
.decode("utf-8").strip()
1426 # output = stdout.decode()
1428 output
= stderr
.decode("utf-8").strip()
1429 # output = stderr.decode()
1431 if return_code
!= 0 and show_error_log
:
1433 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1436 self
.log
.debug("Return code: {}".format(return_code
))
1438 if raise_exception_on_error
and return_code
!= 0:
1439 raise K8sException(output
)
1442 output
= output
.encode("utf-8").strip()
1443 output
= str(output
).replace("\\n", "\n")
1445 return output
, return_code
1447 except asyncio
.CancelledError
:
1449 except K8sException
:
1451 except Exception as e
:
1452 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1454 if raise_exception_on_error
:
1455 raise K8sException(e
) from e
1459 async def _local_async_exec_pipe(
1463 raise_exception_on_error
: bool = True,
1464 show_error_log
: bool = True,
1465 encode_utf8
: bool = False,
1469 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1470 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1471 command
= "{} | {}".format(command1
, command2
)
1473 "Executing async local command: {}, env: {}".format(command
, env
)
1477 command1
= shlex
.split(command1
)
1478 command2
= shlex
.split(command2
)
1480 environ
= os
.environ
.copy()
1485 read
, write
= os
.pipe()
1486 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1488 process_2
= await asyncio
.create_subprocess_exec(
1489 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1492 stdout
, stderr
= await process_2
.communicate()
1494 return_code
= process_2
.returncode
1498 output
= stdout
.decode("utf-8").strip()
1499 # output = stdout.decode()
1501 output
= stderr
.decode("utf-8").strip()
1502 # output = stderr.decode()
1504 if return_code
!= 0 and show_error_log
:
1506 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1509 self
.log
.debug("Return code: {}".format(return_code
))
1511 if raise_exception_on_error
and return_code
!= 0:
1512 raise K8sException(output
)
1515 output
= output
.encode("utf-8").strip()
1516 output
= str(output
).replace("\\n", "\n")
1518 return output
, return_code
1519 except asyncio
.CancelledError
:
1521 except K8sException
:
1523 except Exception as e
:
1524 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1526 if raise_exception_on_error
:
1527 raise K8sException(e
) from e
1531 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1533 Obtains the data of the specified service in the k8cluster.
1535 :param cluster_id: id of a K8s cluster known by OSM
1536 :param service_name: name of the K8s service in the specified namespace
1537 :param namespace: K8s namespace used by the KDU instance
1538 :return: If successful, it will return a service with the following data:
1539 - `name` of the service
1540 - `type` type of service in the k8 cluster
1541 - `ports` List of ports offered by the service, for each port includes at least
1542 name, port, protocol
1543 - `cluster_ip` Internal ip to be used inside k8s cluster
1544 - `external_ip` List of external ips (in case they are available)
1548 paths
, env
= self
._init
_paths
_env
(
1549 cluster_name
=cluster_id
, create_if_not_exist
=True
1552 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1553 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1556 output
, _rc
= await self
._local
_async
_exec
(
1557 command
=command
, raise_exception_on_error
=True, env
=env
1560 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1563 "name": service_name
,
1564 "type": self
._get
_deep
(data
, ("spec", "type")),
1565 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1566 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1568 if service
["type"] == "LoadBalancer":
1569 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1570 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1571 service
["external_ip"] = ip_list
1575 async def _exec_get_command(
1576 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1578 """Obtains information about the kdu instance."""
1580 full_command
= self
._get
_get
_command
(
1581 get_command
, kdu_instance
, namespace
, kubeconfig
1584 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1588 async def _exec_inspect_command(
1589 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1591 """Obtains information about a kdu, no cluster (no env)."""
1595 repo_str
= " --repo {}".format(repo_url
)
1597 idx
= kdu_model
.find("/")
1600 kdu_model
= kdu_model
[idx
:]
1602 kdu_model
, version
= self
._split
_version
(kdu_model
)
1604 version_str
= "--version {}".format(version
)
1608 full_command
= self
._get
_inspect
_command
(
1609 inspect_command
, kdu_model
, repo_str
, version_str
1612 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1616 async def _get_replica_count_url(
1620 resource_name
: str = None,
1622 """Get the replica count value in the Helm Chart Values.
1625 kdu_model: The name or path of a bundle
1626 repo_url: Helm Chart repository url
1627 resource_name: Resource name
1630 True if replicas, False replicaCount
1633 kdu_values
= yaml
.load(
1634 await self
.values_kdu(kdu_model
, repo_url
), Loader
=yaml
.SafeLoader
1639 "kdu_values not found for kdu_model {}".format(kdu_model
)
1643 kdu_values
= kdu_values
.get(resource_name
, None)
1646 msg
= "resource {} not found in the values in model {}".format(
1647 resource_name
, kdu_model
1650 raise K8sException(msg
)
1652 duplicate_check
= False
1657 if kdu_values
.get("replicaCount", None):
1658 replicas
= kdu_values
["replicaCount"]
1659 replica_str
= "replicaCount"
1660 elif kdu_values
.get("replicas", None):
1661 duplicate_check
= True
1662 replicas
= kdu_values
["replicas"]
1663 replica_str
= "replicas"
1667 "replicaCount or replicas not found in the resource"
1668 "{} values in model {}. Cannot be scaled".format(
1669 resource_name
, kdu_model
1674 "replicaCount or replicas not found in the values"
1675 "in model {}. Cannot be scaled".format(kdu_model
)
1678 raise K8sException(msg
)
1680 # Control if replicas and replicaCount exists at the same time
1681 msg
= "replicaCount and replicas are exists at the same time"
1683 if "replicaCount" in kdu_values
:
1685 raise K8sException(msg
)
1687 if "replicas" in kdu_values
:
1689 raise K8sException(msg
)
1691 return replicas
, replica_str
1693 async def _get_replica_count_instance(
1698 resource_name
: str = None,
1700 """Get the replica count value in the instance.
1703 kdu_instance: The name of the KDU instance
1704 namespace: KDU instance namespace
1706 resource_name: Resource name
1709 True if replicas, False replicaCount
1712 kdu_values
= yaml
.load(
1713 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1714 Loader
=yaml
.SafeLoader
,
1721 kdu_values
.get(resource_name
, None) if resource_name
else None
1725 resource_values
.get("replicaCount", None)
1726 or resource_values
.get("replicas", None)
1730 kdu_values
.get("replicaCount", None)
1731 or kdu_values
.get("replicas", None)
1737 async def _store_status(
1742 namespace
: str = None,
1743 check_every
: float = 10,
1744 db_dict
: dict = None,
1745 run_once
: bool = False,
1749 await asyncio
.sleep(check_every
)
1750 detailed_status
= await self
._status
_kdu
(
1751 cluster_id
=cluster_id
,
1752 kdu_instance
=kdu_instance
,
1753 namespace
=namespace
,
1756 status
= detailed_status
.get("info").get("description")
1757 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1758 # write status to db
1759 result
= await self
.write_app_status_to_db(
1762 detailed_status
=str(detailed_status
),
1763 operation
=operation
,
1766 self
.log
.info("Error writing in database. Task exiting...")
1768 except asyncio
.CancelledError
:
1769 self
.log
.debug("Task cancelled")
1771 except Exception as e
:
1773 "_store_status exception: {}".format(str(e
)), exc_info
=True
1780 # params for use in -f file
1781 # returns values file option and filename (in order to delete it at the end)
1782 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1784 if params
and len(params
) > 0:
1785 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1787 def get_random_number():
1788 r
= random
.randrange(start
=1, stop
=99999999)
1796 value
= params
.get(key
)
1797 if "!!yaml" in str(value
):
1798 value
= yaml
.load(value
[7:])
1799 params2
[key
] = value
1801 values_file
= get_random_number() + ".yaml"
1802 with
open(values_file
, "w") as stream
:
1803 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1805 return "-f {}".format(values_file
), values_file
1809 # params for use in --set option
1811 def _params_to_set_option(params
: dict) -> str:
1813 if params
and len(params
) > 0:
1816 value
= params
.get(key
, None)
1817 if value
is not None:
1819 params_str
+= "--set "
1823 params_str
+= "{}={}".format(key
, value
)
1827 def generate_kdu_instance_name(**kwargs
):
1828 chart_name
= kwargs
["kdu_model"]
1829 # check embeded chart (file or dir)
1830 if chart_name
.startswith("/"):
1831 # extract file or directory name
1832 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1834 elif "://" in chart_name
:
1835 # extract last portion of URL
1836 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1839 for c
in chart_name
:
1840 if c
.isalpha() or c
.isnumeric():
1847 # if does not start with alpha character, prefix 'a'
1848 if not name
[0].isalpha():
1853 def get_random_number():
1854 r
= random
.randrange(start
=1, stop
=99999999)
1856 s
= s
.rjust(10, "0")
1859 name
= name
+ get_random_number()
1862 def _split_version(self
, kdu_model
: str) -> (str, str):
1864 if ":" in kdu_model
:
1865 parts
= kdu_model
.split(sep
=":")
1867 version
= str(parts
[1])
1868 kdu_model
= parts
[0]
1869 return kdu_model
, version
1871 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
1873 idx
= kdu_model
.find("/")
1875 repo_name
= kdu_model
[:idx
]
1876 # Find repository link
1877 local_repo_list
= await self
.repo_list(cluster_uuid
)
1878 for repo
in local_repo_list
:
1879 repo_url
= repo
["url"] if repo
["name"] == repo_name
else None