2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
32 from uuid
import uuid4
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
45 service_account
= "osm"
46 _STABLE_REPO_URL
= "https://charts.helm.sh/stable"
52 kubectl_command
: str = "/usr/bin/kubectl",
53 helm_command
: str = "/usr/bin/helm",
56 vca_config
: dict = None,
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 # obtain stable repo url from config or apply default
88 if not vca_config
or not vca_config
.get("stablerepourl"):
89 self
._stable
_repo
_url
= self
._STABLE
_REPO
_URL
91 self
._stable
_repo
_url
= vca_config
.get("stablerepourl")
94 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
99 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(':')
100 return namespace
, cluster_id
103 self
, k8s_creds
: str, namespace
: str = "kube-system", reuse_cluster_uuid
=None, **kwargs
,
106 It prepares a given K8s cluster environment to run Charts
108 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
110 :param namespace: optional namespace to be used for helm. By default,
111 'kube-system' will be used
112 :param reuse_cluster_uuid: existing cluster uuid for reuse
113 :param kwargs: Additional parameters (None yet)
114 :return: uuid of the K8s cluster and True if connector has installed some
115 software in the cluster
116 (on error, an exception will be raised)
119 if reuse_cluster_uuid
:
120 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
121 namespace
= namespace_
or namespace
123 cluster_id
= str(uuid4())
124 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
126 self
.log
.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
))
128 paths
, env
= self
._init
_paths
_env
(
129 cluster_name
=cluster_id
, create_if_not_exist
=True
131 mode
= stat
.S_IRUSR | stat
.S_IWUSR
132 with
open(paths
["kube_config"], "w", mode
) as f
:
134 os
.chmod(paths
["kube_config"], 0o600)
136 # Code with initialization specific of helm version
137 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
139 # sync fs with local data
140 self
.fs
.reverse_sync(from_path
=cluster_id
)
142 self
.log
.info("Cluster {} initialized".format(cluster_id
))
144 return cluster_uuid
, n2vc_installed_sw
147 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
149 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
150 self
.log
.debug("Cluster {}, adding {} repository {}. URL: {}".format(
151 cluster_id
, repo_type
, name
, url
))
154 self
.fs
.sync(from_path
=cluster_id
)
157 paths
, env
= self
._init
_paths
_env
(
158 cluster_name
=cluster_id
, create_if_not_exist
=True
162 command
= "{} repo update".format(
165 self
.log
.debug("updating repo: {}".format(command
))
166 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False, env
=env
)
168 # helm repo add name url
169 command
= "{} repo add {} {}".format(
170 self
._helm
_command
, name
, url
172 self
.log
.debug("adding repo: {}".format(command
))
173 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
176 self
.fs
.reverse_sync(from_path
=cluster_id
)
178 async def repo_list(self
, cluster_uuid
: str) -> list:
180 Get the list of registered repositories
182 :return: list of registered repositories: [ (name, url) .... ]
185 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
186 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
189 self
.fs
.sync(from_path
=cluster_id
)
192 paths
, env
= self
._init
_paths
_env
(
193 cluster_name
=cluster_id
, create_if_not_exist
=True
196 command
= "{} repo list --output yaml".format(
200 # Set exception to false because if there are no repos just want an empty list
201 output
, _rc
= await self
._local
_async
_exec
(
202 command
=command
, raise_exception_on_error
=False, env
=env
206 self
.fs
.reverse_sync(from_path
=cluster_id
)
209 if output
and len(output
) > 0:
210 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
211 # unify format between helm2 and helm3 setting all keys lowercase
212 return self
._lower
_keys
_list
(repos
)
218 async def repo_remove(self
, cluster_uuid
: str, name
: str):
220 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
221 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
224 self
.fs
.sync(from_path
=cluster_id
)
227 paths
, env
= self
._init
_paths
_env
(
228 cluster_name
=cluster_id
, create_if_not_exist
=True
231 command
= "{} repo remove {}".format(
232 self
._helm
_command
, name
234 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
237 self
.fs
.reverse_sync(from_path
=cluster_id
)
240 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False, **kwargs
244 Resets the Kubernetes cluster by removing the helm deployment that represents it.
246 :param cluster_uuid: The UUID of the cluster to reset
247 :param force: Boolean to force the reset
248 :param uninstall_sw: Boolean to force the reset
249 :param kwargs: Additional parameters (None yet)
250 :return: Returns True if successful or raises an exception.
252 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
253 self
.log
.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
254 .format(cluster_id
, uninstall_sw
))
257 self
.fs
.sync(from_path
=cluster_id
)
259 # uninstall releases if needed.
261 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
262 if len(releases
) > 0:
266 kdu_instance
= r
.get("name")
267 chart
= r
.get("chart")
269 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
271 await self
.uninstall(
272 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
274 except Exception as e
:
275 # will not raise exception as it was found
276 # that in some cases of previously installed helm releases it
279 "Error uninstalling release {}: {}".format(kdu_instance
, e
)
283 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
286 uninstall_sw
= False # Allow to remove k8s cluster without removing Tiller
289 await self
._uninstall
_sw
(cluster_id
, namespace
)
291 # delete cluster directory
292 self
.log
.debug("Removing directory {}".format(cluster_id
))
293 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
294 # Remove also local directorio if still exist
295 direct
= self
.fs
.path
+ "/" + cluster_id
296 shutil
.rmtree(direct
, ignore_errors
=True)
300 async def _install_impl(
308 timeout
: float = 300,
310 db_dict
: dict = None,
311 kdu_name
: str = None,
312 namespace
: str = None,
315 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
316 cluster_id
=cluster_id
, params
=params
322 parts
= kdu_model
.split(sep
=":")
324 version
= str(parts
[1])
327 command
= self
._get
_install
_command
(kdu_model
, kdu_instance
, namespace
,
328 params_str
, version
, atomic
, timeout
)
330 self
.log
.debug("installing: {}".format(command
))
333 # exec helm in a task
334 exec_task
= asyncio
.ensure_future(
335 coro_or_future
=self
._local
_async
_exec
(
336 command
=command
, raise_exception_on_error
=False, env
=env
340 # write status in another task
341 status_task
= asyncio
.ensure_future(
342 coro_or_future
=self
._store
_status
(
343 cluster_id
=cluster_id
,
344 kdu_instance
=kdu_instance
,
352 # wait for execution task
353 await asyncio
.wait([exec_task
])
358 output
, rc
= exec_task
.result()
362 output
, rc
= await self
._local
_async
_exec
(
363 command
=command
, raise_exception_on_error
=False, env
=env
366 # remove temporal values yaml file
368 os
.remove(file_to_delete
)
371 await self
._store
_status
(
372 cluster_id
=cluster_id
,
373 kdu_instance
=kdu_instance
,
382 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
384 raise K8sException(msg
)
390 kdu_model
: str = None,
392 timeout
: float = 300,
394 db_dict
: dict = None,
396 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
397 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
400 self
.fs
.sync(from_path
=cluster_id
)
402 # look for instance to obtain namespace
403 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
404 if not instance_info
:
405 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
408 paths
, env
= self
._init
_paths
_env
(
409 cluster_name
=cluster_id
, create_if_not_exist
=True
413 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
414 cluster_id
=cluster_id
, params
=params
420 parts
= kdu_model
.split(sep
=":")
422 version
= str(parts
[1])
425 command
= self
._get
_upgrade
_command
(kdu_model
, kdu_instance
, instance_info
["namespace"],
426 params_str
, version
, atomic
, timeout
)
428 self
.log
.debug("upgrading: {}".format(command
))
432 # exec helm in a task
433 exec_task
= asyncio
.ensure_future(
434 coro_or_future
=self
._local
_async
_exec
(
435 command
=command
, raise_exception_on_error
=False, env
=env
438 # write status in another task
439 status_task
= asyncio
.ensure_future(
440 coro_or_future
=self
._store
_status
(
441 cluster_id
=cluster_id
,
442 kdu_instance
=kdu_instance
,
443 namespace
=instance_info
["namespace"],
450 # wait for execution task
451 await asyncio
.wait([exec_task
])
455 output
, rc
= exec_task
.result()
459 output
, rc
= await self
._local
_async
_exec
(
460 command
=command
, raise_exception_on_error
=False, env
=env
463 # remove temporal values yaml file
465 os
.remove(file_to_delete
)
468 await self
._store
_status
(
469 cluster_id
=cluster_id
,
470 kdu_instance
=kdu_instance
,
471 namespace
=instance_info
["namespace"],
479 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
481 raise K8sException(msg
)
484 self
.fs
.reverse_sync(from_path
=cluster_id
)
486 # return new revision number
487 instance
= await self
.get_instance_info(
488 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
491 revision
= int(instance
.get("revision"))
492 self
.log
.debug("New revision: {}".format(revision
))
498 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
501 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
503 "rollback kdu_instance {} to revision {} from cluster {}".format(
504 kdu_instance
, revision
, cluster_id
509 self
.fs
.sync(from_path
=cluster_id
)
511 # look for instance to obtain namespace
512 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
513 if not instance_info
:
514 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
517 paths
, env
= self
._init
_paths
_env
(
518 cluster_name
=cluster_id
, create_if_not_exist
=True
521 command
= self
._get
_rollback
_command
(kdu_instance
, instance_info
["namespace"],
524 self
.log
.debug("rolling_back: {}".format(command
))
526 # exec helm in a task
527 exec_task
= asyncio
.ensure_future(
528 coro_or_future
=self
._local
_async
_exec
(
529 command
=command
, raise_exception_on_error
=False, env
=env
532 # write status in another task
533 status_task
= asyncio
.ensure_future(
534 coro_or_future
=self
._store
_status
(
535 cluster_id
=cluster_id
,
536 kdu_instance
=kdu_instance
,
537 namespace
=instance_info
["namespace"],
539 operation
="rollback",
544 # wait for execution task
545 await asyncio
.wait([exec_task
])
550 output
, rc
= exec_task
.result()
553 await self
._store
_status
(
554 cluster_id
=cluster_id
,
555 kdu_instance
=kdu_instance
,
556 namespace
=instance_info
["namespace"],
558 operation
="rollback",
564 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
566 raise K8sException(msg
)
569 self
.fs
.reverse_sync(from_path
=cluster_id
)
571 # return new revision number
572 instance
= await self
.get_instance_info(
573 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
576 revision
= int(instance
.get("revision"))
577 self
.log
.debug("New revision: {}".format(revision
))
582 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
584 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
585 (this call should happen after all _terminate-config-primitive_ of the VNF
588 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
589 :param kdu_instance: unique name for the KDU instance to be deleted
590 :param kwargs: Additional parameters (None yet)
591 :return: True if successful
594 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
596 "uninstall kdu_instance {} from cluster {}".format(
597 kdu_instance
, cluster_id
602 self
.fs
.sync(from_path
=cluster_id
)
604 # look for instance to obtain namespace
605 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
606 if not instance_info
:
607 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
610 paths
, env
= self
._init
_paths
_env
(
611 cluster_name
=cluster_id
, create_if_not_exist
=True
614 command
= self
._get
_uninstall
_command
(kdu_instance
, instance_info
["namespace"])
615 output
, _rc
= await self
._local
_async
_exec
(
616 command
=command
, raise_exception_on_error
=True, env
=env
620 self
.fs
.reverse_sync(from_path
=cluster_id
)
622 return self
._output
_to
_table
(output
)
624 async def instances_list(self
, cluster_uuid
: str) -> list:
626 returns a list of deployed releases in a cluster
628 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
632 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
633 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
636 self
.fs
.sync(from_path
=cluster_id
)
638 # execute internal command
639 result
= await self
._instances
_list
(cluster_id
)
642 self
.fs
.reverse_sync(from_path
=cluster_id
)
646 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
647 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
648 for instance
in instances
:
649 if instance
.get("name") == kdu_instance
:
651 self
.log
.debug("Instance {} not found".format(kdu_instance
))
654 async def exec_primitive(
656 cluster_uuid
: str = None,
657 kdu_instance
: str = None,
658 primitive_name
: str = None,
659 timeout
: float = 300,
661 db_dict
: dict = None,
664 """Exec primitive (Juju action)
666 :param cluster_uuid: The UUID of the cluster or namespace:cluster
667 :param kdu_instance: The unique name of the KDU instance
668 :param primitive_name: Name of action that will be executed
669 :param timeout: Timeout for action execution
670 :param params: Dictionary of all the parameters needed for the action
671 :db_dict: Dictionary for any additional data
672 :param kwargs: Additional parameters (None yet)
674 :return: Returns the output of the action
677 "KDUs deployed with Helm don't support actions "
678 "different from rollback, upgrade and status"
681 async def get_services(self
,
684 namespace
: str) -> list:
686 Returns a list of services defined for the specified kdu instance.
688 :param cluster_uuid: UUID of a K8s cluster known by OSM
689 :param kdu_instance: unique name for the KDU instance
690 :param namespace: K8s namespace used by the KDU instance
691 :return: If successful, it will return a list of services, Each service
692 can have the following data:
693 - `name` of the service
694 - `type` type of service in the k8 cluster
695 - `ports` List of ports offered by the service, for each port includes at least
697 - `cluster_ip` Internal ip to be used inside k8s cluster
698 - `external_ip` List of external ips (in case they are available)
701 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
703 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
704 cluster_uuid
, kdu_instance
709 self
.fs
.sync(from_path
=cluster_id
)
711 # get list of services names for kdu
712 service_names
= await self
._get
_services
(cluster_id
, kdu_instance
, namespace
)
715 for service
in service_names
:
716 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
717 service_list
.append(service
)
720 self
.fs
.reverse_sync(from_path
=cluster_id
)
724 async def get_service(self
,
727 namespace
: str) -> object:
730 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
731 service_name
, namespace
, cluster_uuid
)
734 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
737 self
.fs
.sync(from_path
=cluster_id
)
739 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
742 self
.fs
.reverse_sync(from_path
=cluster_id
)
746 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
) -> str:
748 This call would retrieve tha current state of a given KDU instance. It would be
749 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
750 values_ of the configuration parameters applied to a given instance. This call
751 would be based on the `status` call.
753 :param cluster_uuid: UUID of a K8s cluster known by OSM
754 :param kdu_instance: unique name for the KDU instance
755 :param kwargs: Additional parameters (None yet)
756 :return: If successful, it will return the following vector of arguments:
757 - K8s `namespace` in the cluster where the KDU lives
758 - `state` of the KDU instance. It can be:
765 - List of `resources` (objects) that this release consists of, sorted by kind,
766 and the status of those resources
767 - Last `deployment_time`.
771 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
772 cluster_uuid
, kdu_instance
776 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
779 self
.fs
.sync(from_path
=cluster_id
)
781 # get instance: needed to obtain namespace
782 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
783 for instance
in instances
:
784 if instance
.get("name") == kdu_instance
:
787 # instance does not exist
788 raise K8sException("Instance name: {} not found in cluster: {}".format(
789 kdu_instance
, cluster_id
))
791 status
= await self
._status
_kdu
(
792 cluster_id
=cluster_id
,
793 kdu_instance
=kdu_instance
,
794 namespace
=instance
["namespace"],
800 self
.fs
.reverse_sync(from_path
=cluster_id
)
804 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
807 "inspect kdu_model values {} from (optional) repo: {}".format(
812 return await self
._exec
_inspect
_comand
(
813 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
816 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
819 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
822 return await self
._exec
_inspect
_comand
(
823 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
826 async def synchronize_repos(self
, cluster_uuid
: str):
828 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
830 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
831 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
833 local_repo_list
= await self
.repo_list(cluster_uuid
)
834 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
836 deleted_repo_list
= []
839 # iterate over the list of repos in the database that should be
840 # added if not present
841 for repo_name
, db_repo
in db_repo_dict
.items():
843 # check if it is already present
844 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
845 repo_id
= db_repo
.get("_id")
846 if curr_repo_url
!= db_repo
["url"]:
848 self
.log
.debug("repo {} url changed, delete and and again".format(
850 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
851 deleted_repo_list
.append(repo_id
)
854 self
.log
.debug("add repo {}".format(db_repo
["name"]))
855 await self
.repo_add(cluster_uuid
, db_repo
["name"], db_repo
["url"])
856 added_repo_dict
[repo_id
] = db_repo
["name"]
857 except Exception as e
:
859 "Error adding repo id: {}, err_msg: {} ".format(
864 # Delete repos that are present but not in nbi_list
865 for repo_name
in local_repo_dict
:
866 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
867 self
.log
.debug("delete repo {}".format(repo_name
))
869 await self
.repo_remove(cluster_uuid
, repo_name
)
870 deleted_repo_list
.append(repo_name
)
871 except Exception as e
:
873 "Error deleting repo, name: {}, err_msg: {}".format(
878 return deleted_repo_list
, added_repo_dict
882 except Exception as e
:
883 # Do not raise errors synchronizing repos
884 self
.log
.error("Error synchronizing repos: {}".format(e
))
885 raise Exception("Error synchronizing repos: {}".format(e
))
887 def _get_db_repos_dict(self
, repo_ids
: list):
889 for repo_id
in repo_ids
:
890 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
891 db_repos_dict
[db_repo
["name"]] = db_repo
895 ####################################################################################
896 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
897 ####################################################################################
901 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
903 Creates and returns base cluster and kube dirs and returns them.
904 Also created helm3 dirs according to new directory specification, paths are
905 not returned but assigned to helm environment variables
907 :param cluster_name: cluster_name
908 :return: Dictionary with config_paths and dictionary with helm environment variables
912 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
914 Implements the helm version dependent cluster initialization
918 async def _instances_list(self
, cluster_id
):
920 Implements the helm version dependent helm instances list
924 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
926 Implements the helm version dependent method to obtain services from a helm instance
930 async def _status_kdu(self
, cluster_id
: str, kdu_instance
: str, namespace
: str = None,
931 show_error_log
: bool = False, return_text
: bool = False):
933 Implements the helm version dependent method to obtain status of a helm instance
937 def _get_install_command(self
, kdu_model
, kdu_instance
, namespace
,
938 params_str
, version
, atomic
, timeout
) -> str:
940 Obtain command to be executed to delete the indicated instance
944 def _get_upgrade_command(self
, kdu_model
, kdu_instance
, namespace
,
945 params_str
, version
, atomic
, timeout
) -> str:
947 Obtain command to be executed to upgrade the indicated instance
951 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
953 Obtain command to be executed to rollback the indicated instance
957 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
959 Obtain command to be executed to delete the indicated instance
963 def _get_inspect_command(self
, show_command
: str, kdu_model
: str, repo_str
: str,
966 Obtain command to be executed to obtain information about the kdu
970 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
972 Method call to uninstall cluster software for helm. This method is dependent
974 For Helm v2 it will be called when Tiller must be uninstalled
975 For Helm v3 it does nothing and does not need to be callled
979 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
981 Obtains the cluster repos identifiers
985 ####################################################################################
986 ################################### P R I V A T E ##################################
987 ####################################################################################
991 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
992 if os
.path
.exists(filename
):
995 msg
= "File {} does not exist".format(filename
)
996 if exception_if_not_exists
:
997 raise K8sException(msg
)
1000 def _remove_multiple_spaces(strobj
):
1001 strobj
= strobj
.strip()
1002 while " " in strobj
:
1003 strobj
= strobj
.replace(" ", " ")
1007 def _output_to_lines(output
: str) -> list:
1008 output_lines
= list()
1009 lines
= output
.splitlines(keepends
=False)
1013 output_lines
.append(line
)
1017 def _output_to_table(output
: str) -> list:
1018 output_table
= list()
1019 lines
= output
.splitlines(keepends
=False)
1021 line
= line
.replace("\t", " ")
1023 output_table
.append(line_list
)
1024 cells
= line
.split(sep
=" ")
1028 line_list
.append(cell
)
1032 def _parse_services(output
: str) -> list:
1033 lines
= output
.splitlines(keepends
=False)
1036 line
= line
.replace("\t", " ")
1037 cells
= line
.split(sep
=" ")
1038 if len(cells
) > 0 and cells
[0].startswith("service/"):
1039 elems
= cells
[0].split(sep
="/")
1041 services
.append(elems
[1])
1045 def _get_deep(dictionary
: dict, members
: tuple):
1050 value
= target
.get(m
)
1059 # find key:value in several lines
1061 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1062 for line
in p_lines
:
1064 if line
.startswith(p_key
+ ":"):
1065 parts
= line
.split(":")
1066 the_value
= parts
[1].strip()
1074 def _lower_keys_list(input_list
: list):
1076 Transform the keys in a list of dictionaries to lower case and returns a new list
1080 for dictionary
in input_list
:
1081 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1082 new_list
.append(new_dict
)
1085 def _local_exec(self
, command
: str) -> (str, int):
1086 command
= self
._remove
_multiple
_spaces
(command
)
1087 self
.log
.debug("Executing sync local command: {}".format(command
))
1088 # raise exception if fails
1091 output
= subprocess
.check_output(
1092 command
, shell
=True, universal_newlines
=True
1095 self
.log
.debug(output
)
1099 return output
, return_code
1101 async def _local_async_exec(
1104 raise_exception_on_error
: bool = False,
1105 show_error_log
: bool = True,
1106 encode_utf8
: bool = False,
1110 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1111 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1114 command
= shlex
.split(command
)
1116 environ
= os
.environ
.copy()
1121 process
= await asyncio
.create_subprocess_exec(
1122 *command
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1126 # wait for command terminate
1127 stdout
, stderr
= await process
.communicate()
1129 return_code
= process
.returncode
1133 output
= stdout
.decode("utf-8").strip()
1134 # output = stdout.decode()
1136 output
= stderr
.decode("utf-8").strip()
1137 # output = stderr.decode()
1139 if return_code
!= 0 and show_error_log
:
1141 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1144 self
.log
.debug("Return code: {}".format(return_code
))
1146 if raise_exception_on_error
and return_code
!= 0:
1147 raise K8sException(output
)
1150 output
= output
.encode("utf-8").strip()
1151 output
= str(output
).replace("\\n", "\n")
1153 return output
, return_code
1155 except asyncio
.CancelledError
:
1157 except K8sException
:
1159 except Exception as e
:
1160 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1162 if raise_exception_on_error
:
1163 raise K8sException(e
) from e
1167 async def _local_async_exec_pipe(self
,
1170 raise_exception_on_error
: bool = True,
1171 show_error_log
: bool = True,
1172 encode_utf8
: bool = False,
1175 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1176 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1177 command
= "{} | {}".format(command1
, command2
)
1178 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1181 command1
= shlex
.split(command1
)
1182 command2
= shlex
.split(command2
)
1184 environ
= os
.environ
.copy()
1189 read
, write
= os
.pipe()
1190 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1192 process_2
= await asyncio
.create_subprocess_exec(*command2
, stdin
=read
,
1193 stdout
=asyncio
.subprocess
.PIPE
,
1196 stdout
, stderr
= await process_2
.communicate()
1198 return_code
= process_2
.returncode
1202 output
= stdout
.decode("utf-8").strip()
1203 # output = stdout.decode()
1205 output
= stderr
.decode("utf-8").strip()
1206 # output = stderr.decode()
1208 if return_code
!= 0 and show_error_log
:
1210 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1213 self
.log
.debug("Return code: {}".format(return_code
))
1215 if raise_exception_on_error
and return_code
!= 0:
1216 raise K8sException(output
)
1219 output
= output
.encode("utf-8").strip()
1220 output
= str(output
).replace("\\n", "\n")
1222 return output
, return_code
1223 except asyncio
.CancelledError
:
1225 except K8sException
:
1227 except Exception as e
:
1228 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1230 if raise_exception_on_error
:
1231 raise K8sException(e
) from e
1235 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1237 Obtains the data of the specified service in the k8cluster.
1239 :param cluster_id: id of a K8s cluster known by OSM
1240 :param service_name: name of the K8s service in the specified namespace
1241 :param namespace: K8s namespace used by the KDU instance
1242 :return: If successful, it will return a service with the following data:
1243 - `name` of the service
1244 - `type` type of service in the k8 cluster
1245 - `ports` List of ports offered by the service, for each port includes at least
1246 name, port, protocol
1247 - `cluster_ip` Internal ip to be used inside k8s cluster
1248 - `external_ip` List of external ips (in case they are available)
1252 paths
, env
= self
._init
_paths
_env
(
1253 cluster_name
=cluster_id
, create_if_not_exist
=True
1256 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1257 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1260 output
, _rc
= await self
._local
_async
_exec
(
1261 command
=command
, raise_exception_on_error
=True, env
=env
1264 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1267 "name": service_name
,
1268 "type": self
._get
_deep
(data
, ("spec", "type")),
1269 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1270 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP"))
1272 if service
["type"] == "LoadBalancer":
1273 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1274 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1275 service
["external_ip"] = ip_list
1279 async def _exec_inspect_comand(
1280 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1283 Obtains information about a kdu, no cluster (no env)
1288 repo_str
= " --repo {}".format(repo_url
)
1290 idx
= kdu_model
.find("/")
1293 kdu_model
= kdu_model
[idx
:]
1296 if ":" in kdu_model
:
1297 parts
= kdu_model
.split(sep
=":")
1299 version
= "--version {}".format(str(parts
[1]))
1300 kdu_model
= parts
[0]
1302 full_command
= self
._get
_inspect
_command
(inspect_command
, kdu_model
, repo_str
, version
)
1303 output
, _rc
= await self
._local
_async
_exec
(
1304 command
=full_command
, encode_utf8
=True
1309 async def _store_status(
1314 namespace
: str = None,
1315 check_every
: float = 10,
1316 db_dict
: dict = None,
1317 run_once
: bool = False,
1321 await asyncio
.sleep(check_every
)
1322 detailed_status
= await self
._status
_kdu
(
1323 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, namespace
=namespace
,
1326 status
= detailed_status
.get("info").get("description")
1327 self
.log
.debug('KDU {} STATUS: {}.'.format(kdu_instance
, status
))
1328 # write status to db
1329 result
= await self
.write_app_status_to_db(
1332 detailed_status
=str(detailed_status
),
1333 operation
=operation
,
1336 self
.log
.info("Error writing in database. Task exiting...")
1338 except asyncio
.CancelledError
:
1339 self
.log
.debug("Task cancelled")
1341 except Exception as e
:
1342 self
.log
.debug("_store_status exception: {}".format(str(e
)), exc_info
=True)
1348 # params for use in -f file
1349 # returns values file option and filename (in order to delete it at the end)
1350 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1352 if params
and len(params
) > 0:
1353 self
._init
_paths
_env
(
1354 cluster_name
=cluster_id
, create_if_not_exist
=True
1357 def get_random_number():
1358 r
= random
.randrange(start
=1, stop
=99999999)
1366 value
= params
.get(key
)
1367 if "!!yaml" in str(value
):
1368 value
= yaml
.load(value
[7:])
1369 params2
[key
] = value
1371 values_file
= get_random_number() + ".yaml"
1372 with
open(values_file
, "w") as stream
:
1373 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1375 return "-f {}".format(values_file
), values_file
1379 # params for use in --set option
1381 def _params_to_set_option(params
: dict) -> str:
1383 if params
and len(params
) > 0:
1386 value
= params
.get(key
, None)
1387 if value
is not None:
1389 params_str
+= "--set "
1393 params_str
+= "{}={}".format(key
, value
)
1397 def generate_kdu_instance_name(**kwargs
):
1398 chart_name
= kwargs
["kdu_model"]
1399 # check embeded chart (file or dir)
1400 if chart_name
.startswith("/"):
1401 # extract file or directory name
1402 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1404 elif "://" in chart_name
:
1405 # extract last portion of URL
1406 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1409 for c
in chart_name
:
1410 if c
.isalpha() or c
.isnumeric():
1417 # if does not start with alpha character, prefix 'a'
1418 if not name
[0].isalpha():
1423 def get_random_number():
1424 r
= random
.randrange(start
=1, stop
=99999999)
1426 s
= s
.rjust(10, "0")
1429 name
= name
+ get_random_number()