Fixes VCA deletion in NS termination
[osm/LCM.git] / osm_lcm / lcm_utils.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import checksumdir
21 from collections import OrderedDict
22 import os
23 from time import time
24 from osm_lcm.data_utils.database.database import Database
25 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
26
27 # from osm_common.dbbase import DbException
28
29 __author__ = "Alfonso Tierno"
30
31
32 class LcmException(Exception):
33 pass
34
35
36 class LcmExceptionNoMgmtIP(LcmException):
37 pass
38
39
40 class LcmExceptionExit(LcmException):
41 pass
42
43
44 def versiontuple(v):
45 """utility for compare dot separate versions. Fills with zeros to proper number comparison
46 package version will be something like 4.0.1.post11+gb3f024d.dirty-1. Where 4.0.1 is the git tag, postXX is the
47 number of commits from this tag, and +XXXXXXX is the git commit short id. Total length is 16 with until 999 commits
48 """
49 filled = []
50 for point in v.split("."):
51 point, _, _ = point.partition("+")
52 point, _, _ = point.partition("-")
53 filled.append(point.zfill(20))
54 return tuple(filled)
55
56
57 def deep_get(target_dict, key_list, default_value=None):
58 """
59 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
60 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
61 :param target_dict: dictionary to be read
62 :param key_list: list of keys to read from target_dict
63 :param default_value: value to return if key is not present in the nested dictionary
64 :return: The wanted value if exist, None otherwise
65 """
66 for key in key_list:
67 if not isinstance(target_dict, dict) or key not in target_dict:
68 return default_value
69 target_dict = target_dict[key]
70 return target_dict
71
72
73 def get_iterable(in_dict, in_key):
74 """
75 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
76 :param in_dict: a dictionary
77 :param in_key: the key to look for at in_dict
78 :return: in_dict[in_var] or () if it is None or not present
79 """
80 if not in_dict.get(in_key):
81 return ()
82 return in_dict[in_key]
83
84
85 def check_juju_bundle_existence(vnfd: dict) -> str:
86 """Checks the existence of juju-bundle in the descriptor
87
88 Args:
89 vnfd: Descriptor as a dictionary
90
91 Returns:
92 Juju bundle if dictionary has juju-bundle else None
93
94 """
95 if vnfd.get("vnfd"):
96 vnfd = vnfd["vnfd"]
97
98 for kdu in vnfd.get("kdu", []):
99 return kdu.get("juju-bundle", None)
100
101
102 def get_charm_artifact_path(base_folder, charm_name, charm_type, revision=str()) -> str:
103 """Finds the charm artifact paths
104
105 Args:
106 base_folder: Main folder which will be looked up for charm
107 charm_name: Charm name
108 charm_type: Type of charm native_charm, lxc_proxy_charm or k8s_proxy_charm
109 revision: vnf package revision number if there is
110
111 Returns:
112 artifact_path: (str)
113
114 """
115 extension = ""
116 if revision:
117 extension = ":" + str(revision)
118
119 if base_folder.get("pkg-dir"):
120 artifact_path = "{}/{}/{}/{}".format(
121 base_folder["folder"].split(":")[0] + extension,
122 base_folder["pkg-dir"],
123 "charms"
124 if charm_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
125 else "helm-charts",
126 charm_name,
127 )
128
129 else:
130 # For SOL004 packages
131 artifact_path = "{}/Scripts/{}/{}".format(
132 base_folder["folder"].split(":")[0] + extension,
133 "charms"
134 if charm_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
135 else "helm-charts",
136 charm_name,
137 )
138
139 return artifact_path
140
141
142 def populate_dict(target_dict, key_list, value):
143 """
144 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
145 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
146 :param target_dict: dictionary to be changed
147 :param key_list: list of keys to insert at target_dict
148 :param value:
149 :return: None
150 """
151 for key in key_list[0:-1]:
152 if key not in target_dict:
153 target_dict[key] = {}
154 target_dict = target_dict[key]
155 target_dict[key_list[-1]] = value
156
157
158 class LcmBase:
159 def __init__(self, msg, logger):
160 """
161
162 :param db: database connection
163 """
164 self.db = Database().instance.db
165 self.msg = msg
166 self.fs = Filesystem().instance.fs
167 self.logger = logger
168
169 def update_db_2(self, item, _id, _desc):
170 """
171 Updates database with _desc information. If success _desc is cleared
172 :param item: collection
173 :param _id: the _id to use in the query filter
174 :param _desc: dictionary with the content to update. Keys are dot separated keys for
175 :return: None. Exception is raised on error
176 """
177 if not _desc:
178 return
179 now = time()
180 _desc["_admin.modified"] = now
181 self.db.set_one(item, {"_id": _id}, _desc)
182 _desc.clear()
183 # except DbException as e:
184 # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
185
186 def check_charm_hash_changed(
187 self, current_charm_path: str, target_charm_path: str
188 ) -> bool:
189 """Find the target charm has changed or not by checking the hash of
190 old and new charm packages
191
192 Args:
193 current_charm_path (str): Existing charm package artifact path
194 target_charm_path (str): Target charm package artifact path
195
196 Returns:
197 True/False (bool): if charm has changed it returns True
198
199 """
200 # Check if the charm artifacts are available
201 if os.path.exists(self.fs.path + current_charm_path) and os.path.exists(
202 self.fs.path + target_charm_path
203 ):
204 # Compare the hash of charm folders
205 if checksumdir.dirhash(
206 self.fs.path + current_charm_path
207 ) != checksumdir.dirhash(self.fs.path + target_charm_path):
208
209 return True
210
211 return False
212
213 else:
214 raise LcmException(
215 "Charm artifact {} does not exist in the VNF Package".format(
216 self.fs.path + target_charm_path
217 )
218 )
219
220
221 class TaskRegistry(LcmBase):
222 """
223 Implements a registry of task needed for later cancelation, look for related tasks that must be completed before
224 etc. It stores a four level dict
225 First level is the topic, ns, vim_account, sdn
226 Second level is the _id
227 Third level is the operation id
228 Fourth level is a descriptive name, the value is the task class
229
230 The HA (High-Availability) methods are used when more than one LCM instance is running.
231 To register the current task in the external DB, use LcmBase as base class, to be able
232 to reuse LcmBase.update_db_2()
233 The DB registry uses the following fields to distinguish a task:
234 - op_type: operation type ("nslcmops" or "nsilcmops")
235 - op_id: operation ID
236 - worker: the worker ID for this process
237 """
238
239 # NS/NSI: "services" VIM/WIM/SDN: "accounts"
240 topic_service_list = ["ns", "nsi"]
241 topic_account_list = ["vim", "wim", "sdn", "k8scluster", "vca", "k8srepo"]
242
243 # Map topic to InstanceID
244 topic2instid_dict = {"ns": "nsInstanceId", "nsi": "netsliceInstanceId"}
245
246 # Map topic to DB table name
247 topic2dbtable_dict = {
248 "ns": "nslcmops",
249 "nsi": "nsilcmops",
250 "vim": "vim_accounts",
251 "wim": "wim_accounts",
252 "sdn": "sdns",
253 "k8scluster": "k8sclusters",
254 "vca": "vca",
255 "k8srepo": "k8srepos",
256 }
257
258 def __init__(self, worker_id=None, logger=None):
259 self.task_registry = {
260 "ns": {},
261 "nsi": {},
262 "vim_account": {},
263 "wim_account": {},
264 "sdn": {},
265 "k8scluster": {},
266 "vca": {},
267 "k8srepo": {},
268 }
269 self.worker_id = worker_id
270 self.db = Database().instance.db
271 self.logger = logger
272
273 def register(self, topic, _id, op_id, task_name, task):
274 """
275 Register a new task
276 :param topic: Can be "ns", "nsi", "vim_account", "sdn"
277 :param _id: _id of the related item
278 :param op_id: id of the operation of the related item
279 :param task_name: Task descriptive name, as create, instantiate, terminate. Must be unique in this op_id
280 :param task: Task class
281 :return: none
282 """
283 if _id not in self.task_registry[topic]:
284 self.task_registry[topic][_id] = OrderedDict()
285 if op_id not in self.task_registry[topic][_id]:
286 self.task_registry[topic][_id][op_id] = {task_name: task}
287 else:
288 self.task_registry[topic][_id][op_id][task_name] = task
289 # print("registering task", topic, _id, op_id, task_name, task)
290
291 def remove(self, topic, _id, op_id, task_name=None):
292 """
293 When task is ended, it should be removed. It ignores missing tasks. It also removes tasks done with this _id
294 :param topic: Can be "ns", "nsi", "vim_account", "sdn"
295 :param _id: _id of the related item
296 :param op_id: id of the operation of the related item
297 :param task_name: Task descriptive name. If none it deletes all tasks with same _id and op_id
298 :return: None
299 """
300 if not self.task_registry[topic].get(_id):
301 return
302 if not task_name:
303 self.task_registry[topic][_id].pop(op_id, None)
304 elif self.task_registry[topic][_id].get(op_id):
305 self.task_registry[topic][_id][op_id].pop(task_name, None)
306
307 # delete done tasks
308 for op_id_ in list(self.task_registry[topic][_id]):
309 for name, task in self.task_registry[topic][_id][op_id_].items():
310 if not task.done():
311 break
312 else:
313 del self.task_registry[topic][_id][op_id_]
314 if not self.task_registry[topic][_id]:
315 del self.task_registry[topic][_id]
316
317 def lookfor_related(self, topic, _id, my_op_id=None):
318 task_list = []
319 task_name_list = []
320 if _id not in self.task_registry[topic]:
321 return "", task_name_list
322 for op_id in reversed(self.task_registry[topic][_id]):
323 if my_op_id:
324 if my_op_id == op_id:
325 my_op_id = None # so that the next task is taken
326 continue
327
328 for task_name, task in self.task_registry[topic][_id][op_id].items():
329 if not task.done():
330 task_list.append(task)
331 task_name_list.append(task_name)
332 break
333 return ", ".join(task_name_list), task_list
334
335 def cancel(self, topic, _id, target_op_id=None, target_task_name=None):
336 """
337 Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
338 this is cancelled, and the same with task_name
339 """
340 if not self.task_registry[topic].get(_id):
341 return
342 for op_id in reversed(self.task_registry[topic][_id]):
343 if target_op_id and target_op_id != op_id:
344 continue
345 for task_name, task in self.task_registry[topic][_id][op_id].items():
346 if target_task_name and target_task_name != task_name:
347 continue
348 # result =
349 task.cancel()
350 # if result:
351 # self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name))
352
353 # Is topic NS/NSI?
354 def _is_service_type_HA(self, topic):
355 return topic in self.topic_service_list
356
357 # Is topic VIM/WIM/SDN?
358 def _is_account_type_HA(self, topic):
359 return topic in self.topic_account_list
360
361 # Input: op_id, example: 'abc123def:3' Output: account_id='abc123def', op_index=3
362 def _get_account_and_op_HA(self, op_id):
363 if not op_id:
364 return None, None
365 account_id, _, op_index = op_id.rpartition(":")
366 if not account_id or not op_index.isdigit():
367 return None, None
368 return account_id, op_index
369
370 # Get '_id' for any topic and operation
371 def _get_instance_id_HA(self, topic, op_type, op_id):
372 _id = None
373 # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
374 if op_type == "ANY":
375 _id = op_id
376 # NS/NSI: Use op_id as '_id'
377 elif self._is_service_type_HA(topic):
378 _id = op_id
379 # VIM/SDN/WIM/K8SCLUSTER: Split op_id to get Account ID and Operation Index, use Account ID as '_id'
380 elif self._is_account_type_HA(topic):
381 _id, _ = self._get_account_and_op_HA(op_id)
382 return _id
383
384 # Set DB _filter for querying any related process state
385 def _get_waitfor_filter_HA(self, db_lcmop, topic, op_type, op_id):
386 _filter = {}
387 # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
388 # In this special case, the timestamp is ignored
389 if op_type == "ANY":
390 _filter = {"operationState": "PROCESSING"}
391 # Otherwise, get 'startTime' timestamp for this operation
392 else:
393 # NS/NSI
394 if self._is_service_type_HA(topic):
395 now = time()
396 starttime_this_op = db_lcmop.get("startTime")
397 instance_id_label = self.topic2instid_dict.get(topic)
398 instance_id = db_lcmop.get(instance_id_label)
399 _filter = {
400 instance_id_label: instance_id,
401 "operationState": "PROCESSING",
402 "startTime.lt": starttime_this_op,
403 "_admin.modified.gt": now
404 - 2 * 3600, # ignore if tow hours of inactivity
405 }
406 # VIM/WIM/SDN/K8scluster
407 elif self._is_account_type_HA(topic):
408 _, op_index = self._get_account_and_op_HA(op_id)
409 _ops = db_lcmop["_admin"]["operations"]
410 _this_op = _ops[int(op_index)]
411 starttime_this_op = _this_op.get("startTime", None)
412 _filter = {
413 "operationState": "PROCESSING",
414 "startTime.lt": starttime_this_op,
415 }
416 return _filter
417
418 # Get DB params for any topic and operation
419 def _get_dbparams_for_lock_HA(self, topic, op_type, op_id):
420 q_filter = {}
421 update_dict = {}
422 # NS/NSI
423 if self._is_service_type_HA(topic):
424 q_filter = {"_id": op_id, "_admin.worker": None}
425 update_dict = {"_admin.worker": self.worker_id}
426 # VIM/WIM/SDN
427 elif self._is_account_type_HA(topic):
428 account_id, op_index = self._get_account_and_op_HA(op_id)
429 if not account_id:
430 return None, None
431 if op_type == "create":
432 # Creating a VIM/WIM/SDN account implies setting '_admin.current_operation' = 0
433 op_index = 0
434 q_filter = {
435 "_id": account_id,
436 "_admin.operations.{}.worker".format(op_index): None,
437 }
438 update_dict = {
439 "_admin.operations.{}.worker".format(op_index): self.worker_id,
440 "_admin.current_operation": op_index,
441 }
442 return q_filter, update_dict
443
444 def lock_HA(self, topic, op_type, op_id):
445 """
446 Lock a task, if possible, to indicate to the HA system that
447 the task will be executed in this LCM instance.
448 :param topic: Can be "ns", "nsi", "vim", "wim", or "sdn"
449 :param op_type: Operation type, can be "nslcmops", "nsilcmops", "create", "edit", "delete"
450 :param op_id: NS, NSI: Operation ID VIM,WIM,SDN: Account ID + ':' + Operation Index
451 :return:
452 True=lock was successful => execute the task (not registered by any other LCM instance)
453 False=lock failed => do NOT execute the task (already registered by another LCM instance)
454
455 HA tasks and backward compatibility:
456 If topic is "account type" (VIM/WIM/SDN) and op_id is None, 'op_id' was not provided by NBI.
457 This means that the running NBI instance does not support HA.
458 In such a case this method should always return True, to always execute
459 the task in this instance of LCM, without querying the DB.
460 """
461
462 # Backward compatibility for VIM/WIM/SDN/k8scluster without op_id
463 if self._is_account_type_HA(topic) and op_id is None:
464 return True
465
466 # Try to lock this task
467 db_table_name = self.topic2dbtable_dict[topic]
468 q_filter, update_dict = self._get_dbparams_for_lock_HA(topic, op_type, op_id)
469 db_lock_task = self.db.set_one(
470 db_table_name,
471 q_filter=q_filter,
472 update_dict=update_dict,
473 fail_on_empty=False,
474 )
475 if db_lock_task is None:
476 self.logger.debug(
477 "Task {} operation={} already locked by another worker".format(
478 topic, op_id
479 )
480 )
481 return False
482 else:
483 # Set 'detailed-status' to 'In progress' for VIM/WIM/SDN operations
484 if self._is_account_type_HA(topic):
485 detailed_status = "In progress"
486 account_id, op_index = self._get_account_and_op_HA(op_id)
487 q_filter = {"_id": account_id}
488 update_dict = {
489 "_admin.operations.{}.detailed-status".format(
490 op_index
491 ): detailed_status
492 }
493 self.db.set_one(
494 db_table_name,
495 q_filter=q_filter,
496 update_dict=update_dict,
497 fail_on_empty=False,
498 )
499 return True
500
501 def unlock_HA(self, topic, op_type, op_id, operationState, detailed_status):
502 """
503 Register a task, done when finished a VIM/WIM/SDN 'create' operation.
504 :param topic: Can be "vim", "wim", or "sdn"
505 :param op_type: Operation type, can be "create", "edit", "delete"
506 :param op_id: Account ID + ':' + Operation Index
507 :return: nothing
508 """
509
510 # Backward compatibility
511 if not self._is_account_type_HA(topic) or not op_id:
512 return
513
514 # Get Account ID and Operation Index
515 account_id, op_index = self._get_account_and_op_HA(op_id)
516 db_table_name = self.topic2dbtable_dict[topic]
517
518 # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED)
519 # If the account exist, register the HA task.
520 # Update DB for HA tasks
521 q_filter = {"_id": account_id}
522 update_dict = {
523 "_admin.operations.{}.operationState".format(op_index): operationState,
524 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
525 "_admin.operations.{}.worker".format(op_index): None,
526 "_admin.current_operation": None,
527 }
528 self.db.set_one(
529 db_table_name,
530 q_filter=q_filter,
531 update_dict=update_dict,
532 fail_on_empty=False,
533 )
534 return
535
536 async def waitfor_related_HA(self, topic, op_type, op_id=None):
537 """
538 Wait for any pending related HA tasks
539 """
540
541 # Backward compatibility
542 if not (
543 self._is_service_type_HA(topic) or self._is_account_type_HA(topic)
544 ) and (op_id is None):
545 return
546
547 # Get DB table name
548 db_table_name = self.topic2dbtable_dict.get(topic)
549
550 # Get instance ID
551 _id = self._get_instance_id_HA(topic, op_type, op_id)
552 _filter = {"_id": _id}
553 db_lcmop = self.db.get_one(db_table_name, _filter, fail_on_empty=False)
554 if not db_lcmop:
555 return
556
557 # Set DB _filter for querying any related process state
558 _filter = self._get_waitfor_filter_HA(db_lcmop, topic, op_type, op_id)
559
560 # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
561 timeout_wait_for_task = (
562 3600 # Max time (seconds) to wait for a related task to finish
563 )
564 # interval_wait_for_task = 30 # A too long polling interval slows things down considerably
565 interval_wait_for_task = 10 # Interval in seconds for polling related tasks
566 time_left = timeout_wait_for_task
567 old_num_related_tasks = 0
568 while True:
569 # Get related tasks (operations within the same instance as this) which are
570 # still running (operationState='PROCESSING') and which were started before this task.
571 # In the case of op_type='ANY', get any related tasks with operationState='PROCESSING', ignore timestamps.
572 db_waitfor_related_task = self.db.get_list(db_table_name, q_filter=_filter)
573 new_num_related_tasks = len(db_waitfor_related_task)
574 # If there are no related tasks, there is nothing to wait for, so return.
575 if not new_num_related_tasks:
576 return
577 # If number of pending related tasks have changed,
578 # update the 'detailed-status' field and log the change.
579 # Do NOT update the 'detailed-status' for SDNC-associated-to-VIM operations ('ANY').
580 if (op_type != "ANY") and (new_num_related_tasks != old_num_related_tasks):
581 step = "Waiting for {} related tasks to be completed.".format(
582 new_num_related_tasks
583 )
584 update_dict = {}
585 q_filter = {"_id": _id}
586 # NS/NSI
587 if self._is_service_type_HA(topic):
588 update_dict = {
589 "detailed-status": step,
590 "queuePosition": new_num_related_tasks,
591 }
592 # VIM/WIM/SDN
593 elif self._is_account_type_HA(topic):
594 _, op_index = self._get_account_and_op_HA(op_id)
595 update_dict = {
596 "_admin.operations.{}.detailed-status".format(op_index): step
597 }
598 self.logger.debug("Task {} operation={} {}".format(topic, _id, step))
599 self.db.set_one(
600 db_table_name,
601 q_filter=q_filter,
602 update_dict=update_dict,
603 fail_on_empty=False,
604 )
605 old_num_related_tasks = new_num_related_tasks
606 time_left -= interval_wait_for_task
607 if time_left < 0:
608 raise LcmException(
609 "Timeout ({}) when waiting for related tasks to be completed".format(
610 timeout_wait_for_task
611 )
612 )
613 await asyncio.sleep(interval_wait_for_task)
614
615 return