Code Coverage

Cobertura Coverage Report > n2vc >

k8s_conn.py

Trend

File Coverage summary

NameClassesLinesConditionals
k8s_conn.py
100%
1/1
68%
51/75
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_conn.py
68%
51/75
N/A

Source

n2vc/k8s_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 1 import abc
24 1 import asyncio
25 1 from typing import Union
26 1 import time
27
28 1 from n2vc.loggable import Loggable
29
30
31 1 class K8sConnector(abc.ABC, Loggable):
32     """
33     ####################################################################################
34     ################################### P U B L I C ####################################
35     ####################################################################################
36     """
37
38 1     @staticmethod
39 1     def generate_kdu_instance_name(**kwargs):
40 0         raise NotImplementedError("Method not implemented")
41
42 1     def __init__(self, db: object, log: object = None, on_update_db=None):
43         """
44
45         :param db: database object to write current operation status
46         :param log: logger for tracing
47         :param on_update_db: callback called when k8s connector updates database
48         """
49
50         # parent class
51 1         Loggable.__init__(self, log=log, log_to_console=True, prefix="\nK8S")
52
53         # self.log.info('Initializing generic K8S connector')
54
55         # the database and update callback
56 1         self.db = db
57 1         self.on_update_db = on_update_db
58
59         # self.log.info('K8S generic connector initialized')
60
61 1     @abc.abstractmethod
62 1     async def init_env(
63         self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
64     ) -> (str, bool):
65         """
66         It prepares a given K8s cluster environment to run Charts or juju Bundles on
67         both sides:
68             client (OSM)
69             server (Tiller/Charm)
70
71         :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
72         '.kube/config'
73         :param namespace: optional namespace to be used for the K8s engine (helm
74         tiller, juju). By default, 'kube-system' will be used
75         :param reuse_cluster_uuid: existing cluster uuid for reuse
76         :return: uuid of the K8s cluster and True if connector has installed some
77         software in the cluster (on error, an exception will be raised)
78         """
79
80 1     @abc.abstractmethod
81 1     async def repo_add(
82         self,
83         cluster_uuid: str,
84         name: str,
85         url: str,
86         repo_type: str = "chart",
87         cert: str = None,
88         user: str = None,
89         password: str = None,
90     ):
91         """
92         Add a new repository to OSM database
93
94         :param cluster_uuid: the cluster
95         :param name: name for the repo in OSM
96         :param url: URL of the repo
97         :param repo_type: either "chart" or "bundle"
98         :return: True if successful
99         """
100
101 1     @abc.abstractmethod
102 1     async def repo_list(self, cluster_uuid: str):
103         """
104         Get the list of registered repositories
105
106         :param cluster_uuid: the cluster
107         :return: list of registered repositories: [ (name, url) .... ]
108         """
109
110 1     @abc.abstractmethod
111 1     async def repo_remove(self, cluster_uuid: str, name: str):
112         """
113         Remove a repository from OSM
114
115         :param name: repo name in OSM
116         :param cluster_uuid: the cluster
117         :return: True if successful
118         """
119
120 1     @abc.abstractmethod
121 1     async def synchronize_repos(self, cluster_uuid: str, name: str):
122         """
123         Synchronizes the list of repositories created in the cluster with
124         the repositories added by the NBI
125
126         :param cluster_uuid: the cluster
127         :return: List of repositories deleted from the cluster and dictionary with
128         repos added
129         """
130
131 1     @abc.abstractmethod
132 1     async def reset(
133         self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
134     ) -> bool:
135         """
136         Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list
137         of known K8s clusters. Intended to be used e.g. when the NS instance is deleted.
138
139         :param cluster_uuid: UUID of a K8s cluster known by OSM.
140         :param force: force deletion, even in case there are deployed releases
141         :param uninstall_sw: flag to indicate that sw uninstallation from software is
142         needed
143         :return: str: kdu_instance generated by helm
144         """
145
146 1     @abc.abstractmethod
147 1     async def install(
148         self,
149         cluster_uuid: str,
150         kdu_model: str,
151         kdu_instance: str,
152         atomic: bool = True,
153         timeout: float = 300,
154         params: dict = None,
155         db_dict: dict = None,
156         kdu_name: str = None,
157         namespace: str = None,
158     ):
159         """
160         Deploys of a new KDU instance. It would implicitly rely on the `install` call
161         to deploy the Chart/Bundle properly parametrized (in practice, this call would
162         happen before any _initial-config-primitive_of the VNF is called).
163
164         :param cluster_uuid: UUID of a K8s cluster known by OSM
165         :param kdu_model: chart/bundle:version reference (string), which can be either
166             of these options:
167             - a name of chart/bundle available via the repos known by OSM
168             - a path to a packaged chart/bundle
169             - a path to an unpacked chart/bundle directory or a URL
170         :param kdu_instance: Kdu instance name
171         :param atomic: If set, installation process purges chart/bundle on fail, also
172             will wait until all the K8s objects are active
173         :param timeout: Time in seconds to wait for the install of the chart/bundle
174             (defaults to Helm default timeout: 300s)
175         :param params: dictionary of key-value pairs for instantiation parameters
176             (overriding default values)
177         :param dict db_dict: where to write into database when the status changes.
178                         It contains a dict with {collection: <str>, filter: {},
179                         path: <str>},
180                             e.g. {collection: "nsrs", filter:
181                             {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
182         :param kdu_name: Name of the KDU instance to be installed
183         :param namespace: K8s namespace to use for the KDU instance
184         :return: True if successful
185         """
186
187 1     @abc.abstractmethod
188 1     async def upgrade(
189         self,
190         cluster_uuid: str,
191         kdu_instance: str,
192         kdu_model: str = None,
193         atomic: bool = True,
194         timeout: float = 300,
195         params: dict = None,
196         db_dict: dict = None,
197         force: bool = False,
198     ):
199         """
200         Upgrades an existing KDU instance. It would implicitly use the `upgrade` call
201         over an existing Chart/Bundle. It can be used both to upgrade the chart or to
202         reconfigure it. This would be exposed as Day-2 primitive.
203
204         :param cluster_uuid: UUID of a K8s cluster known by OSM
205         :param kdu_instance: unique name for the KDU instance to be updated
206         :param kdu_model: new chart/bundle:version reference
207         :param atomic: rollback in case of fail and wait for pods and services are
208             available
209         :param timeout: Time in seconds to wait for the install of the chart/bundle
210             (defaults to Helm default timeout: 300s)
211         :param params: new dictionary of key-value pairs for instantiation parameters
212         :param dict db_dict: where to write into database when the status changes.
213                         It contains a dict with {collection: <str>, filter: {},
214                         path: <str>},
215                             e.g. {collection: "nsrs", filter:
216                             {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
217         :param force: force recreation of resources if necessary
218         :return: reference to the new revision number of the KDU instance
219         """
220
221 1     @abc.abstractmethod
222 1     async def scale(
223         self,
224         kdu_instance: str,
225         scale: int,
226         resource_name: str,
227         total_timeout: float = 1800,
228         cluster_uuid: str = None,
229         kdu_model: str = None,
230         atomic: bool = True,
231         db_dict: dict = None,
232         **kwargs,
233     ) -> bool:
234         """Scale a resource in a KDU instance.
235
236         Args:
237             kdu_instance: KDU instance name
238             scale: Scale to which to set the resource
239             resource_name: Resource name
240             total_timeout: The time, in seconds, to wait for the install
241                 to finish
242             cluster_uuid: The UUID of the cluster
243             kdu_model: The chart/bundle reference
244             atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
245                 The --wait flag will be set automatically if --atomic is used
246             db_dict: Dictionary for any additional data
247             kwargs: Additional parameters
248                 vca_id (str): VCA ID
249
250         Returns:
251             True if successful, False otherwise
252         """
253
254 1     @abc.abstractmethod
255 1     async def get_scale_count(
256         self,
257         resource_name: str,
258         kdu_instance: str,
259         cluster_uuid: str,
260         kdu_model: str,
261         timeout: float = 300,
262         **kwargs,
263     ) -> int:
264         """Get a resource scale count in a KDU instance.
265
266         Args:
267             resource_name: Resource name
268             kdu_instance: KDU instance name
269             cluster_uuid: The UUID of the cluster
270             kdu_model:    chart/bundle reference
271             timeout:  The time, in seconds, to wait
272             kwargs: Additional parameters
273
274         Returns:
275             Resource instance count
276         """
277
278 1     @abc.abstractmethod
279 1     async def rollback(
280         self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
281     ):
282         """
283         Rolls back a previous update of a KDU instance. It would implicitly use the
284         `rollback` call. It can be used both to rollback from a Chart/Bundle version
285         update or from a reconfiguration. This would be exposed as Day-2 primitive.
286
287         :param cluster_uuid: UUID of a K8s cluster known by OSM
288         :param kdu_instance: unique name for the KDU instance
289         :param revision: revision to which revert changes. If omitted, it will revert
290             the last update only
291         :param dict db_dict: where to write into database when the status changes.
292                         It contains a dict with {collection: <str>, filter: {},
293                         path: <str>},
294                             e.g. {collection: "nsrs", filter:
295                             {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
296         :return:If successful, reference to the current active revision of the KDU
297             instance after the rollback
298         """
299
300 1     @abc.abstractmethod
301 1     async def uninstall(self, cluster_uuid: str, kdu_instance: str):
302         """
303         Removes an existing KDU instance. It would implicitly use the `delete` call
304         (this call would happen after all _terminate-config-primitive_ of the VNF are
305         invoked).
306
307         :param cluster_uuid: UUID of a K8s cluster known by OSM
308         :param kdu_instance: unique name for the KDU instance to be deleted
309         :return: True if successful
310         """
311
312 1     @abc.abstractmethod
313 1     async def exec_primitive(
314         self,
315         cluster_uuid: str = None,
316         kdu_instance: str = None,
317         primitive_name: str = None,
318         timeout: float = 300,
319         params: dict = None,
320         db_dict: dict = None,
321     ) -> str:
322         """Exec primitive (Juju action)
323
324         :param cluster_uuid str: The UUID of the cluster
325         :param kdu_instance str: The unique name of the KDU instance
326         :param primitive_name: Name of action that will be executed
327         :param timeout: Timeout for action execution
328         :param params: Dictionary of all the parameters needed for the action
329         :db_dict: Dictionary for any additional data
330
331         :return: Returns the output of the action
332         """
333
334 1     @abc.abstractmethod
335 1     async def upgrade_charm(
336         self,
337         ee_id: str = None,
338         path: str = None,
339         charm_id: str = None,
340         charm_type: str = None,
341         timeout: float = None,
342     ) -> str:
343         """This method upgrade charms in VNFs
344
345         Args:
346             ee_id:  Execution environment id
347             path:   Local path to the charm
348             charm_id:   charm-id
349             charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
350             timeout: (Float)    Timeout for the ns update operation
351
352         Returns:
353             The output of the update operation if status equals to "completed"
354         """
355
356 1     @abc.abstractmethod
357 1     async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
358         """
359         These calls will retrieve from the Chart/Bundle:
360
361             - The list of configurable values and their defaults (e.g. in Charts,
362                 it would retrieve the contents of `values.yaml`).
363             - If available, any embedded help file (e.g. `readme.md`) embedded in the
364                 Chart/Bundle.
365
366         :param kdu_model: chart/bundle reference
367         :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases,
368             even stable URL)
369         :return:
370
371         If successful, it will return the available parameters and their default values
372         as provided by the backend.
373         """
374
375 1     @abc.abstractmethod
376 1     async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
377         """
378
379         :param kdu_model: chart/bundle reference
380         :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases,
381             even stable URL)
382         :return: If successful, it will return the contents of the 'readme.md'
383         """
384
385 1     @abc.abstractmethod
386 1     async def status_kdu(
387         self, cluster_uuid: str, kdu_instance: str, yaml_format: str
388     ) -> Union[str, dict]:
389         """
390         This call would retrieve tha current state of a given KDU instance. It would be
391         would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
392         values_ of the configuration parameters applied to a given instance. This call
393         would be based on the `status` call.
394
395         :param cluster_uuid: UUID of a K8s cluster known by OSM
396         :param kdu_instance: unique name for the KDU instance
397         :param yaml_format: if the return shall be returned as an YAML string or as a
398                                 dictionary
399         :return: If successful, it will return the following vector of arguments:
400         - K8s `namespace` in the cluster where the KDU lives
401         - `state` of the KDU instance. It can be:
402               - UNKNOWN
403               - DEPLOYED
404               - DELETED
405               - SUPERSEDED
406               - FAILED or
407               - DELETING
408         - List of `resources` (objects) that this release consists of, sorted by kind,
409           and the status of those resources
410         - Last `deployment_time`.
411
412         """
413
414 1     @abc.abstractmethod
415 1     async def get_services(
416         self, cluster_uuid: str, kdu_instance: str, namespace: str
417     ) -> list:
418         """
419         Returns a list of services defined for the specified kdu instance.
420
421         :param cluster_uuid: UUID of a K8s cluster known by OSM
422         :param kdu_instance: unique name for the KDU instance
423         :param namespace: K8s namespace used by the KDU instance
424         :return: If successful, it will return a list of services, Each service
425         can have the following data:
426         - `name` of the service
427         - `type` type of service in the k8 cluster
428         - `ports` List of ports offered by the service, for each port includes at least
429         name, port, protocol
430         - `cluster_ip` Internal ip to be used inside k8s cluster
431         - `external_ip` List of external ips (in case they are available)
432         """
433
434 1     @abc.abstractmethod
435 1     async def get_service(
436         self, cluster_uuid: str, service_name: str, namespace: str = None
437     ) -> object:
438         """
439         Obtains the data of the specified service in the k8cluster.
440
441         :param cluster_uuid: UUID of a K8s cluster known by OSM
442         :param service_name: name of the K8s service in the specified namespace
443         :param namespace: K8s namespace used by the KDU instance
444         :return: If successful, it will return a list of services, Each service can have
445         the following data:
446         - `name` of the service
447         - `type` type of service in the k8 cluster
448         - `ports` List of ports offered by the service, for each port includes at least
449         name, port, protocol
450         - `cluster_ip` Internal ip to be used inside k8s cluster
451         - `external_ip` List of external ips (in case they are available)
452         """
453
454     """
455     ####################################################################################
456     ################################### P R I V A T E ##################################
457     ####################################################################################
458     """
459
460 1     async def write_app_status_to_db(
461         self, db_dict: dict, status: str, detailed_status: str, operation: str
462     ) -> bool:
463         """
464         This method will write the status of the application to the database.
465
466         :param db_dict: A dictionary with the database necessary information. It shall contain the values for the keys:
467             - "collection": The Mongo DB collection to write to
468             - "filter": The query filter to use in the update process
469             - "path": The dot separated keys which targets the object to be updated
470         :param status: Status of the application
471         :param detailed_status: Detailed status of the application
472         :param operation: Operation that is being performed on the application
473         :return: True if successful
474         """
475
476 0         if not self.db:
477 0             self.warning("No db => No database write")
478 0             return False
479
480 0         if not db_dict:
481 0             self.warning("No db_dict => No database write")
482 0             return False
483
484 0         self.log.debug("status={}".format(status))
485
486 0         try:
487 0             the_table = db_dict["collection"]
488 0             the_filter = db_dict["filter"]
489 0             the_path = db_dict["path"]
490 0             if not the_path[-1] == ".":
491 0                 the_path = the_path + "."
492 0             update_dict = {
493                 the_path + "operation": operation,
494                 the_path + "status": status,
495                 the_path + "detailed-status": detailed_status,
496                 the_path + "status-time": str(time.time()),
497             }
498
499 0             self.db.set_one(
500                 table=the_table,
501                 q_filter=the_filter,
502                 update_dict=update_dict,
503                 fail_on_empty=True,
504             )
505
506             # database callback
507 0             if self.on_update_db:
508 0                 if asyncio.iscoroutinefunction(self.on_update_db):
509 0                     await self.on_update_db(
510                         the_table, the_filter, the_path, update_dict
511                     )
512                 else:
513 0                     self.on_update_db(the_table, the_filter, the_path, update_dict)
514
515 0             return True
516
517 0         except Exception as e:
518 0             self.log.info("Exception writing status to database: {}".format(e))
519 0             return False