952630a5e9f11b3a4e0de2e359246cfa7d41323b
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 def _get_namespace(self
, cluster_uuid
: str) -> str:
95 Obtains the namespace used by the cluster with the uuid passed by argument
97 param: cluster_uuid: cluster's uuid
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster
= self
.db
.get_one(
102 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
104 return k8scluster
.get("namespace")
109 namespace
: str = "kube-system",
110 reuse_cluster_uuid
=None,
114 It prepares a given K8s cluster environment to run Charts
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
127 if reuse_cluster_uuid
:
128 cluster_id
= reuse_cluster_uuid
130 cluster_id
= str(uuid4())
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
136 paths
, env
= self
._init
_paths
_env
(
137 cluster_name
=cluster_id
, create_if_not_exist
=True
139 mode
= stat
.S_IRUSR | stat
.S_IWUSR
140 with
open(paths
["kube_config"], "w", mode
) as f
:
142 os
.chmod(paths
["kube_config"], 0o600)
144 # Code with initialization specific of helm version
145 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
147 # sync fs with local data
148 self
.fs
.reverse_sync(from_path
=cluster_id
)
150 self
.log
.info("Cluster {} initialized".format(cluster_id
))
152 return cluster_id
, n2vc_installed_sw
159 repo_type
: str = "chart",
162 password
: str = None,
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid
, repo_type
, name
, url
171 paths
, env
= self
._init
_paths
_env
(
172 cluster_name
=cluster_uuid
, create_if_not_exist
=True
176 self
.fs
.sync(from_path
=cluster_uuid
)
178 # helm repo add name url
179 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
180 paths
["kube_config"], self
._helm
_command
, name
, url
184 temp_cert_file
= os
.path
.join(
185 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
187 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
188 with
open(temp_cert_file
, "w") as the_cert
:
190 command
+= " --ca-file {}".format(temp_cert_file
)
193 command
+= " --username={}".format(user
)
196 command
+= " --password={}".format(password
)
198 self
.log
.debug("adding repo: {}".format(command
))
199 await self
._local
_async
_exec
(
200 command
=command
, raise_exception_on_error
=True, env
=env
204 command
= "env KUBECONFIG={} {} repo update {}".format(
205 paths
["kube_config"], self
._helm
_command
, name
207 self
.log
.debug("updating repo: {}".format(command
))
208 await self
._local
_async
_exec
(
209 command
=command
, raise_exception_on_error
=False, env
=env
213 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
215 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid
, repo_type
, name
223 paths
, env
= self
._init
_paths
_env
(
224 cluster_name
=cluster_uuid
, create_if_not_exist
=True
228 self
.fs
.sync(from_path
=cluster_uuid
)
231 command
= "{} repo update {}".format(self
._helm
_command
, name
)
232 self
.log
.debug("updating repo: {}".format(command
))
233 await self
._local
_async
_exec
(
234 command
=command
, raise_exception_on_error
=False, env
=env
238 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
240 async def repo_list(self
, cluster_uuid
: str) -> list:
242 Get the list of registered repositories
244 :return: list of registered repositories: [ (name, url) .... ]
247 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
250 paths
, env
= self
._init
_paths
_env
(
251 cluster_name
=cluster_uuid
, create_if_not_exist
=True
255 self
.fs
.sync(from_path
=cluster_uuid
)
257 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths
["kube_config"], self
._helm
_command
261 # Set exception to false because if there are no repos just want an empty list
262 output
, _rc
= await self
._local
_async
_exec
(
263 command
=command
, raise_exception_on_error
=False, env
=env
267 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
270 if output
and len(output
) > 0:
271 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self
._lower
_keys
_list
(repos
)
279 async def repo_remove(self
, cluster_uuid
: str, name
: str):
281 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
285 paths
, env
= self
._init
_paths
_env
(
286 cluster_name
=cluster_uuid
, create_if_not_exist
=True
290 self
.fs
.sync(from_path
=cluster_uuid
)
292 command
= "env KUBECONFIG={} {} repo remove {}".format(
293 paths
["kube_config"], self
._helm
_command
, name
295 await self
._local
_async
_exec
(
296 command
=command
, raise_exception_on_error
=True, env
=env
300 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
306 uninstall_sw
: bool = False,
311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
319 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
322 cluster_uuid
, uninstall_sw
327 self
.fs
.sync(from_path
=cluster_uuid
)
329 # uninstall releases if needed.
331 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
332 if len(releases
) > 0:
336 kdu_instance
= r
.get("name")
337 chart
= r
.get("chart")
339 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
341 await self
.uninstall(
342 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
344 except Exception as e
:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
349 "Error uninstalling release {}: {}".format(
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
356 ).format(cluster_uuid
)
359 False # Allow to remove k8s cluster without removing Tiller
363 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
365 # delete cluster directory
366 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
367 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
368 # Remove also local directorio if still exist
369 direct
= self
.fs
.path
+ "/" + cluster_uuid
370 shutil
.rmtree(direct
, ignore_errors
=True)
374 async def _install_impl(
382 timeout
: float = 300,
384 db_dict
: dict = None,
385 kdu_name
: str = None,
386 namespace
: str = None,
389 paths
, env
= self
._init
_paths
_env
(
390 cluster_name
=cluster_id
, create_if_not_exist
=True
394 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
395 cluster_id
=cluster_id
, params
=params
399 kdu_model
, version
= self
._split
_version
(kdu_model
)
401 repo
= self
._split
_repo
(kdu_model
)
403 self
.repo_update(cluster_id
, repo
)
405 command
= self
._get
_install
_command
(
413 paths
["kube_config"],
416 self
.log
.debug("installing: {}".format(command
))
419 # exec helm in a task
420 exec_task
= asyncio
.ensure_future(
421 coro_or_future
=self
._local
_async
_exec
(
422 command
=command
, raise_exception_on_error
=False, env
=env
426 # write status in another task
427 status_task
= asyncio
.ensure_future(
428 coro_or_future
=self
._store
_status
(
429 cluster_id
=cluster_id
,
430 kdu_instance
=kdu_instance
,
438 # wait for execution task
439 await asyncio
.wait([exec_task
])
444 output
, rc
= exec_task
.result()
448 output
, rc
= await self
._local
_async
_exec
(
449 command
=command
, raise_exception_on_error
=False, env
=env
452 # remove temporal values yaml file
454 os
.remove(file_to_delete
)
457 await self
._store
_status
(
458 cluster_id
=cluster_id
,
459 kdu_instance
=kdu_instance
,
468 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
470 raise K8sException(msg
)
476 kdu_model
: str = None,
478 timeout
: float = 300,
480 db_dict
: dict = None,
482 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
485 self
.fs
.sync(from_path
=cluster_uuid
)
487 # look for instance to obtain namespace
488 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
489 if not instance_info
:
490 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
493 paths
, env
= self
._init
_paths
_env
(
494 cluster_name
=cluster_uuid
, create_if_not_exist
=True
498 self
.fs
.sync(from_path
=cluster_uuid
)
501 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
502 cluster_id
=cluster_uuid
, params
=params
506 kdu_model
, version
= self
._split
_version
(kdu_model
)
508 repo
= self
._split
_repo
(kdu_model
)
510 self
.repo_update(cluster_uuid
, repo
)
512 command
= self
._get
_upgrade
_command
(
515 instance_info
["namespace"],
520 paths
["kube_config"],
523 self
.log
.debug("upgrading: {}".format(command
))
527 # exec helm in a task
528 exec_task
= asyncio
.ensure_future(
529 coro_or_future
=self
._local
_async
_exec
(
530 command
=command
, raise_exception_on_error
=False, env
=env
533 # write status in another task
534 status_task
= asyncio
.ensure_future(
535 coro_or_future
=self
._store
_status
(
536 cluster_id
=cluster_uuid
,
537 kdu_instance
=kdu_instance
,
538 namespace
=instance_info
["namespace"],
545 # wait for execution task
546 await asyncio
.wait([exec_task
])
550 output
, rc
= exec_task
.result()
554 output
, rc
= await self
._local
_async
_exec
(
555 command
=command
, raise_exception_on_error
=False, env
=env
558 # remove temporal values yaml file
560 os
.remove(file_to_delete
)
563 await self
._store
_status
(
564 cluster_id
=cluster_uuid
,
565 kdu_instance
=kdu_instance
,
566 namespace
=instance_info
["namespace"],
574 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
576 raise K8sException(msg
)
579 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
581 # return new revision number
582 instance
= await self
.get_instance_info(
583 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
586 revision
= int(instance
.get("revision"))
587 self
.log
.debug("New revision: {}".format(revision
))
597 total_timeout
: float = 1800,
598 cluster_uuid
: str = None,
599 kdu_model
: str = None,
601 db_dict
: dict = None,
604 """Scale a resource in a Helm Chart.
607 kdu_instance: KDU instance name
608 scale: Scale to which to set the resource
609 resource_name: Resource name
610 total_timeout: The time, in seconds, to wait
611 cluster_uuid: The UUID of the cluster
612 kdu_model: The chart reference
613 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
614 The --wait flag will be set automatically if --atomic is used
615 db_dict: Dictionary for any additional data
616 kwargs: Additional parameters
619 True if successful, False otherwise
622 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
624 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
625 resource_name
, kdu_model
, cluster_uuid
628 self
.log
.debug(debug_mgs
)
630 # look for instance to obtain namespace
631 # get_instance_info function calls the sync command
632 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
633 if not instance_info
:
634 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
637 paths
, env
= self
._init
_paths
_env
(
638 cluster_name
=cluster_uuid
, create_if_not_exist
=True
642 kdu_model
, version
= self
._split
_version
(kdu_model
)
644 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
647 "Repository not found for kdu_model {}".format(kdu_model
)
650 _
, replica_str
= await self
._get
_replica
_count
_url
(
651 kdu_model
, repo_url
, resource_name
654 command
= self
._get
_upgrade
_scale
_command
(
657 instance_info
["namespace"],
664 paths
["kube_config"],
667 self
.log
.debug("scaling: {}".format(command
))
670 # exec helm in a task
671 exec_task
= asyncio
.ensure_future(
672 coro_or_future
=self
._local
_async
_exec
(
673 command
=command
, raise_exception_on_error
=False, env
=env
676 # write status in another task
677 status_task
= asyncio
.ensure_future(
678 coro_or_future
=self
._store
_status
(
679 cluster_id
=cluster_uuid
,
680 kdu_instance
=kdu_instance
,
681 namespace
=instance_info
["namespace"],
688 # wait for execution task
689 await asyncio
.wait([exec_task
])
693 output
, rc
= exec_task
.result()
696 output
, rc
= await self
._local
_async
_exec
(
697 command
=command
, raise_exception_on_error
=False, env
=env
701 await self
._store
_status
(
702 cluster_id
=cluster_uuid
,
703 kdu_instance
=kdu_instance
,
704 namespace
=instance_info
["namespace"],
712 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
714 raise K8sException(msg
)
717 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
721 async def get_scale_count(
729 """Get a resource scale count.
732 cluster_uuid: The UUID of the cluster
733 resource_name: Resource name
734 kdu_instance: KDU instance name
735 kdu_model: The name or path of a bundle
736 kwargs: Additional parameters
739 Resource instance count
743 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
746 # look for instance to obtain namespace
747 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
748 if not instance_info
:
749 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
752 paths
, env
= self
._init
_paths
_env
(
753 cluster_name
=cluster_uuid
, create_if_not_exist
=True
756 replicas
= await self
._get
_replica
_count
_instance
(
757 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
760 # Get default value if scale count is not found from provided values
762 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
765 "Repository not found for kdu_model {}".format(kdu_model
)
768 replicas
, _
= await self
._get
_replica
_count
_url
(
769 kdu_model
, repo_url
, resource_name
773 msg
= "Replica count not found. Cannot be scaled"
775 raise K8sException(msg
)
780 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
783 "rollback kdu_instance {} to revision {} from cluster {}".format(
784 kdu_instance
, revision
, cluster_uuid
789 self
.fs
.sync(from_path
=cluster_uuid
)
791 # look for instance to obtain namespace
792 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
793 if not instance_info
:
794 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
797 paths
, env
= self
._init
_paths
_env
(
798 cluster_name
=cluster_uuid
, create_if_not_exist
=True
802 self
.fs
.sync(from_path
=cluster_uuid
)
804 command
= self
._get
_rollback
_command
(
805 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
808 self
.log
.debug("rolling_back: {}".format(command
))
810 # exec helm in a task
811 exec_task
= asyncio
.ensure_future(
812 coro_or_future
=self
._local
_async
_exec
(
813 command
=command
, raise_exception_on_error
=False, env
=env
816 # write status in another task
817 status_task
= asyncio
.ensure_future(
818 coro_or_future
=self
._store
_status
(
819 cluster_id
=cluster_uuid
,
820 kdu_instance
=kdu_instance
,
821 namespace
=instance_info
["namespace"],
823 operation
="rollback",
828 # wait for execution task
829 await asyncio
.wait([exec_task
])
834 output
, rc
= exec_task
.result()
837 await self
._store
_status
(
838 cluster_id
=cluster_uuid
,
839 kdu_instance
=kdu_instance
,
840 namespace
=instance_info
["namespace"],
842 operation
="rollback",
848 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
850 raise K8sException(msg
)
853 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
855 # return new revision number
856 instance
= await self
.get_instance_info(
857 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
860 revision
= int(instance
.get("revision"))
861 self
.log
.debug("New revision: {}".format(revision
))
866 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
868 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
869 (this call should happen after all _terminate-config-primitive_ of the VNF
872 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
873 :param kdu_instance: unique name for the KDU instance to be deleted
874 :param kwargs: Additional parameters (None yet)
875 :return: True if successful
879 "uninstall kdu_instance {} from cluster {}".format(
880 kdu_instance
, cluster_uuid
885 self
.fs
.sync(from_path
=cluster_uuid
)
887 # look for instance to obtain namespace
888 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
889 if not instance_info
:
890 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
893 paths
, env
= self
._init
_paths
_env
(
894 cluster_name
=cluster_uuid
, create_if_not_exist
=True
898 self
.fs
.sync(from_path
=cluster_uuid
)
900 command
= self
._get
_uninstall
_command
(
901 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
903 output
, _rc
= await self
._local
_async
_exec
(
904 command
=command
, raise_exception_on_error
=True, env
=env
908 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
910 return self
._output
_to
_table
(output
)
912 async def instances_list(self
, cluster_uuid
: str) -> list:
914 returns a list of deployed releases in a cluster
916 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
920 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
923 self
.fs
.sync(from_path
=cluster_uuid
)
925 # execute internal command
926 result
= await self
._instances
_list
(cluster_uuid
)
929 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
933 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
934 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
935 for instance
in instances
:
936 if instance
.get("name") == kdu_instance
:
938 self
.log
.debug("Instance {} not found".format(kdu_instance
))
941 async def upgrade_charm(
945 charm_id
: str = None,
946 charm_type
: str = None,
947 timeout
: float = None,
949 """This method upgrade charms in VNFs
952 ee_id: Execution environment id
953 path: Local path to the charm
955 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
956 timeout: (Float) Timeout for the ns update operation
959 The output of the update operation if status equals to "completed"
961 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
963 async def exec_primitive(
965 cluster_uuid
: str = None,
966 kdu_instance
: str = None,
967 primitive_name
: str = None,
968 timeout
: float = 300,
970 db_dict
: dict = None,
973 """Exec primitive (Juju action)
975 :param cluster_uuid: The UUID of the cluster or namespace:cluster
976 :param kdu_instance: The unique name of the KDU instance
977 :param primitive_name: Name of action that will be executed
978 :param timeout: Timeout for action execution
979 :param params: Dictionary of all the parameters needed for the action
980 :db_dict: Dictionary for any additional data
981 :param kwargs: Additional parameters (None yet)
983 :return: Returns the output of the action
986 "KDUs deployed with Helm don't support actions "
987 "different from rollback, upgrade and status"
990 async def get_services(
991 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
994 Returns a list of services defined for the specified kdu instance.
996 :param cluster_uuid: UUID of a K8s cluster known by OSM
997 :param kdu_instance: unique name for the KDU instance
998 :param namespace: K8s namespace used by the KDU instance
999 :return: If successful, it will return a list of services, Each service
1000 can have the following data:
1001 - `name` of the service
1002 - `type` type of service in the k8 cluster
1003 - `ports` List of ports offered by the service, for each port includes at least
1004 name, port, protocol
1005 - `cluster_ip` Internal ip to be used inside k8s cluster
1006 - `external_ip` List of external ips (in case they are available)
1010 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1011 cluster_uuid
, kdu_instance
1016 paths
, env
= self
._init
_paths
_env
(
1017 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1021 self
.fs
.sync(from_path
=cluster_uuid
)
1023 # get list of services names for kdu
1024 service_names
= await self
._get
_services
(
1025 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1029 for service
in service_names
:
1030 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1031 service_list
.append(service
)
1034 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1038 async def get_service(
1039 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1043 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1044 service_name
, namespace
, cluster_uuid
1049 self
.fs
.sync(from_path
=cluster_uuid
)
1051 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1054 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1058 async def status_kdu(
1059 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1060 ) -> Union
[str, dict]:
1062 This call would retrieve tha current state of a given KDU instance. It would be
1063 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1064 values_ of the configuration parameters applied to a given instance. This call
1065 would be based on the `status` call.
1067 :param cluster_uuid: UUID of a K8s cluster known by OSM
1068 :param kdu_instance: unique name for the KDU instance
1069 :param kwargs: Additional parameters (None yet)
1070 :param yaml_format: if the return shall be returned as an YAML string or as a
1072 :return: If successful, it will return the following vector of arguments:
1073 - K8s `namespace` in the cluster where the KDU lives
1074 - `state` of the KDU instance. It can be:
1081 - List of `resources` (objects) that this release consists of, sorted by kind,
1082 and the status of those resources
1083 - Last `deployment_time`.
1087 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1088 cluster_uuid
, kdu_instance
1093 self
.fs
.sync(from_path
=cluster_uuid
)
1095 # get instance: needed to obtain namespace
1096 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1097 for instance
in instances
:
1098 if instance
.get("name") == kdu_instance
:
1101 # instance does not exist
1103 "Instance name: {} not found in cluster: {}".format(
1104 kdu_instance
, cluster_uuid
1108 status
= await self
._status
_kdu
(
1109 cluster_id
=cluster_uuid
,
1110 kdu_instance
=kdu_instance
,
1111 namespace
=instance
["namespace"],
1112 yaml_format
=yaml_format
,
1113 show_error_log
=True,
1117 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1121 async def get_values_kdu(
1122 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1125 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1127 return await self
._exec
_get
_command
(
1128 get_command
="values",
1129 kdu_instance
=kdu_instance
,
1130 namespace
=namespace
,
1131 kubeconfig
=kubeconfig
,
1134 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1137 "inspect kdu_model values {} from (optional) repo: {}".format(
1142 return await self
._exec
_inspect
_command
(
1143 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1146 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1149 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1152 return await self
._exec
_inspect
_command
(
1153 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1156 async def synchronize_repos(self
, cluster_uuid
: str):
1158 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1160 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1161 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1163 local_repo_list
= await self
.repo_list(cluster_uuid
)
1164 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1166 deleted_repo_list
= []
1167 added_repo_dict
= {}
1169 # iterate over the list of repos in the database that should be
1170 # added if not present
1171 for repo_name
, db_repo
in db_repo_dict
.items():
1173 # check if it is already present
1174 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1175 repo_id
= db_repo
.get("_id")
1176 if curr_repo_url
!= db_repo
["url"]:
1179 "repo {} url changed, delete and and again".format(
1183 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1184 deleted_repo_list
.append(repo_id
)
1187 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1188 if "ca_cert" in db_repo
:
1189 await self
.repo_add(
1193 cert
=db_repo
["ca_cert"],
1196 await self
.repo_add(
1201 added_repo_dict
[repo_id
] = db_repo
["name"]
1202 except Exception as e
:
1204 "Error adding repo id: {}, err_msg: {} ".format(
1209 # Delete repos that are present but not in nbi_list
1210 for repo_name
in local_repo_dict
:
1211 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1212 self
.log
.debug("delete repo {}".format(repo_name
))
1214 await self
.repo_remove(cluster_uuid
, repo_name
)
1215 deleted_repo_list
.append(repo_name
)
1216 except Exception as e
:
1218 "Error deleting repo, name: {}, err_msg: {}".format(
1223 return deleted_repo_list
, added_repo_dict
1225 except K8sException
:
1227 except Exception as e
:
1228 # Do not raise errors synchronizing repos
1229 self
.log
.error("Error synchronizing repos: {}".format(e
))
1230 raise Exception("Error synchronizing repos: {}".format(e
))
1232 def _get_db_repos_dict(self
, repo_ids
: list):
1234 for repo_id
in repo_ids
:
1235 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1236 db_repos_dict
[db_repo
["name"]] = db_repo
1237 return db_repos_dict
1240 ####################################################################################
1241 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1242 ####################################################################################
1246 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1248 Creates and returns base cluster and kube dirs and returns them.
1249 Also created helm3 dirs according to new directory specification, paths are
1250 not returned but assigned to helm environment variables
1252 :param cluster_name: cluster_name
1253 :return: Dictionary with config_paths and dictionary with helm environment variables
1257 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1259 Implements the helm version dependent cluster initialization
1263 async def _instances_list(self
, cluster_id
):
1265 Implements the helm version dependent helm instances list
1269 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1271 Implements the helm version dependent method to obtain services from a helm instance
1275 async def _status_kdu(
1279 namespace
: str = None,
1280 yaml_format
: bool = False,
1281 show_error_log
: bool = False,
1282 ) -> Union
[str, dict]:
1284 Implements the helm version dependent method to obtain status of a helm instance
1288 def _get_install_command(
1300 Obtain command to be executed to delete the indicated instance
1304 def _get_upgrade_scale_command(
1317 """Obtain command to be executed to upgrade the indicated instance."""
1320 def _get_upgrade_command(
1332 Obtain command to be executed to upgrade the indicated instance
1336 def _get_rollback_command(
1337 self
, kdu_instance
, namespace
, revision
, kubeconfig
1340 Obtain command to be executed to rollback the indicated instance
1344 def _get_uninstall_command(
1345 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1348 Obtain command to be executed to delete the indicated instance
1352 def _get_inspect_command(
1353 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1356 Obtain command to be executed to obtain information about the kdu
1360 def _get_get_command(
1361 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1363 """Obtain command to be executed to get information about the kdu instance."""
1366 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1368 Method call to uninstall cluster software for helm. This method is dependent
1370 For Helm v2 it will be called when Tiller must be uninstalled
1371 For Helm v3 it does nothing and does not need to be callled
1375 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1377 Obtains the cluster repos identifiers
1381 ####################################################################################
1382 ################################### P R I V A T E ##################################
1383 ####################################################################################
1387 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1388 if os
.path
.exists(filename
):
1391 msg
= "File {} does not exist".format(filename
)
1392 if exception_if_not_exists
:
1393 raise K8sException(msg
)
1396 def _remove_multiple_spaces(strobj
):
1397 strobj
= strobj
.strip()
1398 while " " in strobj
:
1399 strobj
= strobj
.replace(" ", " ")
1403 def _output_to_lines(output
: str) -> list:
1404 output_lines
= list()
1405 lines
= output
.splitlines(keepends
=False)
1409 output_lines
.append(line
)
1413 def _output_to_table(output
: str) -> list:
1414 output_table
= list()
1415 lines
= output
.splitlines(keepends
=False)
1417 line
= line
.replace("\t", " ")
1419 output_table
.append(line_list
)
1420 cells
= line
.split(sep
=" ")
1424 line_list
.append(cell
)
1428 def _parse_services(output
: str) -> list:
1429 lines
= output
.splitlines(keepends
=False)
1432 line
= line
.replace("\t", " ")
1433 cells
= line
.split(sep
=" ")
1434 if len(cells
) > 0 and cells
[0].startswith("service/"):
1435 elems
= cells
[0].split(sep
="/")
1437 services
.append(elems
[1])
1441 def _get_deep(dictionary
: dict, members
: tuple):
1446 value
= target
.get(m
)
1455 # find key:value in several lines
1457 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1458 for line
in p_lines
:
1460 if line
.startswith(p_key
+ ":"):
1461 parts
= line
.split(":")
1462 the_value
= parts
[1].strip()
1470 def _lower_keys_list(input_list
: list):
1472 Transform the keys in a list of dictionaries to lower case and returns a new list
1477 for dictionary
in input_list
:
1478 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1479 new_list
.append(new_dict
)
1482 async def _local_async_exec(
1485 raise_exception_on_error
: bool = False,
1486 show_error_log
: bool = True,
1487 encode_utf8
: bool = False,
1491 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1493 "Executing async local command: {}, env: {}".format(command
, env
)
1497 command
= shlex
.split(command
)
1499 environ
= os
.environ
.copy()
1504 process
= await asyncio
.create_subprocess_exec(
1506 stdout
=asyncio
.subprocess
.PIPE
,
1507 stderr
=asyncio
.subprocess
.PIPE
,
1511 # wait for command terminate
1512 stdout
, stderr
= await process
.communicate()
1514 return_code
= process
.returncode
1518 output
= stdout
.decode("utf-8").strip()
1519 # output = stdout.decode()
1521 output
= stderr
.decode("utf-8").strip()
1522 # output = stderr.decode()
1524 if return_code
!= 0 and show_error_log
:
1526 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1529 self
.log
.debug("Return code: {}".format(return_code
))
1531 if raise_exception_on_error
and return_code
!= 0:
1532 raise K8sException(output
)
1535 output
= output
.encode("utf-8").strip()
1536 output
= str(output
).replace("\\n", "\n")
1538 return output
, return_code
1540 except asyncio
.CancelledError
:
1542 except K8sException
:
1544 except Exception as e
:
1545 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1547 if raise_exception_on_error
:
1548 raise K8sException(e
) from e
1552 async def _local_async_exec_pipe(
1556 raise_exception_on_error
: bool = True,
1557 show_error_log
: bool = True,
1558 encode_utf8
: bool = False,
1562 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1563 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1564 command
= "{} | {}".format(command1
, command2
)
1566 "Executing async local command: {}, env: {}".format(command
, env
)
1570 command1
= shlex
.split(command1
)
1571 command2
= shlex
.split(command2
)
1573 environ
= os
.environ
.copy()
1578 read
, write
= os
.pipe()
1579 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1581 process_2
= await asyncio
.create_subprocess_exec(
1582 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1585 stdout
, stderr
= await process_2
.communicate()
1587 return_code
= process_2
.returncode
1591 output
= stdout
.decode("utf-8").strip()
1592 # output = stdout.decode()
1594 output
= stderr
.decode("utf-8").strip()
1595 # output = stderr.decode()
1597 if return_code
!= 0 and show_error_log
:
1599 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1602 self
.log
.debug("Return code: {}".format(return_code
))
1604 if raise_exception_on_error
and return_code
!= 0:
1605 raise K8sException(output
)
1608 output
= output
.encode("utf-8").strip()
1609 output
= str(output
).replace("\\n", "\n")
1611 return output
, return_code
1612 except asyncio
.CancelledError
:
1614 except K8sException
:
1616 except Exception as e
:
1617 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1619 if raise_exception_on_error
:
1620 raise K8sException(e
) from e
1624 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1626 Obtains the data of the specified service in the k8cluster.
1628 :param cluster_id: id of a K8s cluster known by OSM
1629 :param service_name: name of the K8s service in the specified namespace
1630 :param namespace: K8s namespace used by the KDU instance
1631 :return: If successful, it will return a service with the following data:
1632 - `name` of the service
1633 - `type` type of service in the k8 cluster
1634 - `ports` List of ports offered by the service, for each port includes at least
1635 name, port, protocol
1636 - `cluster_ip` Internal ip to be used inside k8s cluster
1637 - `external_ip` List of external ips (in case they are available)
1641 paths
, env
= self
._init
_paths
_env
(
1642 cluster_name
=cluster_id
, create_if_not_exist
=True
1645 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1646 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1649 output
, _rc
= await self
._local
_async
_exec
(
1650 command
=command
, raise_exception_on_error
=True, env
=env
1653 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1656 "name": service_name
,
1657 "type": self
._get
_deep
(data
, ("spec", "type")),
1658 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1659 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1661 if service
["type"] == "LoadBalancer":
1662 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1663 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1664 service
["external_ip"] = ip_list
1668 async def _exec_get_command(
1669 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1671 """Obtains information about the kdu instance."""
1673 full_command
= self
._get
_get
_command
(
1674 get_command
, kdu_instance
, namespace
, kubeconfig
1677 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1681 async def _exec_inspect_command(
1682 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1684 """Obtains information about a kdu, no cluster (no env)."""
1688 repo_str
= " --repo {}".format(repo_url
)
1690 idx
= kdu_model
.find("/")
1693 kdu_model
= kdu_model
[idx
:]
1695 kdu_model
, version
= self
._split
_version
(kdu_model
)
1697 version_str
= "--version {}".format(version
)
1701 full_command
= self
._get
_inspect
_command
(
1702 inspect_command
, kdu_model
, repo_str
, version_str
1705 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1709 async def _get_replica_count_url(
1713 resource_name
: str = None,
1715 """Get the replica count value in the Helm Chart Values.
1718 kdu_model: The name or path of a bundle
1719 repo_url: Helm Chart repository url
1720 resource_name: Resource name
1723 True if replicas, False replicaCount
1726 kdu_values
= yaml
.load(
1727 await self
.values_kdu(kdu_model
, repo_url
), Loader
=yaml
.SafeLoader
1732 "kdu_values not found for kdu_model {}".format(kdu_model
)
1736 kdu_values
= kdu_values
.get(resource_name
, None)
1739 msg
= "resource {} not found in the values in model {}".format(
1740 resource_name
, kdu_model
1743 raise K8sException(msg
)
1745 duplicate_check
= False
1750 if kdu_values
.get("replicaCount", None):
1751 replicas
= kdu_values
["replicaCount"]
1752 replica_str
= "replicaCount"
1753 elif kdu_values
.get("replicas", None):
1754 duplicate_check
= True
1755 replicas
= kdu_values
["replicas"]
1756 replica_str
= "replicas"
1760 "replicaCount or replicas not found in the resource"
1761 "{} values in model {}. Cannot be scaled".format(
1762 resource_name
, kdu_model
1767 "replicaCount or replicas not found in the values"
1768 "in model {}. Cannot be scaled".format(kdu_model
)
1771 raise K8sException(msg
)
1773 # Control if replicas and replicaCount exists at the same time
1774 msg
= "replicaCount and replicas are exists at the same time"
1776 if "replicaCount" in kdu_values
:
1778 raise K8sException(msg
)
1780 if "replicas" in kdu_values
:
1782 raise K8sException(msg
)
1784 return replicas
, replica_str
1786 async def _get_replica_count_instance(
1791 resource_name
: str = None,
1793 """Get the replica count value in the instance.
1796 kdu_instance: The name of the KDU instance
1797 namespace: KDU instance namespace
1799 resource_name: Resource name
1802 True if replicas, False replicaCount
1805 kdu_values
= yaml
.load(
1806 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1807 Loader
=yaml
.SafeLoader
,
1814 kdu_values
.get(resource_name
, None) if resource_name
else None
1818 resource_values
.get("replicaCount", None)
1819 or resource_values
.get("replicas", None)
1823 kdu_values
.get("replicaCount", None)
1824 or kdu_values
.get("replicas", None)
1830 async def _store_status(
1835 namespace
: str = None,
1836 check_every
: float = 10,
1837 db_dict
: dict = None,
1838 run_once
: bool = False,
1842 await asyncio
.sleep(check_every
)
1843 detailed_status
= await self
._status
_kdu
(
1844 cluster_id
=cluster_id
,
1845 kdu_instance
=kdu_instance
,
1847 namespace
=namespace
,
1849 status
= detailed_status
.get("info").get("description")
1850 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1851 # write status to db
1852 result
= await self
.write_app_status_to_db(
1855 detailed_status
=str(detailed_status
),
1856 operation
=operation
,
1859 self
.log
.info("Error writing in database. Task exiting...")
1861 except asyncio
.CancelledError
:
1862 self
.log
.debug("Task cancelled")
1864 except Exception as e
:
1866 "_store_status exception: {}".format(str(e
)), exc_info
=True
1873 # params for use in -f file
1874 # returns values file option and filename (in order to delete it at the end)
1875 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1877 if params
and len(params
) > 0:
1878 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1880 def get_random_number():
1881 r
= random
.randrange(start
=1, stop
=99999999)
1889 value
= params
.get(key
)
1890 if "!!yaml" in str(value
):
1891 value
= yaml
.load(value
[7:])
1892 params2
[key
] = value
1894 values_file
= get_random_number() + ".yaml"
1895 with
open(values_file
, "w") as stream
:
1896 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1898 return "-f {}".format(values_file
), values_file
1902 # params for use in --set option
1904 def _params_to_set_option(params
: dict) -> str:
1906 if params
and len(params
) > 0:
1909 value
= params
.get(key
, None)
1910 if value
is not None:
1912 params_str
+= "--set "
1916 params_str
+= "{}={}".format(key
, value
)
1920 def generate_kdu_instance_name(**kwargs
):
1921 chart_name
= kwargs
["kdu_model"]
1922 # check embeded chart (file or dir)
1923 if chart_name
.startswith("/"):
1924 # extract file or directory name
1925 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1927 elif "://" in chart_name
:
1928 # extract last portion of URL
1929 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1932 for c
in chart_name
:
1933 if c
.isalpha() or c
.isnumeric():
1940 # if does not start with alpha character, prefix 'a'
1941 if not name
[0].isalpha():
1946 def get_random_number():
1947 r
= random
.randrange(start
=1, stop
=99999999)
1949 s
= s
.rjust(10, "0")
1952 name
= name
+ get_random_number()
1955 def _split_version(self
, kdu_model
: str) -> (str, str):
1957 if ":" in kdu_model
:
1958 parts
= kdu_model
.split(sep
=":")
1960 version
= str(parts
[1])
1961 kdu_model
= parts
[0]
1962 return kdu_model
, version
1964 async def _split_repo(self
, kdu_model
: str) -> str:
1966 idx
= kdu_model
.find("/")
1968 repo_name
= kdu_model
[:idx
]
1971 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
1973 idx
= kdu_model
.find("/")
1975 repo_name
= kdu_model
[:idx
]
1976 # Find repository link
1977 local_repo_list
= await self
.repo_list(cluster_uuid
)
1978 for repo
in local_repo_list
:
1979 repo_url
= repo
["url"] if repo
["name"] == repo_name
else None