2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
94 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
99 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(":")
100 return namespace
, cluster_id
105 namespace
: str = "kube-system",
106 reuse_cluster_uuid
=None,
110 It prepares a given K8s cluster environment to run Charts
112 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
114 :param namespace: optional namespace to be used for helm. By default,
115 'kube-system' will be used
116 :param reuse_cluster_uuid: existing cluster uuid for reuse
117 :param kwargs: Additional parameters (None yet)
118 :return: uuid of the K8s cluster and True if connector has installed some
119 software in the cluster
120 (on error, an exception will be raised)
123 if reuse_cluster_uuid
:
124 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
125 namespace
= namespace_
or namespace
127 cluster_id
= str(uuid4())
128 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
131 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
134 paths
, env
= self
._init
_paths
_env
(
135 cluster_name
=cluster_id
, create_if_not_exist
=True
137 mode
= stat
.S_IRUSR | stat
.S_IWUSR
138 with
open(paths
["kube_config"], "w", mode
) as f
:
140 os
.chmod(paths
["kube_config"], 0o600)
142 # Code with initialization specific of helm version
143 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
145 # sync fs with local data
146 self
.fs
.reverse_sync(from_path
=cluster_id
)
148 self
.log
.info("Cluster {} initialized".format(cluster_id
))
150 return cluster_uuid
, n2vc_installed_sw
153 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
155 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_id
, repo_type
, name
, url
163 paths
, env
= self
._init
_paths
_env
(
164 cluster_name
=cluster_id
, create_if_not_exist
=True
168 self
.fs
.sync(from_path
=cluster_id
)
170 # helm repo add name url
171 command
= "env KUBECONFIG={} {} repo add {} {}".format(
172 paths
["kube_config"], self
._helm
_command
, name
, url
174 self
.log
.debug("adding repo: {}".format(command
))
175 await self
._local
_async
_exec
(
176 command
=command
, raise_exception_on_error
=True, env
=env
180 command
= "env KUBECONFIG={} {} repo update {}".format(
181 paths
["kube_config"], self
._helm
_command
, name
183 self
.log
.debug("updating repo: {}".format(command
))
184 await self
._local
_async
_exec
(
185 command
=command
, raise_exception_on_error
=False, env
=env
189 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
191 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
193 "Cluster {}, updating {} repository {}".format(
194 cluster_uuid
, repo_type
, name
199 paths
, env
= self
._init
_paths
_env
(
200 cluster_name
=cluster_uuid
, create_if_not_exist
=True
204 self
.fs
.sync(from_path
=cluster_uuid
)
207 command
= "{} repo update {}".format(self
._helm
_command
, name
)
208 self
.log
.debug("updating repo: {}".format(command
))
209 await self
._local
_async
_exec
(
210 command
=command
, raise_exception_on_error
=False, env
=env
214 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
216 async def repo_list(self
, cluster_uuid
: str) -> list:
218 Get the list of registered repositories
220 :return: list of registered repositories: [ (name, url) .... ]
223 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
224 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
227 paths
, env
= self
._init
_paths
_env
(
228 cluster_name
=cluster_id
, create_if_not_exist
=True
232 self
.fs
.sync(from_path
=cluster_id
)
234 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
235 paths
["kube_config"], self
._helm
_command
238 # Set exception to false because if there are no repos just want an empty list
239 output
, _rc
= await self
._local
_async
_exec
(
240 command
=command
, raise_exception_on_error
=False, env
=env
244 self
.fs
.reverse_sync(from_path
=cluster_id
)
247 if output
and len(output
) > 0:
248 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
249 # unify format between helm2 and helm3 setting all keys lowercase
250 return self
._lower
_keys
_list
(repos
)
256 async def repo_remove(self
, cluster_uuid
: str, name
: str):
258 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
259 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
262 paths
, env
= self
._init
_paths
_env
(
263 cluster_name
=cluster_id
, create_if_not_exist
=True
267 self
.fs
.sync(from_path
=cluster_id
)
269 command
= "env KUBECONFIG={} {} repo remove {}".format(
270 paths
["kube_config"], self
._helm
_command
, name
272 await self
._local
_async
_exec
(
273 command
=command
, raise_exception_on_error
=True, env
=env
277 self
.fs
.reverse_sync(from_path
=cluster_id
)
283 uninstall_sw
: bool = False,
288 Resets the Kubernetes cluster by removing the helm deployment that represents it.
290 :param cluster_uuid: The UUID of the cluster to reset
291 :param force: Boolean to force the reset
292 :param uninstall_sw: Boolean to force the reset
293 :param kwargs: Additional parameters (None yet)
294 :return: Returns True if successful or raises an exception.
296 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
298 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
299 cluster_id
, uninstall_sw
304 self
.fs
.sync(from_path
=cluster_id
)
306 # uninstall releases if needed.
308 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
309 if len(releases
) > 0:
313 kdu_instance
= r
.get("name")
314 chart
= r
.get("chart")
316 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
318 await self
.uninstall(
319 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
321 except Exception as e
:
322 # will not raise exception as it was found
323 # that in some cases of previously installed helm releases it
326 "Error uninstalling release {}: {}".format(
332 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
336 False # Allow to remove k8s cluster without removing Tiller
340 await self
._uninstall
_sw
(cluster_id
, namespace
)
342 # delete cluster directory
343 self
.log
.debug("Removing directory {}".format(cluster_id
))
344 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
345 # Remove also local directorio if still exist
346 direct
= self
.fs
.path
+ "/" + cluster_id
347 shutil
.rmtree(direct
, ignore_errors
=True)
351 async def _install_impl(
359 timeout
: float = 300,
361 db_dict
: dict = None,
362 kdu_name
: str = None,
363 namespace
: str = None,
366 paths
, env
= self
._init
_paths
_env
(
367 cluster_name
=cluster_id
, create_if_not_exist
=True
371 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
372 cluster_id
=cluster_id
, params
=params
378 parts
= kdu_model
.split(sep
=":")
380 version
= str(parts
[1])
383 repo
= self
._split
_repo
(kdu_model
)
385 self
.repo_update(cluster_id
, repo
)
387 command
= self
._get
_install
_command
(
395 paths
["kube_config"],
398 self
.log
.debug("installing: {}".format(command
))
401 # exec helm in a task
402 exec_task
= asyncio
.ensure_future(
403 coro_or_future
=self
._local
_async
_exec
(
404 command
=command
, raise_exception_on_error
=False, env
=env
408 # write status in another task
409 status_task
= asyncio
.ensure_future(
410 coro_or_future
=self
._store
_status
(
411 cluster_id
=cluster_id
,
412 kdu_instance
=kdu_instance
,
420 # wait for execution task
421 await asyncio
.wait([exec_task
])
426 output
, rc
= exec_task
.result()
430 output
, rc
= await self
._local
_async
_exec
(
431 command
=command
, raise_exception_on_error
=False, env
=env
434 # remove temporal values yaml file
436 os
.remove(file_to_delete
)
439 await self
._store
_status
(
440 cluster_id
=cluster_id
,
441 kdu_instance
=kdu_instance
,
450 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
452 raise K8sException(msg
)
458 kdu_model
: str = None,
460 timeout
: float = 300,
462 db_dict
: dict = None,
464 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
465 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
468 self
.fs
.sync(from_path
=cluster_id
)
470 # look for instance to obtain namespace
471 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
472 if not instance_info
:
473 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
476 paths
, env
= self
._init
_paths
_env
(
477 cluster_name
=cluster_id
, create_if_not_exist
=True
481 self
.fs
.sync(from_path
=cluster_id
)
484 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
485 cluster_id
=cluster_id
, params
=params
491 parts
= kdu_model
.split(sep
=":")
493 version
= str(parts
[1])
496 repo
= self
._split
_repo
(kdu_model
)
498 self
.repo_update(cluster_uuid
, repo
)
500 command
= self
._get
_upgrade
_command
(
503 instance_info
["namespace"],
508 paths
["kube_config"],
511 self
.log
.debug("upgrading: {}".format(command
))
515 # exec helm in a task
516 exec_task
= asyncio
.ensure_future(
517 coro_or_future
=self
._local
_async
_exec
(
518 command
=command
, raise_exception_on_error
=False, env
=env
521 # write status in another task
522 status_task
= asyncio
.ensure_future(
523 coro_or_future
=self
._store
_status
(
524 cluster_id
=cluster_id
,
525 kdu_instance
=kdu_instance
,
526 namespace
=instance_info
["namespace"],
533 # wait for execution task
534 await asyncio
.wait([exec_task
])
538 output
, rc
= exec_task
.result()
542 output
, rc
= await self
._local
_async
_exec
(
543 command
=command
, raise_exception_on_error
=False, env
=env
546 # remove temporal values yaml file
548 os
.remove(file_to_delete
)
551 await self
._store
_status
(
552 cluster_id
=cluster_id
,
553 kdu_instance
=kdu_instance
,
554 namespace
=instance_info
["namespace"],
562 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
564 raise K8sException(msg
)
567 self
.fs
.reverse_sync(from_path
=cluster_id
)
569 # return new revision number
570 instance
= await self
.get_instance_info(
571 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
574 revision
= int(instance
.get("revision"))
575 self
.log
.debug("New revision: {}".format(revision
))
585 total_timeout
: float = 1800,
588 raise NotImplementedError("Method not implemented")
590 async def get_scale_count(
596 raise NotImplementedError("Method not implemented")
599 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
602 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
604 "rollback kdu_instance {} to revision {} from cluster {}".format(
605 kdu_instance
, revision
, cluster_id
610 self
.fs
.sync(from_path
=cluster_id
)
612 # look for instance to obtain namespace
613 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
614 if not instance_info
:
615 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
618 paths
, env
= self
._init
_paths
_env
(
619 cluster_name
=cluster_id
, create_if_not_exist
=True
623 self
.fs
.sync(from_path
=cluster_id
)
625 command
= self
._get
_rollback
_command
(
626 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
629 self
.log
.debug("rolling_back: {}".format(command
))
631 # exec helm in a task
632 exec_task
= asyncio
.ensure_future(
633 coro_or_future
=self
._local
_async
_exec
(
634 command
=command
, raise_exception_on_error
=False, env
=env
637 # write status in another task
638 status_task
= asyncio
.ensure_future(
639 coro_or_future
=self
._store
_status
(
640 cluster_id
=cluster_id
,
641 kdu_instance
=kdu_instance
,
642 namespace
=instance_info
["namespace"],
644 operation
="rollback",
649 # wait for execution task
650 await asyncio
.wait([exec_task
])
655 output
, rc
= exec_task
.result()
658 await self
._store
_status
(
659 cluster_id
=cluster_id
,
660 kdu_instance
=kdu_instance
,
661 namespace
=instance_info
["namespace"],
663 operation
="rollback",
669 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
671 raise K8sException(msg
)
674 self
.fs
.reverse_sync(from_path
=cluster_id
)
676 # return new revision number
677 instance
= await self
.get_instance_info(
678 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
681 revision
= int(instance
.get("revision"))
682 self
.log
.debug("New revision: {}".format(revision
))
687 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
689 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
690 (this call should happen after all _terminate-config-primitive_ of the VNF
693 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
694 :param kdu_instance: unique name for the KDU instance to be deleted
695 :param kwargs: Additional parameters (None yet)
696 :return: True if successful
699 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
701 "uninstall kdu_instance {} from cluster {}".format(kdu_instance
, cluster_id
)
705 self
.fs
.sync(from_path
=cluster_id
)
707 # look for instance to obtain namespace
708 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
709 if not instance_info
:
710 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
713 paths
, env
= self
._init
_paths
_env
(
714 cluster_name
=cluster_id
, create_if_not_exist
=True
718 self
.fs
.sync(from_path
=cluster_id
)
720 command
= self
._get
_uninstall
_command
(
721 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
723 output
, _rc
= await self
._local
_async
_exec
(
724 command
=command
, raise_exception_on_error
=True, env
=env
728 self
.fs
.reverse_sync(from_path
=cluster_id
)
730 return self
._output
_to
_table
(output
)
732 async def instances_list(self
, cluster_uuid
: str) -> list:
734 returns a list of deployed releases in a cluster
736 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
740 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
741 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
744 self
.fs
.sync(from_path
=cluster_id
)
746 # execute internal command
747 result
= await self
._instances
_list
(cluster_id
)
750 self
.fs
.reverse_sync(from_path
=cluster_id
)
754 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
755 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
756 for instance
in instances
:
757 if instance
.get("name") == kdu_instance
:
759 self
.log
.debug("Instance {} not found".format(kdu_instance
))
762 async def exec_primitive(
764 cluster_uuid
: str = None,
765 kdu_instance
: str = None,
766 primitive_name
: str = None,
767 timeout
: float = 300,
769 db_dict
: dict = None,
772 """Exec primitive (Juju action)
774 :param cluster_uuid: The UUID of the cluster or namespace:cluster
775 :param kdu_instance: The unique name of the KDU instance
776 :param primitive_name: Name of action that will be executed
777 :param timeout: Timeout for action execution
778 :param params: Dictionary of all the parameters needed for the action
779 :db_dict: Dictionary for any additional data
780 :param kwargs: Additional parameters (None yet)
782 :return: Returns the output of the action
785 "KDUs deployed with Helm don't support actions "
786 "different from rollback, upgrade and status"
789 async def get_services(
790 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
793 Returns a list of services defined for the specified kdu instance.
795 :param cluster_uuid: UUID of a K8s cluster known by OSM
796 :param kdu_instance: unique name for the KDU instance
797 :param namespace: K8s namespace used by the KDU instance
798 :return: If successful, it will return a list of services, Each service
799 can have the following data:
800 - `name` of the service
801 - `type` type of service in the k8 cluster
802 - `ports` List of ports offered by the service, for each port includes at least
804 - `cluster_ip` Internal ip to be used inside k8s cluster
805 - `external_ip` List of external ips (in case they are available)
808 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
810 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
811 cluster_uuid
, kdu_instance
816 paths
, env
= self
._init
_paths
_env
(
817 cluster_name
=cluster_id
, create_if_not_exist
=True
821 self
.fs
.sync(from_path
=cluster_id
)
823 # get list of services names for kdu
824 service_names
= await self
._get
_services
(
825 cluster_id
, kdu_instance
, namespace
, paths
["kube_config"]
829 for service
in service_names
:
830 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
831 service_list
.append(service
)
834 self
.fs
.reverse_sync(from_path
=cluster_id
)
838 async def get_service(
839 self
, cluster_uuid
: str, service_name
: str, namespace
: str
843 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
844 service_name
, namespace
, cluster_uuid
848 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
851 self
.fs
.sync(from_path
=cluster_id
)
853 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
856 self
.fs
.reverse_sync(from_path
=cluster_id
)
860 async def status_kdu(
861 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
862 ) -> Union
[str, dict]:
864 This call would retrieve tha current state of a given KDU instance. It would be
865 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
866 values_ of the configuration parameters applied to a given instance. This call
867 would be based on the `status` call.
869 :param cluster_uuid: UUID of a K8s cluster known by OSM
870 :param kdu_instance: unique name for the KDU instance
871 :param kwargs: Additional parameters (None yet)
872 :param yaml_format: if the return shall be returned as an YAML string or as a
874 :return: If successful, it will return the following vector of arguments:
875 - K8s `namespace` in the cluster where the KDU lives
876 - `state` of the KDU instance. It can be:
883 - List of `resources` (objects) that this release consists of, sorted by kind,
884 and the status of those resources
885 - Last `deployment_time`.
889 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
890 cluster_uuid
, kdu_instance
894 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
897 self
.fs
.sync(from_path
=cluster_id
)
899 # get instance: needed to obtain namespace
900 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
901 for instance
in instances
:
902 if instance
.get("name") == kdu_instance
:
905 # instance does not exist
907 "Instance name: {} not found in cluster: {}".format(
908 kdu_instance
, cluster_id
912 status
= await self
._status
_kdu
(
913 cluster_id
=cluster_id
,
914 kdu_instance
=kdu_instance
,
915 namespace
=instance
["namespace"],
916 yaml_format
=yaml_format
,
921 self
.fs
.reverse_sync(from_path
=cluster_id
)
925 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
928 "inspect kdu_model values {} from (optional) repo: {}".format(
933 return await self
._exec
_inspect
_comand
(
934 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
937 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
940 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
943 return await self
._exec
_inspect
_comand
(
944 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
947 async def synchronize_repos(self
, cluster_uuid
: str):
949 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
951 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
952 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
954 local_repo_list
= await self
.repo_list(cluster_uuid
)
955 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
957 deleted_repo_list
= []
960 # iterate over the list of repos in the database that should be
961 # added if not present
962 for repo_name
, db_repo
in db_repo_dict
.items():
964 # check if it is already present
965 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
966 repo_id
= db_repo
.get("_id")
967 if curr_repo_url
!= db_repo
["url"]:
970 "repo {} url changed, delete and and again".format(
974 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
975 deleted_repo_list
.append(repo_id
)
978 self
.log
.debug("add repo {}".format(db_repo
["name"]))
980 cluster_uuid
, db_repo
["name"], db_repo
["url"]
982 added_repo_dict
[repo_id
] = db_repo
["name"]
983 except Exception as e
:
985 "Error adding repo id: {}, err_msg: {} ".format(
990 # Delete repos that are present but not in nbi_list
991 for repo_name
in local_repo_dict
:
992 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
993 self
.log
.debug("delete repo {}".format(repo_name
))
995 await self
.repo_remove(cluster_uuid
, repo_name
)
996 deleted_repo_list
.append(repo_name
)
997 except Exception as e
:
999 "Error deleting repo, name: {}, err_msg: {}".format(
1004 return deleted_repo_list
, added_repo_dict
1006 except K8sException
:
1008 except Exception as e
:
1009 # Do not raise errors synchronizing repos
1010 self
.log
.error("Error synchronizing repos: {}".format(e
))
1011 raise Exception("Error synchronizing repos: {}".format(e
))
1013 def _get_db_repos_dict(self
, repo_ids
: list):
1015 for repo_id
in repo_ids
:
1016 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1017 db_repos_dict
[db_repo
["name"]] = db_repo
1018 return db_repos_dict
1021 ####################################################################################
1022 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1023 ####################################################################################
1027 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1029 Creates and returns base cluster and kube dirs and returns them.
1030 Also created helm3 dirs according to new directory specification, paths are
1031 not returned but assigned to helm environment variables
1033 :param cluster_name: cluster_name
1034 :return: Dictionary with config_paths and dictionary with helm environment variables
1038 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1040 Implements the helm version dependent cluster initialization
1044 async def _instances_list(self
, cluster_id
):
1046 Implements the helm version dependent helm instances list
1050 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1052 Implements the helm version dependent method to obtain services from a helm instance
1056 async def _status_kdu(
1060 namespace
: str = None,
1061 yaml_format
: bool = False,
1062 show_error_log
: bool = False,
1063 ) -> Union
[str, dict]:
1065 Implements the helm version dependent method to obtain status of a helm instance
1069 def _get_install_command(
1081 Obtain command to be executed to delete the indicated instance
1085 def _get_upgrade_command(
1097 Obtain command to be executed to upgrade the indicated instance
1101 def _get_rollback_command(
1102 self
, kdu_instance
, namespace
, revision
, kubeconfig
1105 Obtain command to be executed to rollback the indicated instance
1109 def _get_uninstall_command(
1110 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1113 Obtain command to be executed to delete the indicated instance
1117 def _get_inspect_command(
1118 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1121 Obtain command to be executed to obtain information about the kdu
1125 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1127 Method call to uninstall cluster software for helm. This method is dependent
1129 For Helm v2 it will be called when Tiller must be uninstalled
1130 For Helm v3 it does nothing and does not need to be callled
1134 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1136 Obtains the cluster repos identifiers
1140 ####################################################################################
1141 ################################### P R I V A T E ##################################
1142 ####################################################################################
1146 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1147 if os
.path
.exists(filename
):
1150 msg
= "File {} does not exist".format(filename
)
1151 if exception_if_not_exists
:
1152 raise K8sException(msg
)
1155 def _remove_multiple_spaces(strobj
):
1156 strobj
= strobj
.strip()
1157 while " " in strobj
:
1158 strobj
= strobj
.replace(" ", " ")
1162 def _output_to_lines(output
: str) -> list:
1163 output_lines
= list()
1164 lines
= output
.splitlines(keepends
=False)
1168 output_lines
.append(line
)
1172 def _output_to_table(output
: str) -> list:
1173 output_table
= list()
1174 lines
= output
.splitlines(keepends
=False)
1176 line
= line
.replace("\t", " ")
1178 output_table
.append(line_list
)
1179 cells
= line
.split(sep
=" ")
1183 line_list
.append(cell
)
1187 def _parse_services(output
: str) -> list:
1188 lines
= output
.splitlines(keepends
=False)
1191 line
= line
.replace("\t", " ")
1192 cells
= line
.split(sep
=" ")
1193 if len(cells
) > 0 and cells
[0].startswith("service/"):
1194 elems
= cells
[0].split(sep
="/")
1196 services
.append(elems
[1])
1200 def _get_deep(dictionary
: dict, members
: tuple):
1205 value
= target
.get(m
)
1214 # find key:value in several lines
1216 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1217 for line
in p_lines
:
1219 if line
.startswith(p_key
+ ":"):
1220 parts
= line
.split(":")
1221 the_value
= parts
[1].strip()
1229 def _lower_keys_list(input_list
: list):
1231 Transform the keys in a list of dictionaries to lower case and returns a new list
1236 for dictionary
in input_list
:
1237 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1238 new_list
.append(new_dict
)
1241 async def _local_async_exec(
1244 raise_exception_on_error
: bool = False,
1245 show_error_log
: bool = True,
1246 encode_utf8
: bool = False,
1250 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1252 "Executing async local command: {}, env: {}".format(command
, env
)
1256 command
= shlex
.split(command
)
1258 environ
= os
.environ
.copy()
1263 process
= await asyncio
.create_subprocess_exec(
1265 stdout
=asyncio
.subprocess
.PIPE
,
1266 stderr
=asyncio
.subprocess
.PIPE
,
1270 # wait for command terminate
1271 stdout
, stderr
= await process
.communicate()
1273 return_code
= process
.returncode
1277 output
= stdout
.decode("utf-8").strip()
1278 # output = stdout.decode()
1280 output
= stderr
.decode("utf-8").strip()
1281 # output = stderr.decode()
1283 if return_code
!= 0 and show_error_log
:
1285 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1288 self
.log
.debug("Return code: {}".format(return_code
))
1290 if raise_exception_on_error
and return_code
!= 0:
1291 raise K8sException(output
)
1294 output
= output
.encode("utf-8").strip()
1295 output
= str(output
).replace("\\n", "\n")
1297 return output
, return_code
1299 except asyncio
.CancelledError
:
1301 except K8sException
:
1303 except Exception as e
:
1304 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1306 if raise_exception_on_error
:
1307 raise K8sException(e
) from e
1311 async def _local_async_exec_pipe(
1315 raise_exception_on_error
: bool = True,
1316 show_error_log
: bool = True,
1317 encode_utf8
: bool = False,
1321 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1322 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1323 command
= "{} | {}".format(command1
, command2
)
1325 "Executing async local command: {}, env: {}".format(command
, env
)
1329 command1
= shlex
.split(command1
)
1330 command2
= shlex
.split(command2
)
1332 environ
= os
.environ
.copy()
1337 read
, write
= os
.pipe()
1338 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1340 process_2
= await asyncio
.create_subprocess_exec(
1341 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1344 stdout
, stderr
= await process_2
.communicate()
1346 return_code
= process_2
.returncode
1350 output
= stdout
.decode("utf-8").strip()
1351 # output = stdout.decode()
1353 output
= stderr
.decode("utf-8").strip()
1354 # output = stderr.decode()
1356 if return_code
!= 0 and show_error_log
:
1358 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1361 self
.log
.debug("Return code: {}".format(return_code
))
1363 if raise_exception_on_error
and return_code
!= 0:
1364 raise K8sException(output
)
1367 output
= output
.encode("utf-8").strip()
1368 output
= str(output
).replace("\\n", "\n")
1370 return output
, return_code
1371 except asyncio
.CancelledError
:
1373 except K8sException
:
1375 except Exception as e
:
1376 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1378 if raise_exception_on_error
:
1379 raise K8sException(e
) from e
1383 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1385 Obtains the data of the specified service in the k8cluster.
1387 :param cluster_id: id of a K8s cluster known by OSM
1388 :param service_name: name of the K8s service in the specified namespace
1389 :param namespace: K8s namespace used by the KDU instance
1390 :return: If successful, it will return a service with the following data:
1391 - `name` of the service
1392 - `type` type of service in the k8 cluster
1393 - `ports` List of ports offered by the service, for each port includes at least
1394 name, port, protocol
1395 - `cluster_ip` Internal ip to be used inside k8s cluster
1396 - `external_ip` List of external ips (in case they are available)
1400 paths
, env
= self
._init
_paths
_env
(
1401 cluster_name
=cluster_id
, create_if_not_exist
=True
1404 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1405 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1408 output
, _rc
= await self
._local
_async
_exec
(
1409 command
=command
, raise_exception_on_error
=True, env
=env
1412 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1415 "name": service_name
,
1416 "type": self
._get
_deep
(data
, ("spec", "type")),
1417 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1418 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1420 if service
["type"] == "LoadBalancer":
1421 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1422 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1423 service
["external_ip"] = ip_list
1427 async def _exec_inspect_comand(
1428 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1431 Obtains information about a kdu, no cluster (no env)
1436 repo_str
= " --repo {}".format(repo_url
)
1438 idx
= kdu_model
.find("/")
1441 kdu_model
= kdu_model
[idx
:]
1444 if ":" in kdu_model
:
1445 parts
= kdu_model
.split(sep
=":")
1447 version
= "--version {}".format(str(parts
[1]))
1448 kdu_model
= parts
[0]
1450 full_command
= self
._get
_inspect
_command
(
1451 inspect_command
, kdu_model
, repo_str
, version
1453 output
, _rc
= await self
._local
_async
_exec
(
1454 command
=full_command
, encode_utf8
=True
1459 async def _store_status(
1464 namespace
: str = None,
1465 check_every
: float = 10,
1466 db_dict
: dict = None,
1467 run_once
: bool = False,
1471 await asyncio
.sleep(check_every
)
1472 detailed_status
= await self
._status
_kdu
(
1473 cluster_id
=cluster_id
,
1474 kdu_instance
=kdu_instance
,
1476 namespace
=namespace
,
1478 status
= detailed_status
.get("info").get("description")
1479 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1480 # write status to db
1481 result
= await self
.write_app_status_to_db(
1484 detailed_status
=str(detailed_status
),
1485 operation
=operation
,
1488 self
.log
.info("Error writing in database. Task exiting...")
1490 except asyncio
.CancelledError
:
1491 self
.log
.debug("Task cancelled")
1493 except Exception as e
:
1495 "_store_status exception: {}".format(str(e
)), exc_info
=True
1502 # params for use in -f file
1503 # returns values file option and filename (in order to delete it at the end)
1504 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1506 if params
and len(params
) > 0:
1507 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1509 def get_random_number():
1510 r
= random
.randrange(start
=1, stop
=99999999)
1518 value
= params
.get(key
)
1519 if "!!yaml" in str(value
):
1520 value
= yaml
.load(value
[7:])
1521 params2
[key
] = value
1523 values_file
= get_random_number() + ".yaml"
1524 with
open(values_file
, "w") as stream
:
1525 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1527 return "-f {}".format(values_file
), values_file
1531 # params for use in --set option
1533 def _params_to_set_option(params
: dict) -> str:
1535 if params
and len(params
) > 0:
1538 value
= params
.get(key
, None)
1539 if value
is not None:
1541 params_str
+= "--set "
1545 params_str
+= "{}={}".format(key
, value
)
1549 def generate_kdu_instance_name(**kwargs
):
1550 chart_name
= kwargs
["kdu_model"]
1551 # check embeded chart (file or dir)
1552 if chart_name
.startswith("/"):
1553 # extract file or directory name
1554 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1556 elif "://" in chart_name
:
1557 # extract last portion of URL
1558 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1561 for c
in chart_name
:
1562 if c
.isalpha() or c
.isnumeric():
1569 # if does not start with alpha character, prefix 'a'
1570 if not name
[0].isalpha():
1575 def get_random_number():
1576 r
= random
.randrange(start
=1, stop
=99999999)
1578 s
= s
.rjust(10, "0")
1581 name
= name
+ get_random_number()
1584 async def _split_repo(self
, kdu_model
: str) -> str:
1586 idx
= kdu_model
.find("/")
1588 repo_name
= kdu_model
[:idx
]