2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
32 from uuid
import uuid4
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
45 service_account
= "osm"
46 _STABLE_REPO_URL
= "https://charts.helm.sh/stable"
52 kubectl_command
: str = "/usr/bin/kubectl",
53 helm_command
: str = "/usr/bin/helm",
56 vca_config
: dict = None,
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 # obtain stable repo url from config or apply default
88 if not vca_config
or not vca_config
.get("stablerepourl"):
89 self
._stable
_repo
_url
= self
._STABLE
_REPO
_URL
91 self
._stable
_repo
_url
= vca_config
.get("stablerepourl")
94 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
99 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(':')
100 return namespace
, cluster_id
103 self
, k8s_creds
: str, namespace
: str = "kube-system", reuse_cluster_uuid
=None, **kwargs
,
106 It prepares a given K8s cluster environment to run Charts
108 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
110 :param namespace: optional namespace to be used for helm. By default,
111 'kube-system' will be used
112 :param reuse_cluster_uuid: existing cluster uuid for reuse
113 :param kwargs: Additional parameters (None yet)
114 :return: uuid of the K8s cluster and True if connector has installed some
115 software in the cluster
116 (on error, an exception will be raised)
119 if reuse_cluster_uuid
:
120 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
121 namespace
= namespace_
or namespace
123 cluster_id
= str(uuid4())
124 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
126 self
.log
.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
))
128 paths
, env
= self
._init
_paths
_env
(
129 cluster_name
=cluster_id
, create_if_not_exist
=True
131 mode
= stat
.S_IRUSR | stat
.S_IWUSR
132 with
open(paths
["kube_config"], "w", mode
) as f
:
134 os
.chmod(paths
["kube_config"], 0o600)
136 # Code with initialization specific of helm version
137 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
139 # sync fs with local data
140 self
.fs
.reverse_sync(from_path
=cluster_id
)
142 self
.log
.info("Cluster {} initialized".format(cluster_id
))
144 return cluster_uuid
, n2vc_installed_sw
147 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
149 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
150 self
.log
.debug("Cluster {}, adding {} repository {}. URL: {}".format(
151 cluster_id
, repo_type
, name
, url
))
154 self
.fs
.sync(from_path
=cluster_id
)
157 paths
, env
= self
._init
_paths
_env
(
158 cluster_name
=cluster_id
, create_if_not_exist
=True
162 command
= "{} repo update".format(
165 self
.log
.debug("updating repo: {}".format(command
))
166 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False, env
=env
)
168 # helm repo add name url
169 command
= "{} repo add {} {}".format(
170 self
._helm
_command
, name
, url
172 self
.log
.debug("adding repo: {}".format(command
))
173 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
176 self
.fs
.reverse_sync(from_path
=cluster_id
)
178 async def repo_list(self
, cluster_uuid
: str) -> list:
180 Get the list of registered repositories
182 :return: list of registered repositories: [ (name, url) .... ]
185 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
186 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
189 self
.fs
.sync(from_path
=cluster_id
)
192 paths
, env
= self
._init
_paths
_env
(
193 cluster_name
=cluster_id
, create_if_not_exist
=True
196 command
= "{} repo list --output yaml".format(
200 # Set exception to false because if there are no repos just want an empty list
201 output
, _rc
= await self
._local
_async
_exec
(
202 command
=command
, raise_exception_on_error
=False, env
=env
206 self
.fs
.reverse_sync(from_path
=cluster_id
)
209 if output
and len(output
) > 0:
210 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
211 # unify format between helm2 and helm3 setting all keys lowercase
212 return self
._lower
_keys
_list
(repos
)
218 async def repo_remove(self
, cluster_uuid
: str, name
: str):
220 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
221 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
224 self
.fs
.sync(from_path
=cluster_id
)
227 paths
, env
= self
._init
_paths
_env
(
228 cluster_name
=cluster_id
, create_if_not_exist
=True
231 command
= "{} repo remove {}".format(
232 self
._helm
_command
, name
234 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
237 self
.fs
.reverse_sync(from_path
=cluster_id
)
240 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False, **kwargs
244 Resets the Kubernetes cluster by removing the helm deployment that represents it.
246 :param cluster_uuid: The UUID of the cluster to reset
247 :param force: Boolean to force the reset
248 :param uninstall_sw: Boolean to force the reset
249 :param kwargs: Additional parameters (None yet)
250 :return: Returns True if successful or raises an exception.
252 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
253 self
.log
.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
254 .format(cluster_id
, uninstall_sw
))
257 self
.fs
.sync(from_path
=cluster_id
)
259 # uninstall releases if needed.
261 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
262 if len(releases
) > 0:
266 kdu_instance
= r
.get("name")
267 chart
= r
.get("chart")
269 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
271 await self
.uninstall(
272 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
274 except Exception as e
:
275 # will not raise exception as it was found
276 # that in some cases of previously installed helm releases it
279 "Error uninstalling release {}: {}".format(kdu_instance
, e
)
283 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
286 uninstall_sw
= False # Allow to remove k8s cluster without removing Tiller
289 await self
._uninstall
_sw
(cluster_id
, namespace
)
291 # delete cluster directory
292 self
.log
.debug("Removing directory {}".format(cluster_id
))
293 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
294 # Remove also local directorio if still exist
295 direct
= self
.fs
.path
+ "/" + cluster_id
296 shutil
.rmtree(direct
, ignore_errors
=True)
300 async def _install_impl(
308 timeout
: float = 300,
310 db_dict
: dict = None,
311 kdu_name
: str = None,
312 namespace
: str = None,
315 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
316 cluster_id
=cluster_id
, params
=params
322 parts
= kdu_model
.split(sep
=":")
324 version
= str(parts
[1])
327 command
= self
._get
_install
_command
(kdu_model
, kdu_instance
, namespace
,
328 params_str
, version
, atomic
, timeout
)
330 self
.log
.debug("installing: {}".format(command
))
333 # exec helm in a task
334 exec_task
= asyncio
.ensure_future(
335 coro_or_future
=self
._local
_async
_exec
(
336 command
=command
, raise_exception_on_error
=False, env
=env
340 # write status in another task
341 status_task
= asyncio
.ensure_future(
342 coro_or_future
=self
._store
_status
(
343 cluster_id
=cluster_id
,
344 kdu_instance
=kdu_instance
,
352 # wait for execution task
353 await asyncio
.wait([exec_task
])
358 output
, rc
= exec_task
.result()
362 output
, rc
= await self
._local
_async
_exec
(
363 command
=command
, raise_exception_on_error
=False, env
=env
366 # remove temporal values yaml file
368 os
.remove(file_to_delete
)
371 await self
._store
_status
(
372 cluster_id
=cluster_id
,
373 kdu_instance
=kdu_instance
,
382 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
384 raise K8sException(msg
)
390 kdu_model
: str = None,
392 timeout
: float = 300,
394 db_dict
: dict = None,
396 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
397 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
400 self
.fs
.sync(from_path
=cluster_id
)
402 # look for instance to obtain namespace
403 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
404 if not instance_info
:
405 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
408 paths
, env
= self
._init
_paths
_env
(
409 cluster_name
=cluster_id
, create_if_not_exist
=True
413 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
414 cluster_id
=cluster_id
, params
=params
420 parts
= kdu_model
.split(sep
=":")
422 version
= str(parts
[1])
425 command
= self
._get
_upgrade
_command
(kdu_model
, kdu_instance
, instance_info
["namespace"],
426 params_str
, version
, atomic
, timeout
)
428 self
.log
.debug("upgrading: {}".format(command
))
432 # exec helm in a task
433 exec_task
= asyncio
.ensure_future(
434 coro_or_future
=self
._local
_async
_exec
(
435 command
=command
, raise_exception_on_error
=False, env
=env
438 # write status in another task
439 status_task
= asyncio
.ensure_future(
440 coro_or_future
=self
._store
_status
(
441 cluster_id
=cluster_id
,
442 kdu_instance
=kdu_instance
,
443 namespace
=instance_info
["namespace"],
450 # wait for execution task
451 await asyncio
.wait([exec_task
])
455 output
, rc
= exec_task
.result()
459 output
, rc
= await self
._local
_async
_exec
(
460 command
=command
, raise_exception_on_error
=False, env
=env
463 # remove temporal values yaml file
465 os
.remove(file_to_delete
)
468 await self
._store
_status
(
469 cluster_id
=cluster_id
,
470 kdu_instance
=kdu_instance
,
471 namespace
=instance_info
["namespace"],
479 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
481 raise K8sException(msg
)
484 self
.fs
.reverse_sync(from_path
=cluster_id
)
486 # return new revision number
487 instance
= await self
.get_instance_info(
488 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
491 revision
= int(instance
.get("revision"))
492 self
.log
.debug("New revision: {}".format(revision
))
502 total_timeout
: float = 1800,
505 raise NotImplementedError("Method not implemented")
507 async def get_scale_count(
513 raise NotImplementedError("Method not implemented")
516 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
519 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
521 "rollback kdu_instance {} to revision {} from cluster {}".format(
522 kdu_instance
, revision
, cluster_id
527 self
.fs
.sync(from_path
=cluster_id
)
529 # look for instance to obtain namespace
530 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
531 if not instance_info
:
532 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
535 paths
, env
= self
._init
_paths
_env
(
536 cluster_name
=cluster_id
, create_if_not_exist
=True
539 command
= self
._get
_rollback
_command
(kdu_instance
, instance_info
["namespace"],
542 self
.log
.debug("rolling_back: {}".format(command
))
544 # exec helm in a task
545 exec_task
= asyncio
.ensure_future(
546 coro_or_future
=self
._local
_async
_exec
(
547 command
=command
, raise_exception_on_error
=False, env
=env
550 # write status in another task
551 status_task
= asyncio
.ensure_future(
552 coro_or_future
=self
._store
_status
(
553 cluster_id
=cluster_id
,
554 kdu_instance
=kdu_instance
,
555 namespace
=instance_info
["namespace"],
557 operation
="rollback",
562 # wait for execution task
563 await asyncio
.wait([exec_task
])
568 output
, rc
= exec_task
.result()
571 await self
._store
_status
(
572 cluster_id
=cluster_id
,
573 kdu_instance
=kdu_instance
,
574 namespace
=instance_info
["namespace"],
576 operation
="rollback",
582 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
584 raise K8sException(msg
)
587 self
.fs
.reverse_sync(from_path
=cluster_id
)
589 # return new revision number
590 instance
= await self
.get_instance_info(
591 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
594 revision
= int(instance
.get("revision"))
595 self
.log
.debug("New revision: {}".format(revision
))
600 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
602 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
603 (this call should happen after all _terminate-config-primitive_ of the VNF
606 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
607 :param kdu_instance: unique name for the KDU instance to be deleted
608 :param kwargs: Additional parameters (None yet)
609 :return: True if successful
612 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
614 "uninstall kdu_instance {} from cluster {}".format(
615 kdu_instance
, cluster_id
620 self
.fs
.sync(from_path
=cluster_id
)
622 # look for instance to obtain namespace
623 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
624 if not instance_info
:
625 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
628 paths
, env
= self
._init
_paths
_env
(
629 cluster_name
=cluster_id
, create_if_not_exist
=True
632 command
= self
._get
_uninstall
_command
(kdu_instance
, instance_info
["namespace"])
633 output
, _rc
= await self
._local
_async
_exec
(
634 command
=command
, raise_exception_on_error
=True, env
=env
638 self
.fs
.reverse_sync(from_path
=cluster_id
)
640 return self
._output
_to
_table
(output
)
642 async def instances_list(self
, cluster_uuid
: str) -> list:
644 returns a list of deployed releases in a cluster
646 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
650 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
651 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
654 self
.fs
.sync(from_path
=cluster_id
)
656 # execute internal command
657 result
= await self
._instances
_list
(cluster_id
)
660 self
.fs
.reverse_sync(from_path
=cluster_id
)
664 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
665 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
666 for instance
in instances
:
667 if instance
.get("name") == kdu_instance
:
669 self
.log
.debug("Instance {} not found".format(kdu_instance
))
672 async def exec_primitive(
674 cluster_uuid
: str = None,
675 kdu_instance
: str = None,
676 primitive_name
: str = None,
677 timeout
: float = 300,
679 db_dict
: dict = None,
682 """Exec primitive (Juju action)
684 :param cluster_uuid: The UUID of the cluster or namespace:cluster
685 :param kdu_instance: The unique name of the KDU instance
686 :param primitive_name: Name of action that will be executed
687 :param timeout: Timeout for action execution
688 :param params: Dictionary of all the parameters needed for the action
689 :db_dict: Dictionary for any additional data
690 :param kwargs: Additional parameters (None yet)
692 :return: Returns the output of the action
695 "KDUs deployed with Helm don't support actions "
696 "different from rollback, upgrade and status"
699 async def get_services(self
,
702 namespace
: str) -> list:
704 Returns a list of services defined for the specified kdu instance.
706 :param cluster_uuid: UUID of a K8s cluster known by OSM
707 :param kdu_instance: unique name for the KDU instance
708 :param namespace: K8s namespace used by the KDU instance
709 :return: If successful, it will return a list of services, Each service
710 can have the following data:
711 - `name` of the service
712 - `type` type of service in the k8 cluster
713 - `ports` List of ports offered by the service, for each port includes at least
715 - `cluster_ip` Internal ip to be used inside k8s cluster
716 - `external_ip` List of external ips (in case they are available)
719 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
721 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
722 cluster_uuid
, kdu_instance
727 self
.fs
.sync(from_path
=cluster_id
)
729 # get list of services names for kdu
730 service_names
= await self
._get
_services
(cluster_id
, kdu_instance
, namespace
)
733 for service
in service_names
:
734 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
735 service_list
.append(service
)
738 self
.fs
.reverse_sync(from_path
=cluster_id
)
742 async def get_service(self
,
745 namespace
: str) -> object:
748 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
749 service_name
, namespace
, cluster_uuid
)
752 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
755 self
.fs
.sync(from_path
=cluster_id
)
757 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
760 self
.fs
.reverse_sync(from_path
=cluster_id
)
764 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
) -> str:
766 This call would retrieve tha current state of a given KDU instance. It would be
767 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
768 values_ of the configuration parameters applied to a given instance. This call
769 would be based on the `status` call.
771 :param cluster_uuid: UUID of a K8s cluster known by OSM
772 :param kdu_instance: unique name for the KDU instance
773 :param kwargs: Additional parameters (None yet)
774 :return: If successful, it will return the following vector of arguments:
775 - K8s `namespace` in the cluster where the KDU lives
776 - `state` of the KDU instance. It can be:
783 - List of `resources` (objects) that this release consists of, sorted by kind,
784 and the status of those resources
785 - Last `deployment_time`.
789 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
790 cluster_uuid
, kdu_instance
794 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
797 self
.fs
.sync(from_path
=cluster_id
)
799 # get instance: needed to obtain namespace
800 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
801 for instance
in instances
:
802 if instance
.get("name") == kdu_instance
:
805 # instance does not exist
806 raise K8sException("Instance name: {} not found in cluster: {}".format(
807 kdu_instance
, cluster_id
))
809 status
= await self
._status
_kdu
(
810 cluster_id
=cluster_id
,
811 kdu_instance
=kdu_instance
,
812 namespace
=instance
["namespace"],
818 self
.fs
.reverse_sync(from_path
=cluster_id
)
822 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
825 "inspect kdu_model values {} from (optional) repo: {}".format(
830 return await self
._exec
_inspect
_comand
(
831 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
834 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
837 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
840 return await self
._exec
_inspect
_comand
(
841 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
844 async def synchronize_repos(self
, cluster_uuid
: str):
846 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
848 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
849 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
851 local_repo_list
= await self
.repo_list(cluster_uuid
)
852 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
854 deleted_repo_list
= []
857 # iterate over the list of repos in the database that should be
858 # added if not present
859 for repo_name
, db_repo
in db_repo_dict
.items():
861 # check if it is already present
862 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
863 repo_id
= db_repo
.get("_id")
864 if curr_repo_url
!= db_repo
["url"]:
866 self
.log
.debug("repo {} url changed, delete and and again".format(
868 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
869 deleted_repo_list
.append(repo_id
)
872 self
.log
.debug("add repo {}".format(db_repo
["name"]))
873 await self
.repo_add(cluster_uuid
, db_repo
["name"], db_repo
["url"])
874 added_repo_dict
[repo_id
] = db_repo
["name"]
875 except Exception as e
:
877 "Error adding repo id: {}, err_msg: {} ".format(
882 # Delete repos that are present but not in nbi_list
883 for repo_name
in local_repo_dict
:
884 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
885 self
.log
.debug("delete repo {}".format(repo_name
))
887 await self
.repo_remove(cluster_uuid
, repo_name
)
888 deleted_repo_list
.append(repo_name
)
889 except Exception as e
:
891 "Error deleting repo, name: {}, err_msg: {}".format(
896 return deleted_repo_list
, added_repo_dict
900 except Exception as e
:
901 # Do not raise errors synchronizing repos
902 self
.log
.error("Error synchronizing repos: {}".format(e
))
903 raise Exception("Error synchronizing repos: {}".format(e
))
905 def _get_db_repos_dict(self
, repo_ids
: list):
907 for repo_id
in repo_ids
:
908 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
909 db_repos_dict
[db_repo
["name"]] = db_repo
913 ####################################################################################
914 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
915 ####################################################################################
919 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
921 Creates and returns base cluster and kube dirs and returns them.
922 Also created helm3 dirs according to new directory specification, paths are
923 not returned but assigned to helm environment variables
925 :param cluster_name: cluster_name
926 :return: Dictionary with config_paths and dictionary with helm environment variables
930 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
932 Implements the helm version dependent cluster initialization
936 async def _instances_list(self
, cluster_id
):
938 Implements the helm version dependent helm instances list
942 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
944 Implements the helm version dependent method to obtain services from a helm instance
948 async def _status_kdu(self
, cluster_id
: str, kdu_instance
: str, namespace
: str = None,
949 show_error_log
: bool = False, return_text
: bool = False):
951 Implements the helm version dependent method to obtain status of a helm instance
955 def _get_install_command(self
, kdu_model
, kdu_instance
, namespace
,
956 params_str
, version
, atomic
, timeout
) -> str:
958 Obtain command to be executed to delete the indicated instance
962 def _get_upgrade_command(self
, kdu_model
, kdu_instance
, namespace
,
963 params_str
, version
, atomic
, timeout
) -> str:
965 Obtain command to be executed to upgrade the indicated instance
969 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
971 Obtain command to be executed to rollback the indicated instance
975 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
977 Obtain command to be executed to delete the indicated instance
981 def _get_inspect_command(self
, show_command
: str, kdu_model
: str, repo_str
: str,
984 Obtain command to be executed to obtain information about the kdu
988 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
990 Method call to uninstall cluster software for helm. This method is dependent
992 For Helm v2 it will be called when Tiller must be uninstalled
993 For Helm v3 it does nothing and does not need to be callled
997 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
999 Obtains the cluster repos identifiers
1003 ####################################################################################
1004 ################################### P R I V A T E ##################################
1005 ####################################################################################
1009 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1010 if os
.path
.exists(filename
):
1013 msg
= "File {} does not exist".format(filename
)
1014 if exception_if_not_exists
:
1015 raise K8sException(msg
)
1018 def _remove_multiple_spaces(strobj
):
1019 strobj
= strobj
.strip()
1020 while " " in strobj
:
1021 strobj
= strobj
.replace(" ", " ")
1025 def _output_to_lines(output
: str) -> list:
1026 output_lines
= list()
1027 lines
= output
.splitlines(keepends
=False)
1031 output_lines
.append(line
)
1035 def _output_to_table(output
: str) -> list:
1036 output_table
= list()
1037 lines
= output
.splitlines(keepends
=False)
1039 line
= line
.replace("\t", " ")
1041 output_table
.append(line_list
)
1042 cells
= line
.split(sep
=" ")
1046 line_list
.append(cell
)
1050 def _parse_services(output
: str) -> list:
1051 lines
= output
.splitlines(keepends
=False)
1054 line
= line
.replace("\t", " ")
1055 cells
= line
.split(sep
=" ")
1056 if len(cells
) > 0 and cells
[0].startswith("service/"):
1057 elems
= cells
[0].split(sep
="/")
1059 services
.append(elems
[1])
1063 def _get_deep(dictionary
: dict, members
: tuple):
1068 value
= target
.get(m
)
1077 # find key:value in several lines
1079 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1080 for line
in p_lines
:
1082 if line
.startswith(p_key
+ ":"):
1083 parts
= line
.split(":")
1084 the_value
= parts
[1].strip()
1092 def _lower_keys_list(input_list
: list):
1094 Transform the keys in a list of dictionaries to lower case and returns a new list
1098 for dictionary
in input_list
:
1099 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1100 new_list
.append(new_dict
)
1103 def _local_exec(self
, command
: str) -> (str, int):
1104 command
= self
._remove
_multiple
_spaces
(command
)
1105 self
.log
.debug("Executing sync local command: {}".format(command
))
1106 # raise exception if fails
1109 output
= subprocess
.check_output(
1110 command
, shell
=True, universal_newlines
=True
1113 self
.log
.debug(output
)
1117 return output
, return_code
1119 async def _local_async_exec(
1122 raise_exception_on_error
: bool = False,
1123 show_error_log
: bool = True,
1124 encode_utf8
: bool = False,
1128 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1129 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1132 command
= shlex
.split(command
)
1134 environ
= os
.environ
.copy()
1139 process
= await asyncio
.create_subprocess_exec(
1140 *command
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1144 # wait for command terminate
1145 stdout
, stderr
= await process
.communicate()
1147 return_code
= process
.returncode
1151 output
= stdout
.decode("utf-8").strip()
1152 # output = stdout.decode()
1154 output
= stderr
.decode("utf-8").strip()
1155 # output = stderr.decode()
1157 if return_code
!= 0 and show_error_log
:
1159 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1162 self
.log
.debug("Return code: {}".format(return_code
))
1164 if raise_exception_on_error
and return_code
!= 0:
1165 raise K8sException(output
)
1168 output
= output
.encode("utf-8").strip()
1169 output
= str(output
).replace("\\n", "\n")
1171 return output
, return_code
1173 except asyncio
.CancelledError
:
1175 except K8sException
:
1177 except Exception as e
:
1178 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1180 if raise_exception_on_error
:
1181 raise K8sException(e
) from e
1185 async def _local_async_exec_pipe(self
,
1188 raise_exception_on_error
: bool = True,
1189 show_error_log
: bool = True,
1190 encode_utf8
: bool = False,
1193 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1194 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1195 command
= "{} | {}".format(command1
, command2
)
1196 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1199 command1
= shlex
.split(command1
)
1200 command2
= shlex
.split(command2
)
1202 environ
= os
.environ
.copy()
1207 read
, write
= os
.pipe()
1208 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1210 process_2
= await asyncio
.create_subprocess_exec(*command2
, stdin
=read
,
1211 stdout
=asyncio
.subprocess
.PIPE
,
1214 stdout
, stderr
= await process_2
.communicate()
1216 return_code
= process_2
.returncode
1220 output
= stdout
.decode("utf-8").strip()
1221 # output = stdout.decode()
1223 output
= stderr
.decode("utf-8").strip()
1224 # output = stderr.decode()
1226 if return_code
!= 0 and show_error_log
:
1228 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1231 self
.log
.debug("Return code: {}".format(return_code
))
1233 if raise_exception_on_error
and return_code
!= 0:
1234 raise K8sException(output
)
1237 output
= output
.encode("utf-8").strip()
1238 output
= str(output
).replace("\\n", "\n")
1240 return output
, return_code
1241 except asyncio
.CancelledError
:
1243 except K8sException
:
1245 except Exception as e
:
1246 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1248 if raise_exception_on_error
:
1249 raise K8sException(e
) from e
1253 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1255 Obtains the data of the specified service in the k8cluster.
1257 :param cluster_id: id of a K8s cluster known by OSM
1258 :param service_name: name of the K8s service in the specified namespace
1259 :param namespace: K8s namespace used by the KDU instance
1260 :return: If successful, it will return a service with the following data:
1261 - `name` of the service
1262 - `type` type of service in the k8 cluster
1263 - `ports` List of ports offered by the service, for each port includes at least
1264 name, port, protocol
1265 - `cluster_ip` Internal ip to be used inside k8s cluster
1266 - `external_ip` List of external ips (in case they are available)
1270 paths
, env
= self
._init
_paths
_env
(
1271 cluster_name
=cluster_id
, create_if_not_exist
=True
1274 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1275 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1278 output
, _rc
= await self
._local
_async
_exec
(
1279 command
=command
, raise_exception_on_error
=True, env
=env
1282 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1285 "name": service_name
,
1286 "type": self
._get
_deep
(data
, ("spec", "type")),
1287 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1288 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP"))
1290 if service
["type"] == "LoadBalancer":
1291 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1292 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1293 service
["external_ip"] = ip_list
1297 async def _exec_inspect_comand(
1298 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1301 Obtains information about a kdu, no cluster (no env)
1306 repo_str
= " --repo {}".format(repo_url
)
1308 idx
= kdu_model
.find("/")
1311 kdu_model
= kdu_model
[idx
:]
1314 if ":" in kdu_model
:
1315 parts
= kdu_model
.split(sep
=":")
1317 version
= "--version {}".format(str(parts
[1]))
1318 kdu_model
= parts
[0]
1320 full_command
= self
._get
_inspect
_command
(inspect_command
, kdu_model
, repo_str
, version
)
1321 output
, _rc
= await self
._local
_async
_exec
(
1322 command
=full_command
, encode_utf8
=True
1327 async def _store_status(
1332 namespace
: str = None,
1333 check_every
: float = 10,
1334 db_dict
: dict = None,
1335 run_once
: bool = False,
1339 await asyncio
.sleep(check_every
)
1340 detailed_status
= await self
._status
_kdu
(
1341 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, namespace
=namespace
,
1344 status
= detailed_status
.get("info").get("description")
1345 self
.log
.debug('KDU {} STATUS: {}.'.format(kdu_instance
, status
))
1346 # write status to db
1347 result
= await self
.write_app_status_to_db(
1350 detailed_status
=str(detailed_status
),
1351 operation
=operation
,
1354 self
.log
.info("Error writing in database. Task exiting...")
1356 except asyncio
.CancelledError
:
1357 self
.log
.debug("Task cancelled")
1359 except Exception as e
:
1360 self
.log
.debug("_store_status exception: {}".format(str(e
)), exc_info
=True)
1366 # params for use in -f file
1367 # returns values file option and filename (in order to delete it at the end)
1368 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1370 if params
and len(params
) > 0:
1371 self
._init
_paths
_env
(
1372 cluster_name
=cluster_id
, create_if_not_exist
=True
1375 def get_random_number():
1376 r
= random
.randrange(start
=1, stop
=99999999)
1384 value
= params
.get(key
)
1385 if "!!yaml" in str(value
):
1386 value
= yaml
.load(value
[7:])
1387 params2
[key
] = value
1389 values_file
= get_random_number() + ".yaml"
1390 with
open(values_file
, "w") as stream
:
1391 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1393 return "-f {}".format(values_file
), values_file
1397 # params for use in --set option
1399 def _params_to_set_option(params
: dict) -> str:
1401 if params
and len(params
) > 0:
1404 value
= params
.get(key
, None)
1405 if value
is not None:
1407 params_str
+= "--set "
1411 params_str
+= "{}={}".format(key
, value
)
1415 def generate_kdu_instance_name(**kwargs
):
1416 chart_name
= kwargs
["kdu_model"]
1417 # check embeded chart (file or dir)
1418 if chart_name
.startswith("/"):
1419 # extract file or directory name
1420 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1422 elif "://" in chart_name
:
1423 # extract last portion of URL
1424 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1427 for c
in chart_name
:
1428 if c
.isalpha() or c
.isnumeric():
1435 # if does not start with alpha character, prefix 'a'
1436 if not name
[0].isalpha():
1441 def get_random_number():
1442 r
= random
.randrange(start
=1, stop
=99999999)
1444 s
= s
.rjust(10, "0")
1447 name
= name
+ get_random_number()