Code Coverage

Cobertura Coverage Report > osm_lcm >

lcm_utils.py

Trend

File Coverage summary

NameClassesLinesConditionals
lcm_utils.py
100%
1/1
48%
146/303
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
lcm_utils.py
48%
146/303
N/A

Source

osm_lcm/lcm_utils.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 1 import asyncio
20 1 import checksumdir
21 1 from collections import OrderedDict
22 1 import hashlib
23 1 import os
24 1 import shutil
25 1 import traceback
26 1 from time import time
27
28 1 from osm_common.fsbase import FsException
29 1 from osm_lcm.data_utils.database.database import Database
30 1 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
31 1 import yaml
32 1 from zipfile import ZipFile, BadZipfile
33
34 # from osm_common.dbbase import DbException
35
36 1 __author__ = "Alfonso Tierno"
37
38
39 1 class LcmException(Exception):
40 1     pass
41
42
43 1 class LcmExceptionNoMgmtIP(LcmException):
44 1     pass
45
46
47 1 class LcmExceptionExit(LcmException):
48 1     pass
49
50
51 1 def versiontuple(v):
52     """utility for compare dot separate versions. Fills with zeros to proper number comparison
53     package version will be something like 4.0.1.post11+gb3f024d.dirty-1. Where 4.0.1 is the git tag, postXX is the
54     number of commits from this tag, and +XXXXXXX is the git commit short id. Total length is 16 with until 999 commits
55     """
56 1     filled = []
57 1     for point in v.split("."):
58 1         point, _, _ = point.partition("+")
59 1         point, _, _ = point.partition("-")
60 1         filled.append(point.zfill(20))
61 1     return tuple(filled)
62
63
64 1 def deep_get(target_dict, key_list, default_value=None):
65     """
66     Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
67     Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
68     :param target_dict: dictionary to be read
69     :param key_list: list of keys to read from  target_dict
70     :param default_value: value to return if key is not present in the nested dictionary
71     :return: The wanted value if exist, None otherwise
72     """
73 1     for key in key_list:
74 1         if not isinstance(target_dict, dict) or key not in target_dict:
75 1             return default_value
76 1         target_dict = target_dict[key]
77 1     return target_dict
78
79
80 1 def get_iterable(in_dict, in_key):
81     """
82     Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
83     :param in_dict: a dictionary
84     :param in_key: the key to look for at in_dict
85     :return: in_dict[in_var] or () if it is None or not present
86     """
87 1     if not in_dict.get(in_key):
88 1         return ()
89 1     return in_dict[in_key]
90
91
92 1 def check_juju_bundle_existence(vnfd: dict) -> str:
93     """Checks the existence of juju-bundle in the descriptor
94
95     Args:
96         vnfd:   Descriptor as a dictionary
97
98     Returns:
99         Juju bundle if dictionary has juju-bundle else None
100
101     """
102 1     if vnfd.get("vnfd"):
103 0         vnfd = vnfd["vnfd"]
104
105 1     for kdu in vnfd.get("kdu", []):
106 1         return kdu.get("juju-bundle", None)
107
108
109 1 def get_charm_artifact_path(base_folder, charm_name, charm_type, revision=str()) -> str:
110     """Finds the charm artifact paths
111
112     Args:
113         base_folder:    Main folder which will be looked up for charm
114         charm_name:   Charm name
115         charm_type:   Type of charm native_charm, lxc_proxy_charm or k8s_proxy_charm
116         revision:   vnf package revision number if there is
117
118     Returns:
119         artifact_path: (str)
120
121     """
122 1     extension = ""
123 1     if revision:
124 1         extension = ":" + str(revision)
125
126 1     if base_folder.get("pkg-dir"):
127 1         artifact_path = "{}/{}/{}/{}".format(
128             base_folder["folder"].split(":")[0] + extension,
129             base_folder["pkg-dir"],
130             "charms"
131             if charm_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
132             else "helm-charts",
133             charm_name,
134         )
135
136     else:
137         # For SOL004 packages
138 1         artifact_path = "{}/Scripts/{}/{}".format(
139             base_folder["folder"].split(":")[0] + extension,
140             "charms"
141             if charm_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
142             else "helm-charts",
143             charm_name,
144         )
145
146 1     return artifact_path
147
148
149 1 def populate_dict(target_dict, key_list, value):
150     """
151     Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
152     Example target_dict={K: J}; key_list=[a,b,c];  target_dict will be {K: J, a: {b: {c: value}}}
153     :param target_dict: dictionary to be changed
154     :param key_list: list of keys to insert at target_dict
155     :param value:
156     :return: None
157     """
158 0     for key in key_list[0:-1]:
159 0         if key not in target_dict:
160 0             target_dict[key] = {}
161 0         target_dict = target_dict[key]
162 0     target_dict[key_list[-1]] = value
163
164
165 1 def get_ee_id_parts(ee_id):
166     """
167     Parses ee_id stored at database that can be either 'version:namespace.helm_id' or only
168     namespace.helm_id for backward compatibility
169     If exists helm version can be helm-v3 or helm (helm-v2 old version)
170     """
171 1     version, _, part_id = ee_id.rpartition(":")
172 1     namespace, _, helm_id = part_id.rpartition(".")
173 1     return version, namespace, helm_id
174
175
176 1 class LcmBase:
177 1     def __init__(self, msg, logger):
178         """
179
180         :param db: database connection
181         """
182 1         self.db = Database().instance.db
183 1         self.msg = msg
184 1         self.fs = Filesystem().instance.fs
185 1         self.logger = logger
186
187 1     def update_db_2(self, item, _id, _desc):
188         """
189         Updates database with _desc information. If success _desc is cleared
190         :param item: collection
191         :param _id: the _id to use in the query filter
192         :param _desc: dictionary with the content to update. Keys are dot separated keys for
193         :return: None. Exception is raised on error
194         """
195 1         if not _desc:
196 1             return
197 1         now = time()
198 1         _desc["_admin.modified"] = now
199 1         self.db.set_one(item, {"_id": _id}, _desc)
200 1         _desc.clear()
201         # except DbException as e:
202         #     self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
203
204 1     @staticmethod
205 1     def calculate_charm_hash(zipped_file):
206         """Calculate the hash of charm files which ends with .charm
207
208         Args:
209             zipped_file (str): Existing charm package full path
210
211         Returns:
212             hex digest  (str): The hash of the charm file
213         """
214 1         filehash = hashlib.md5()
215 1         with open(zipped_file, mode="rb") as file:
216 1             contents = file.read()
217 1             filehash.update(contents)
218 1             return filehash.hexdigest()
219
220 1     @staticmethod
221 1     def compare_charm_hash(current_charm, target_charm):
222         """Compare the existing charm and the target charm if the charms
223         are given as zip files ends with .charm
224
225         Args:
226             current_charm (str): Existing charm package full path
227             target_charm  (str): Target charm package full path
228
229         Returns:
230             True/False (bool): if charm has changed it returns True
231         """
232 1         return LcmBase.calculate_charm_hash(
233             current_charm
234         ) != LcmBase.calculate_charm_hash(target_charm)
235
236 1     @staticmethod
237 1     def compare_charmdir_hash(current_charm_dir, target_charm_dir):
238         """Compare the existing charm and the target charm if the charms
239         are given as directories
240
241         Args:
242             current_charm_dir (str): Existing charm package directory path
243             target_charm_dir  (str): Target charm package directory path
244
245         Returns:
246             True/False (bool): if charm has changed it returns True
247         """
248 1         return checksumdir.dirhash(current_charm_dir) != checksumdir.dirhash(
249             target_charm_dir
250         )
251
252 1     def check_charm_hash_changed(
253         self, current_charm_path: str, target_charm_path: str
254     ) -> bool:
255         """Find the target charm has changed or not by checking the hash of
256         old and new charm packages
257
258         Args:
259             current_charm_path (str): Existing charm package artifact path
260             target_charm_path  (str): Target charm package artifact path
261
262         Returns:
263             True/False (bool): if charm has changed it returns True
264
265         """
266 1         try:
267             # Check if the charm artifacts are available
268 1             current_charm = self.fs.path + current_charm_path
269 1             target_charm = self.fs.path + target_charm_path
270
271 1             if os.path.exists(current_charm) and os.path.exists(target_charm):
272                 # Compare the hash of .charm files
273 1                 if current_charm.endswith(".charm"):
274 0                     return LcmBase.compare_charm_hash(current_charm, target_charm)
275
276                 # Compare the hash of charm folders
277 1                 return LcmBase.compare_charmdir_hash(current_charm, target_charm)
278
279             else:
280 1                 raise LcmException(
281                     "Charm artifact {} does not exist in the VNF Package".format(
282                         self.fs.path + target_charm_path
283                     )
284                 )
285 1         except (IOError, OSError, TypeError) as error:
286 0             self.logger.debug(traceback.format_exc())
287 0             self.logger.error(f"{error} occured while checking the charm hashes")
288 0             raise LcmException(error)
289
290 1     @staticmethod
291 1     def get_charm_name(charm_metadata_file: str) -> str:
292         """Get the charm name from metadata file.
293
294         Args:
295             charm_metadata_file    (str):  charm metadata file full path
296
297         Returns:
298             charm_name    (str):  charm name
299
300         """
301         # Read charm metadata.yaml to get the charm name
302 1         with open(charm_metadata_file, "r") as metadata_file:
303 1             content = yaml.safe_load(metadata_file)
304 1             charm_name = content["name"]
305 1             return str(charm_name)
306
307 1     def _get_charm_path(
308         self, nsd_package_path: str, nsd_package_name: str, charm_folder_name: str
309     ) -> str:
310         """Get the full path of charm folder.
311
312         Args:
313             nsd_package_path    (str):  NSD package full path
314             nsd_package_name    (str):  NSD package name
315             charm_folder_name   (str):  folder name
316
317         Returns:
318             charm_path    (str):  charm folder full path
319         """
320 1         charm_path = (
321             self.fs.path
322             + nsd_package_path
323             + "/"
324             + nsd_package_name
325             + "/charms/"
326             + charm_folder_name
327         )
328 1         return charm_path
329
330 1     def _get_charm_metadata_file(
331         self,
332         charm_folder_name: str,
333         nsd_package_path: str,
334         nsd_package_name: str,
335         charm_path: str = None,
336     ) -> str:
337         """Get the path of charm metadata file.
338
339         Args:
340             charm_folder_name   (str):  folder name
341             nsd_package_path    (str):  NSD package full path
342             nsd_package_name    (str):  NSD package name
343             charm_path  (str):  Charm full path
344
345         Returns:
346             charm_metadata_file_path    (str):  charm metadata file full path
347
348         """
349         # Locate the charm metadata.yaml
350 1         if charm_folder_name.endswith(".charm"):
351 1             extract_path = (
352                 self.fs.path
353                 + nsd_package_path
354                 + "/"
355                 + nsd_package_name
356                 + "/charms/"
357                 + charm_folder_name.replace(".charm", "")
358             )
359             # Extract .charm to extract path
360 1             with ZipFile(charm_path, "r") as zipfile:
361 1                 zipfile.extractall(extract_path)
362 1             return extract_path + "/metadata.yaml"
363         else:
364 1             return charm_path + "/metadata.yaml"
365
366 1     def find_charm_name(self, db_nsr: dict, charm_folder_name: str) -> str:
367         """Get the charm name from metadata.yaml of charm package.
368
369         Args:
370             db_nsr  (dict): NS record as a dictionary
371             charm_folder_name   (str): charm folder name
372
373         Returns:
374              charm_name (str):  charm name
375         """
376 1         try:
377 1             if not charm_folder_name:
378 1                 raise LcmException("charm_folder_name should be provided.")
379
380             # Find nsd_package details: path, name
381 1             revision = db_nsr.get("revision", "")
382
383             # Get the NSD package path
384 1             if revision:
385 0                 nsd_package_path = db_nsr["nsd-id"] + ":" + str(revision)
386 0                 db_nsd = self.db.get_one("nsds_revisions", {"_id": nsd_package_path})
387
388             else:
389 1                 nsd_package_path = db_nsr["nsd-id"]
390
391 1                 db_nsd = self.db.get_one("nsds", {"_id": nsd_package_path})
392
393             # Get the NSD package name
394 1             nsd_package_name = db_nsd["_admin"]["storage"]["pkg-dir"]
395
396             # Remove the existing nsd package and sync from FsMongo
397 1             shutil.rmtree(self.fs.path + nsd_package_path, ignore_errors=True)
398 1             self.fs.sync(from_path=nsd_package_path)
399
400             # Get the charm path
401 1             charm_path = self._get_charm_path(
402                 nsd_package_path, nsd_package_name, charm_folder_name
403             )
404
405             # Find charm metadata file full path
406 1             charm_metadata_file = self._get_charm_metadata_file(
407                 charm_folder_name, nsd_package_path, nsd_package_name, charm_path
408             )
409
410             # Return charm name
411 1             return self.get_charm_name(charm_metadata_file)
412
413 1         except (
414             yaml.YAMLError,
415             IOError,
416             FsException,
417             KeyError,
418             TypeError,
419             FileNotFoundError,
420             BadZipfile,
421         ) as error:
422 1             self.logger.debug(traceback.format_exc())
423 1             self.logger.error(f"{error} occured while getting the charm name")
424 1             raise LcmException(error)
425
426
427 1 class TaskRegistry(LcmBase):
428     """
429     Implements a registry of task needed for later cancelation, look for related tasks that must be completed before
430     etc. It stores a four level dict
431     First level is the topic, ns, vim_account, sdn
432     Second level is the _id
433     Third level is the operation id
434     Fourth level is a descriptive name, the value is the task class
435
436     The HA (High-Availability) methods are used when more than one LCM instance is running.
437     To register the current task in the external DB, use LcmBase as base class, to be able
438     to reuse LcmBase.update_db_2()
439     The DB registry uses the following fields to distinguish a task:
440     - op_type: operation type ("nslcmops" or "nsilcmops")
441     - op_id:   operation ID
442     - worker:  the worker ID for this process
443     """
444
445     # NS/NSI: "services" VIM/WIM/SDN: "accounts"
446 1     topic_service_list = ["ns", "nsi"]
447 1     topic_account_list = ["vim", "wim", "sdn", "k8scluster", "vca", "k8srepo"]
448
449     # Map topic to InstanceID
450 1     topic2instid_dict = {"ns": "nsInstanceId", "nsi": "netsliceInstanceId"}
451
452     # Map topic to DB table name
453 1     topic2dbtable_dict = {
454         "ns": "nslcmops",
455         "nsi": "nsilcmops",
456         "vim": "vim_accounts",
457         "wim": "wim_accounts",
458         "sdn": "sdns",
459         "k8scluster": "k8sclusters",
460         "vca": "vca",
461         "k8srepo": "k8srepos",
462     }
463
464 1     def __init__(self, worker_id=None, logger=None):
465 1         self.task_registry = {
466             "ns": {},
467             "nsi": {},
468             "vim_account": {},
469             "wim_account": {},
470             "sdn": {},
471             "k8scluster": {},
472             "vca": {},
473             "k8srepo": {},
474         }
475 1         self.worker_id = worker_id
476 1         self.db = Database().instance.db
477 1         self.logger = logger
478
479 1     def register(self, topic, _id, op_id, task_name, task):
480         """
481         Register a new task
482         :param topic: Can be "ns", "nsi", "vim_account", "sdn"
483         :param _id: _id of the related item
484         :param op_id: id of the operation of the related item
485         :param task_name: Task descriptive name, as create, instantiate, terminate. Must be unique in this op_id
486         :param task: Task class
487         :return: none
488         """
489 0         if _id not in self.task_registry[topic]:
490 0             self.task_registry[topic][_id] = OrderedDict()
491 0         if op_id not in self.task_registry[topic][_id]:
492 0             self.task_registry[topic][_id][op_id] = {task_name: task}
493         else:
494 0             self.task_registry[topic][_id][op_id][task_name] = task
495         # print("registering task", topic, _id, op_id, task_name, task)
496
497 1     def remove(self, topic, _id, op_id, task_name=None):
498         """
499         When task is ended, it should be removed. It ignores missing tasks. It also removes tasks done with this _id
500         :param topic: Can be "ns", "nsi", "vim_account", "sdn"
501         :param _id: _id of the related item
502         :param op_id: id of the operation of the related item
503         :param task_name: Task descriptive name. If none it deletes all tasks with same _id and op_id
504         :return: None
505         """
506 0         if not self.task_registry[topic].get(_id):
507 0             return
508 0         if not task_name:
509 0             self.task_registry[topic][_id].pop(op_id, None)
510 0         elif self.task_registry[topic][_id].get(op_id):
511 0             self.task_registry[topic][_id][op_id].pop(task_name, None)
512
513         # delete done tasks
514 0         for op_id_ in list(self.task_registry[topic][_id]):
515 0             for name, task in self.task_registry[topic][_id][op_id_].items():
516 0                 if not task.done():
517 0                     break
518             else:
519 0                 del self.task_registry[topic][_id][op_id_]
520 0         if not self.task_registry[topic][_id]:
521 0             del self.task_registry[topic][_id]
522
523 1     def lookfor_related(self, topic, _id, my_op_id=None):
524 0         task_list = []
525 0         task_name_list = []
526 0         if _id not in self.task_registry[topic]:
527 0             return "", task_name_list
528 0         for op_id in reversed(self.task_registry[topic][_id]):
529 0             if my_op_id:
530 0                 if my_op_id == op_id:
531 0                     my_op_id = None  # so that the next task is taken
532 0                 continue
533
534 0             for task_name, task in self.task_registry[topic][_id][op_id].items():
535 0                 if not task.done():
536 0                     task_list.append(task)
537 0                     task_name_list.append(task_name)
538 0             break
539 0         return ", ".join(task_name_list), task_list
540
541 1     def cancel(self, topic, _id, target_op_id=None, target_task_name=None):
542         """
543         Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
544         this is cancelled, and the same with task_name
545         """
546 0         if not self.task_registry[topic].get(_id):
547 0             return
548 0         for op_id in reversed(self.task_registry[topic][_id]):
549 0             if target_op_id and target_op_id != op_id:
550 0                 continue
551 0             for task_name, task in self.task_registry[topic][_id][op_id].items():
552 0                 if target_task_name and target_task_name != task_name:
553 0                     continue
554                 # result =
555 0                 task.cancel()
556                 # if result:
557                 #     self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name))
558
559     # Is topic NS/NSI?
560 1     def _is_service_type_HA(self, topic):
561 0         return topic in self.topic_service_list
562
563     # Is topic VIM/WIM/SDN?
564 1     def _is_account_type_HA(self, topic):
565 0         return topic in self.topic_account_list
566
567     # Input: op_id, example: 'abc123def:3' Output: account_id='abc123def', op_index=3
568 1     def _get_account_and_op_HA(self, op_id):
569 0         if not op_id:
570 0             return None, None
571 0         account_id, _, op_index = op_id.rpartition(":")
572 0         if not account_id or not op_index.isdigit():
573 0             return None, None
574 0         return account_id, op_index
575
576     # Get '_id' for any topic and operation
577 1     def _get_instance_id_HA(self, topic, op_type, op_id):
578 0         _id = None
579         # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
580 0         if op_type == "ANY":
581 0             _id = op_id
582         # NS/NSI: Use op_id as '_id'
583 0         elif self._is_service_type_HA(topic):
584 0             _id = op_id
585         # VIM/SDN/WIM/K8SCLUSTER: Split op_id to get Account ID and Operation Index, use Account ID as '_id'
586 0         elif self._is_account_type_HA(topic):
587 0             _id, _ = self._get_account_and_op_HA(op_id)
588 0         return _id
589
590     # Set DB _filter for querying any related process state
591 1     def _get_waitfor_filter_HA(self, db_lcmop, topic, op_type, op_id):
592 0         _filter = {}
593         # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
594         # In this special case, the timestamp is ignored
595 0         if op_type == "ANY":
596 0             _filter = {"operationState": "PROCESSING"}
597         # Otherwise, get 'startTime' timestamp for this operation
598         else:
599             # NS/NSI
600 0             if self._is_service_type_HA(topic):
601 0                 now = time()
602 0                 starttime_this_op = db_lcmop.get("startTime")
603 0                 instance_id_label = self.topic2instid_dict.get(topic)
604 0                 instance_id = db_lcmop.get(instance_id_label)
605 0                 _filter = {
606                     instance_id_label: instance_id,
607                     "operationState": "PROCESSING",
608                     "startTime.lt": starttime_this_op,
609                     "_admin.modified.gt": now
610                     - 2 * 3600,  # ignore if tow hours of inactivity
611                 }
612             # VIM/WIM/SDN/K8scluster
613 0             elif self._is_account_type_HA(topic):
614 0                 _, op_index = self._get_account_and_op_HA(op_id)
615 0                 _ops = db_lcmop["_admin"]["operations"]
616 0                 _this_op = _ops[int(op_index)]
617 0                 starttime_this_op = _this_op.get("startTime", None)
618 0                 _filter = {
619                     "operationState": "PROCESSING",
620                     "startTime.lt": starttime_this_op,
621                 }
622 0         return _filter
623
624     # Get DB params for any topic and operation
625 1     def _get_dbparams_for_lock_HA(self, topic, op_type, op_id):
626 0         q_filter = {}
627 0         update_dict = {}
628         # NS/NSI
629 0         if self._is_service_type_HA(topic):
630 0             q_filter = {"_id": op_id, "_admin.worker": None}
631 0             update_dict = {"_admin.worker": self.worker_id}
632         # VIM/WIM/SDN
633 0         elif self._is_account_type_HA(topic):
634 0             account_id, op_index = self._get_account_and_op_HA(op_id)
635 0             if not account_id:
636 0                 return None, None
637 0             if op_type == "create":
638                 # Creating a VIM/WIM/SDN account implies setting '_admin.current_operation' = 0
639 0                 op_index = 0
640 0             q_filter = {
641                 "_id": account_id,
642                 "_admin.operations.{}.worker".format(op_index): None,
643             }
644 0             update_dict = {
645                 "_admin.operations.{}.worker".format(op_index): self.worker_id,
646                 "_admin.current_operation": op_index,
647             }
648 0         return q_filter, update_dict
649
650 1     def lock_HA(self, topic, op_type, op_id):
651         """
652         Lock a task, if possible, to indicate to the HA system that
653         the task will be executed in this LCM instance.
654         :param topic: Can be "ns", "nsi", "vim", "wim", or "sdn"
655         :param op_type: Operation type, can be "nslcmops", "nsilcmops", "create", "edit", "delete"
656         :param op_id: NS, NSI: Operation ID  VIM,WIM,SDN: Account ID + ':' + Operation Index
657         :return:
658         True=lock was successful => execute the task (not registered by any other LCM instance)
659         False=lock failed => do NOT execute the task (already registered by another LCM instance)
660
661         HA tasks and backward compatibility:
662         If topic is "account type" (VIM/WIM/SDN) and op_id is None, 'op_id' was not provided by NBI.
663         This means that the running NBI instance does not support HA.
664         In such a case this method should always return True, to always execute
665         the task in this instance of LCM, without querying the DB.
666         """
667
668         # Backward compatibility for VIM/WIM/SDN/k8scluster without op_id
669 0         if self._is_account_type_HA(topic) and op_id is None:
670 0             return True
671
672         # Try to lock this task
673 0         db_table_name = self.topic2dbtable_dict[topic]
674 0         q_filter, update_dict = self._get_dbparams_for_lock_HA(topic, op_type, op_id)
675 0         db_lock_task = self.db.set_one(
676             db_table_name,
677             q_filter=q_filter,
678             update_dict=update_dict,
679             fail_on_empty=False,
680         )
681 0         if db_lock_task is None:
682 0             self.logger.debug(
683                 "Task {} operation={} already locked by another worker".format(
684                     topic, op_id
685                 )
686             )
687 0             return False
688         else:
689             # Set 'detailed-status' to 'In progress' for VIM/WIM/SDN operations
690 0             if self._is_account_type_HA(topic):
691 0                 detailed_status = "In progress"
692 0                 account_id, op_index = self._get_account_and_op_HA(op_id)
693 0                 q_filter = {"_id": account_id}
694 0                 update_dict = {
695                     "_admin.operations.{}.detailed-status".format(
696                         op_index
697                     ): detailed_status
698                 }
699 0                 self.db.set_one(
700                     db_table_name,
701                     q_filter=q_filter,
702                     update_dict=update_dict,
703                     fail_on_empty=False,
704                 )
705 0             return True
706
707 1     def unlock_HA(self, topic, op_type, op_id, operationState, detailed_status):
708         """
709         Register a task, done when finished a VIM/WIM/SDN 'create' operation.
710         :param topic: Can be "vim", "wim", or "sdn"
711         :param op_type: Operation type, can be "create", "edit", "delete"
712         :param op_id: Account ID + ':' + Operation Index
713         :return: nothing
714         """
715
716         # Backward compatibility
717 0         if not self._is_account_type_HA(topic) or not op_id:
718 0             return
719
720         # Get Account ID and Operation Index
721 0         account_id, op_index = self._get_account_and_op_HA(op_id)
722 0         db_table_name = self.topic2dbtable_dict[topic]
723
724         # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED)
725         # If the account exist, register the HA task.
726         # Update DB for HA tasks
727 0         q_filter = {"_id": account_id}
728 0         update_dict = {
729             "_admin.operations.{}.operationState".format(op_index): operationState,
730             "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
731             "_admin.operations.{}.worker".format(op_index): None,
732             "_admin.current_operation": None,
733         }
734 0         self.db.set_one(
735             db_table_name,
736             q_filter=q_filter,
737             update_dict=update_dict,
738             fail_on_empty=False,
739         )
740 0         return
741
742 1     async def waitfor_related_HA(self, topic, op_type, op_id=None):
743         """
744         Wait for any pending related HA tasks
745         """
746
747         # Backward compatibility
748 0         if not (
749             self._is_service_type_HA(topic) or self._is_account_type_HA(topic)
750         ) and (op_id is None):
751 0             return
752
753         # Get DB table name
754 0         db_table_name = self.topic2dbtable_dict.get(topic)
755
756         # Get instance ID
757 0         _id = self._get_instance_id_HA(topic, op_type, op_id)
758 0         _filter = {"_id": _id}
759 0         db_lcmop = self.db.get_one(db_table_name, _filter, fail_on_empty=False)
760 0         if not db_lcmop:
761 0             return
762
763         # Set DB _filter for querying any related process state
764 0         _filter = self._get_waitfor_filter_HA(db_lcmop, topic, op_type, op_id)
765
766         # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
767 0         timeout_wait_for_task = (
768             3600  # Max time (seconds) to wait for a related task to finish
769         )
770         # interval_wait_for_task = 30    #  A too long polling interval slows things down considerably
771 0         interval_wait_for_task = 10  # Interval in seconds for polling related tasks
772 0         time_left = timeout_wait_for_task
773 0         old_num_related_tasks = 0
774         while True:
775             # Get related tasks (operations within the same instance as this) which are
776             # still running (operationState='PROCESSING') and which were started before this task.
777             # In the case of op_type='ANY', get any related tasks with operationState='PROCESSING', ignore timestamps.
778 0             db_waitfor_related_task = self.db.get_list(db_table_name, q_filter=_filter)
779 0             new_num_related_tasks = len(db_waitfor_related_task)
780             # If there are no related tasks, there is nothing to wait for, so return.
781 0             if not new_num_related_tasks:
782 0                 return
783             # If number of pending related tasks have changed,
784             # update the 'detailed-status' field and log the change.
785             # Do NOT update the 'detailed-status' for SDNC-associated-to-VIM operations ('ANY').
786 0             if (op_type != "ANY") and (new_num_related_tasks != old_num_related_tasks):
787 0                 step = "Waiting for {} related tasks to be completed.".format(
788                     new_num_related_tasks
789                 )
790 0                 update_dict = {}
791 0                 q_filter = {"_id": _id}
792                 # NS/NSI
793 0                 if self._is_service_type_HA(topic):
794 0                     update_dict = {
795                         "detailed-status": step,
796                         "queuePosition": new_num_related_tasks,
797                     }
798                 # VIM/WIM/SDN
799 0                 elif self._is_account_type_HA(topic):
800 0                     _, op_index = self._get_account_and_op_HA(op_id)
801 0                     update_dict = {
802                         "_admin.operations.{}.detailed-status".format(op_index): step
803                     }
804 0                 self.logger.debug("Task {} operation={} {}".format(topic, _id, step))
805 0                 self.db.set_one(
806                     db_table_name,
807                     q_filter=q_filter,
808                     update_dict=update_dict,
809                     fail_on_empty=False,
810                 )
811 0                 old_num_related_tasks = new_num_related_tasks
812 0             time_left -= interval_wait_for_task
813 0             if time_left < 0:
814 0                 raise LcmException(
815                     "Timeout ({}) when waiting for related tasks to be completed".format(
816                         timeout_wait_for_task
817                     )
818                 )
819 0             await asyncio.sleep(interval_wait_for_task)
820
821 0         return