2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
32 from uuid
import uuid4
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
45 service_account
= "osm"
46 _STABLE_REPO_URL
= "https://charts.helm.sh/stable"
52 kubectl_command
: str = "/usr/bin/kubectl",
53 helm_command
: str = "/usr/bin/helm",
56 vca_config
: dict = None,
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 # obtain stable repo url from config or apply default
88 if not vca_config
or not vca_config
.get("stablerepourl"):
89 self
._stable
_repo
_url
= self
._STABLE
_REPO
_URL
91 self
._stable
_repo
_url
= vca_config
.get("stablerepourl")
94 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
99 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(':')
100 return namespace
, cluster_id
103 self
, k8s_creds
: str, namespace
: str = "kube-system", reuse_cluster_uuid
=None
106 It prepares a given K8s cluster environment to run Charts
108 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
110 :param namespace: optional namespace to be used for helm. By default,
111 'kube-system' will be used
112 :param reuse_cluster_uuid: existing cluster uuid for reuse
113 :return: uuid of the K8s cluster and True if connector has installed some
114 software in the cluster
115 (on error, an exception will be raised)
118 if reuse_cluster_uuid
:
119 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
120 namespace
= namespace_
or namespace
122 cluster_id
= str(uuid4())
123 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
125 self
.log
.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
))
127 paths
, env
= self
._init
_paths
_env
(
128 cluster_name
=cluster_id
, create_if_not_exist
=True
130 mode
= stat
.S_IRUSR | stat
.S_IWUSR
131 with
open(paths
["kube_config"], "w", mode
) as f
:
133 os
.chmod(paths
["kube_config"], 0o600)
135 # Code with initialization specific of helm version
136 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
138 # sync fs with local data
139 self
.fs
.reverse_sync(from_path
=cluster_id
)
141 self
.log
.info("Cluster {} initialized".format(cluster_id
))
143 return cluster_uuid
, n2vc_installed_sw
146 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
148 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
149 self
.log
.debug("Cluster {}, adding {} repository {}. URL: {}".format(
150 cluster_id
, repo_type
, name
, url
))
153 self
.fs
.sync(from_path
=cluster_id
)
156 paths
, env
= self
._init
_paths
_env
(
157 cluster_name
=cluster_id
, create_if_not_exist
=True
161 command
= "{} repo update".format(
164 self
.log
.debug("updating repo: {}".format(command
))
165 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False, env
=env
)
167 # helm repo add name url
168 command
= "{} repo add {} {}".format(
169 self
._helm
_command
, name
, url
171 self
.log
.debug("adding repo: {}".format(command
))
172 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
175 self
.fs
.reverse_sync(from_path
=cluster_id
)
177 async def repo_list(self
, cluster_uuid
: str) -> list:
179 Get the list of registered repositories
181 :return: list of registered repositories: [ (name, url) .... ]
184 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
185 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
188 self
.fs
.sync(from_path
=cluster_id
)
191 paths
, env
= self
._init
_paths
_env
(
192 cluster_name
=cluster_id
, create_if_not_exist
=True
195 command
= "{} repo list --output yaml".format(
199 # Set exception to false because if there are no repos just want an empty list
200 output
, _rc
= await self
._local
_async
_exec
(
201 command
=command
, raise_exception_on_error
=False, env
=env
205 self
.fs
.reverse_sync(from_path
=cluster_id
)
208 if output
and len(output
) > 0:
209 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
210 # unify format between helm2 and helm3 setting all keys lowercase
211 return self
._lower
_keys
_list
(repos
)
217 async def repo_remove(self
, cluster_uuid
: str, name
: str):
219 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
220 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
223 self
.fs
.sync(from_path
=cluster_id
)
226 paths
, env
= self
._init
_paths
_env
(
227 cluster_name
=cluster_id
, create_if_not_exist
=True
230 command
= "{} repo remove {}".format(
231 self
._helm
_command
, name
233 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
236 self
.fs
.reverse_sync(from_path
=cluster_id
)
239 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
242 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
243 self
.log
.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
244 .format(cluster_id
, uninstall_sw
))
247 self
.fs
.sync(from_path
=cluster_id
)
249 # uninstall releases if needed.
251 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
252 if len(releases
) > 0:
256 kdu_instance
= r
.get("name")
257 chart
= r
.get("chart")
259 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
261 await self
.uninstall(
262 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
264 except Exception as e
:
265 # will not raise exception as it was found
266 # that in some cases of previously installed helm releases it
269 "Error uninstalling release {}: {}".format(kdu_instance
, e
)
273 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
276 uninstall_sw
= False # Allow to remove k8s cluster without removing Tiller
279 await self
._uninstall
_sw
(cluster_id
, namespace
)
281 # delete cluster directory
282 self
.log
.debug("Removing directory {}".format(cluster_id
))
283 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
284 # Remove also local directorio if still exist
285 direct
= self
.fs
.path
+ "/" + cluster_id
286 shutil
.rmtree(direct
, ignore_errors
=True)
290 async def _install_impl(
298 timeout
: float = 300,
300 db_dict
: dict = None,
301 kdu_name
: str = None,
302 namespace
: str = None,
305 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
306 cluster_id
=cluster_id
, params
=params
312 parts
= kdu_model
.split(sep
=":")
314 version
= str(parts
[1])
317 command
= self
._get
_install
_command
(kdu_model
, kdu_instance
, namespace
,
318 params_str
, version
, atomic
, timeout
)
320 self
.log
.debug("installing: {}".format(command
))
323 # exec helm in a task
324 exec_task
= asyncio
.ensure_future(
325 coro_or_future
=self
._local
_async
_exec
(
326 command
=command
, raise_exception_on_error
=False, env
=env
330 # write status in another task
331 status_task
= asyncio
.ensure_future(
332 coro_or_future
=self
._store
_status
(
333 cluster_id
=cluster_id
,
334 kdu_instance
=kdu_instance
,
342 # wait for execution task
343 await asyncio
.wait([exec_task
])
348 output
, rc
= exec_task
.result()
352 output
, rc
= await self
._local
_async
_exec
(
353 command
=command
, raise_exception_on_error
=False, env
=env
356 # remove temporal values yaml file
358 os
.remove(file_to_delete
)
361 await self
._store
_status
(
362 cluster_id
=cluster_id
,
363 kdu_instance
=kdu_instance
,
372 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
374 raise K8sException(msg
)
380 kdu_model
: str = None,
382 timeout
: float = 300,
384 db_dict
: dict = None,
386 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
387 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
390 self
.fs
.sync(from_path
=cluster_id
)
392 # look for instance to obtain namespace
393 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
394 if not instance_info
:
395 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
398 paths
, env
= self
._init
_paths
_env
(
399 cluster_name
=cluster_id
, create_if_not_exist
=True
403 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
404 cluster_id
=cluster_id
, params
=params
410 parts
= kdu_model
.split(sep
=":")
412 version
= str(parts
[1])
415 command
= self
._get
_upgrade
_command
(kdu_model
, kdu_instance
, instance_info
["namespace"],
416 params_str
, version
, atomic
, timeout
)
418 self
.log
.debug("upgrading: {}".format(command
))
422 # exec helm in a task
423 exec_task
= asyncio
.ensure_future(
424 coro_or_future
=self
._local
_async
_exec
(
425 command
=command
, raise_exception_on_error
=False, env
=env
428 # write status in another task
429 status_task
= asyncio
.ensure_future(
430 coro_or_future
=self
._store
_status
(
431 cluster_id
=cluster_id
,
432 kdu_instance
=kdu_instance
,
433 namespace
=instance_info
["namespace"],
440 # wait for execution task
441 await asyncio
.wait([exec_task
])
445 output
, rc
= exec_task
.result()
449 output
, rc
= await self
._local
_async
_exec
(
450 command
=command
, raise_exception_on_error
=False, env
=env
453 # remove temporal values yaml file
455 os
.remove(file_to_delete
)
458 await self
._store
_status
(
459 cluster_id
=cluster_id
,
460 kdu_instance
=kdu_instance
,
461 namespace
=instance_info
["namespace"],
469 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
471 raise K8sException(msg
)
474 self
.fs
.reverse_sync(from_path
=cluster_id
)
476 # return new revision number
477 instance
= await self
.get_instance_info(
478 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
481 revision
= int(instance
.get("revision"))
482 self
.log
.debug("New revision: {}".format(revision
))
488 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
491 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
493 "rollback kdu_instance {} to revision {} from cluster {}".format(
494 kdu_instance
, revision
, cluster_id
499 self
.fs
.sync(from_path
=cluster_id
)
501 # look for instance to obtain namespace
502 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
503 if not instance_info
:
504 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
507 paths
, env
= self
._init
_paths
_env
(
508 cluster_name
=cluster_id
, create_if_not_exist
=True
511 command
= self
._get
_rollback
_command
(kdu_instance
, instance_info
["namespace"],
514 self
.log
.debug("rolling_back: {}".format(command
))
516 # exec helm in a task
517 exec_task
= asyncio
.ensure_future(
518 coro_or_future
=self
._local
_async
_exec
(
519 command
=command
, raise_exception_on_error
=False, env
=env
522 # write status in another task
523 status_task
= asyncio
.ensure_future(
524 coro_or_future
=self
._store
_status
(
525 cluster_id
=cluster_id
,
526 kdu_instance
=kdu_instance
,
527 namespace
=instance_info
["namespace"],
529 operation
="rollback",
534 # wait for execution task
535 await asyncio
.wait([exec_task
])
540 output
, rc
= exec_task
.result()
543 await self
._store
_status
(
544 cluster_id
=cluster_id
,
545 kdu_instance
=kdu_instance
,
546 namespace
=instance_info
["namespace"],
548 operation
="rollback",
554 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
556 raise K8sException(msg
)
559 self
.fs
.reverse_sync(from_path
=cluster_id
)
561 # return new revision number
562 instance
= await self
.get_instance_info(
563 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
566 revision
= int(instance
.get("revision"))
567 self
.log
.debug("New revision: {}".format(revision
))
572 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str):
574 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
575 (this call should happen after all _terminate-config-primitive_ of the VNF
578 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
579 :param kdu_instance: unique name for the KDU instance to be deleted
580 :return: True if successful
583 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
585 "uninstall kdu_instance {} from cluster {}".format(
586 kdu_instance
, cluster_id
591 self
.fs
.sync(from_path
=cluster_id
)
593 # look for instance to obtain namespace
594 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
595 if not instance_info
:
596 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
599 paths
, env
= self
._init
_paths
_env
(
600 cluster_name
=cluster_id
, create_if_not_exist
=True
603 command
= self
._get
_uninstall
_command
(kdu_instance
, instance_info
["namespace"])
604 output
, _rc
= await self
._local
_async
_exec
(
605 command
=command
, raise_exception_on_error
=True, env
=env
609 self
.fs
.reverse_sync(from_path
=cluster_id
)
611 return self
._output
_to
_table
(output
)
613 async def instances_list(self
, cluster_uuid
: str) -> list:
615 returns a list of deployed releases in a cluster
617 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
621 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
622 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
625 self
.fs
.sync(from_path
=cluster_id
)
627 # execute internal command
628 result
= await self
._instances
_list
(cluster_id
)
631 self
.fs
.reverse_sync(from_path
=cluster_id
)
635 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
636 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
637 for instance
in instances
:
638 if instance
.get("name") == kdu_instance
:
640 self
.log
.debug("Instance {} not found".format(kdu_instance
))
643 async def exec_primitive(
645 cluster_uuid
: str = None,
646 kdu_instance
: str = None,
647 primitive_name
: str = None,
648 timeout
: float = 300,
650 db_dict
: dict = None,
652 """Exec primitive (Juju action)
654 :param cluster_uuid: The UUID of the cluster or namespace:cluster
655 :param kdu_instance: The unique name of the KDU instance
656 :param primitive_name: Name of action that will be executed
657 :param timeout: Timeout for action execution
658 :param params: Dictionary of all the parameters needed for the action
659 :db_dict: Dictionary for any additional data
661 :return: Returns the output of the action
664 "KDUs deployed with Helm don't support actions "
665 "different from rollback, upgrade and status"
668 async def get_services(self
,
671 namespace
: str) -> list:
673 Returns a list of services defined for the specified kdu instance.
675 :param cluster_uuid: UUID of a K8s cluster known by OSM
676 :param kdu_instance: unique name for the KDU instance
677 :param namespace: K8s namespace used by the KDU instance
678 :return: If successful, it will return a list of services, Each service
679 can have the following data:
680 - `name` of the service
681 - `type` type of service in the k8 cluster
682 - `ports` List of ports offered by the service, for each port includes at least
684 - `cluster_ip` Internal ip to be used inside k8s cluster
685 - `external_ip` List of external ips (in case they are available)
688 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
690 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
691 cluster_uuid
, kdu_instance
696 self
.fs
.sync(from_path
=cluster_id
)
698 # get list of services names for kdu
699 service_names
= await self
._get
_services
(cluster_id
, kdu_instance
, namespace
)
702 for service
in service_names
:
703 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
704 service_list
.append(service
)
707 self
.fs
.reverse_sync(from_path
=cluster_id
)
711 async def get_service(self
,
714 namespace
: str) -> object:
717 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
718 service_name
, namespace
, cluster_uuid
)
721 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
724 self
.fs
.sync(from_path
=cluster_id
)
726 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
729 self
.fs
.reverse_sync(from_path
=cluster_id
)
733 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str) -> str:
736 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
737 cluster_uuid
, kdu_instance
741 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
744 self
.fs
.sync(from_path
=cluster_id
)
746 # get instance: needed to obtain namespace
747 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
748 for instance
in instances
:
749 if instance
.get("name") == kdu_instance
:
752 # instance does not exist
753 raise K8sException("Instance name: {} not found in cluster: {}".format(
754 kdu_instance
, cluster_id
))
756 status
= await self
._status
_kdu
(
757 cluster_id
=cluster_id
,
758 kdu_instance
=kdu_instance
,
759 namespace
=instance
["namespace"],
765 self
.fs
.reverse_sync(from_path
=cluster_id
)
769 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
772 "inspect kdu_model values {} from (optional) repo: {}".format(
777 return await self
._exec
_inspect
_comand
(
778 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
781 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
784 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
787 return await self
._exec
_inspect
_comand
(
788 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
791 async def synchronize_repos(self
, cluster_uuid
: str):
793 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
795 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
796 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
798 local_repo_list
= await self
.repo_list(cluster_uuid
)
799 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
801 deleted_repo_list
= []
804 # iterate over the list of repos in the database that should be
805 # added if not present
806 for repo_name
, db_repo
in db_repo_dict
.items():
808 # check if it is already present
809 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
810 repo_id
= db_repo
.get("_id")
811 if curr_repo_url
!= db_repo
["url"]:
813 self
.log
.debug("repo {} url changed, delete and and again".format(
815 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
816 deleted_repo_list
.append(repo_id
)
819 self
.log
.debug("add repo {}".format(db_repo
["name"]))
820 await self
.repo_add(cluster_uuid
, db_repo
["name"], db_repo
["url"])
821 added_repo_dict
[repo_id
] = db_repo
["name"]
822 except Exception as e
:
824 "Error adding repo id: {}, err_msg: {} ".format(
829 # Delete repos that are present but not in nbi_list
830 for repo_name
in local_repo_dict
:
831 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
832 self
.log
.debug("delete repo {}".format(repo_name
))
834 await self
.repo_remove(cluster_uuid
, repo_name
)
835 deleted_repo_list
.append(repo_name
)
836 except Exception as e
:
838 "Error deleting repo, name: {}, err_msg: {}".format(
843 return deleted_repo_list
, added_repo_dict
847 except Exception as e
:
848 # Do not raise errors synchronizing repos
849 self
.log
.error("Error synchronizing repos: {}".format(e
))
850 raise Exception("Error synchronizing repos: {}".format(e
))
852 def _get_db_repos_dict(self
, repo_ids
: list):
854 for repo_id
in repo_ids
:
855 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
856 db_repos_dict
[db_repo
["name"]] = db_repo
860 ####################################################################################
861 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
862 ####################################################################################
866 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
868 Creates and returns base cluster and kube dirs and returns them.
869 Also created helm3 dirs according to new directory specification, paths are
870 not returned but assigned to helm environment variables
872 :param cluster_name: cluster_name
873 :return: Dictionary with config_paths and dictionary with helm environment variables
877 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
879 Implements the helm version dependent cluster initialization
883 async def _instances_list(self
, cluster_id
):
885 Implements the helm version dependent helm instances list
889 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
891 Implements the helm version dependent method to obtain services from a helm instance
895 async def _status_kdu(self
, cluster_id
: str, kdu_instance
: str, namespace
: str = None,
896 show_error_log
: bool = False, return_text
: bool = False):
898 Implements the helm version dependent method to obtain status of a helm instance
902 def _get_install_command(self
, kdu_model
, kdu_instance
, namespace
,
903 params_str
, version
, atomic
, timeout
) -> str:
905 Obtain command to be executed to delete the indicated instance
909 def _get_upgrade_command(self
, kdu_model
, kdu_instance
, namespace
,
910 params_str
, version
, atomic
, timeout
) -> str:
912 Obtain command to be executed to upgrade the indicated instance
916 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
918 Obtain command to be executed to rollback the indicated instance
922 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
924 Obtain command to be executed to delete the indicated instance
928 def _get_inspect_command(self
, show_command
: str, kdu_model
: str, repo_str
: str,
931 Obtain command to be executed to obtain information about the kdu
935 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
937 Method call to uninstall cluster software for helm. This method is dependent
939 For Helm v2 it will be called when Tiller must be uninstalled
940 For Helm v3 it does nothing and does not need to be callled
944 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
946 Obtains the cluster repos identifiers
950 ####################################################################################
951 ################################### P R I V A T E ##################################
952 ####################################################################################
956 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
957 if os
.path
.exists(filename
):
960 msg
= "File {} does not exist".format(filename
)
961 if exception_if_not_exists
:
962 raise K8sException(msg
)
965 def _remove_multiple_spaces(strobj
):
966 strobj
= strobj
.strip()
968 strobj
= strobj
.replace(" ", " ")
972 def _output_to_lines(output
: str) -> list:
973 output_lines
= list()
974 lines
= output
.splitlines(keepends
=False)
978 output_lines
.append(line
)
982 def _output_to_table(output
: str) -> list:
983 output_table
= list()
984 lines
= output
.splitlines(keepends
=False)
986 line
= line
.replace("\t", " ")
988 output_table
.append(line_list
)
989 cells
= line
.split(sep
=" ")
993 line_list
.append(cell
)
997 def _parse_services(output
: str) -> list:
998 lines
= output
.splitlines(keepends
=False)
1001 line
= line
.replace("\t", " ")
1002 cells
= line
.split(sep
=" ")
1003 if len(cells
) > 0 and cells
[0].startswith("service/"):
1004 elems
= cells
[0].split(sep
="/")
1006 services
.append(elems
[1])
1010 def _get_deep(dictionary
: dict, members
: tuple):
1015 value
= target
.get(m
)
1024 # find key:value in several lines
1026 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1027 for line
in p_lines
:
1029 if line
.startswith(p_key
+ ":"):
1030 parts
= line
.split(":")
1031 the_value
= parts
[1].strip()
1039 def _lower_keys_list(input_list
: list):
1041 Transform the keys in a list of dictionaries to lower case and returns a new list
1045 for dictionary
in input_list
:
1046 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1047 new_list
.append(new_dict
)
1050 def _local_exec(self
, command
: str) -> (str, int):
1051 command
= self
._remove
_multiple
_spaces
(command
)
1052 self
.log
.debug("Executing sync local command: {}".format(command
))
1053 # raise exception if fails
1056 output
= subprocess
.check_output(
1057 command
, shell
=True, universal_newlines
=True
1060 self
.log
.debug(output
)
1064 return output
, return_code
1066 async def _local_async_exec(
1069 raise_exception_on_error
: bool = False,
1070 show_error_log
: bool = True,
1071 encode_utf8
: bool = False,
1075 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1076 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1079 command
= shlex
.split(command
)
1081 environ
= os
.environ
.copy()
1086 process
= await asyncio
.create_subprocess_exec(
1087 *command
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1091 # wait for command terminate
1092 stdout
, stderr
= await process
.communicate()
1094 return_code
= process
.returncode
1098 output
= stdout
.decode("utf-8").strip()
1099 # output = stdout.decode()
1101 output
= stderr
.decode("utf-8").strip()
1102 # output = stderr.decode()
1104 if return_code
!= 0 and show_error_log
:
1106 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1109 self
.log
.debug("Return code: {}".format(return_code
))
1111 if raise_exception_on_error
and return_code
!= 0:
1112 raise K8sException(output
)
1115 output
= output
.encode("utf-8").strip()
1116 output
= str(output
).replace("\\n", "\n")
1118 return output
, return_code
1120 except asyncio
.CancelledError
:
1122 except K8sException
:
1124 except Exception as e
:
1125 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1127 if raise_exception_on_error
:
1128 raise K8sException(e
) from e
1132 async def _local_async_exec_pipe(self
,
1135 raise_exception_on_error
: bool = True,
1136 show_error_log
: bool = True,
1137 encode_utf8
: bool = False,
1140 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1141 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1142 command
= "{} | {}".format(command1
, command2
)
1143 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1146 command1
= shlex
.split(command1
)
1147 command2
= shlex
.split(command2
)
1149 environ
= os
.environ
.copy()
1154 read
, write
= os
.pipe()
1155 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1157 process_2
= await asyncio
.create_subprocess_exec(*command2
, stdin
=read
,
1158 stdout
=asyncio
.subprocess
.PIPE
,
1161 stdout
, stderr
= await process_2
.communicate()
1163 return_code
= process_2
.returncode
1167 output
= stdout
.decode("utf-8").strip()
1168 # output = stdout.decode()
1170 output
= stderr
.decode("utf-8").strip()
1171 # output = stderr.decode()
1173 if return_code
!= 0 and show_error_log
:
1175 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1178 self
.log
.debug("Return code: {}".format(return_code
))
1180 if raise_exception_on_error
and return_code
!= 0:
1181 raise K8sException(output
)
1184 output
= output
.encode("utf-8").strip()
1185 output
= str(output
).replace("\\n", "\n")
1187 return output
, return_code
1188 except asyncio
.CancelledError
:
1190 except K8sException
:
1192 except Exception as e
:
1193 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1195 if raise_exception_on_error
:
1196 raise K8sException(e
) from e
1200 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1202 Obtains the data of the specified service in the k8cluster.
1204 :param cluster_id: id of a K8s cluster known by OSM
1205 :param service_name: name of the K8s service in the specified namespace
1206 :param namespace: K8s namespace used by the KDU instance
1207 :return: If successful, it will return a service with the following data:
1208 - `name` of the service
1209 - `type` type of service in the k8 cluster
1210 - `ports` List of ports offered by the service, for each port includes at least
1211 name, port, protocol
1212 - `cluster_ip` Internal ip to be used inside k8s cluster
1213 - `external_ip` List of external ips (in case they are available)
1217 paths
, env
= self
._init
_paths
_env
(
1218 cluster_name
=cluster_id
, create_if_not_exist
=True
1221 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1222 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1225 output
, _rc
= await self
._local
_async
_exec
(
1226 command
=command
, raise_exception_on_error
=True, env
=env
1229 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1232 "name": service_name
,
1233 "type": self
._get
_deep
(data
, ("spec", "type")),
1234 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1235 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP"))
1237 if service
["type"] == "LoadBalancer":
1238 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1239 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1240 service
["external_ip"] = ip_list
1244 async def _exec_inspect_comand(
1245 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1248 Obtains information about a kdu, no cluster (no env)
1253 repo_str
= " --repo {}".format(repo_url
)
1255 idx
= kdu_model
.find("/")
1258 kdu_model
= kdu_model
[idx
:]
1261 if ":" in kdu_model
:
1262 parts
= kdu_model
.split(sep
=":")
1264 version
= "--version {}".format(str(parts
[1]))
1265 kdu_model
= parts
[0]
1267 full_command
= self
._get
_inspect
_command
(inspect_command
, kdu_model
, repo_str
, version
)
1268 output
, _rc
= await self
._local
_async
_exec
(
1269 command
=full_command
, encode_utf8
=True
1274 async def _store_status(
1279 namespace
: str = None,
1280 check_every
: float = 10,
1281 db_dict
: dict = None,
1282 run_once
: bool = False,
1286 await asyncio
.sleep(check_every
)
1287 detailed_status
= await self
._status
_kdu
(
1288 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, namespace
=namespace
,
1291 status
= detailed_status
.get("info").get("description")
1292 self
.log
.debug('KDU {} STATUS: {}.'.format(kdu_instance
, status
))
1293 # write status to db
1294 result
= await self
.write_app_status_to_db(
1297 detailed_status
=str(detailed_status
),
1298 operation
=operation
,
1301 self
.log
.info("Error writing in database. Task exiting...")
1303 except asyncio
.CancelledError
:
1304 self
.log
.debug("Task cancelled")
1306 except Exception as e
:
1307 self
.log
.debug("_store_status exception: {}".format(str(e
)), exc_info
=True)
1313 # params for use in -f file
1314 # returns values file option and filename (in order to delete it at the end)
1315 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1317 if params
and len(params
) > 0:
1318 self
._init
_paths
_env
(
1319 cluster_name
=cluster_id
, create_if_not_exist
=True
1322 def get_random_number():
1323 r
= random
.randrange(start
=1, stop
=99999999)
1331 value
= params
.get(key
)
1332 if "!!yaml" in str(value
):
1333 value
= yaml
.load(value
[7:])
1334 params2
[key
] = value
1336 values_file
= get_random_number() + ".yaml"
1337 with
open(values_file
, "w") as stream
:
1338 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1340 return "-f {}".format(values_file
), values_file
1344 # params for use in --set option
1346 def _params_to_set_option(params
: dict) -> str:
1348 if params
and len(params
) > 0:
1351 value
= params
.get(key
, None)
1352 if value
is not None:
1354 params_str
+= "--set "
1358 params_str
+= "{}={}".format(key
, value
)
1362 def generate_kdu_instance_name(**kwargs
):
1363 chart_name
= kwargs
["kdu_model"]
1364 # check embeded chart (file or dir)
1365 if chart_name
.startswith("/"):
1366 # extract file or directory name
1367 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1369 elif "://" in chart_name
:
1370 # extract last portion of URL
1371 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1374 for c
in chart_name
:
1375 if c
.isalpha() or c
.isnumeric():
1382 # if does not start with alpha character, prefix 'a'
1383 if not name
[0].isalpha():
1388 def get_random_number():
1389 r
= random
.randrange(start
=1, stop
=99999999)
1391 s
= s
.rjust(10, "0")
1394 name
= name
+ get_random_number()