blob: 5cd5a2f54f39cafe8d2c59bd05dbe699f9cb66ae [file] [log] [blame]
tierno59d22d22018-09-25 18:10:19 +02001# -*- coding: utf-8 -*-
2
tierno2e215512018-11-28 09:37:52 +00003##
4# Copyright 2018 Telefonica S.A.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17##
tierno59d22d22018-09-25 18:10:19 +020018
kuused124bfe2019-06-18 12:09:24 +020019import asyncio
aticigdffa6212022-04-12 15:27:53 +030020import checksumdir
tierno59d22d22018-09-25 18:10:19 +020021from collections import OrderedDict
aticigdffa6212022-04-12 15:27:53 +030022import os
aticig9bc63ac2022-07-27 09:32:06 +030023import shutil
24import traceback
tierno79cd8ad2019-10-18 13:03:10 +000025from time import time
aticig9bc63ac2022-07-27 09:32:06 +030026
27from osm_common.fsbase import FsException
bravof922c4172020-11-24 21:21:43 -030028from osm_lcm.data_utils.database.database import Database
29from osm_lcm.data_utils.filesystem.filesystem import Filesystem
aticig9bc63ac2022-07-27 09:32:06 +030030import yaml
31from zipfile import ZipFile, BadZipfile
bravof922c4172020-11-24 21:21:43 -030032
tiernobaa51102018-12-14 13:16:18 +000033# from osm_common.dbbase import DbException
tierno59d22d22018-09-25 18:10:19 +020034
35__author__ = "Alfonso Tierno"
36
37
38class LcmException(Exception):
39 pass
40
41
tiernof578e552018-11-08 19:07:20 +010042class LcmExceptionNoMgmtIP(LcmException):
43 pass
44
45
gcalvinoed7f6d42018-12-14 14:44:56 +010046class LcmExceptionExit(LcmException):
47 pass
48
49
tierno59d22d22018-09-25 18:10:19 +020050def versiontuple(v):
tierno27246d82018-09-27 15:59:09 +020051 """utility for compare dot separate versions. Fills with zeros to proper number comparison
52 package version will be something like 4.0.1.post11+gb3f024d.dirty-1. Where 4.0.1 is the git tag, postXX is the
53 number of commits from this tag, and +XXXXXXX is the git commit short id. Total length is 16 with until 999 commits
54 """
tierno59d22d22018-09-25 18:10:19 +020055 filled = []
56 for point in v.split("."):
tiernoe64f7fb2019-09-11 08:55:52 +000057 point, _, _ = point.partition("+")
58 point, _, _ = point.partition("-")
59 filled.append(point.zfill(20))
tierno59d22d22018-09-25 18:10:19 +020060 return tuple(filled)
61
62
tierno744303e2020-01-13 16:46:31 +000063def deep_get(target_dict, key_list, default_value=None):
tierno626e0152019-11-29 14:16:16 +000064 """
65 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
66 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
67 :param target_dict: dictionary to be read
68 :param key_list: list of keys to read from target_dict
tierno744303e2020-01-13 16:46:31 +000069 :param default_value: value to return if key is not present in the nested dictionary
tierno626e0152019-11-29 14:16:16 +000070 :return: The wanted value if exist, None otherwise
71 """
72 for key in key_list:
73 if not isinstance(target_dict, dict) or key not in target_dict:
tierno744303e2020-01-13 16:46:31 +000074 return default_value
tierno626e0152019-11-29 14:16:16 +000075 target_dict = target_dict[key]
76 return target_dict
77
78
tierno744303e2020-01-13 16:46:31 +000079def get_iterable(in_dict, in_key):
80 """
81 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
82 :param in_dict: a dictionary
83 :param in_key: the key to look for at in_dict
84 :return: in_dict[in_var] or () if it is None or not present
85 """
86 if not in_dict.get(in_key):
87 return ()
88 return in_dict[in_key]
89
90
Gulsum Aticie7e0b752022-09-30 14:31:26 +030091def get_paas_id_by_nsr_id(nsr_id: str, db: object) -> str:
92 """Get the PaaS account ID using NS record ID.
93 Args:
94 nsr_id (str): NS record ID
95 db (object): Database Object
96
97 Returns:
98 paas_id (str) PaaS account ID
99 """
100 db_nsr = db.get_one("nsrs", {"_id": nsr_id})
101 return db_nsr.get("paasdatacenter")
102
103
104def get_paas_type_by_paas_id(paas_id: str, db: object) -> str:
105 """Get the PaaS type using PaaS account ID.
106 Args:
107 paas_id (str): PaaS account ID
108 db (object): Database Object
109
110 Returns:
111 paas_type (str) Paas Orchestrator type
112 """
113 db_paas = db.get_one("paas", {"_id": paas_id})
114 return db_paas["paas_type"]
115
116
aticigdffa6212022-04-12 15:27:53 +0300117def check_juju_bundle_existence(vnfd: dict) -> str:
118 """Checks the existence of juju-bundle in the descriptor
119
120 Args:
121 vnfd: Descriptor as a dictionary
122
123 Returns:
124 Juju bundle if dictionary has juju-bundle else None
125
126 """
127 if vnfd.get("vnfd"):
128 vnfd = vnfd["vnfd"]
129
130 for kdu in vnfd.get("kdu", []):
131 return kdu.get("juju-bundle", None)
132
133
134def get_charm_artifact_path(base_folder, charm_name, charm_type, revision=str()) -> str:
135 """Finds the charm artifact paths
136
137 Args:
138 base_folder: Main folder which will be looked up for charm
139 charm_name: Charm name
140 charm_type: Type of charm native_charm, lxc_proxy_charm or k8s_proxy_charm
141 revision: vnf package revision number if there is
142
143 Returns:
144 artifact_path: (str)
145
146 """
147 extension = ""
148 if revision:
149 extension = ":" + str(revision)
150
151 if base_folder.get("pkg-dir"):
152 artifact_path = "{}/{}/{}/{}".format(
aticigd7083542022-05-30 20:45:55 +0300153 base_folder["folder"].split(":")[0] + extension,
aticigdffa6212022-04-12 15:27:53 +0300154 base_folder["pkg-dir"],
155 "charms"
156 if charm_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
157 else "helm-charts",
158 charm_name,
159 )
160
161 else:
162 # For SOL004 packages
163 artifact_path = "{}/Scripts/{}/{}".format(
aticigd7083542022-05-30 20:45:55 +0300164 base_folder["folder"].split(":")[0] + extension,
aticigdffa6212022-04-12 15:27:53 +0300165 "charms"
166 if charm_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
167 else "helm-charts",
168 charm_name,
169 )
170
171 return artifact_path
172
173
tierno744303e2020-01-13 16:46:31 +0000174def populate_dict(target_dict, key_list, value):
175 """
176 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
177 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
178 :param target_dict: dictionary to be changed
179 :param key_list: list of keys to insert at target_dict
180 :param value:
181 :return: None
182 """
183 for key in key_list[0:-1]:
184 if key not in target_dict:
185 target_dict[key] = {}
186 target_dict = target_dict[key]
187 target_dict[key_list[-1]] = value
188
189
kuused124bfe2019-06-18 12:09:24 +0200190class LcmBase:
bravof922c4172020-11-24 21:21:43 -0300191 def __init__(self, msg, logger):
kuused124bfe2019-06-18 12:09:24 +0200192 """
193
194 :param db: database connection
195 """
bravof922c4172020-11-24 21:21:43 -0300196 self.db = Database().instance.db
kuused124bfe2019-06-18 12:09:24 +0200197 self.msg = msg
bravof922c4172020-11-24 21:21:43 -0300198 self.fs = Filesystem().instance.fs
kuused124bfe2019-06-18 12:09:24 +0200199 self.logger = logger
200
201 def update_db_2(self, item, _id, _desc):
202 """
203 Updates database with _desc information. If success _desc is cleared
Pedro Escaleirada21d262022-04-21 16:31:06 +0100204 :param item: collection
205 :param _id: the _id to use in the query filter
kuused124bfe2019-06-18 12:09:24 +0200206 :param _desc: dictionary with the content to update. Keys are dot separated keys for
207 :return: None. Exception is raised on error
208 """
209 if not _desc:
210 return
tierno79cd8ad2019-10-18 13:03:10 +0000211 now = time()
212 _desc["_admin.modified"] = now
kuused124bfe2019-06-18 12:09:24 +0200213 self.db.set_one(item, {"_id": _id}, _desc)
214 _desc.clear()
215 # except DbException as e:
216 # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
217
aticigdffa6212022-04-12 15:27:53 +0300218 def check_charm_hash_changed(
219 self, current_charm_path: str, target_charm_path: str
220 ) -> bool:
221 """Find the target charm has changed or not by checking the hash of
222 old and new charm packages
223
224 Args:
225 current_charm_path (str): Existing charm package artifact path
226 target_charm_path (str): Target charm package artifact path
227
228 Returns:
229 True/False (bool): if charm has changed it returns True
230
231 """
232 # Check if the charm artifacts are available
233 if os.path.exists(self.fs.path + current_charm_path) and os.path.exists(
234 self.fs.path + target_charm_path
235 ):
236 # Compare the hash of charm folders
237 if checksumdir.dirhash(
238 self.fs.path + current_charm_path
239 ) != checksumdir.dirhash(self.fs.path + target_charm_path):
240
241 return True
242
243 return False
244
245 else:
246 raise LcmException(
247 "Charm artifact {} does not exist in the VNF Package".format(
248 self.fs.path + target_charm_path
249 )
250 )
251
aticig9bc63ac2022-07-27 09:32:06 +0300252 @staticmethod
253 def get_charm_name(charm_metadata_file: str) -> str:
254 """Get the charm name from metadata file.
255
256 Args:
257 charm_metadata_file (str): charm metadata file full path
258
259 Returns:
260 charm_name (str): charm name
261
262 """
263 # Read charm metadata.yaml to get the charm name
264 with open(charm_metadata_file, "r") as metadata_file:
265 content = yaml.safe_load(metadata_file)
266 charm_name = content["name"]
267 return str(charm_name)
268
269 def _get_charm_path(
270 self, nsd_package_path: str, nsd_package_name: str, charm_folder_name: str
271 ) -> str:
272 """Get the full path of charm folder.
273
274 Args:
275 nsd_package_path (str): NSD package full path
276 nsd_package_name (str): NSD package name
277 charm_folder_name (str): folder name
278
279 Returns:
280 charm_path (str): charm folder full path
281 """
282 charm_path = (
283 self.fs.path
284 + nsd_package_path
285 + "/"
286 + nsd_package_name
287 + "/charms/"
288 + charm_folder_name
289 )
290 return charm_path
291
292 def _get_charm_metadata_file(
293 self,
294 charm_folder_name: str,
295 nsd_package_path: str,
296 nsd_package_name: str,
297 charm_path: str = None,
298 ) -> str:
299 """Get the path of charm metadata file.
300
301 Args:
302 charm_folder_name (str): folder name
303 nsd_package_path (str): NSD package full path
304 nsd_package_name (str): NSD package name
305 charm_path (str): Charm full path
306
307 Returns:
308 charm_metadata_file_path (str): charm metadata file full path
309
310 """
311 # Locate the charm metadata.yaml
312 if charm_folder_name.endswith(".charm"):
313 extract_path = (
314 self.fs.path
315 + nsd_package_path
316 + "/"
317 + nsd_package_name
318 + "/charms/"
aticiga37c6ff2022-08-20 20:56:19 +0300319 + charm_folder_name.replace(".charm", "")
aticig9bc63ac2022-07-27 09:32:06 +0300320 )
321 # Extract .charm to extract path
322 with ZipFile(charm_path, "r") as zipfile:
323 zipfile.extractall(extract_path)
324 return extract_path + "/metadata.yaml"
325 else:
326 return charm_path + "/metadata.yaml"
327
328 def find_charm_name(self, db_nsr: dict, charm_folder_name: str) -> str:
329 """Get the charm name from metadata.yaml of charm package.
330
331 Args:
332 db_nsr (dict): NS record as a dictionary
333 charm_folder_name (str): charm folder name
334
335 Returns:
336 charm_name (str): charm name
337 """
338 try:
339 if not charm_folder_name:
340 raise LcmException("charm_folder_name should be provided.")
341
342 # Find nsd_package details: path, name
343 revision = db_nsr.get("revision", "")
aticiga37c6ff2022-08-20 20:56:19 +0300344
345 # Get the NSD package path
346 if revision:
347
preethika.p28b0bf82022-09-23 07:36:28 +0000348 nsd_package_path = db_nsr["nsd-id"] + ":" + str(revision)
aticiga37c6ff2022-08-20 20:56:19 +0300349 db_nsd = self.db.get_one("nsds_revisions", {"_id": nsd_package_path})
350
351 else:
352 nsd_package_path = db_nsr["nsd-id"]
353
354 db_nsd = self.db.get_one("nsds", {"_id": nsd_package_path})
355
356 # Get the NSD package name
357 nsd_package_name = db_nsd["_admin"]["storage"]["pkg-dir"]
aticig9bc63ac2022-07-27 09:32:06 +0300358
359 # Remove the existing nsd package and sync from FsMongo
360 shutil.rmtree(self.fs.path + nsd_package_path, ignore_errors=True)
361 self.fs.sync(from_path=nsd_package_path)
362
363 # Get the charm path
364 charm_path = self._get_charm_path(
365 nsd_package_path, nsd_package_name, charm_folder_name
366 )
367
368 # Find charm metadata file full path
369 charm_metadata_file = self._get_charm_metadata_file(
370 charm_folder_name, nsd_package_path, nsd_package_name, charm_path
371 )
372
373 # Return charm name
374 return self.get_charm_name(charm_metadata_file)
375
376 except (
377 yaml.YAMLError,
378 IOError,
379 FsException,
380 KeyError,
381 TypeError,
382 FileNotFoundError,
383 BadZipfile,
384 ) as error:
385 self.logger.debug(traceback.format_exc())
386 self.logger.error(f"{error} occured while getting the charm name")
387 raise LcmException(error)
388
kuused124bfe2019-06-18 12:09:24 +0200389
390class TaskRegistry(LcmBase):
tierno59d22d22018-09-25 18:10:19 +0200391 """
392 Implements a registry of task needed for later cancelation, look for related tasks that must be completed before
393 etc. It stores a four level dict
394 First level is the topic, ns, vim_account, sdn
395 Second level is the _id
396 Third level is the operation id
397 Fourth level is a descriptive name, the value is the task class
kuused124bfe2019-06-18 12:09:24 +0200398
399 The HA (High-Availability) methods are used when more than one LCM instance is running.
400 To register the current task in the external DB, use LcmBase as base class, to be able
401 to reuse LcmBase.update_db_2()
402 The DB registry uses the following fields to distinguish a task:
403 - op_type: operation type ("nslcmops" or "nsilcmops")
404 - op_id: operation ID
405 - worker: the worker ID for this process
tierno59d22d22018-09-25 18:10:19 +0200406 """
407
Patricia Reinoso15ce47b2022-10-26 08:58:39 +0000408 # NS/NSI: "services" VIM/WIM/SDN/k8scluster/vca/PaaS/k8srepo: "accounts"
garciadeblas5697b8b2021-03-24 09:17:02 +0100409 topic_service_list = ["ns", "nsi"]
Patricia Reinoso15ce47b2022-10-26 08:58:39 +0000410 topic_account_list = ["vim", "wim", "sdn", "k8scluster", "vca", "paas", "k8srepo"]
kuuse6a470c62019-07-10 13:52:45 +0200411
412 # Map topic to InstanceID
garciadeblas5697b8b2021-03-24 09:17:02 +0100413 topic2instid_dict = {"ns": "nsInstanceId", "nsi": "netsliceInstanceId"}
kuuse6a470c62019-07-10 13:52:45 +0200414
415 # Map topic to DB table name
416 topic2dbtable_dict = {
garciadeblas5697b8b2021-03-24 09:17:02 +0100417 "ns": "nslcmops",
418 "nsi": "nsilcmops",
419 "vim": "vim_accounts",
420 "wim": "wim_accounts",
421 "sdn": "sdns",
422 "k8scluster": "k8sclusters",
423 "vca": "vca",
Patricia Reinoso15ce47b2022-10-26 08:58:39 +0000424 "paas": "paas",
garciadeblas5697b8b2021-03-24 09:17:02 +0100425 "k8srepo": "k8srepos",
426 }
kuused124bfe2019-06-18 12:09:24 +0200427
bravof922c4172020-11-24 21:21:43 -0300428 def __init__(self, worker_id=None, logger=None):
tierno59d22d22018-09-25 18:10:19 +0200429 self.task_registry = {
430 "ns": {},
Felipe Vicensc2033f22018-11-15 15:09:58 +0100431 "nsi": {},
tierno59d22d22018-09-25 18:10:19 +0200432 "vim_account": {},
tiernoe37b57d2018-12-11 17:22:51 +0000433 "wim_account": {},
tierno59d22d22018-09-25 18:10:19 +0200434 "sdn": {},
calvinosanch9f9c6f22019-11-04 13:37:39 +0100435 "k8scluster": {},
David Garciac1fe90a2021-03-31 19:12:02 +0200436 "vca": {},
Patricia Reinoso15ce47b2022-10-26 08:58:39 +0000437 "paas": {},
calvinosanch9f9c6f22019-11-04 13:37:39 +0100438 "k8srepo": {},
tierno59d22d22018-09-25 18:10:19 +0200439 }
kuused124bfe2019-06-18 12:09:24 +0200440 self.worker_id = worker_id
bravof922c4172020-11-24 21:21:43 -0300441 self.db = Database().instance.db
kuused124bfe2019-06-18 12:09:24 +0200442 self.logger = logger
tierno59d22d22018-09-25 18:10:19 +0200443
444 def register(self, topic, _id, op_id, task_name, task):
445 """
446 Register a new task
Patricia Reinoso15ce47b2022-10-26 08:58:39 +0000447 :param topic: Can be "ns", "nsi", "vim_account", "sdn", "paas"
tierno59d22d22018-09-25 18:10:19 +0200448 :param _id: _id of the related item
449 :param op_id: id of the operation of the related item
450 :param task_name: Task descriptive name, as create, instantiate, terminate. Must be unique in this op_id
451 :param task: Task class
452 :return: none
453 """
454 if _id not in self.task_registry[topic]:
455 self.task_registry[topic][_id] = OrderedDict()
456 if op_id not in self.task_registry[topic][_id]:
457 self.task_registry[topic][_id][op_id] = {task_name: task}
458 else:
459 self.task_registry[topic][_id][op_id][task_name] = task
460 # print("registering task", topic, _id, op_id, task_name, task)
461
462 def remove(self, topic, _id, op_id, task_name=None):
463 """
tiernobaa51102018-12-14 13:16:18 +0000464 When task is ended, it should be removed. It ignores missing tasks. It also removes tasks done with this _id
Felipe Vicensc2033f22018-11-15 15:09:58 +0100465 :param topic: Can be "ns", "nsi", "vim_account", "sdn"
tierno59d22d22018-09-25 18:10:19 +0200466 :param _id: _id of the related item
467 :param op_id: id of the operation of the related item
tiernobaa51102018-12-14 13:16:18 +0000468 :param task_name: Task descriptive name. If none it deletes all tasks with same _id and op_id
469 :return: None
tierno59d22d22018-09-25 18:10:19 +0200470 """
tiernobaa51102018-12-14 13:16:18 +0000471 if not self.task_registry[topic].get(_id):
tierno59d22d22018-09-25 18:10:19 +0200472 return
473 if not task_name:
tiernobaa51102018-12-14 13:16:18 +0000474 self.task_registry[topic][_id].pop(op_id, None)
475 elif self.task_registry[topic][_id].get(op_id):
476 self.task_registry[topic][_id][op_id].pop(task_name, None)
477
478 # delete done tasks
479 for op_id_ in list(self.task_registry[topic][_id]):
480 for name, task in self.task_registry[topic][_id][op_id_].items():
481 if not task.done():
482 break
483 else:
484 del self.task_registry[topic][_id][op_id_]
tierno59d22d22018-09-25 18:10:19 +0200485 if not self.task_registry[topic][_id]:
486 del self.task_registry[topic][_id]
487
488 def lookfor_related(self, topic, _id, my_op_id=None):
489 task_list = []
490 task_name_list = []
491 if _id not in self.task_registry[topic]:
492 return "", task_name_list
493 for op_id in reversed(self.task_registry[topic][_id]):
494 if my_op_id:
495 if my_op_id == op_id:
496 my_op_id = None # so that the next task is taken
497 continue
498
499 for task_name, task in self.task_registry[topic][_id][op_id].items():
tiernobaa51102018-12-14 13:16:18 +0000500 if not task.done():
501 task_list.append(task)
502 task_name_list.append(task_name)
tierno59d22d22018-09-25 18:10:19 +0200503 break
504 return ", ".join(task_name_list), task_list
505
506 def cancel(self, topic, _id, target_op_id=None, target_task_name=None):
507 """
kuused124bfe2019-06-18 12:09:24 +0200508 Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
Felipe Vicensc2033f22018-11-15 15:09:58 +0100509 this is cancelled, and the same with task_name
tierno59d22d22018-09-25 18:10:19 +0200510 """
511 if not self.task_registry[topic].get(_id):
512 return
513 for op_id in reversed(self.task_registry[topic][_id]):
514 if target_op_id and target_op_id != op_id:
515 continue
516 for task_name, task in self.task_registry[topic][_id][op_id].items():
517 if target_task_name and target_task_name != task_name:
518 continue
519 # result =
520 task.cancel()
521 # if result:
522 # self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name))
523
kuuse6a470c62019-07-10 13:52:45 +0200524 # Is topic NS/NSI?
525 def _is_service_type_HA(self, topic):
526 return topic in self.topic_service_list
527
528 # Is topic VIM/WIM/SDN?
529 def _is_account_type_HA(self, topic):
530 return topic in self.topic_account_list
531
532 # Input: op_id, example: 'abc123def:3' Output: account_id='abc123def', op_index=3
533 def _get_account_and_op_HA(self, op_id):
534 if not op_id:
tiernofa076c32020-08-13 14:25:47 +0000535 return None, None
garciadeblas5697b8b2021-03-24 09:17:02 +0100536 account_id, _, op_index = op_id.rpartition(":")
tiernofa076c32020-08-13 14:25:47 +0000537 if not account_id or not op_index.isdigit():
538 return None, None
kuuse6a470c62019-07-10 13:52:45 +0200539 return account_id, op_index
540
541 # Get '_id' for any topic and operation
542 def _get_instance_id_HA(self, topic, op_type, op_id):
543 _id = None
544 # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
garciadeblas5697b8b2021-03-24 09:17:02 +0100545 if op_type == "ANY":
kuuse6a470c62019-07-10 13:52:45 +0200546 _id = op_id
547 # NS/NSI: Use op_id as '_id'
548 elif self._is_service_type_HA(topic):
549 _id = op_id
calvinosanch9f9c6f22019-11-04 13:37:39 +0100550 # VIM/SDN/WIM/K8SCLUSTER: Split op_id to get Account ID and Operation Index, use Account ID as '_id'
kuuse6a470c62019-07-10 13:52:45 +0200551 elif self._is_account_type_HA(topic):
552 _id, _ = self._get_account_and_op_HA(op_id)
553 return _id
554
555 # Set DB _filter for querying any related process state
556 def _get_waitfor_filter_HA(self, db_lcmop, topic, op_type, op_id):
557 _filter = {}
558 # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
559 # In this special case, the timestamp is ignored
garciadeblas5697b8b2021-03-24 09:17:02 +0100560 if op_type == "ANY":
561 _filter = {"operationState": "PROCESSING"}
kuuse6a470c62019-07-10 13:52:45 +0200562 # Otherwise, get 'startTime' timestamp for this operation
563 else:
564 # NS/NSI
565 if self._is_service_type_HA(topic):
tierno79cd8ad2019-10-18 13:03:10 +0000566 now = time()
kuuse6a470c62019-07-10 13:52:45 +0200567 starttime_this_op = db_lcmop.get("startTime")
568 instance_id_label = self.topic2instid_dict.get(topic)
569 instance_id = db_lcmop.get(instance_id_label)
garciadeblas5697b8b2021-03-24 09:17:02 +0100570 _filter = {
571 instance_id_label: instance_id,
572 "operationState": "PROCESSING",
573 "startTime.lt": starttime_this_op,
574 "_admin.modified.gt": now
575 - 2 * 3600, # ignore if tow hours of inactivity
576 }
calvinosanch9f9c6f22019-11-04 13:37:39 +0100577 # VIM/WIM/SDN/K8scluster
kuuse6a470c62019-07-10 13:52:45 +0200578 elif self._is_account_type_HA(topic):
579 _, op_index = self._get_account_and_op_HA(op_id)
garciadeblas5697b8b2021-03-24 09:17:02 +0100580 _ops = db_lcmop["_admin"]["operations"]
kuuse6a470c62019-07-10 13:52:45 +0200581 _this_op = _ops[int(op_index)]
garciadeblas5697b8b2021-03-24 09:17:02 +0100582 starttime_this_op = _this_op.get("startTime", None)
583 _filter = {
584 "operationState": "PROCESSING",
585 "startTime.lt": starttime_this_op,
586 }
kuuse6a470c62019-07-10 13:52:45 +0200587 return _filter
588
589 # Get DB params for any topic and operation
590 def _get_dbparams_for_lock_HA(self, topic, op_type, op_id):
591 q_filter = {}
592 update_dict = {}
593 # NS/NSI
594 if self._is_service_type_HA(topic):
garciadeblas5697b8b2021-03-24 09:17:02 +0100595 q_filter = {"_id": op_id, "_admin.worker": None}
596 update_dict = {"_admin.worker": self.worker_id}
kuuse6a470c62019-07-10 13:52:45 +0200597 # VIM/WIM/SDN
598 elif self._is_account_type_HA(topic):
599 account_id, op_index = self._get_account_and_op_HA(op_id)
600 if not account_id:
601 return None, None
garciadeblas5697b8b2021-03-24 09:17:02 +0100602 if op_type == "create":
kuuse6a470c62019-07-10 13:52:45 +0200603 # Creating a VIM/WIM/SDN account implies setting '_admin.current_operation' = 0
604 op_index = 0
garciadeblas5697b8b2021-03-24 09:17:02 +0100605 q_filter = {
606 "_id": account_id,
607 "_admin.operations.{}.worker".format(op_index): None,
608 }
609 update_dict = {
610 "_admin.operations.{}.worker".format(op_index): self.worker_id,
611 "_admin.current_operation": op_index,
612 }
kuuse6a470c62019-07-10 13:52:45 +0200613 return q_filter, update_dict
614
kuused124bfe2019-06-18 12:09:24 +0200615 def lock_HA(self, topic, op_type, op_id):
616 """
kuuse6a470c62019-07-10 13:52:45 +0200617 Lock a task, if possible, to indicate to the HA system that
kuused124bfe2019-06-18 12:09:24 +0200618 the task will be executed in this LCM instance.
Patricia Reinoso15ce47b2022-10-26 08:58:39 +0000619 :param topic: Can be "ns", "nsi", "vim", "wim", "paas" or "sdn"
kuuse6a470c62019-07-10 13:52:45 +0200620 :param op_type: Operation type, can be "nslcmops", "nsilcmops", "create", "edit", "delete"
Patricia Reinoso15ce47b2022-10-26 08:58:39 +0000621 :param op_id: NS, NSI: Operation ID VIM,WIM,SDN,PaaS: Account ID + ':' + Operation Index
kuused124bfe2019-06-18 12:09:24 +0200622 :return:
kuuse6a470c62019-07-10 13:52:45 +0200623 True=lock was successful => execute the task (not registered by any other LCM instance)
kuused124bfe2019-06-18 12:09:24 +0200624 False=lock failed => do NOT execute the task (already registered by another LCM instance)
kuuse6a470c62019-07-10 13:52:45 +0200625
626 HA tasks and backward compatibility:
Patricia Reinoso15ce47b2022-10-26 08:58:39 +0000627 If topic is "account type" (VIM/WIM/SDN/PaaS) and op_id is None, 'op_id' was not provided by NBI.
kuuse6a470c62019-07-10 13:52:45 +0200628 This means that the running NBI instance does not support HA.
629 In such a case this method should always return True, to always execute
630 the task in this instance of LCM, without querying the DB.
tierno59d22d22018-09-25 18:10:19 +0200631 """
632
Patricia Reinoso15ce47b2022-10-26 08:58:39 +0000633 # Backward compatibility for VIM/WIM/SDN/k8scluster/PaaS without op_id
kuuse6a470c62019-07-10 13:52:45 +0200634 if self._is_account_type_HA(topic) and op_id is None:
635 return True
tierno59d22d22018-09-25 18:10:19 +0200636
kuuse6a470c62019-07-10 13:52:45 +0200637 # Try to lock this task
tiernofa076c32020-08-13 14:25:47 +0000638 db_table_name = self.topic2dbtable_dict[topic]
kuuse6a470c62019-07-10 13:52:45 +0200639 q_filter, update_dict = self._get_dbparams_for_lock_HA(topic, op_type, op_id)
garciadeblas5697b8b2021-03-24 09:17:02 +0100640 db_lock_task = self.db.set_one(
641 db_table_name,
642 q_filter=q_filter,
643 update_dict=update_dict,
644 fail_on_empty=False,
645 )
kuused124bfe2019-06-18 12:09:24 +0200646 if db_lock_task is None:
garciadeblas5697b8b2021-03-24 09:17:02 +0100647 self.logger.debug(
648 "Task {} operation={} already locked by another worker".format(
649 topic, op_id
650 )
651 )
kuused124bfe2019-06-18 12:09:24 +0200652 return False
653 else:
kuuse6a470c62019-07-10 13:52:45 +0200654 # Set 'detailed-status' to 'In progress' for VIM/WIM/SDN operations
655 if self._is_account_type_HA(topic):
garciadeblas5697b8b2021-03-24 09:17:02 +0100656 detailed_status = "In progress"
kuuse6a470c62019-07-10 13:52:45 +0200657 account_id, op_index = self._get_account_and_op_HA(op_id)
garciadeblas5697b8b2021-03-24 09:17:02 +0100658 q_filter = {"_id": account_id}
659 update_dict = {
660 "_admin.operations.{}.detailed-status".format(
661 op_index
662 ): detailed_status
663 }
664 self.db.set_one(
665 db_table_name,
666 q_filter=q_filter,
667 update_dict=update_dict,
668 fail_on_empty=False,
669 )
kuused124bfe2019-06-18 12:09:24 +0200670 return True
671
tiernofa076c32020-08-13 14:25:47 +0000672 def unlock_HA(self, topic, op_type, op_id, operationState, detailed_status):
kuuse6a470c62019-07-10 13:52:45 +0200673 """
674 Register a task, done when finished a VIM/WIM/SDN 'create' operation.
675 :param topic: Can be "vim", "wim", or "sdn"
676 :param op_type: Operation type, can be "create", "edit", "delete"
677 :param op_id: Account ID + ':' + Operation Index
678 :return: nothing
679 """
680
681 # Backward compatibility
tiernofa076c32020-08-13 14:25:47 +0000682 if not self._is_account_type_HA(topic) or not op_id:
kuuse6a470c62019-07-10 13:52:45 +0200683 return
684
685 # Get Account ID and Operation Index
686 account_id, op_index = self._get_account_and_op_HA(op_id)
tiernofa076c32020-08-13 14:25:47 +0000687 db_table_name = self.topic2dbtable_dict[topic]
kuuse6a470c62019-07-10 13:52:45 +0200688
689 # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED)
690 # If the account exist, register the HA task.
691 # Update DB for HA tasks
garciadeblas5697b8b2021-03-24 09:17:02 +0100692 q_filter = {"_id": account_id}
693 update_dict = {
694 "_admin.operations.{}.operationState".format(op_index): operationState,
695 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
696 "_admin.operations.{}.worker".format(op_index): None,
697 "_admin.current_operation": None,
698 }
699 self.db.set_one(
700 db_table_name,
701 q_filter=q_filter,
702 update_dict=update_dict,
703 fail_on_empty=False,
704 )
kuuse6a470c62019-07-10 13:52:45 +0200705 return
706
kuused124bfe2019-06-18 12:09:24 +0200707 async def waitfor_related_HA(self, topic, op_type, op_id=None):
tierno59d22d22018-09-25 18:10:19 +0200708 """
kuused124bfe2019-06-18 12:09:24 +0200709 Wait for any pending related HA tasks
tierno59d22d22018-09-25 18:10:19 +0200710 """
kuused124bfe2019-06-18 12:09:24 +0200711
kuuse6a470c62019-07-10 13:52:45 +0200712 # Backward compatibility
garciadeblas5697b8b2021-03-24 09:17:02 +0100713 if not (
714 self._is_service_type_HA(topic) or self._is_account_type_HA(topic)
715 ) and (op_id is None):
kuuse6a470c62019-07-10 13:52:45 +0200716 return
kuused124bfe2019-06-18 12:09:24 +0200717
kuuse6a470c62019-07-10 13:52:45 +0200718 # Get DB table name
719 db_table_name = self.topic2dbtable_dict.get(topic)
720
721 # Get instance ID
722 _id = self._get_instance_id_HA(topic, op_type, op_id)
723 _filter = {"_id": _id}
garciadeblas5697b8b2021-03-24 09:17:02 +0100724 db_lcmop = self.db.get_one(db_table_name, _filter, fail_on_empty=False)
kuused124bfe2019-06-18 12:09:24 +0200725 if not db_lcmop:
tierno59d22d22018-09-25 18:10:19 +0200726 return
kuuse6a470c62019-07-10 13:52:45 +0200727
728 # Set DB _filter for querying any related process state
729 _filter = self._get_waitfor_filter_HA(db_lcmop, topic, op_type, op_id)
kuused124bfe2019-06-18 12:09:24 +0200730
731 # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
garciadeblas5697b8b2021-03-24 09:17:02 +0100732 timeout_wait_for_task = (
733 3600 # Max time (seconds) to wait for a related task to finish
734 )
kuused124bfe2019-06-18 12:09:24 +0200735 # interval_wait_for_task = 30 # A too long polling interval slows things down considerably
garciadeblas5697b8b2021-03-24 09:17:02 +0100736 interval_wait_for_task = 10 # Interval in seconds for polling related tasks
kuused124bfe2019-06-18 12:09:24 +0200737 time_left = timeout_wait_for_task
738 old_num_related_tasks = 0
739 while True:
kuuse6a470c62019-07-10 13:52:45 +0200740 # Get related tasks (operations within the same instance as this) which are
kuused124bfe2019-06-18 12:09:24 +0200741 # still running (operationState='PROCESSING') and which were started before this task.
kuuse6a470c62019-07-10 13:52:45 +0200742 # In the case of op_type='ANY', get any related tasks with operationState='PROCESSING', ignore timestamps.
garciadeblas5697b8b2021-03-24 09:17:02 +0100743 db_waitfor_related_task = self.db.get_list(db_table_name, q_filter=_filter)
kuused124bfe2019-06-18 12:09:24 +0200744 new_num_related_tasks = len(db_waitfor_related_task)
kuuse6a470c62019-07-10 13:52:45 +0200745 # If there are no related tasks, there is nothing to wait for, so return.
kuused124bfe2019-06-18 12:09:24 +0200746 if not new_num_related_tasks:
kuused124bfe2019-06-18 12:09:24 +0200747 return
748 # If number of pending related tasks have changed,
749 # update the 'detailed-status' field and log the change.
kuuse6a470c62019-07-10 13:52:45 +0200750 # Do NOT update the 'detailed-status' for SDNC-associated-to-VIM operations ('ANY').
garciadeblas5697b8b2021-03-24 09:17:02 +0100751 if (op_type != "ANY") and (new_num_related_tasks != old_num_related_tasks):
752 step = "Waiting for {} related tasks to be completed.".format(
753 new_num_related_tasks
754 )
kuuse6a470c62019-07-10 13:52:45 +0200755 update_dict = {}
garciadeblas5697b8b2021-03-24 09:17:02 +0100756 q_filter = {"_id": _id}
kuuse6a470c62019-07-10 13:52:45 +0200757 # NS/NSI
758 if self._is_service_type_HA(topic):
garciadeblas5697b8b2021-03-24 09:17:02 +0100759 update_dict = {
760 "detailed-status": step,
761 "queuePosition": new_num_related_tasks,
762 }
kuuse6a470c62019-07-10 13:52:45 +0200763 # VIM/WIM/SDN
764 elif self._is_account_type_HA(topic):
765 _, op_index = self._get_account_and_op_HA(op_id)
garciadeblas5697b8b2021-03-24 09:17:02 +0100766 update_dict = {
767 "_admin.operations.{}.detailed-status".format(op_index): step
768 }
kuuse6a470c62019-07-10 13:52:45 +0200769 self.logger.debug("Task {} operation={} {}".format(topic, _id, step))
garciadeblas5697b8b2021-03-24 09:17:02 +0100770 self.db.set_one(
771 db_table_name,
772 q_filter=q_filter,
773 update_dict=update_dict,
774 fail_on_empty=False,
775 )
kuused124bfe2019-06-18 12:09:24 +0200776 old_num_related_tasks = new_num_related_tasks
777 time_left -= interval_wait_for_task
778 if time_left < 0:
779 raise LcmException(
780 "Timeout ({}) when waiting for related tasks to be completed".format(
garciadeblas5697b8b2021-03-24 09:17:02 +0100781 timeout_wait_for_task
782 )
783 )
kuused124bfe2019-06-18 12:09:24 +0200784 await asyncio.sleep(interval_wait_for_task)
785
786 return