82ae0674aaafc66c423bbf8aed68c70c4dd95edb
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
94 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
99 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(":")
100 return namespace
, cluster_id
105 namespace
: str = "kube-system",
106 reuse_cluster_uuid
=None,
110 It prepares a given K8s cluster environment to run Charts
112 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
114 :param namespace: optional namespace to be used for helm. By default,
115 'kube-system' will be used
116 :param reuse_cluster_uuid: existing cluster uuid for reuse
117 :param kwargs: Additional parameters (None yet)
118 :return: uuid of the K8s cluster and True if connector has installed some
119 software in the cluster
120 (on error, an exception will be raised)
123 if reuse_cluster_uuid
:
124 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
125 namespace
= namespace_
or namespace
127 cluster_id
= str(uuid4())
128 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
131 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
134 paths
, env
= self
._init
_paths
_env
(
135 cluster_name
=cluster_id
, create_if_not_exist
=True
137 mode
= stat
.S_IRUSR | stat
.S_IWUSR
138 with
open(paths
["kube_config"], "w", mode
) as f
:
140 os
.chmod(paths
["kube_config"], 0o600)
142 # Code with initialization specific of helm version
143 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
145 # sync fs with local data
146 self
.fs
.reverse_sync(from_path
=cluster_id
)
148 self
.log
.info("Cluster {} initialized".format(cluster_id
))
150 return cluster_uuid
, n2vc_installed_sw
153 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
155 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_id
, repo_type
, name
, url
163 paths
, env
= self
._init
_paths
_env
(
164 cluster_name
=cluster_id
, create_if_not_exist
=True
168 self
.fs
.sync(from_path
=cluster_id
)
170 # helm repo add name url
171 command
= "env KUBECONFIG={} {} repo add {} {}".format(
172 paths
["kube_config"], self
._helm
_command
, name
, url
174 self
.log
.debug("adding repo: {}".format(command
))
175 await self
._local
_async
_exec
(
176 command
=command
, raise_exception_on_error
=True, env
=env
180 command
= "env KUBECONFIG={} {} repo update {}".format(
181 paths
["kube_config"], self
._helm
_command
, name
183 self
.log
.debug("updating repo: {}".format(command
))
184 await self
._local
_async
_exec
(
185 command
=command
, raise_exception_on_error
=False, env
=env
189 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
191 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
193 "Cluster {}, updating {} repository {}".format(
194 cluster_uuid
, repo_type
, name
199 paths
, env
= self
._init
_paths
_env
(
200 cluster_name
=cluster_uuid
, create_if_not_exist
=True
204 self
.fs
.sync(from_path
=cluster_uuid
)
207 command
= "{} repo update {}".format(self
._helm
_command
, name
)
208 self
.log
.debug("updating repo: {}".format(command
))
209 await self
._local
_async
_exec
(
210 command
=command
, raise_exception_on_error
=False, env
=env
214 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
216 async def repo_list(self
, cluster_uuid
: str) -> list:
218 Get the list of registered repositories
220 :return: list of registered repositories: [ (name, url) .... ]
223 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
224 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
227 paths
, env
= self
._init
_paths
_env
(
228 cluster_name
=cluster_id
, create_if_not_exist
=True
232 self
.fs
.sync(from_path
=cluster_id
)
234 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
235 paths
["kube_config"], self
._helm
_command
238 # Set exception to false because if there are no repos just want an empty list
239 output
, _rc
= await self
._local
_async
_exec
(
240 command
=command
, raise_exception_on_error
=False, env
=env
244 self
.fs
.reverse_sync(from_path
=cluster_id
)
247 if output
and len(output
) > 0:
248 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
249 # unify format between helm2 and helm3 setting all keys lowercase
250 return self
._lower
_keys
_list
(repos
)
256 async def repo_remove(self
, cluster_uuid
: str, name
: str):
258 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
259 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
262 paths
, env
= self
._init
_paths
_env
(
263 cluster_name
=cluster_id
, create_if_not_exist
=True
267 self
.fs
.sync(from_path
=cluster_id
)
269 command
= "env KUBECONFIG={} {} repo remove {}".format(
270 paths
["kube_config"], self
._helm
_command
, name
272 await self
._local
_async
_exec
(
273 command
=command
, raise_exception_on_error
=True, env
=env
277 self
.fs
.reverse_sync(from_path
=cluster_id
)
283 uninstall_sw
: bool = False,
288 Resets the Kubernetes cluster by removing the helm deployment that represents it.
290 :param cluster_uuid: The UUID of the cluster to reset
291 :param force: Boolean to force the reset
292 :param uninstall_sw: Boolean to force the reset
293 :param kwargs: Additional parameters (None yet)
294 :return: Returns True if successful or raises an exception.
296 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
298 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
299 cluster_id
, uninstall_sw
304 self
.fs
.sync(from_path
=cluster_id
)
306 # uninstall releases if needed.
308 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
309 if len(releases
) > 0:
313 kdu_instance
= r
.get("name")
314 chart
= r
.get("chart")
316 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
318 await self
.uninstall(
319 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
321 except Exception as e
:
322 # will not raise exception as it was found
323 # that in some cases of previously installed helm releases it
326 "Error uninstalling release {}: {}".format(
332 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
336 False # Allow to remove k8s cluster without removing Tiller
340 await self
._uninstall
_sw
(cluster_id
, namespace
)
342 # delete cluster directory
343 self
.log
.debug("Removing directory {}".format(cluster_id
))
344 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
345 # Remove also local directorio if still exist
346 direct
= self
.fs
.path
+ "/" + cluster_id
347 shutil
.rmtree(direct
, ignore_errors
=True)
351 async def _install_impl(
359 timeout
: float = 300,
361 db_dict
: dict = None,
362 kdu_name
: str = None,
363 namespace
: str = None,
366 paths
, env
= self
._init
_paths
_env
(
367 cluster_name
=cluster_id
, create_if_not_exist
=True
371 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
372 cluster_id
=cluster_id
, params
=params
378 parts
= kdu_model
.split(sep
=":")
380 version
= str(parts
[1])
383 repo
= self
._split
_repo
(kdu_model
)
385 self
.repo_update(cluster_id
, repo
)
387 command
= self
._get
_install
_command
(
395 paths
["kube_config"],
398 self
.log
.debug("installing: {}".format(command
))
401 # exec helm in a task
402 exec_task
= asyncio
.ensure_future(
403 coro_or_future
=self
._local
_async
_exec
(
404 command
=command
, raise_exception_on_error
=False, env
=env
408 # write status in another task
409 status_task
= asyncio
.ensure_future(
410 coro_or_future
=self
._store
_status
(
411 cluster_id
=cluster_id
,
412 kdu_instance
=kdu_instance
,
419 # wait for execution task
420 await asyncio
.wait([exec_task
])
425 output
, rc
= exec_task
.result()
429 output
, rc
= await self
._local
_async
_exec
(
430 command
=command
, raise_exception_on_error
=False, env
=env
433 # remove temporal values yaml file
435 os
.remove(file_to_delete
)
438 await self
._store
_status
(
439 cluster_id
=cluster_id
,
440 kdu_instance
=kdu_instance
,
447 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
449 raise K8sException(msg
)
455 kdu_model
: str = None,
457 timeout
: float = 300,
459 db_dict
: dict = None,
461 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
462 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
465 self
.fs
.sync(from_path
=cluster_id
)
467 # look for instance to obtain namespace
468 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
469 if not instance_info
:
470 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
473 paths
, env
= self
._init
_paths
_env
(
474 cluster_name
=cluster_id
, create_if_not_exist
=True
478 self
.fs
.sync(from_path
=cluster_id
)
481 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
482 cluster_id
=cluster_id
, params
=params
488 parts
= kdu_model
.split(sep
=":")
490 version
= str(parts
[1])
493 repo
= self
._split
_repo
(kdu_model
)
495 self
.repo_update(cluster_uuid
, repo
)
497 command
= self
._get
_upgrade
_command
(
500 instance_info
["namespace"],
505 paths
["kube_config"],
508 self
.log
.debug("upgrading: {}".format(command
))
512 # exec helm in a task
513 exec_task
= asyncio
.ensure_future(
514 coro_or_future
=self
._local
_async
_exec
(
515 command
=command
, raise_exception_on_error
=False, env
=env
518 # write status in another task
519 status_task
= asyncio
.ensure_future(
520 coro_or_future
=self
._store
_status
(
521 cluster_id
=cluster_id
,
522 kdu_instance
=kdu_instance
,
523 namespace
=instance_info
["namespace"],
529 # wait for execution task
530 await asyncio
.wait([exec_task
])
534 output
, rc
= exec_task
.result()
538 output
, rc
= await self
._local
_async
_exec
(
539 command
=command
, raise_exception_on_error
=False, env
=env
542 # remove temporal values yaml file
544 os
.remove(file_to_delete
)
547 await self
._store
_status
(
548 cluster_id
=cluster_id
,
549 kdu_instance
=kdu_instance
,
550 namespace
=instance_info
["namespace"],
556 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
558 raise K8sException(msg
)
561 self
.fs
.reverse_sync(from_path
=cluster_id
)
563 # return new revision number
564 instance
= await self
.get_instance_info(
565 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
568 revision
= int(instance
.get("revision"))
569 self
.log
.debug("New revision: {}".format(revision
))
579 total_timeout
: float = 1800,
582 raise NotImplementedError("Method not implemented")
584 async def get_scale_count(
590 raise NotImplementedError("Method not implemented")
593 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
596 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
598 "rollback kdu_instance {} to revision {} from cluster {}".format(
599 kdu_instance
, revision
, cluster_id
604 self
.fs
.sync(from_path
=cluster_id
)
606 # look for instance to obtain namespace
607 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
608 if not instance_info
:
609 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
612 paths
, env
= self
._init
_paths
_env
(
613 cluster_name
=cluster_id
, create_if_not_exist
=True
617 self
.fs
.sync(from_path
=cluster_id
)
619 command
= self
._get
_rollback
_command
(
620 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
623 self
.log
.debug("rolling_back: {}".format(command
))
625 # exec helm in a task
626 exec_task
= asyncio
.ensure_future(
627 coro_or_future
=self
._local
_async
_exec
(
628 command
=command
, raise_exception_on_error
=False, env
=env
631 # write status in another task
632 status_task
= asyncio
.ensure_future(
633 coro_or_future
=self
._store
_status
(
634 cluster_id
=cluster_id
,
635 kdu_instance
=kdu_instance
,
636 namespace
=instance_info
["namespace"],
638 operation
="rollback",
642 # wait for execution task
643 await asyncio
.wait([exec_task
])
648 output
, rc
= exec_task
.result()
651 await self
._store
_status
(
652 cluster_id
=cluster_id
,
653 kdu_instance
=kdu_instance
,
654 namespace
=instance_info
["namespace"],
656 operation
="rollback",
660 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
662 raise K8sException(msg
)
665 self
.fs
.reverse_sync(from_path
=cluster_id
)
667 # return new revision number
668 instance
= await self
.get_instance_info(
669 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
672 revision
= int(instance
.get("revision"))
673 self
.log
.debug("New revision: {}".format(revision
))
678 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
680 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
681 (this call should happen after all _terminate-config-primitive_ of the VNF
684 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
685 :param kdu_instance: unique name for the KDU instance to be deleted
686 :param kwargs: Additional parameters (None yet)
687 :return: True if successful
690 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
692 "uninstall kdu_instance {} from cluster {}".format(kdu_instance
, cluster_id
)
696 self
.fs
.sync(from_path
=cluster_id
)
698 # look for instance to obtain namespace
699 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
700 if not instance_info
:
701 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
704 paths
, env
= self
._init
_paths
_env
(
705 cluster_name
=cluster_id
, create_if_not_exist
=True
709 self
.fs
.sync(from_path
=cluster_id
)
711 command
= self
._get
_uninstall
_command
(
712 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
714 output
, _rc
= await self
._local
_async
_exec
(
715 command
=command
, raise_exception_on_error
=True, env
=env
719 self
.fs
.reverse_sync(from_path
=cluster_id
)
721 return self
._output
_to
_table
(output
)
723 async def instances_list(self
, cluster_uuid
: str) -> list:
725 returns a list of deployed releases in a cluster
727 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
731 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
732 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
735 self
.fs
.sync(from_path
=cluster_id
)
737 # execute internal command
738 result
= await self
._instances
_list
(cluster_id
)
741 self
.fs
.reverse_sync(from_path
=cluster_id
)
745 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
746 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
747 for instance
in instances
:
748 if instance
.get("name") == kdu_instance
:
750 self
.log
.debug("Instance {} not found".format(kdu_instance
))
753 async def exec_primitive(
755 cluster_uuid
: str = None,
756 kdu_instance
: str = None,
757 primitive_name
: str = None,
758 timeout
: float = 300,
760 db_dict
: dict = None,
763 """Exec primitive (Juju action)
765 :param cluster_uuid: The UUID of the cluster or namespace:cluster
766 :param kdu_instance: The unique name of the KDU instance
767 :param primitive_name: Name of action that will be executed
768 :param timeout: Timeout for action execution
769 :param params: Dictionary of all the parameters needed for the action
770 :db_dict: Dictionary for any additional data
771 :param kwargs: Additional parameters (None yet)
773 :return: Returns the output of the action
776 "KDUs deployed with Helm don't support actions "
777 "different from rollback, upgrade and status"
780 async def get_services(
781 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
784 Returns a list of services defined for the specified kdu instance.
786 :param cluster_uuid: UUID of a K8s cluster known by OSM
787 :param kdu_instance: unique name for the KDU instance
788 :param namespace: K8s namespace used by the KDU instance
789 :return: If successful, it will return a list of services, Each service
790 can have the following data:
791 - `name` of the service
792 - `type` type of service in the k8 cluster
793 - `ports` List of ports offered by the service, for each port includes at least
795 - `cluster_ip` Internal ip to be used inside k8s cluster
796 - `external_ip` List of external ips (in case they are available)
799 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
801 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
802 cluster_uuid
, kdu_instance
807 paths
, env
= self
._init
_paths
_env
(
808 cluster_name
=cluster_id
, create_if_not_exist
=True
812 self
.fs
.sync(from_path
=cluster_id
)
814 # get list of services names for kdu
815 service_names
= await self
._get
_services
(
816 cluster_id
, kdu_instance
, namespace
, paths
["kube_config"]
820 for service
in service_names
:
821 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
822 service_list
.append(service
)
825 self
.fs
.reverse_sync(from_path
=cluster_id
)
829 async def get_service(
830 self
, cluster_uuid
: str, service_name
: str, namespace
: str
834 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
835 service_name
, namespace
, cluster_uuid
839 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
842 self
.fs
.sync(from_path
=cluster_id
)
844 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
847 self
.fs
.reverse_sync(from_path
=cluster_id
)
851 async def status_kdu(
852 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
853 ) -> Union
[str, dict]:
855 This call would retrieve tha current state of a given KDU instance. It would be
856 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
857 values_ of the configuration parameters applied to a given instance. This call
858 would be based on the `status` call.
860 :param cluster_uuid: UUID of a K8s cluster known by OSM
861 :param kdu_instance: unique name for the KDU instance
862 :param kwargs: Additional parameters (None yet)
863 :param yaml_format: if the return shall be returned as an YAML string or as a
865 :return: If successful, it will return the following vector of arguments:
866 - K8s `namespace` in the cluster where the KDU lives
867 - `state` of the KDU instance. It can be:
874 - List of `resources` (objects) that this release consists of, sorted by kind,
875 and the status of those resources
876 - Last `deployment_time`.
880 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
881 cluster_uuid
, kdu_instance
885 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
888 self
.fs
.sync(from_path
=cluster_id
)
890 # get instance: needed to obtain namespace
891 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
892 for instance
in instances
:
893 if instance
.get("name") == kdu_instance
:
896 # instance does not exist
898 "Instance name: {} not found in cluster: {}".format(
899 kdu_instance
, cluster_id
903 status
= await self
._status
_kdu
(
904 cluster_id
=cluster_id
,
905 kdu_instance
=kdu_instance
,
906 namespace
=instance
["namespace"],
907 yaml_format
=yaml_format
,
912 self
.fs
.reverse_sync(from_path
=cluster_id
)
916 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
919 "inspect kdu_model values {} from (optional) repo: {}".format(
924 return await self
._exec
_inspect
_comand
(
925 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
928 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
931 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
934 return await self
._exec
_inspect
_comand
(
935 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
938 async def synchronize_repos(self
, cluster_uuid
: str):
940 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
942 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
943 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
945 local_repo_list
= await self
.repo_list(cluster_uuid
)
946 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
948 deleted_repo_list
= []
951 # iterate over the list of repos in the database that should be
952 # added if not present
953 for repo_name
, db_repo
in db_repo_dict
.items():
955 # check if it is already present
956 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
957 repo_id
= db_repo
.get("_id")
958 if curr_repo_url
!= db_repo
["url"]:
961 "repo {} url changed, delete and and again".format(
965 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
966 deleted_repo_list
.append(repo_id
)
969 self
.log
.debug("add repo {}".format(db_repo
["name"]))
971 cluster_uuid
, db_repo
["name"], db_repo
["url"]
973 added_repo_dict
[repo_id
] = db_repo
["name"]
974 except Exception as e
:
976 "Error adding repo id: {}, err_msg: {} ".format(
981 # Delete repos that are present but not in nbi_list
982 for repo_name
in local_repo_dict
:
983 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
984 self
.log
.debug("delete repo {}".format(repo_name
))
986 await self
.repo_remove(cluster_uuid
, repo_name
)
987 deleted_repo_list
.append(repo_name
)
988 except Exception as e
:
990 "Error deleting repo, name: {}, err_msg: {}".format(
995 return deleted_repo_list
, added_repo_dict
999 except Exception as e
:
1000 # Do not raise errors synchronizing repos
1001 self
.log
.error("Error synchronizing repos: {}".format(e
))
1002 raise Exception("Error synchronizing repos: {}".format(e
))
1004 def _get_db_repos_dict(self
, repo_ids
: list):
1006 for repo_id
in repo_ids
:
1007 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1008 db_repos_dict
[db_repo
["name"]] = db_repo
1009 return db_repos_dict
1012 ####################################################################################
1013 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1014 ####################################################################################
1018 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1020 Creates and returns base cluster and kube dirs and returns them.
1021 Also created helm3 dirs according to new directory specification, paths are
1022 not returned but assigned to helm environment variables
1024 :param cluster_name: cluster_name
1025 :return: Dictionary with config_paths and dictionary with helm environment variables
1029 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1031 Implements the helm version dependent cluster initialization
1035 async def _instances_list(self
, cluster_id
):
1037 Implements the helm version dependent helm instances list
1041 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1043 Implements the helm version dependent method to obtain services from a helm instance
1047 async def _status_kdu(
1051 namespace
: str = None,
1052 yaml_format
: bool = False,
1053 show_error_log
: bool = False,
1054 ) -> Union
[str, dict]:
1056 Implements the helm version dependent method to obtain status of a helm instance
1060 def _get_install_command(
1072 Obtain command to be executed to delete the indicated instance
1076 def _get_upgrade_command(
1088 Obtain command to be executed to upgrade the indicated instance
1092 def _get_rollback_command(
1093 self
, kdu_instance
, namespace
, revision
, kubeconfig
1096 Obtain command to be executed to rollback the indicated instance
1100 def _get_uninstall_command(
1101 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1104 Obtain command to be executed to delete the indicated instance
1108 def _get_inspect_command(
1109 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1112 Obtain command to be executed to obtain information about the kdu
1116 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1118 Method call to uninstall cluster software for helm. This method is dependent
1120 For Helm v2 it will be called when Tiller must be uninstalled
1121 For Helm v3 it does nothing and does not need to be callled
1125 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1127 Obtains the cluster repos identifiers
1131 ####################################################################################
1132 ################################### P R I V A T E ##################################
1133 ####################################################################################
1137 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1138 if os
.path
.exists(filename
):
1141 msg
= "File {} does not exist".format(filename
)
1142 if exception_if_not_exists
:
1143 raise K8sException(msg
)
1146 def _remove_multiple_spaces(strobj
):
1147 strobj
= strobj
.strip()
1148 while " " in strobj
:
1149 strobj
= strobj
.replace(" ", " ")
1153 def _output_to_lines(output
: str) -> list:
1154 output_lines
= list()
1155 lines
= output
.splitlines(keepends
=False)
1159 output_lines
.append(line
)
1163 def _output_to_table(output
: str) -> list:
1164 output_table
= list()
1165 lines
= output
.splitlines(keepends
=False)
1167 line
= line
.replace("\t", " ")
1169 output_table
.append(line_list
)
1170 cells
= line
.split(sep
=" ")
1174 line_list
.append(cell
)
1178 def _parse_services(output
: str) -> list:
1179 lines
= output
.splitlines(keepends
=False)
1182 line
= line
.replace("\t", " ")
1183 cells
= line
.split(sep
=" ")
1184 if len(cells
) > 0 and cells
[0].startswith("service/"):
1185 elems
= cells
[0].split(sep
="/")
1187 services
.append(elems
[1])
1191 def _get_deep(dictionary
: dict, members
: tuple):
1196 value
= target
.get(m
)
1205 # find key:value in several lines
1207 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1208 for line
in p_lines
:
1210 if line
.startswith(p_key
+ ":"):
1211 parts
= line
.split(":")
1212 the_value
= parts
[1].strip()
1220 def _lower_keys_list(input_list
: list):
1222 Transform the keys in a list of dictionaries to lower case and returns a new list
1227 for dictionary
in input_list
:
1228 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1229 new_list
.append(new_dict
)
1232 async def _local_async_exec(
1235 raise_exception_on_error
: bool = False,
1236 show_error_log
: bool = True,
1237 encode_utf8
: bool = False,
1241 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1243 "Executing async local command: {}, env: {}".format(command
, env
)
1247 command
= shlex
.split(command
)
1249 environ
= os
.environ
.copy()
1254 process
= await asyncio
.create_subprocess_exec(
1256 stdout
=asyncio
.subprocess
.PIPE
,
1257 stderr
=asyncio
.subprocess
.PIPE
,
1261 # wait for command terminate
1262 stdout
, stderr
= await process
.communicate()
1264 return_code
= process
.returncode
1268 output
= stdout
.decode("utf-8").strip()
1269 # output = stdout.decode()
1271 output
= stderr
.decode("utf-8").strip()
1272 # output = stderr.decode()
1274 if return_code
!= 0 and show_error_log
:
1276 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1279 self
.log
.debug("Return code: {}".format(return_code
))
1281 if raise_exception_on_error
and return_code
!= 0:
1282 raise K8sException(output
)
1285 output
= output
.encode("utf-8").strip()
1286 output
= str(output
).replace("\\n", "\n")
1288 return output
, return_code
1290 except asyncio
.CancelledError
:
1292 except K8sException
:
1294 except Exception as e
:
1295 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1297 if raise_exception_on_error
:
1298 raise K8sException(e
) from e
1302 async def _local_async_exec_pipe(
1306 raise_exception_on_error
: bool = True,
1307 show_error_log
: bool = True,
1308 encode_utf8
: bool = False,
1312 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1313 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1314 command
= "{} | {}".format(command1
, command2
)
1316 "Executing async local command: {}, env: {}".format(command
, env
)
1320 command1
= shlex
.split(command1
)
1321 command2
= shlex
.split(command2
)
1323 environ
= os
.environ
.copy()
1328 read
, write
= os
.pipe()
1329 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1331 process_2
= await asyncio
.create_subprocess_exec(
1332 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1335 stdout
, stderr
= await process_2
.communicate()
1337 return_code
= process_2
.returncode
1341 output
= stdout
.decode("utf-8").strip()
1342 # output = stdout.decode()
1344 output
= stderr
.decode("utf-8").strip()
1345 # output = stderr.decode()
1347 if return_code
!= 0 and show_error_log
:
1349 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1352 self
.log
.debug("Return code: {}".format(return_code
))
1354 if raise_exception_on_error
and return_code
!= 0:
1355 raise K8sException(output
)
1358 output
= output
.encode("utf-8").strip()
1359 output
= str(output
).replace("\\n", "\n")
1361 return output
, return_code
1362 except asyncio
.CancelledError
:
1364 except K8sException
:
1366 except Exception as e
:
1367 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1369 if raise_exception_on_error
:
1370 raise K8sException(e
) from e
1374 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1376 Obtains the data of the specified service in the k8cluster.
1378 :param cluster_id: id of a K8s cluster known by OSM
1379 :param service_name: name of the K8s service in the specified namespace
1380 :param namespace: K8s namespace used by the KDU instance
1381 :return: If successful, it will return a service with the following data:
1382 - `name` of the service
1383 - `type` type of service in the k8 cluster
1384 - `ports` List of ports offered by the service, for each port includes at least
1385 name, port, protocol
1386 - `cluster_ip` Internal ip to be used inside k8s cluster
1387 - `external_ip` List of external ips (in case they are available)
1391 paths
, env
= self
._init
_paths
_env
(
1392 cluster_name
=cluster_id
, create_if_not_exist
=True
1395 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1396 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1399 output
, _rc
= await self
._local
_async
_exec
(
1400 command
=command
, raise_exception_on_error
=True, env
=env
1403 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1406 "name": service_name
,
1407 "type": self
._get
_deep
(data
, ("spec", "type")),
1408 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1409 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1411 if service
["type"] == "LoadBalancer":
1412 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1413 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1414 service
["external_ip"] = ip_list
1418 async def _exec_inspect_comand(
1419 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1422 Obtains information about a kdu, no cluster (no env)
1427 repo_str
= " --repo {}".format(repo_url
)
1429 idx
= kdu_model
.find("/")
1432 kdu_model
= kdu_model
[idx
:]
1435 if ":" in kdu_model
:
1436 parts
= kdu_model
.split(sep
=":")
1438 version
= "--version {}".format(str(parts
[1]))
1439 kdu_model
= parts
[0]
1441 full_command
= self
._get
_inspect
_command
(
1442 inspect_command
, kdu_model
, repo_str
, version
1444 output
, _rc
= await self
._local
_async
_exec
(
1445 command
=full_command
, encode_utf8
=True
1450 async def _store_status(
1455 namespace
: str = None,
1456 db_dict
: dict = None,
1459 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1461 :param cluster_id (str): the cluster where the KDU instance is deployed
1462 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1463 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1464 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1465 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1466 values for the keys:
1467 - "collection": The Mongo DB collection to write to
1468 - "filter": The query filter to use in the update process
1469 - "path": The dot separated keys which targets the object to be updated
1474 detailed_status
= await self
._status
_kdu
(
1475 cluster_id
=cluster_id
,
1476 kdu_instance
=kdu_instance
,
1478 namespace
=namespace
,
1481 status
= detailed_status
.get("info").get("description")
1482 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1484 # write status to db
1485 result
= await self
.write_app_status_to_db(
1488 detailed_status
=str(detailed_status
),
1489 operation
=operation
,
1493 self
.log
.info("Error writing in database. Task exiting...")
1495 except asyncio
.CancelledError
as e
:
1497 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1499 except Exception as e
:
1500 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1502 # params for use in -f file
1503 # returns values file option and filename (in order to delete it at the end)
1504 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1506 if params
and len(params
) > 0:
1507 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1509 def get_random_number():
1510 r
= random
.randrange(start
=1, stop
=99999999)
1518 value
= params
.get(key
)
1519 if "!!yaml" in str(value
):
1520 value
= yaml
.load(value
[7:])
1521 params2
[key
] = value
1523 values_file
= get_random_number() + ".yaml"
1524 with
open(values_file
, "w") as stream
:
1525 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1527 return "-f {}".format(values_file
), values_file
1531 # params for use in --set option
1533 def _params_to_set_option(params
: dict) -> str:
1535 if params
and len(params
) > 0:
1538 value
= params
.get(key
, None)
1539 if value
is not None:
1541 params_str
+= "--set "
1545 params_str
+= "{}={}".format(key
, value
)
1549 def generate_kdu_instance_name(**kwargs
):
1550 chart_name
= kwargs
["kdu_model"]
1551 # check embeded chart (file or dir)
1552 if chart_name
.startswith("/"):
1553 # extract file or directory name
1554 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1556 elif "://" in chart_name
:
1557 # extract last portion of URL
1558 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1561 for c
in chart_name
:
1562 if c
.isalpha() or c
.isnumeric():
1569 # if does not start with alpha character, prefix 'a'
1570 if not name
[0].isalpha():
1575 def get_random_number():
1576 r
= random
.randrange(start
=1, stop
=99999999)
1578 s
= s
.rjust(10, "0")
1581 name
= name
+ get_random_number()
1584 async def _split_repo(self
, kdu_model
: str) -> str:
1586 idx
= kdu_model
.find("/")
1588 repo_name
= kdu_model
[:idx
]