2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
32 from uuid
import uuid4
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
45 service_account
= "osm"
51 kubectl_command
: str = "/usr/bin/kubectl",
52 helm_command
: str = "/usr/bin/helm",
58 :param fs: file system for kubernetes and helm configuration
59 :param db: database object to write current operation status
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
63 :param on_update_db: callback called when k8s connector updates database
67 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
69 self
.log
.info("Initializing K8S Helm connector")
71 # random numbers for release name generation
72 random
.seed(time
.time())
77 # exception if kubectl is not installed
78 self
.kubectl_command
= kubectl_command
79 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
81 # exception if helm is not installed
82 self
._helm
_command
= helm_command
83 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
86 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
88 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
89 cluster_id for backward compatibility
91 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(':')
92 return namespace
, cluster_id
95 self
, k8s_creds
: str, namespace
: str = "kube-system", reuse_cluster_uuid
=None
98 It prepares a given K8s cluster environment to run Charts
100 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
102 :param namespace: optional namespace to be used for helm. By default,
103 'kube-system' will be used
104 :param reuse_cluster_uuid: existing cluster uuid for reuse
105 :return: uuid of the K8s cluster and True if connector has installed some
106 software in the cluster
107 (on error, an exception will be raised)
110 if reuse_cluster_uuid
:
111 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
112 namespace
= namespace_
or namespace
114 cluster_id
= str(uuid4())
115 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
117 self
.log
.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
))
119 paths
, env
= self
._init
_paths
_env
(
120 cluster_name
=cluster_id
, create_if_not_exist
=True
122 mode
= stat
.S_IRUSR | stat
.S_IWUSR
123 with
open(paths
["kube_config"], "w", mode
) as f
:
125 os
.chmod(paths
["kube_config"], 0o600)
127 # Code with initialization specific of helm version
128 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
130 # sync fs with local data
131 self
.fs
.reverse_sync(from_path
=cluster_id
)
133 self
.log
.info("Cluster {} initialized".format(cluster_id
))
135 return cluster_uuid
, n2vc_installed_sw
138 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
140 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
141 self
.log
.debug("Cluster {}, adding {} repository {}. URL: {}".format(
142 cluster_id
, repo_type
, name
, url
))
145 self
.fs
.sync(from_path
=cluster_id
)
148 paths
, env
= self
._init
_paths
_env
(
149 cluster_name
=cluster_id
, create_if_not_exist
=True
153 command
= "{} repo update".format(
156 self
.log
.debug("updating repo: {}".format(command
))
157 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False, env
=env
)
159 # helm repo add name url
160 command
= "{} repo add {} {}".format(
161 self
._helm
_command
, name
, url
163 self
.log
.debug("adding repo: {}".format(command
))
164 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
167 self
.fs
.reverse_sync(from_path
=cluster_id
)
169 async def repo_list(self
, cluster_uuid
: str) -> list:
171 Get the list of registered repositories
173 :return: list of registered repositories: [ (name, url) .... ]
176 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
177 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
180 self
.fs
.sync(from_path
=cluster_id
)
183 paths
, env
= self
._init
_paths
_env
(
184 cluster_name
=cluster_id
, create_if_not_exist
=True
187 command
= "{} repo list --output yaml".format(
191 # Set exception to false because if there are no repos just want an empty list
192 output
, _rc
= await self
._local
_async
_exec
(
193 command
=command
, raise_exception_on_error
=False, env
=env
197 self
.fs
.reverse_sync(from_path
=cluster_id
)
200 if output
and len(output
) > 0:
201 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
202 # unify format between helm2 and helm3 setting all keys lowercase
203 return self
._lower
_keys
_list
(repos
)
209 async def repo_remove(self
, cluster_uuid
: str, name
: str):
211 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
212 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
215 self
.fs
.sync(from_path
=cluster_id
)
218 paths
, env
= self
._init
_paths
_env
(
219 cluster_name
=cluster_id
, create_if_not_exist
=True
222 command
= "{} repo remove {}".format(
223 self
._helm
_command
, name
225 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
228 self
.fs
.reverse_sync(from_path
=cluster_id
)
231 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
234 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
235 self
.log
.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
236 .format(cluster_id
, uninstall_sw
))
239 self
.fs
.sync(from_path
=cluster_id
)
241 # uninstall releases if needed.
243 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
244 if len(releases
) > 0:
248 kdu_instance
= r
.get("name")
249 chart
= r
.get("chart")
251 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
253 await self
.uninstall(
254 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
256 except Exception as e
:
257 # will not raise exception as it was found
258 # that in some cases of previously installed helm releases it
261 "Error uninstalling release {}: {}".format(kdu_instance
, e
)
265 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
268 uninstall_sw
= False # Allow to remove k8s cluster without removing Tiller
271 await self
._uninstall
_sw
(cluster_id
, namespace
)
273 # delete cluster directory
274 self
.log
.debug("Removing directory {}".format(cluster_id
))
275 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
276 # Remove also local directorio if still exist
277 direct
= self
.fs
.path
+ "/" + cluster_id
278 shutil
.rmtree(direct
, ignore_errors
=True)
287 timeout
: float = 300,
289 db_dict
: dict = None,
290 kdu_name
: str = None,
291 namespace
: str = None,
293 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
294 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
297 self
.fs
.sync(from_path
=cluster_id
)
300 paths
, env
= self
._init
_paths
_env
(
301 cluster_name
=cluster_id
, create_if_not_exist
=True
305 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
306 cluster_id
=cluster_id
, params
=params
312 parts
= kdu_model
.split(sep
=":")
314 version
= str(parts
[1])
317 # generate a name for the release. Then, check if already exists
319 while kdu_instance
is None:
320 kdu_instance
= self
._generate
_release
_name
(kdu_model
)
322 result
= await self
._status
_kdu
(
323 cluster_id
=cluster_id
,
324 kdu_instance
=kdu_instance
,
326 show_error_log
=False,
328 if result
is not None:
329 # instance already exists: generate a new one
334 command
= self
._get
_install
_command
(kdu_model
, kdu_instance
, namespace
,
335 params_str
, version
, atomic
, timeout
)
337 self
.log
.debug("installing: {}".format(command
))
340 # exec helm in a task
341 exec_task
= asyncio
.ensure_future(
342 coro_or_future
=self
._local
_async
_exec
(
343 command
=command
, raise_exception_on_error
=False, env
=env
347 # write status in another task
348 status_task
= asyncio
.ensure_future(
349 coro_or_future
=self
._store
_status
(
350 cluster_id
=cluster_id
,
351 kdu_instance
=kdu_instance
,
359 # wait for execution task
360 await asyncio
.wait([exec_task
])
365 output
, rc
= exec_task
.result()
369 output
, rc
= await self
._local
_async
_exec
(
370 command
=command
, raise_exception_on_error
=False, env
=env
373 # remove temporal values yaml file
375 os
.remove(file_to_delete
)
378 await self
._store
_status
(
379 cluster_id
=cluster_id
,
380 kdu_instance
=kdu_instance
,
389 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
391 raise K8sException(msg
)
394 self
.fs
.reverse_sync(from_path
=cluster_id
)
396 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
403 kdu_model
: str = None,
405 timeout
: float = 300,
407 db_dict
: dict = None,
409 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
410 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
413 self
.fs
.sync(from_path
=cluster_id
)
415 # look for instance to obtain namespace
416 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
417 if not instance_info
:
418 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
421 paths
, env
= self
._init
_paths
_env
(
422 cluster_name
=cluster_id
, create_if_not_exist
=True
426 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
427 cluster_id
=cluster_id
, params
=params
433 parts
= kdu_model
.split(sep
=":")
435 version
= str(parts
[1])
438 command
= self
._get
_upgrade
_command
(kdu_model
, kdu_instance
, instance_info
["namespace"],
439 params_str
, version
, atomic
, timeout
)
441 self
.log
.debug("upgrading: {}".format(command
))
445 # exec helm in a task
446 exec_task
= asyncio
.ensure_future(
447 coro_or_future
=self
._local
_async
_exec
(
448 command
=command
, raise_exception_on_error
=False, env
=env
451 # write status in another task
452 status_task
= asyncio
.ensure_future(
453 coro_or_future
=self
._store
_status
(
454 cluster_id
=cluster_id
,
455 kdu_instance
=kdu_instance
,
456 namespace
=instance_info
["namespace"],
463 # wait for execution task
464 await asyncio
.wait([exec_task
])
468 output
, rc
= exec_task
.result()
472 output
, rc
= await self
._local
_async
_exec
(
473 command
=command
, raise_exception_on_error
=False, env
=env
476 # remove temporal values yaml file
478 os
.remove(file_to_delete
)
481 await self
._store
_status
(
482 cluster_id
=cluster_id
,
483 kdu_instance
=kdu_instance
,
484 namespace
=instance_info
["namespace"],
492 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
494 raise K8sException(msg
)
497 self
.fs
.reverse_sync(from_path
=cluster_id
)
499 # return new revision number
500 instance
= await self
.get_instance_info(
501 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
504 revision
= int(instance
.get("revision"))
505 self
.log
.debug("New revision: {}".format(revision
))
511 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
514 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
516 "rollback kdu_instance {} to revision {} from cluster {}".format(
517 kdu_instance
, revision
, cluster_id
522 self
.fs
.sync(from_path
=cluster_id
)
524 # look for instance to obtain namespace
525 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
526 if not instance_info
:
527 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
530 paths
, env
= self
._init
_paths
_env
(
531 cluster_name
=cluster_id
, create_if_not_exist
=True
534 command
= self
._get
_rollback
_command
(kdu_instance
, instance_info
["namespace"],
537 self
.log
.debug("rolling_back: {}".format(command
))
539 # exec helm in a task
540 exec_task
= asyncio
.ensure_future(
541 coro_or_future
=self
._local
_async
_exec
(
542 command
=command
, raise_exception_on_error
=False, env
=env
545 # write status in another task
546 status_task
= asyncio
.ensure_future(
547 coro_or_future
=self
._store
_status
(
548 cluster_id
=cluster_id
,
549 kdu_instance
=kdu_instance
,
550 namespace
=instance_info
["namespace"],
552 operation
="rollback",
557 # wait for execution task
558 await asyncio
.wait([exec_task
])
563 output
, rc
= exec_task
.result()
566 await self
._store
_status
(
567 cluster_id
=cluster_id
,
568 kdu_instance
=kdu_instance
,
569 namespace
=instance_info
["namespace"],
571 operation
="rollback",
577 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
579 raise K8sException(msg
)
582 self
.fs
.reverse_sync(from_path
=cluster_id
)
584 # return new revision number
585 instance
= await self
.get_instance_info(
586 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
589 revision
= int(instance
.get("revision"))
590 self
.log
.debug("New revision: {}".format(revision
))
595 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str):
597 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
598 (this call should happen after all _terminate-config-primitive_ of the VNF
601 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
602 :param kdu_instance: unique name for the KDU instance to be deleted
603 :return: True if successful
606 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
608 "uninstall kdu_instance {} from cluster {}".format(
609 kdu_instance
, cluster_id
614 self
.fs
.sync(from_path
=cluster_id
)
616 # look for instance to obtain namespace
617 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
618 if not instance_info
:
619 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
622 paths
, env
= self
._init
_paths
_env
(
623 cluster_name
=cluster_id
, create_if_not_exist
=True
626 command
= self
._get
_uninstall
_command
(kdu_instance
, instance_info
["namespace"])
627 output
, _rc
= await self
._local
_async
_exec
(
628 command
=command
, raise_exception_on_error
=True, env
=env
632 self
.fs
.reverse_sync(from_path
=cluster_id
)
634 return self
._output
_to
_table
(output
)
636 async def instances_list(self
, cluster_uuid
: str) -> list:
638 returns a list of deployed releases in a cluster
640 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
644 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
645 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
648 self
.fs
.sync(from_path
=cluster_id
)
650 # execute internal command
651 result
= await self
._instances
_list
(cluster_id
)
654 self
.fs
.reverse_sync(from_path
=cluster_id
)
658 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
659 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
660 for instance
in instances
:
661 if instance
.get("name") == kdu_instance
:
663 self
.log
.debug("Instance {} not found".format(kdu_instance
))
666 async def exec_primitive(
668 cluster_uuid
: str = None,
669 kdu_instance
: str = None,
670 primitive_name
: str = None,
671 timeout
: float = 300,
673 db_dict
: dict = None,
675 """Exec primitive (Juju action)
677 :param cluster_uuid: The UUID of the cluster or namespace:cluster
678 :param kdu_instance: The unique name of the KDU instance
679 :param primitive_name: Name of action that will be executed
680 :param timeout: Timeout for action execution
681 :param params: Dictionary of all the parameters needed for the action
682 :db_dict: Dictionary for any additional data
684 :return: Returns the output of the action
687 "KDUs deployed with Helm don't support actions "
688 "different from rollback, upgrade and status"
691 async def get_services(self
,
694 namespace
: str) -> list:
696 Returns a list of services defined for the specified kdu instance.
698 :param cluster_uuid: UUID of a K8s cluster known by OSM
699 :param kdu_instance: unique name for the KDU instance
700 :param namespace: K8s namespace used by the KDU instance
701 :return: If successful, it will return a list of services, Each service
702 can have the following data:
703 - `name` of the service
704 - `type` type of service in the k8 cluster
705 - `ports` List of ports offered by the service, for each port includes at least
707 - `cluster_ip` Internal ip to be used inside k8s cluster
708 - `external_ip` List of external ips (in case they are available)
711 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
713 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
714 cluster_uuid
, kdu_instance
719 self
.fs
.sync(from_path
=cluster_id
)
721 # get list of services names for kdu
722 service_names
= await self
._get
_services
(cluster_id
, kdu_instance
, namespace
)
725 for service
in service_names
:
726 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
727 service_list
.append(service
)
730 self
.fs
.reverse_sync(from_path
=cluster_id
)
734 async def get_service(self
,
737 namespace
: str) -> object:
740 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
741 service_name
, namespace
, cluster_uuid
)
744 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
747 self
.fs
.sync(from_path
=cluster_id
)
749 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
752 self
.fs
.reverse_sync(from_path
=cluster_id
)
756 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str) -> str:
759 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
760 cluster_uuid
, kdu_instance
764 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
767 self
.fs
.sync(from_path
=cluster_id
)
769 # get instance: needed to obtain namespace
770 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
771 for instance
in instances
:
772 if instance
.get("name") == kdu_instance
:
775 # instance does not exist
776 raise K8sException("Instance name: {} not found in cluster: {}".format(
777 kdu_instance
, cluster_id
))
779 status
= await self
._status
_kdu
(
780 cluster_id
=cluster_id
,
781 kdu_instance
=kdu_instance
,
782 namespace
=instance
["namespace"],
788 self
.fs
.reverse_sync(from_path
=cluster_id
)
792 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
795 "inspect kdu_model values {} from (optional) repo: {}".format(
800 return await self
._exec
_inspect
_comand
(
801 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
804 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
807 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
810 return await self
._exec
_inspect
_comand
(
811 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
814 async def synchronize_repos(self
, cluster_uuid
: str):
816 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
818 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
819 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
821 local_repo_list
= await self
.repo_list(cluster_uuid
)
822 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
824 deleted_repo_list
= []
827 # iterate over the list of repos in the database that should be
828 # added if not present
829 for repo_name
, db_repo
in db_repo_dict
.items():
831 # check if it is already present
832 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
833 repo_id
= db_repo
.get("_id")
834 if curr_repo_url
!= db_repo
["url"]:
836 self
.log
.debug("repo {} url changed, delete and and again".format(
838 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
839 deleted_repo_list
.append(repo_id
)
842 self
.log
.debug("add repo {}".format(db_repo
["name"]))
843 await self
.repo_add(cluster_uuid
, db_repo
["name"], db_repo
["url"])
844 added_repo_dict
[repo_id
] = db_repo
["name"]
845 except Exception as e
:
847 "Error adding repo id: {}, err_msg: {} ".format(
852 # Delete repos that are present but not in nbi_list
853 for repo_name
in local_repo_dict
:
854 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
855 self
.log
.debug("delete repo {}".format(repo_name
))
857 await self
.repo_remove(cluster_uuid
, repo_name
)
858 deleted_repo_list
.append(repo_name
)
859 except Exception as e
:
861 "Error deleting repo, name: {}, err_msg: {}".format(
866 return deleted_repo_list
, added_repo_dict
870 except Exception as e
:
871 # Do not raise errors synchronizing repos
872 self
.log
.error("Error synchronizing repos: {}".format(e
))
873 raise Exception("Error synchronizing repos: {}".format(e
))
875 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
877 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
878 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
880 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
884 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
887 def _get_db_repos_dict(self
, repo_ids
: list):
889 for repo_id
in repo_ids
:
890 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
891 db_repos_dict
[db_repo
["name"]] = db_repo
895 ####################################################################################
896 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
897 ####################################################################################
901 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
903 Creates and returns base cluster and kube dirs and returns them.
904 Also created helm3 dirs according to new directory specification, paths are
905 not returned but assigned to helm environment variables
907 :param cluster_name: cluster_name
908 :return: Dictionary with config_paths and dictionary with helm environment variables
912 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
914 Implements the helm version dependent cluster initialization
918 async def _instances_list(self
, cluster_id
):
920 Implements the helm version dependent helm instances list
924 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
926 Implements the helm version dependent method to obtain services from a helm instance
930 async def _status_kdu(self
, cluster_id
: str, kdu_instance
: str, namespace
: str = None,
931 show_error_log
: bool = False, return_text
: bool = False):
933 Implements the helm version dependent method to obtain status of a helm instance
937 def _get_install_command(self
, kdu_model
, kdu_instance
, namespace
,
938 params_str
, version
, atomic
, timeout
) -> str:
940 Obtain command to be executed to delete the indicated instance
944 def _get_upgrade_command(self
, kdu_model
, kdu_instance
, namespace
,
945 params_str
, version
, atomic
, timeout
) -> str:
947 Obtain command to be executed to upgrade the indicated instance
951 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
953 Obtain command to be executed to rollback the indicated instance
957 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
959 Obtain command to be executed to delete the indicated instance
963 def _get_inspect_command(self
, show_command
: str, kdu_model
: str, repo_str
: str,
966 Obtain command to be executed to obtain information about the kdu
970 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
972 Method call to uninstall cluster software for helm. This method is dependent
974 For Helm v2 it will be called when Tiller must be uninstalled
975 For Helm v3 it does nothing and does not need to be callled
979 ####################################################################################
980 ################################### P R I V A T E ##################################
981 ####################################################################################
985 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
986 if os
.path
.exists(filename
):
989 msg
= "File {} does not exist".format(filename
)
990 if exception_if_not_exists
:
991 raise K8sException(msg
)
994 def _remove_multiple_spaces(strobj
):
995 strobj
= strobj
.strip()
997 strobj
= strobj
.replace(" ", " ")
1001 def _output_to_lines(output
: str) -> list:
1002 output_lines
= list()
1003 lines
= output
.splitlines(keepends
=False)
1007 output_lines
.append(line
)
1011 def _output_to_table(output
: str) -> list:
1012 output_table
= list()
1013 lines
= output
.splitlines(keepends
=False)
1015 line
= line
.replace("\t", " ")
1017 output_table
.append(line_list
)
1018 cells
= line
.split(sep
=" ")
1022 line_list
.append(cell
)
1026 def _parse_services(output
: str) -> list:
1027 lines
= output
.splitlines(keepends
=False)
1030 line
= line
.replace("\t", " ")
1031 cells
= line
.split(sep
=" ")
1032 if len(cells
) > 0 and cells
[0].startswith("service/"):
1033 elems
= cells
[0].split(sep
="/")
1035 services
.append(elems
[1])
1039 def _get_deep(dictionary
: dict, members
: tuple):
1044 value
= target
.get(m
)
1053 # find key:value in several lines
1055 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1056 for line
in p_lines
:
1058 if line
.startswith(p_key
+ ":"):
1059 parts
= line
.split(":")
1060 the_value
= parts
[1].strip()
1068 def _lower_keys_list(input_list
: list):
1070 Transform the keys in a list of dictionaries to lower case and returns a new list
1074 for dictionary
in input_list
:
1075 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1076 new_list
.append(new_dict
)
1079 def _local_exec(self
, command
: str) -> (str, int):
1080 command
= self
._remove
_multiple
_spaces
(command
)
1081 self
.log
.debug("Executing sync local command: {}".format(command
))
1082 # raise exception if fails
1085 output
= subprocess
.check_output(
1086 command
, shell
=True, universal_newlines
=True
1089 self
.log
.debug(output
)
1093 return output
, return_code
1095 async def _local_async_exec(
1098 raise_exception_on_error
: bool = False,
1099 show_error_log
: bool = True,
1100 encode_utf8
: bool = False,
1104 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1105 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1108 command
= shlex
.split(command
)
1110 environ
= os
.environ
.copy()
1115 process
= await asyncio
.create_subprocess_exec(
1116 *command
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1120 # wait for command terminate
1121 stdout
, stderr
= await process
.communicate()
1123 return_code
= process
.returncode
1127 output
= stdout
.decode("utf-8").strip()
1128 # output = stdout.decode()
1130 output
= stderr
.decode("utf-8").strip()
1131 # output = stderr.decode()
1133 if return_code
!= 0 and show_error_log
:
1135 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1138 self
.log
.debug("Return code: {}".format(return_code
))
1140 if raise_exception_on_error
and return_code
!= 0:
1141 raise K8sException(output
)
1144 output
= output
.encode("utf-8").strip()
1145 output
= str(output
).replace("\\n", "\n")
1147 return output
, return_code
1149 except asyncio
.CancelledError
:
1151 except K8sException
:
1153 except Exception as e
:
1154 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1156 if raise_exception_on_error
:
1157 raise K8sException(e
) from e
1161 async def _local_async_exec_pipe(self
,
1164 raise_exception_on_error
: bool = True,
1165 show_error_log
: bool = True,
1166 encode_utf8
: bool = False,
1169 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1170 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1171 command
= "{} | {}".format(command1
, command2
)
1172 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1175 command1
= shlex
.split(command1
)
1176 command2
= shlex
.split(command2
)
1178 environ
= os
.environ
.copy()
1183 read
, write
= os
.pipe()
1184 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1186 process_2
= await asyncio
.create_subprocess_exec(*command2
, stdin
=read
,
1187 stdout
=asyncio
.subprocess
.PIPE
,
1190 stdout
, stderr
= await process_2
.communicate()
1192 return_code
= process_2
.returncode
1196 output
= stdout
.decode("utf-8").strip()
1197 # output = stdout.decode()
1199 output
= stderr
.decode("utf-8").strip()
1200 # output = stderr.decode()
1202 if return_code
!= 0 and show_error_log
:
1204 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1207 self
.log
.debug("Return code: {}".format(return_code
))
1209 if raise_exception_on_error
and return_code
!= 0:
1210 raise K8sException(output
)
1213 output
= output
.encode("utf-8").strip()
1214 output
= str(output
).replace("\\n", "\n")
1216 return output
, return_code
1217 except asyncio
.CancelledError
:
1219 except K8sException
:
1221 except Exception as e
:
1222 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1224 if raise_exception_on_error
:
1225 raise K8sException(e
) from e
1229 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1231 Obtains the data of the specified service in the k8cluster.
1233 :param cluster_id: id of a K8s cluster known by OSM
1234 :param service_name: name of the K8s service in the specified namespace
1235 :param namespace: K8s namespace used by the KDU instance
1236 :return: If successful, it will return a service with the following data:
1237 - `name` of the service
1238 - `type` type of service in the k8 cluster
1239 - `ports` List of ports offered by the service, for each port includes at least
1240 name, port, protocol
1241 - `cluster_ip` Internal ip to be used inside k8s cluster
1242 - `external_ip` List of external ips (in case they are available)
1246 paths
, env
= self
._init
_paths
_env
(
1247 cluster_name
=cluster_id
, create_if_not_exist
=True
1250 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1251 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1254 output
, _rc
= await self
._local
_async
_exec
(
1255 command
=command
, raise_exception_on_error
=True, env
=env
1258 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1261 "name": service_name
,
1262 "type": self
._get
_deep
(data
, ("spec", "type")),
1263 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1264 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP"))
1266 if service
["type"] == "LoadBalancer":
1267 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1268 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1269 service
["external_ip"] = ip_list
1273 async def _exec_inspect_comand(
1274 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1277 Obtains information about a kdu, no cluster (no env)
1282 repo_str
= " --repo {}".format(repo_url
)
1284 idx
= kdu_model
.find("/")
1287 kdu_model
= kdu_model
[idx
:]
1290 if ":" in kdu_model
:
1291 parts
= kdu_model
.split(sep
=":")
1293 version
= "--version {}".format(str(parts
[1]))
1294 kdu_model
= parts
[0]
1296 full_command
= self
._get
_inspect
_command
(inspect_command
, kdu_model
, repo_str
, version
)
1297 output
, _rc
= await self
._local
_async
_exec
(
1298 command
=full_command
, encode_utf8
=True
1303 async def _store_status(
1308 namespace
: str = None,
1309 check_every
: float = 10,
1310 db_dict
: dict = None,
1311 run_once
: bool = False,
1315 await asyncio
.sleep(check_every
)
1316 detailed_status
= await self
._status
_kdu
(
1317 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, namespace
=namespace
,
1320 status
= detailed_status
.get("info").get("description")
1321 self
.log
.debug('KDU {} STATUS: {}.'.format(kdu_instance
, status
))
1322 # write status to db
1323 result
= await self
.write_app_status_to_db(
1326 detailed_status
=str(detailed_status
),
1327 operation
=operation
,
1330 self
.log
.info("Error writing in database. Task exiting...")
1332 except asyncio
.CancelledError
:
1333 self
.log
.debug("Task cancelled")
1335 except Exception as e
:
1336 self
.log
.debug("_store_status exception: {}".format(str(e
)), exc_info
=True)
1342 # params for use in -f file
1343 # returns values file option and filename (in order to delete it at the end)
1344 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1346 if params
and len(params
) > 0:
1347 self
._init
_paths
_env
(
1348 cluster_name
=cluster_id
, create_if_not_exist
=True
1351 def get_random_number():
1352 r
= random
.randrange(start
=1, stop
=99999999)
1360 value
= params
.get(key
)
1361 if "!!yaml" in str(value
):
1362 value
= yaml
.load(value
[7:])
1363 params2
[key
] = value
1365 values_file
= get_random_number() + ".yaml"
1366 with
open(values_file
, "w") as stream
:
1367 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1369 return "-f {}".format(values_file
), values_file
1373 # params for use in --set option
1375 def _params_to_set_option(params
: dict) -> str:
1377 if params
and len(params
) > 0:
1380 value
= params
.get(key
, None)
1381 if value
is not None:
1383 params_str
+= "--set "
1387 params_str
+= "{}={}".format(key
, value
)
1391 def _generate_release_name(chart_name
: str):
1392 # check embeded chart (file or dir)
1393 if chart_name
.startswith("/"):
1394 # extract file or directory name
1395 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1397 elif "://" in chart_name
:
1398 # extract last portion of URL
1399 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1402 for c
in chart_name
:
1403 if c
.isalpha() or c
.isnumeric():
1410 # if does not start with alpha character, prefix 'a'
1411 if not name
[0].isalpha():
1416 def get_random_number():
1417 r
= random
.randrange(start
=1, stop
=99999999)
1419 s
= s
.rjust(10, "0")
1422 name
= name
+ get_random_number()