Fix black issues
[osm/N2VC.git] / n2vc / k8s_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import abc
24 import asyncio
25 from typing import Union
26 import time
27
28 from n2vc.loggable import Loggable
29
30
31 class K8sConnector(abc.ABC, Loggable):
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 @staticmethod
39 def generate_kdu_instance_name(**kwargs):
40 raise NotImplementedError("Method not implemented")
41
42 def __init__(self, db: object, log: object = None, on_update_db=None):
43 """
44
45 :param db: database object to write current operation status
46 :param log: logger for tracing
47 :param on_update_db: callback called when k8s connector updates database
48 """
49
50 # parent class
51 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nK8S")
52
53 # self.log.info('Initializing generic K8S connector')
54
55 # the database and update callback
56 self.db = db
57 self.on_update_db = on_update_db
58
59 # self.log.info('K8S generic connector initialized')
60
61 @abc.abstractmethod
62 async def init_env(
63 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
64 ) -> (str, bool):
65 """
66 It prepares a given K8s cluster environment to run Charts or juju Bundles on
67 both sides:
68 client (OSM)
69 server (Tiller/Charm)
70
71 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
72 '.kube/config'
73 :param namespace: optional namespace to be used for the K8s engine (helm
74 tiller, juju). By default, 'kube-system' will be used
75 :param reuse_cluster_uuid: existing cluster uuid for reuse
76 :return: uuid of the K8s cluster and True if connector has installed some
77 software in the cluster (on error, an exception will be raised)
78 """
79
80 @abc.abstractmethod
81 async def repo_add(
82 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
83 ):
84 """
85 Add a new repository to OSM database
86
87 :param cluster_uuid: the cluster
88 :param name: name for the repo in OSM
89 :param url: URL of the repo
90 :param repo_type: either "chart" or "bundle"
91 :return: True if successful
92 """
93
94 @abc.abstractmethod
95 async def repo_list(self, cluster_uuid: str):
96 """
97 Get the list of registered repositories
98
99 :param cluster_uuid: the cluster
100 :return: list of registered repositories: [ (name, url) .... ]
101 """
102
103 @abc.abstractmethod
104 async def repo_remove(self, cluster_uuid: str, name: str):
105 """
106 Remove a repository from OSM
107
108 :param name: repo name in OSM
109 :param cluster_uuid: the cluster
110 :return: True if successful
111 """
112
113 @abc.abstractmethod
114 async def synchronize_repos(self, cluster_uuid: str, name: str):
115 """
116 Synchronizes the list of repositories created in the cluster with
117 the repositories added by the NBI
118
119 :param cluster_uuid: the cluster
120 :return: List of repositories deleted from the cluster and dictionary with
121 repos added
122 """
123
124 @abc.abstractmethod
125 async def reset(
126 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
127 ) -> bool:
128 """
129 Uninstalls Tiller/Charm from a known K8s cluster and removes it from the list
130 of known K8s clusters. Intended to be used e.g. when the NS instance is deleted.
131
132 :param cluster_uuid: UUID of a K8s cluster known by OSM.
133 :param force: force deletion, even in case there are deployed releases
134 :param uninstall_sw: flag to indicate that sw uninstallation from software is
135 needed
136 :return: str: kdu_instance generated by helm
137 """
138
139 @abc.abstractmethod
140 async def install(
141 self,
142 cluster_uuid: str,
143 kdu_model: str,
144 kdu_instance: str,
145 atomic: bool = True,
146 timeout: float = 300,
147 params: dict = None,
148 db_dict: dict = None,
149 kdu_name: str = None,
150 namespace: str = None,
151 ):
152 """
153 Deploys of a new KDU instance. It would implicitly rely on the `install` call
154 to deploy the Chart/Bundle properly parametrized (in practice, this call would
155 happen before any _initial-config-primitive_of the VNF is called).
156
157 :param cluster_uuid: UUID of a K8s cluster known by OSM
158 :param kdu_model: chart/bundle:version reference (string), which can be either
159 of these options:
160 - a name of chart/bundle available via the repos known by OSM
161 - a path to a packaged chart/bundle
162 - a path to an unpacked chart/bundle directory or a URL
163 :param kdu_instance: Kdu instance name
164 :param atomic: If set, installation process purges chart/bundle on fail, also
165 will wait until all the K8s objects are active
166 :param timeout: Time in seconds to wait for the install of the chart/bundle
167 (defaults to Helm default timeout: 300s)
168 :param params: dictionary of key-value pairs for instantiation parameters
169 (overriding default values)
170 :param dict db_dict: where to write into database when the status changes.
171 It contains a dict with {collection: <str>, filter: {},
172 path: <str>},
173 e.g. {collection: "nsrs", filter:
174 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
175 :param kdu_name: Name of the KDU instance to be installed
176 :param namespace: K8s namespace to use for the KDU instance
177 :return: True if successful
178 """
179
180 @abc.abstractmethod
181 async def upgrade(
182 self,
183 cluster_uuid: str,
184 kdu_instance: str,
185 kdu_model: str = None,
186 atomic: bool = True,
187 timeout: float = 300,
188 params: dict = None,
189 db_dict: dict = None,
190 ):
191 """
192 Upgrades an existing KDU instance. It would implicitly use the `upgrade` call
193 over an existing Chart/Bundle. It can be used both to upgrade the chart or to
194 reconfigure it. This would be exposed as Day-2 primitive.
195
196 :param cluster_uuid: UUID of a K8s cluster known by OSM
197 :param kdu_instance: unique name for the KDU instance to be updated
198 :param kdu_model: new chart/bundle:version reference
199 :param atomic: rollback in case of fail and wait for pods and services are
200 available
201 :param timeout: Time in seconds to wait for the install of the chart/bundle
202 (defaults to Helm default timeout: 300s)
203 :param params: new dictionary of key-value pairs for instantiation parameters
204 :param dict db_dict: where to write into database when the status changes.
205 It contains a dict with {collection: <str>, filter: {},
206 path: <str>},
207 e.g. {collection: "nsrs", filter:
208 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
209 :return: reference to the new revision number of the KDU instance
210 """
211
212 @abc.abstractmethod
213 async def scale(
214 self,
215 kdu_instance: str,
216 scale: int,
217 resource_name: str,
218 total_timeout: float = 1800,
219 **kwargs,
220 ) -> bool:
221 """
222 Scales an application in KDU instance.
223
224 :param: kdu_instance str: KDU instance name
225 :param: scale int: Scale to which to set this application
226 :param: resource_name str: Resource name (Application name)
227 :param: timeout float: The time, in seconds, to wait for the install
228 to finish
229 :param kwargs: Additional parameters
230
231 :return: If successful, returns True
232 """
233
234 @abc.abstractmethod
235 async def get_scale_count(
236 self,
237 resource_name: str,
238 kdu_instance: str,
239 **kwargs,
240 ) -> int:
241 """
242 Get an application scale count.
243
244 :param: resource_name str: Resource name (Application name)
245 :param: kdu_instance str: KDU instance name
246 :param kwargs: Additional parameters
247
248 :return: Return application instance count
249 """
250
251 @abc.abstractmethod
252 async def rollback(
253 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
254 ):
255 """
256 Rolls back a previous update of a KDU instance. It would implicitly use the
257 `rollback` call. It can be used both to rollback from a Chart/Bundle version
258 update or from a reconfiguration. This would be exposed as Day-2 primitive.
259
260 :param cluster_uuid: UUID of a K8s cluster known by OSM
261 :param kdu_instance: unique name for the KDU instance
262 :param revision: revision to which revert changes. If omitted, it will revert
263 the last update only
264 :param dict db_dict: where to write into database when the status changes.
265 It contains a dict with {collection: <str>, filter: {},
266 path: <str>},
267 e.g. {collection: "nsrs", filter:
268 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
269 :return:If successful, reference to the current active revision of the KDU
270 instance after the rollback
271 """
272
273 @abc.abstractmethod
274 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
275 """
276 Removes an existing KDU instance. It would implicitly use the `delete` call
277 (this call would happen after all _terminate-config-primitive_ of the VNF are
278 invoked).
279
280 :param cluster_uuid: UUID of a K8s cluster known by OSM
281 :param kdu_instance: unique name for the KDU instance to be deleted
282 :return: True if successful
283 """
284
285 @abc.abstractmethod
286 async def exec_primitive(
287 self,
288 cluster_uuid: str = None,
289 kdu_instance: str = None,
290 primitive_name: str = None,
291 timeout: float = 300,
292 params: dict = None,
293 db_dict: dict = None,
294 ) -> str:
295 """Exec primitive (Juju action)
296
297 :param cluster_uuid str: The UUID of the cluster
298 :param kdu_instance str: The unique name of the KDU instance
299 :param primitive_name: Name of action that will be executed
300 :param timeout: Timeout for action execution
301 :param params: Dictionary of all the parameters needed for the action
302 :db_dict: Dictionary for any additional data
303
304 :return: Returns the output of the action
305 """
306
307 @abc.abstractmethod
308 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
309 """
310 These calls will retrieve from the Chart/Bundle:
311
312 - The list of configurable values and their defaults (e.g. in Charts,
313 it would retrieve the contents of `values.yaml`).
314 - If available, any embedded help file (e.g. `readme.md`) embedded in the
315 Chart/Bundle.
316
317 :param kdu_model: chart/bundle reference
318 :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases,
319 even stable URL)
320 :return:
321
322 If successful, it will return the available parameters and their default values
323 as provided by the backend.
324 """
325
326 @abc.abstractmethod
327 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
328 """
329
330 :param kdu_model: chart/bundle reference
331 :param repo_url: optional, reposotory URL (None if tar.gz, URl in other cases,
332 even stable URL)
333 :return: If successful, it will return the contents of the 'readme.md'
334 """
335
336 @abc.abstractmethod
337 async def status_kdu(
338 self, cluster_uuid: str, kdu_instance: str, yaml_format: str
339 ) -> Union[str, dict]:
340 """
341 This call would retrieve tha current state of a given KDU instance. It would be
342 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
343 values_ of the configuration parameters applied to a given instance. This call
344 would be based on the `status` call.
345
346 :param cluster_uuid: UUID of a K8s cluster known by OSM
347 :param kdu_instance: unique name for the KDU instance
348 :param yaml_format: if the return shall be returned as an YAML string or as a
349 dictionary
350 :return: If successful, it will return the following vector of arguments:
351 - K8s `namespace` in the cluster where the KDU lives
352 - `state` of the KDU instance. It can be:
353 - UNKNOWN
354 - DEPLOYED
355 - DELETED
356 - SUPERSEDED
357 - FAILED or
358 - DELETING
359 - List of `resources` (objects) that this release consists of, sorted by kind,
360 and the status of those resources
361 - Last `deployment_time`.
362
363 """
364
365 @abc.abstractmethod
366 async def get_services(
367 self, cluster_uuid: str, kdu_instance: str, namespace: str
368 ) -> list:
369 """
370 Returns a list of services defined for the specified kdu instance.
371
372 :param cluster_uuid: UUID of a K8s cluster known by OSM
373 :param kdu_instance: unique name for the KDU instance
374 :param namespace: K8s namespace used by the KDU instance
375 :return: If successful, it will return a list of services, Each service
376 can have the following data:
377 - `name` of the service
378 - `type` type of service in the k8 cluster
379 - `ports` List of ports offered by the service, for each port includes at least
380 name, port, protocol
381 - `cluster_ip` Internal ip to be used inside k8s cluster
382 - `external_ip` List of external ips (in case they are available)
383 """
384
385 @abc.abstractmethod
386 async def get_service(
387 self, cluster_uuid: str, service_name: str, namespace: str = None
388 ) -> object:
389 """
390 Obtains the data of the specified service in the k8cluster.
391
392 :param cluster_uuid: UUID of a K8s cluster known by OSM
393 :param service_name: name of the K8s service in the specified namespace
394 :param namespace: K8s namespace used by the KDU instance
395 :return: If successful, it will return a list of services, Each service can have
396 the following data:
397 - `name` of the service
398 - `type` type of service in the k8 cluster
399 - `ports` List of ports offered by the service, for each port includes at least
400 name, port, protocol
401 - `cluster_ip` Internal ip to be used inside k8s cluster
402 - `external_ip` List of external ips (in case they are available)
403 """
404
405 """
406 ####################################################################################
407 ################################### P R I V A T E ##################################
408 ####################################################################################
409 """
410
411 async def write_app_status_to_db(
412 self, db_dict: dict, status: str, detailed_status: str, operation: str
413 ) -> bool:
414 """
415 This method will write the status of the application to the database.
416
417 :param db_dict: A dictionary with the database necessary information. It shall contain the values for the keys:
418 - "collection": The Mongo DB collection to write to
419 - "filter": The query filter to use in the update process
420 - "path": The dot separated keys which targets the object to be updated
421 :param status: Status of the application
422 :param detailed_status: Detailed status of the application
423 :param operation: Operation that is being performed on the application
424 :return: True if successful
425 """
426
427 if not self.db:
428 self.warning("No db => No database write")
429 return False
430
431 if not db_dict:
432 self.warning("No db_dict => No database write")
433 return False
434
435 self.log.debug("status={}".format(status))
436
437 try:
438 the_table = db_dict["collection"]
439 the_filter = db_dict["filter"]
440 the_path = db_dict["path"]
441 if not the_path[-1] == ".":
442 the_path = the_path + "."
443 update_dict = {
444 the_path + "operation": operation,
445 the_path + "status": status,
446 the_path + "detailed-status": detailed_status,
447 the_path + "status-time": str(time.time()),
448 }
449
450 self.db.set_one(
451 table=the_table,
452 q_filter=the_filter,
453 update_dict=update_dict,
454 fail_on_empty=True,
455 )
456
457 # database callback
458 if self.on_update_db:
459 if asyncio.iscoroutinefunction(self.on_update_db):
460 await self.on_update_db(
461 the_table, the_filter, the_path, update_dict
462 )
463 else:
464 self.on_update_db(the_table, the_filter, the_path, update_dict)
465
466 return True
467
468 except Exception as e:
469 self.log.info("Exception writing status to database: {}".format(e))
470 return False