Code Coverage

Cobertura Coverage Report > NG-RO.osm_ng_ro >

ns.py

Trend

File Coverage summary

NameClassesLinesConditionals
ns.py
100%
1/1
43%
517/1189
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
ns.py
43%
517/1189
N/A

Source

NG-RO/osm_ng_ro/ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #    http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 ##
18
19 1 from http import HTTPStatus
20 1 from itertools import product
21 1 import logging
22 1 from random import choice as random_choice
23 1 from threading import Lock
24 1 from time import time
25 1 from traceback import format_exc as traceback_format_exc
26 1 from typing import Any, Dict, List, Optional, Tuple, Type
27 1 from uuid import uuid4
28
29 1 from cryptography.hazmat.backends import default_backend as crypto_default_backend
30 1 from cryptography.hazmat.primitives import serialization as crypto_serialization
31 1 from cryptography.hazmat.primitives.asymmetric import rsa
32 1 from jinja2 import (
33     Environment,
34     select_autoescape,
35     StrictUndefined,
36     TemplateError,
37     TemplateNotFound,
38     UndefinedError,
39 )
40 1 from osm_common import (
41     dbmemory,
42     dbmongo,
43     fslocal,
44     fsmongo,
45     msgkafka,
46     msglocal,
47     version as common_version,
48 )
49 1 from osm_common.dbbase import DbBase, DbException
50 1 from osm_common.fsbase import FsBase, FsException
51 1 from osm_common.msgbase import MsgException
52 1 from osm_ng_ro.ns_thread import deep_get, NsWorker, NsWorkerException
53 1 from osm_ng_ro.validation import deploy_schema, validate_input
54 1 import yaml
55
56 1 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
57 1 min_common_version = "0.1.16"
58
59
60 1 class NsException(Exception):
61 1     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
62 1         self.http_code = http_code
63 1         super(Exception, self).__init__(message)
64
65
66 1 def get_process_id():
67     """
68     Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
69     will provide a random one
70     :return: Obtained ID
71     """
72     # Try getting docker id. If fails, get pid
73 0     try:
74 0         with open("/proc/self/cgroup", "r") as f:
75 0             text_id_ = f.readline()
76 0             _, _, text_id = text_id_.rpartition("/")
77 0             text_id = text_id.replace("\n", "")[:12]
78
79 0             if text_id:
80 0                 return text_id
81 0     except Exception as error:
82 0         logging.exception(f"{error} occured while getting process id")
83
84     # Return a random id
85 0     return "".join(random_choice("0123456789abcdef") for _ in range(12))
86
87
88 1 def versiontuple(v):
89     """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
90 0     filled = []
91
92 0     for point in v.split("."):
93 0         filled.append(point.zfill(8))
94
95 0     return tuple(filled)
96
97
98 1 class Ns(object):
99 1     def __init__(self):
100 1         self.db = None
101 1         self.fs = None
102 1         self.msg = None
103 1         self.config = None
104         # self.operations = None
105 1         self.logger = None
106         # ^ Getting logger inside method self.start because parent logger (ro) is not available yet.
107         # If done now it will not be linked to parent not getting its handler and level
108 1         self.map_topic = {}
109 1         self.write_lock = None
110 1         self.vims_assigned = {}
111 1         self.next_worker = 0
112 1         self.plugins = {}
113 1         self.workers = []
114 1         self.process_params_function_map = {
115             "net": Ns._process_net_params,
116             "image": Ns._process_image_params,
117             "flavor": Ns._process_flavor_params,
118             "vdu": Ns._process_vdu_params,
119             "affinity-or-anti-affinity-group": Ns._process_affinity_group_params,
120         }
121 1         self.db_path_map = {
122             "net": "vld",
123             "image": "image",
124             "flavor": "flavor",
125             "vdu": "vdur",
126             "affinity-or-anti-affinity-group": "affinity-or-anti-affinity-group",
127         }
128
129 1     def init_db(self, target_version):
130 0         pass
131
132 1     def start(self, config):
133         """
134         Connect to database, filesystem storage, and messaging
135         :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
136         :param config: Configuration of db, storage, etc
137         :return: None
138         """
139 0         self.config = config
140 0         self.config["process_id"] = get_process_id()  # used for HA identity
141 0         self.logger = logging.getLogger("ro.ns")
142
143         # check right version of common
144 0         if versiontuple(common_version) < versiontuple(min_common_version):
145 0             raise NsException(
146                 "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
147                     common_version, min_common_version
148                 )
149             )
150
151 0         try:
152 0             if not self.db:
153 0                 if config["database"]["driver"] == "mongo":
154 0                     self.db = dbmongo.DbMongo()
155 0                     self.db.db_connect(config["database"])
156 0                 elif config["database"]["driver"] == "memory":
157 0                     self.db = dbmemory.DbMemory()
158 0                     self.db.db_connect(config["database"])
159                 else:
160 0                     raise NsException(
161                         "Invalid configuration param '{}' at '[database]':'driver'".format(
162                             config["database"]["driver"]
163                         )
164                     )
165
166 0             if not self.fs:
167 0                 if config["storage"]["driver"] == "local":
168 0                     self.fs = fslocal.FsLocal()
169 0                     self.fs.fs_connect(config["storage"])
170 0                 elif config["storage"]["driver"] == "mongo":
171 0                     self.fs = fsmongo.FsMongo()
172 0                     self.fs.fs_connect(config["storage"])
173 0                 elif config["storage"]["driver"] is None:
174 0                     pass
175                 else:
176 0                     raise NsException(
177                         "Invalid configuration param '{}' at '[storage]':'driver'".format(
178                             config["storage"]["driver"]
179                         )
180                     )
181
182 0             if not self.msg:
183 0                 if config["message"]["driver"] == "local":
184 0                     self.msg = msglocal.MsgLocal()
185 0                     self.msg.connect(config["message"])
186 0                 elif config["message"]["driver"] == "kafka":
187 0                     self.msg = msgkafka.MsgKafka()
188 0                     self.msg.connect(config["message"])
189                 else:
190 0                     raise NsException(
191                         "Invalid configuration param '{}' at '[message]':'driver'".format(
192                             config["message"]["driver"]
193                         )
194                     )
195
196             # TODO load workers to deal with exising database tasks
197
198 0             self.write_lock = Lock()
199 0         except (DbException, FsException, MsgException) as e:
200 0             raise NsException(str(e), http_code=e.http_code)
201
202 1     def get_assigned_vims(self):
203 0         return list(self.vims_assigned.keys())
204
205 1     def stop(self):
206 0         try:
207 0             if self.db:
208 0                 self.db.db_disconnect()
209
210 0             if self.fs:
211 0                 self.fs.fs_disconnect()
212
213 0             if self.msg:
214 0                 self.msg.disconnect()
215
216 0             self.write_lock = None
217 0         except (DbException, FsException, MsgException) as e:
218 0             raise NsException(str(e), http_code=e.http_code)
219
220 0         for worker in self.workers:
221 0             worker.insert_task(("terminate",))
222
223 1     def _create_worker(self):
224         """
225         Look for a worker thread in idle status. If not found it creates one unless the number of threads reach the
226         limit of 'server.ns_threads' configuration. If reached, it just assigns one existing thread
227         return the index of the assigned worker thread. Worker threads are storead at self.workers
228         """
229         # Look for a thread in idle status
230 0         worker_id = next(
231             (
232                 i
233                 for i in range(len(self.workers))
234                 if self.workers[i] and self.workers[i].idle
235             ),
236             None,
237         )
238
239 0         if worker_id is not None:
240             # unset idle status to avoid race conditions
241 0             self.workers[worker_id].idle = False
242         else:
243 0             worker_id = len(self.workers)
244
245 0             if worker_id < self.config["global"]["server.ns_threads"]:
246                 # create a new worker
247 0                 self.workers.append(
248                     NsWorker(worker_id, self.config, self.plugins, self.db)
249                 )
250 0                 self.workers[worker_id].start()
251             else:
252                 # reached maximum number of threads, assign VIM to an existing one
253 0                 worker_id = self.next_worker
254 0                 self.next_worker = (self.next_worker + 1) % self.config["global"][
255                     "server.ns_threads"
256                 ]
257
258 0         return worker_id
259
260 1     def assign_vim(self, target_id):
261 0         with self.write_lock:
262 0             return self._assign_vim(target_id)
263
264 1     def _assign_vim(self, target_id):
265 0         if target_id not in self.vims_assigned:
266 0             worker_id = self.vims_assigned[target_id] = self._create_worker()
267 0             self.workers[worker_id].insert_task(("load_vim", target_id))
268
269 1     def reload_vim(self, target_id):
270         # send reload_vim to the thread working with this VIM and inform all that a VIM has been changed,
271         # this is because database VIM information is cached for threads working with SDN
272 0         with self.write_lock:
273 0             for worker in self.workers:
274 0                 if worker and not worker.idle:
275 0                     worker.insert_task(("reload_vim", target_id))
276
277 1     def unload_vim(self, target_id):
278 0         with self.write_lock:
279 0             return self._unload_vim(target_id)
280
281 1     def _unload_vim(self, target_id):
282 0         if target_id in self.vims_assigned:
283 0             worker_id = self.vims_assigned[target_id]
284 0             self.workers[worker_id].insert_task(("unload_vim", target_id))
285 0             del self.vims_assigned[target_id]
286
287 1     def check_vim(self, target_id):
288 0         with self.write_lock:
289 0             if target_id in self.vims_assigned:
290 0                 worker_id = self.vims_assigned[target_id]
291             else:
292 0                 worker_id = self._create_worker()
293
294 0         worker = self.workers[worker_id]
295 0         worker.insert_task(("check_vim", target_id))
296
297 1     def unload_unused_vims(self):
298 0         with self.write_lock:
299 0             vims_to_unload = []
300
301 0             for target_id in self.vims_assigned:
302 0                 if not self.db.get_one(
303                     "ro_tasks",
304                     q_filter={
305                         "target_id": target_id,
306                         "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
307                     },
308                     fail_on_empty=False,
309                 ):
310 0                     vims_to_unload.append(target_id)
311
312 0             for target_id in vims_to_unload:
313 0                 self._unload_vim(target_id)
314
315 1     @staticmethod
316 1     def _get_cloud_init(
317         db: Type[DbBase],
318         fs: Type[FsBase],
319         location: str,
320     ) -> str:
321         """This method reads cloud init from a file.
322
323         Note: Not used as cloud init content is provided in the http body.
324
325         Args:
326             db (Type[DbBase]): [description]
327             fs (Type[FsBase]): [description]
328             location (str): can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
329
330         Raises:
331             NsException: [description]
332             NsException: [description]
333
334         Returns:
335             str: [description]
336         """
337 1         vnfd_id, _, other = location.partition(":")
338 1         _type, _, name = other.partition(":")
339 1         vnfd = db.get_one("vnfds", {"_id": vnfd_id})
340
341 1         if _type == "file":
342 1             base_folder = vnfd["_admin"]["storage"]
343 1             cloud_init_file = "{}/{}/cloud_init/{}".format(
344                 base_folder["folder"], base_folder["pkg-dir"], name
345             )
346
347 1             if not fs:
348 1                 raise NsException(
349                     "Cannot read file '{}'. Filesystem not loaded, change configuration at storage.driver".format(
350                         cloud_init_file
351                     )
352                 )
353
354 1             with fs.file_open(cloud_init_file, "r") as ci_file:
355 1                 cloud_init_content = ci_file.read()
356 1         elif _type == "vdu":
357 1             cloud_init_content = vnfd["vdu"][int(name)]["cloud-init"]
358         else:
359 1             raise NsException("Mismatch descriptor for cloud init: {}".format(location))
360
361 1         return cloud_init_content
362
363 1     @staticmethod
364 1     def _parse_jinja2(
365         cloud_init_content: str,
366         params: Dict[str, Any],
367         context: str,
368     ) -> str:
369         """Function that processes the cloud init to replace Jinja2 encoded parameters.
370
371         Args:
372             cloud_init_content (str): [description]
373             params (Dict[str, Any]): [description]
374             context (str): [description]
375
376         Raises:
377             NsException: [description]
378             NsException: [description]
379
380         Returns:
381             str: [description]
382         """
383 1         try:
384 1             env = Environment(
385                 undefined=StrictUndefined,
386                 autoescape=select_autoescape(default_for_string=True, default=True),
387             )
388 1             template = env.from_string(cloud_init_content)
389
390 1             return template.render(params or {})
391 1         except UndefinedError as e:
392 1             raise NsException(
393                 "Variable '{}' defined at vnfd='{}' must be provided in the instantiation parameters"
394                 "inside the 'additionalParamsForVnf' block".format(e, context)
395             )
396 1         except (TemplateError, TemplateNotFound) as e:
397 1             raise NsException(
398                 "Error parsing Jinja2 to cloud-init content at vnfd='{}': {}".format(
399                     context, e
400                 )
401             )
402
403 1     def _create_db_ro_nsrs(self, nsr_id, now):
404 0         try:
405 0             key = rsa.generate_private_key(
406                 backend=crypto_default_backend(), public_exponent=65537, key_size=2048
407             )
408 0             private_key = key.private_bytes(
409                 crypto_serialization.Encoding.PEM,
410                 crypto_serialization.PrivateFormat.PKCS8,
411                 crypto_serialization.NoEncryption(),
412             )
413 0             public_key = key.public_key().public_bytes(
414                 crypto_serialization.Encoding.OpenSSH,
415                 crypto_serialization.PublicFormat.OpenSSH,
416             )
417 0             private_key = private_key.decode("utf8")
418             # Change first line because Paramiko needs a explicit start with 'BEGIN RSA PRIVATE KEY'
419 0             i = private_key.find("\n")
420 0             private_key = "-----BEGIN RSA PRIVATE KEY-----" + private_key[i:]
421 0             public_key = public_key.decode("utf8")
422 0         except Exception as e:
423 0             raise NsException("Cannot create ssh-keys: {}".format(e))
424
425 0         schema_version = "1.1"
426 0         private_key_encrypted = self.db.encrypt(
427             private_key, schema_version=schema_version, salt=nsr_id
428         )
429 0         db_content = {
430             "_id": nsr_id,
431             "_admin": {
432                 "created": now,
433                 "modified": now,
434                 "schema_version": schema_version,
435             },
436             "public_key": public_key,
437             "private_key": private_key_encrypted,
438             "actions": [],
439         }
440 0         self.db.create("ro_nsrs", db_content)
441
442 0         return db_content
443
444 1     @staticmethod
445 1     def _create_task(
446         deployment_info: Dict[str, Any],
447         target_id: str,
448         item: str,
449         action: str,
450         target_record: str,
451         target_record_id: str,
452         extra_dict: Dict[str, Any] = None,
453     ) -> Dict[str, Any]:
454         """Function to create task dict from deployment information.
455
456         Args:
457             deployment_info (Dict[str, Any]): [description]
458             target_id (str): [description]
459             item (str): [description]
460             action (str): [description]
461             target_record (str): [description]
462             target_record_id (str): [description]
463             extra_dict (Dict[str, Any], optional): [description]. Defaults to None.
464
465         Returns:
466             Dict[str, Any]: [description]
467         """
468 1         task = {
469             "target_id": target_id,  # it will be removed before pushing at database
470             "action_id": deployment_info.get("action_id"),
471             "nsr_id": deployment_info.get("nsr_id"),
472             "task_id": f"{deployment_info.get('action_id')}:{deployment_info.get('task_index')}",
473             "status": "SCHEDULED",
474             "action": action,
475             "item": item,
476             "target_record": target_record,
477             "target_record_id": target_record_id,
478         }
479
480 1         if extra_dict:
481 1             task.update(extra_dict)  # params, find_params, depends_on
482
483 1         deployment_info["task_index"] = deployment_info.get("task_index", 0) + 1
484
485 1         return task
486
487 1     @staticmethod
488 1     def _create_ro_task(
489         target_id: str,
490         task: Dict[str, Any],
491     ) -> Dict[str, Any]:
492         """Function to create an RO task from task information.
493
494         Args:
495             target_id (str): [description]
496             task (Dict[str, Any]): [description]
497
498         Returns:
499             Dict[str, Any]: [description]
500         """
501 1         now = time()
502
503 1         _id = task.get("task_id")
504 1         db_ro_task = {
505             "_id": _id,
506             "locked_by": None,
507             "locked_at": 0.0,
508             "target_id": target_id,
509             "vim_info": {
510                 "created": False,
511                 "created_items": None,
512                 "vim_id": None,
513                 "vim_name": None,
514                 "vim_status": None,
515                 "vim_details": None,
516                 "vim_message": None,
517                 "refresh_at": None,
518             },
519             "modified_at": now,
520             "created_at": now,
521             "to_check_at": now,
522             "tasks": [task],
523         }
524
525 1         return db_ro_task
526
527 1     @staticmethod
528 1     def _process_image_params(
529         target_image: Dict[str, Any],
530         indata: Dict[str, Any],
531         vim_info: Dict[str, Any],
532         target_record_id: str,
533         **kwargs: Dict[str, Any],
534     ) -> Dict[str, Any]:
535         """Function to process VDU image parameters.
536
537         Args:
538             target_image (Dict[str, Any]): [description]
539             indata (Dict[str, Any]): [description]
540             vim_info (Dict[str, Any]): [description]
541             target_record_id (str): [description]
542
543         Returns:
544             Dict[str, Any]: [description]
545         """
546 1         find_params = {}
547
548 1         if target_image.get("image"):
549 1             find_params["filter_dict"] = {"name": target_image.get("image")}
550
551 1         if target_image.get("vim_image_id"):
552 1             find_params["filter_dict"] = {"id": target_image.get("vim_image_id")}
553
554 1         if target_image.get("image_checksum"):
555 1             find_params["filter_dict"] = {
556                 "checksum": target_image.get("image_checksum")
557             }
558
559 1         return {"find_params": find_params}
560
561 1     @staticmethod
562 1     def _get_resource_allocation_params(
563         quota_descriptor: Dict[str, Any],
564     ) -> Dict[str, Any]:
565         """Read the quota_descriptor from vnfd and fetch the resource allocation properties from the
566         descriptor object.
567
568         Args:
569             quota_descriptor (Dict[str, Any]): cpu/mem/vif/disk-io quota descriptor
570
571         Returns:
572             Dict[str, Any]: quota params for limit, reserve, shares from the descriptor object
573         """
574 1         quota = {}
575
576 1         if quota_descriptor.get("limit"):
577 1             quota["limit"] = int(quota_descriptor["limit"])
578
579 1         if quota_descriptor.get("reserve"):
580 1             quota["reserve"] = int(quota_descriptor["reserve"])
581
582 1         if quota_descriptor.get("shares"):
583 1             quota["shares"] = int(quota_descriptor["shares"])
584
585 1         return quota
586
587 1     @staticmethod
588 1     def _process_guest_epa_quota_params(
589         guest_epa_quota: Dict[str, Any],
590         epa_vcpu_set: bool,
591     ) -> Dict[str, Any]:
592         """Function to extract the guest epa quota parameters.
593
594         Args:
595             guest_epa_quota (Dict[str, Any]): [description]
596             epa_vcpu_set (bool): [description]
597
598         Returns:
599             Dict[str, Any]: [description]
600         """
601 1         result = {}
602
603 1         if guest_epa_quota.get("cpu-quota") and not epa_vcpu_set:
604 1             cpuquota = Ns._get_resource_allocation_params(
605                 guest_epa_quota.get("cpu-quota")
606             )
607
608 1             if cpuquota:
609 1                 result["cpu-quota"] = cpuquota
610
611 1         if guest_epa_quota.get("mem-quota"):
612 1             vduquota = Ns._get_resource_allocation_params(
613                 guest_epa_quota.get("mem-quota")
614             )
615
616 1             if vduquota:
617 1                 result["mem-quota"] = vduquota
618
619 1         if guest_epa_quota.get("disk-io-quota"):
620 1             diskioquota = Ns._get_resource_allocation_params(
621                 guest_epa_quota.get("disk-io-quota")
622             )
623
624 1             if diskioquota:
625 1                 result["disk-io-quota"] = diskioquota
626
627 1         if guest_epa_quota.get("vif-quota"):
628 1             vifquota = Ns._get_resource_allocation_params(
629                 guest_epa_quota.get("vif-quota")
630             )
631
632 1             if vifquota:
633 1                 result["vif-quota"] = vifquota
634
635 1         return result
636
637 1     @staticmethod
638 1     def _process_guest_epa_numa_params(
639         guest_epa_quota: Dict[str, Any],
640     ) -> Tuple[Dict[str, Any], bool]:
641         """[summary]
642
643         Args:
644             guest_epa_quota (Dict[str, Any]): [description]
645
646         Returns:
647             Tuple[Dict[str, Any], bool]: [description]
648         """
649 1         numa = {}
650 1         numa_list = []
651 1         epa_vcpu_set = False
652
653 1         if guest_epa_quota.get("numa-node-policy"):
654 1             numa_node_policy = guest_epa_quota.get("numa-node-policy")
655
656 1             if numa_node_policy.get("node"):
657 1                 for numa_node in numa_node_policy["node"]:
658 1                     vcpu_list = []
659 1                     if numa_node.get("id"):
660 1                         numa["id"] = int(numa_node["id"])
661
662 1                     if numa_node.get("vcpu"):
663 1                         for vcpu in numa_node.get("vcpu"):
664 1                             vcpu_id = int(vcpu.get("id"))
665 1                             vcpu_list.append(vcpu_id)
666 1                         numa["vcpu"] = vcpu_list
667
668 1                     if numa_node.get("num-cores"):
669 1                         numa["cores"] = numa_node["num-cores"]
670 1                         epa_vcpu_set = True
671
672 1                     paired_threads = numa_node.get("paired-threads", {})
673 1                     if paired_threads.get("num-paired-threads"):
674 1                         numa["paired_threads"] = int(
675                             numa_node["paired-threads"]["num-paired-threads"]
676                         )
677 1                         epa_vcpu_set = True
678
679 1                     if paired_threads.get("paired-thread-ids"):
680 1                         numa["paired-threads-id"] = []
681
682 1                         for pair in paired_threads["paired-thread-ids"]:
683 1                             numa["paired-threads-id"].append(
684                                 (
685                                     str(pair["thread-a"]),
686                                     str(pair["thread-b"]),
687                                 )
688                             )
689
690 1                     if numa_node.get("num-threads"):
691 1                         numa["threads"] = int(numa_node["num-threads"])
692 1                         epa_vcpu_set = True
693
694 1                     if numa_node.get("memory-mb"):
695 1                         numa["memory"] = max(int(int(numa_node["memory-mb"]) / 1024), 1)
696
697 1                     numa_list.append(numa)
698 1                     numa = {}
699
700 1         return numa_list, epa_vcpu_set
701
702 1     @staticmethod
703 1     def _process_guest_epa_cpu_pinning_params(
704         guest_epa_quota: Dict[str, Any],
705         vcpu_count: int,
706         epa_vcpu_set: bool,
707     ) -> Tuple[Dict[str, Any], bool]:
708         """[summary]
709
710         Args:
711             guest_epa_quota (Dict[str, Any]): [description]
712             vcpu_count (int): [description]
713             epa_vcpu_set (bool): [description]
714
715         Returns:
716             Tuple[Dict[str, Any], bool]: [description]
717         """
718 1         numa = {}
719 1         local_epa_vcpu_set = epa_vcpu_set
720
721 1         if (
722             guest_epa_quota.get("cpu-pinning-policy") == "DEDICATED"
723             and not epa_vcpu_set
724         ):
725             # Pinning policy "REQUIRE" uses threads as host should support SMT architecture
726             # Pinning policy "ISOLATE" uses cores as host should not support SMT architecture
727             # Pinning policy "PREFER" uses threads in case host supports SMT architecture
728 1             numa[
729                 "cores"
730                 if guest_epa_quota.get("cpu-thread-pinning-policy") == "ISOLATE"
731                 else "threads"
732             ] = max(vcpu_count, 1)
733 1             local_epa_vcpu_set = True
734
735 1         return numa, local_epa_vcpu_set
736
737 1     @staticmethod
738 1     def _process_epa_params(
739         target_flavor: Dict[str, Any],
740     ) -> Dict[str, Any]:
741         """[summary]
742
743         Args:
744             target_flavor (Dict[str, Any]): [description]
745
746         Returns:
747             Dict[str, Any]: [description]
748         """
749 1         extended = {}
750 1         numa = {}
751 1         numa_list = []
752
753 1         if target_flavor.get("guest-epa"):
754 1             guest_epa = target_flavor["guest-epa"]
755
756 1             numa_list, epa_vcpu_set = Ns._process_guest_epa_numa_params(
757                 guest_epa_quota=guest_epa
758             )
759
760 1             if guest_epa.get("mempage-size"):
761 1                 extended["mempage-size"] = guest_epa.get("mempage-size")
762
763 1             if guest_epa.get("cpu-pinning-policy"):
764 1                 extended["cpu-pinning-policy"] = guest_epa.get("cpu-pinning-policy")
765
766 1             if guest_epa.get("cpu-thread-pinning-policy"):
767 1                 extended["cpu-thread-pinning-policy"] = guest_epa.get(
768                     "cpu-thread-pinning-policy"
769                 )
770
771 1             if guest_epa.get("numa-node-policy"):
772 1                 if guest_epa.get("numa-node-policy").get("mem-policy"):
773 1                     extended["mem-policy"] = guest_epa.get("numa-node-policy").get(
774                         "mem-policy"
775                     )
776
777 1             tmp_numa, epa_vcpu_set = Ns._process_guest_epa_cpu_pinning_params(
778                 guest_epa_quota=guest_epa,
779                 vcpu_count=int(target_flavor.get("vcpu-count", 1)),
780                 epa_vcpu_set=epa_vcpu_set,
781             )
782 1             for numa in numa_list:
783 1                 numa.update(tmp_numa)
784
785 1             extended.update(
786                 Ns._process_guest_epa_quota_params(
787                     guest_epa_quota=guest_epa,
788                     epa_vcpu_set=epa_vcpu_set,
789                 )
790             )
791
792 1         if numa:
793 1             extended["numas"] = numa_list
794
795 1         return extended
796
797 1     @staticmethod
798 1     def _process_flavor_params(
799         target_flavor: Dict[str, Any],
800         indata: Dict[str, Any],
801         vim_info: Dict[str, Any],
802         target_record_id: str,
803         **kwargs: Dict[str, Any],
804     ) -> Dict[str, Any]:
805         """[summary]
806
807         Args:
808             target_flavor (Dict[str, Any]): [description]
809             indata (Dict[str, Any]): [description]
810             vim_info (Dict[str, Any]): [description]
811             target_record_id (str): [description]
812
813         Returns:
814             Dict[str, Any]: [description]
815         """
816 1         db = kwargs.get("db")
817 1         target_vdur = {}
818
819 1         flavor_data = {
820             "disk": int(target_flavor["storage-gb"]),
821             "ram": int(target_flavor["memory-mb"]),
822             "vcpus": int(target_flavor["vcpu-count"]),
823         }
824
825 1         for vnf in indata.get("vnf", []):
826 1             for vdur in vnf.get("vdur", []):
827 1                 if vdur.get("ns-flavor-id") == target_flavor.get("id"):
828 1                     target_vdur = vdur
829
830 1         if db and isinstance(indata.get("vnf"), list):
831 1             vnfd_id = indata.get("vnf")[0].get("vnfd-id")
832 1             vnfd = db.get_one("vnfds", {"_id": vnfd_id})
833             # check if there is persistent root disk
834 1             for vdu in vnfd.get("vdu", ()):
835 1                 if vdu["name"] == target_vdur.get("vdu-name"):
836 1                     for vsd in vnfd.get("virtual-storage-desc", ()):
837 1                         if vsd.get("id") == vdu.get("virtual-storage-desc", [[]])[0]:
838 1                             root_disk = vsd
839 1                             if (
840                                 root_disk.get("type-of-storage")
841                                 == "persistent-storage:persistent-storage"
842                             ):
843 1                                 flavor_data["disk"] = 0
844
845 1         for storage in target_vdur.get("virtual-storages", []):
846 1             if (
847                 storage.get("type-of-storage")
848                 == "etsi-nfv-descriptors:ephemeral-storage"
849             ):
850 1                 flavor_data["ephemeral"] = int(storage.get("size-of-storage", 0))
851 1             elif storage.get("type-of-storage") == "etsi-nfv-descriptors:swap-storage":
852 1                 flavor_data["swap"] = int(storage.get("size-of-storage", 0))
853
854 1         extended = Ns._process_epa_params(target_flavor)
855 1         if extended:
856 1             flavor_data["extended"] = extended
857
858 1         extra_dict = {"find_params": {"flavor_data": flavor_data}}
859 1         flavor_data_name = flavor_data.copy()
860 1         flavor_data_name["name"] = target_flavor["name"]
861 1         extra_dict["params"] = {"flavor_data": flavor_data_name}
862
863 1         return extra_dict
864
865 1     @staticmethod
866 1     def _ip_profile_to_ro(
867         ip_profile: Dict[str, Any],
868     ) -> Dict[str, Any]:
869         """[summary]
870
871         Args:
872             ip_profile (Dict[str, Any]): [description]
873
874         Returns:
875             Dict[str, Any]: [description]
876         """
877 1         if not ip_profile:
878 1             return None
879
880 1         ro_ip_profile = {
881             "ip_version": "IPv4"
882             if "v4" in ip_profile.get("ip-version", "ipv4")
883             else "IPv6",
884             "subnet_address": ip_profile.get("subnet-address"),
885             "gateway_address": ip_profile.get("gateway-address"),
886             "dhcp_enabled": ip_profile.get("dhcp-params", {}).get("enabled", False),
887             "dhcp_start_address": ip_profile.get("dhcp-params", {}).get(
888                 "start-address", None
889             ),
890             "dhcp_count": ip_profile.get("dhcp-params", {}).get("count", None),
891         }
892
893 1         if ip_profile.get("dns-server"):
894 1             ro_ip_profile["dns_address"] = ";".join(
895                 [v["address"] for v in ip_profile["dns-server"] if v.get("address")]
896             )
897
898 1         if ip_profile.get("security-group"):
899 1             ro_ip_profile["security_group"] = ip_profile["security-group"]
900
901 1         return ro_ip_profile
902
903 1     @staticmethod
904 1     def _process_net_params(
905         target_vld: Dict[str, Any],
906         indata: Dict[str, Any],
907         vim_info: Dict[str, Any],
908         target_record_id: str,
909         **kwargs: Dict[str, Any],
910     ) -> Dict[str, Any]:
911         """Function to process network parameters.
912
913         Args:
914             target_vld (Dict[str, Any]): [description]
915             indata (Dict[str, Any]): [description]
916             vim_info (Dict[str, Any]): [description]
917             target_record_id (str): [description]
918
919         Returns:
920             Dict[str, Any]: [description]
921         """
922 1         extra_dict = {}
923
924 1         if vim_info.get("sdn"):
925             # vnf_preffix = "vnfrs:{}".format(vnfr_id)
926             # ns_preffix = "nsrs:{}".format(nsr_id)
927             # remove the ending ".sdn
928 1             vld_target_record_id, _, _ = target_record_id.rpartition(".")
929 1             extra_dict["params"] = {
930                 k: vim_info[k]
931                 for k in ("sdn-ports", "target_vim", "vlds", "type")
932                 if vim_info.get(k)
933             }
934
935             # TODO needed to add target_id in the dependency.
936 1             if vim_info.get("target_vim"):
937 1                 extra_dict["depends_on"] = [
938                     f"{vim_info.get('target_vim')} {vld_target_record_id}"
939                 ]
940
941 1             return extra_dict
942
943 1         if vim_info.get("vim_network_name"):
944 1             extra_dict["find_params"] = {
945                 "filter_dict": {
946                     "name": vim_info.get("vim_network_name"),
947                 },
948             }
949 1         elif vim_info.get("vim_network_id"):
950 1             extra_dict["find_params"] = {
951                 "filter_dict": {
952                     "id": vim_info.get("vim_network_id"),
953                 },
954             }
955 1         elif target_vld.get("mgmt-network") and not vim_info.get("provider_network"):
956 1             extra_dict["find_params"] = {
957                 "mgmt": True,
958                 "name": target_vld["id"],
959             }
960         else:
961             # create
962 1             extra_dict["params"] = {
963                 "net_name": (
964                     f"{indata.get('name')[:16]}-{target_vld.get('name', target_vld.get('id'))[:16]}"
965                 ),
966                 "ip_profile": Ns._ip_profile_to_ro(vim_info.get("ip_profile")),
967                 "provider_network_profile": vim_info.get("provider_network"),
968             }
969
970 1             if not target_vld.get("underlay"):
971 1                 extra_dict["params"]["net_type"] = "bridge"
972             else:
973 1                 extra_dict["params"]["net_type"] = (
974                     "ptp" if target_vld.get("type") == "ELINE" else "data"
975                 )
976
977 1         return extra_dict
978
979 1     @staticmethod
980 1     def find_persistent_root_volumes(
981         vnfd: dict,
982         target_vdu: dict,
983         vdu_instantiation_volumes_list: list,
984         disk_list: list,
985     ) -> Dict[str, any]:
986         """Find the persistent root volumes and add them to the disk_list
987         by parsing the instantiation parameters.
988
989         Args:
990             vnfd    (dict):                                 VNF descriptor
991             target_vdu      (dict):                         processed VDU
992             vdu_instantiation_volumes_list  (list):         instantiation parameters for the each VDU as a list
993             disk_list   (list):                             to be filled up
994
995         Returns:
996             persistent_root_disk    (dict):                 Details of persistent root disk
997
998         """
999 1         persistent_root_disk = {}
1000         # There can be only one root disk, when we find it, it will return the result
1001
1002 1         for vdu, vsd in product(
1003             vnfd.get("vdu", ()), vnfd.get("virtual-storage-desc", ())
1004         ):
1005 1             if (
1006                 vdu["name"] == target_vdu["vdu-name"]
1007                 and vsd.get("id") == vdu.get("virtual-storage-desc", [[]])[0]
1008             ):
1009 1                 root_disk = vsd
1010 1                 if (
1011                     root_disk.get("type-of-storage")
1012                     == "persistent-storage:persistent-storage"
1013                 ):
1014 1                     for vdu_volume in vdu_instantiation_volumes_list:
1015 1                         if (
1016                             vdu_volume["vim-volume-id"]
1017                             and root_disk["id"] == vdu_volume["name"]
1018                         ):
1019 1                             persistent_root_disk[vsd["id"]] = {
1020                                 "vim_volume_id": vdu_volume["vim-volume-id"],
1021                                 "image_id": vdu.get("sw-image-desc"),
1022                             }
1023
1024 1                             disk_list.append(persistent_root_disk[vsd["id"]])
1025
1026 1                             return persistent_root_disk
1027
1028                     else:
1029 1                         if root_disk.get("size-of-storage"):
1030 1                             persistent_root_disk[vsd["id"]] = {
1031                                 "image_id": vdu.get("sw-image-desc"),
1032                                 "size": root_disk.get("size-of-storage"),
1033                                 "keep": Ns.is_volume_keeping_required(root_disk),
1034                             }
1035
1036 1                             disk_list.append(persistent_root_disk[vsd["id"]])
1037
1038 1                             return persistent_root_disk
1039
1040 1     @staticmethod
1041 1     def find_persistent_volumes(
1042         persistent_root_disk: dict,
1043         target_vdu: dict,
1044         vdu_instantiation_volumes_list: list,
1045         disk_list: list,
1046     ) -> None:
1047         """Find the ordinary persistent volumes and add them to the disk_list
1048         by parsing the instantiation parameters.
1049
1050         Args:
1051             persistent_root_disk:   persistent root disk dictionary
1052             target_vdu: processed VDU
1053             vdu_instantiation_volumes_list: instantiation parameters for the each VDU as a list
1054             disk_list:  to be filled up
1055
1056         """
1057         # Find the ordinary volumes which are not added to the persistent_root_disk
1058 1         persistent_disk = {}
1059 1         for disk in target_vdu.get("virtual-storages", {}):
1060 1             if (
1061                 disk.get("type-of-storage") == "persistent-storage:persistent-storage"
1062                 and disk["id"] not in persistent_root_disk.keys()
1063             ):
1064 1                 for vdu_volume in vdu_instantiation_volumes_list:
1065 1                     if vdu_volume["vim-volume-id"] and disk["id"] == vdu_volume["name"]:
1066 1                         persistent_disk[disk["id"]] = {
1067                             "vim_volume_id": vdu_volume["vim-volume-id"],
1068                         }
1069 1                         disk_list.append(persistent_disk[disk["id"]])
1070
1071                 else:
1072 1                     if disk["id"] not in persistent_disk.keys():
1073 1                         persistent_disk[disk["id"]] = {
1074                             "size": disk.get("size-of-storage"),
1075                             "keep": Ns.is_volume_keeping_required(disk),
1076                         }
1077 1                         disk_list.append(persistent_disk[disk["id"]])
1078
1079 1     @staticmethod
1080 1     def is_volume_keeping_required(virtual_storage_desc: Dict[str, Any]) -> bool:
1081         """Function to decide keeping persistent volume
1082         upon VDU deletion.
1083
1084         Args:
1085             virtual_storage_desc (Dict[str, Any]): virtual storage description dictionary
1086
1087         Returns:
1088             bool (True/False)
1089         """
1090
1091 1         if not virtual_storage_desc.get("vdu-storage-requirements"):
1092 1             return False
1093 1         for item in virtual_storage_desc.get("vdu-storage-requirements", {}):
1094 1             if item.get("key") == "keep-volume" and item.get("value") == "true":
1095 1                 return True
1096 1         return False
1097
1098 1     @staticmethod
1099 1     def _sort_vdu_interfaces(target_vdu: dict) -> None:
1100         """Sort the interfaces according to position number.
1101
1102         Args:
1103             target_vdu  (dict):     Details of VDU to be created
1104
1105         """
1106         # If the position info is provided for all the interfaces, it will be sorted
1107         # according to position number ascendingly.
1108 1         sorted_interfaces = sorted(
1109             target_vdu["interfaces"],
1110             key=lambda x: (x.get("position") is None, x.get("position")),
1111         )
1112 1         target_vdu["interfaces"] = sorted_interfaces
1113
1114 1     @staticmethod
1115 1     def _partially_locate_vdu_interfaces(target_vdu: dict) -> None:
1116         """Only place the interfaces which has specific position.
1117
1118         Args:
1119             target_vdu  (dict):     Details of VDU to be created
1120
1121         """
1122         # If the position info is provided for some interfaces but not all of them, the interfaces
1123         # which has specific position numbers will be placed and others' positions will not be taken care.
1124 1         if any(
1125             i.get("position") + 1
1126             for i in target_vdu["interfaces"]
1127             if i.get("position") is not None
1128         ):
1129 1             n = len(target_vdu["interfaces"])
1130 1             sorted_interfaces = [-1] * n
1131 1             k, m = 0, 0
1132
1133 1             while k < n:
1134 1                 if target_vdu["interfaces"][k].get("position") is not None:
1135 1                     if any(i.get("position") == 0 for i in target_vdu["interfaces"]):
1136 1                         idx = target_vdu["interfaces"][k]["position"] + 1
1137                     else:
1138 1                         idx = target_vdu["interfaces"][k]["position"]
1139 1                     sorted_interfaces[idx - 1] = target_vdu["interfaces"][k]
1140 1                 k += 1
1141
1142 1             while m < n:
1143 1                 if target_vdu["interfaces"][m].get("position") is None:
1144 1                     idy = sorted_interfaces.index(-1)
1145 1                     sorted_interfaces[idy] = target_vdu["interfaces"][m]
1146 1                 m += 1
1147
1148 1             target_vdu["interfaces"] = sorted_interfaces
1149
1150 1     @staticmethod
1151 1     def _prepare_vdu_cloud_init(
1152         target_vdu: dict, vdu2cloud_init: dict, db: object, fs: object
1153     ) -> Dict:
1154         """Fill cloud_config dict with cloud init details.
1155
1156         Args:
1157             target_vdu  (dict):         Details of VDU to be created
1158             vdu2cloud_init  (dict):     Cloud init dict
1159             db  (object):               DB object
1160             fs  (object):               FS object
1161
1162         Returns:
1163             cloud_config (dict):        Cloud config details of VDU
1164
1165         """
1166         # cloud config
1167 1         cloud_config = {}
1168
1169 1         if target_vdu.get("cloud-init"):
1170 1             if target_vdu["cloud-init"] not in vdu2cloud_init:
1171 1                 vdu2cloud_init[target_vdu["cloud-init"]] = Ns._get_cloud_init(
1172                     db=db,
1173                     fs=fs,
1174                     location=target_vdu["cloud-init"],
1175                 )
1176
1177 1             cloud_content_ = vdu2cloud_init[target_vdu["cloud-init"]]
1178 1             cloud_config["user-data"] = Ns._parse_jinja2(
1179                 cloud_init_content=cloud_content_,
1180                 params=target_vdu.get("additionalParams"),
1181                 context=target_vdu["cloud-init"],
1182             )
1183
1184 1         if target_vdu.get("boot-data-drive"):
1185 1             cloud_config["boot-data-drive"] = target_vdu.get("boot-data-drive")
1186
1187 1         return cloud_config
1188
1189 1     @staticmethod
1190 1     def _check_vld_information_of_interfaces(
1191         interface: dict, ns_preffix: str, vnf_preffix: str
1192     ) -> Optional[str]:
1193         """Prepare the net_text by the virtual link information for vnf and ns level.
1194         Args:
1195             interface   (dict):         Interface details
1196             ns_preffix  (str):          Prefix of NS
1197             vnf_preffix (str):          Prefix of VNF
1198
1199         Returns:
1200             net_text    (str):          information of net
1201
1202         """
1203 1         net_text = ""
1204 1         if interface.get("ns-vld-id"):
1205 1             net_text = ns_preffix + ":vld." + interface["ns-vld-id"]
1206 1         elif interface.get("vnf-vld-id"):
1207 1             net_text = vnf_preffix + ":vld." + interface["vnf-vld-id"]
1208
1209 1         return net_text
1210
1211 1     @staticmethod
1212 1     def _prepare_interface_port_security(interface: dict) -> None:
1213         """
1214
1215         Args:
1216             interface   (dict):     Interface details
1217
1218         """
1219 1         if "port-security-enabled" in interface:
1220 1             interface["port_security"] = interface.pop("port-security-enabled")
1221
1222 1         if "port-security-disable-strategy" in interface:
1223 1             interface["port_security_disable_strategy"] = interface.pop(
1224                 "port-security-disable-strategy"
1225             )
1226
1227 1     @staticmethod
1228 1     def _create_net_item_of_interface(interface: dict, net_text: str) -> dict:
1229         """Prepare net item including name, port security, floating ip etc.
1230
1231         Args:
1232             interface   (dict):         Interface details
1233             net_text    (str):          information of net
1234
1235         Returns:
1236             net_item    (dict):         Dict including net details
1237
1238         """
1239
1240 1         net_item = {
1241             x: v
1242             for x, v in interface.items()
1243             if x
1244             in (
1245                 "name",
1246                 "vpci",
1247                 "port_security",
1248                 "port_security_disable_strategy",
1249                 "floating_ip",
1250             )
1251         }
1252 1         net_item["net_id"] = "TASK-" + net_text
1253 1         net_item["type"] = "virtual"
1254
1255 1         return net_item
1256
1257 1     @staticmethod
1258 1     def _prepare_type_of_interface(
1259         interface: dict, tasks_by_target_record_id: dict, net_text: str, net_item: dict
1260     ) -> None:
1261         """Fill the net item type by interface type such as SR-IOV, OM-MGMT, bridge etc.
1262
1263         Args:
1264             interface   (dict):                     Interface details
1265             tasks_by_target_record_id   (dict):     Task details
1266             net_text    (str):                      information of net
1267             net_item    (dict):                     Dict including net details
1268
1269         """
1270         # TODO mac_address: used for  SR-IOV ifaces #TODO for other types
1271         # TODO floating_ip: True/False (or it can be None)
1272
1273 1         if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
1274             # Mark the net create task as type data
1275 1             if deep_get(
1276                 tasks_by_target_record_id,
1277                 net_text,
1278                 "extra_dict",
1279                 "params",
1280                 "net_type",
1281             ):
1282 1                 tasks_by_target_record_id[net_text]["extra_dict"]["params"][
1283                     "net_type"
1284                 ] = "data"
1285
1286 1             net_item["use"] = "data"
1287 1             net_item["model"] = interface["type"]
1288 1             net_item["type"] = interface["type"]
1289
1290 1         elif (
1291             interface.get("type") == "OM-MGMT"
1292             or interface.get("mgmt-interface")
1293             or interface.get("mgmt-vnf")
1294         ):
1295 1             net_item["use"] = "mgmt"
1296
1297         else:
1298             # If interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
1299 1             net_item["use"] = "bridge"
1300 1             net_item["model"] = interface.get("type")
1301
1302 1     @staticmethod
1303 1     def _prepare_vdu_interfaces(
1304         target_vdu: dict,
1305         extra_dict: dict,
1306         ns_preffix: str,
1307         vnf_preffix: str,
1308         logger: object,
1309         tasks_by_target_record_id: dict,
1310         net_list: list,
1311     ) -> None:
1312         """Prepare the net_item and add net_list, add mgmt interface to extra_dict.
1313
1314         Args:
1315             target_vdu  (dict):                             VDU to be created
1316             extra_dict  (dict):                             Dictionary to be filled
1317             ns_preffix  (str):                              NS prefix as string
1318             vnf_preffix (str):                              VNF prefix as string
1319             logger  (object):                               Logger Object
1320             tasks_by_target_record_id  (dict):              Task details
1321             net_list    (list):                             Net list of VDU
1322         """
1323 1         for iface_index, interface in enumerate(target_vdu["interfaces"]):
1324 1             net_text = Ns._check_vld_information_of_interfaces(
1325                 interface, ns_preffix, vnf_preffix
1326             )
1327 1             if not net_text:
1328                 # Interface not connected to any vld
1329 1                 logger.error(
1330                     "Interface {} from vdu {} not connected to any vld".format(
1331                         iface_index, target_vdu["vdu-name"]
1332                     )
1333                 )
1334 1                 continue
1335
1336 1             extra_dict["depends_on"].append(net_text)
1337
1338 1             Ns._prepare_interface_port_security(interface)
1339
1340 1             net_item = Ns._create_net_item_of_interface(interface, net_text)
1341
1342 1             Ns._prepare_type_of_interface(
1343                 interface, tasks_by_target_record_id, net_text, net_item
1344             )
1345
1346 1             if interface.get("ip-address"):
1347 1                 net_item["ip_address"] = interface["ip-address"]
1348
1349 1             if interface.get("mac-address"):
1350 1                 net_item["mac_address"] = interface["mac-address"]
1351
1352 1             net_list.append(net_item)
1353
1354 1             if interface.get("mgmt-vnf"):
1355 0                 extra_dict["mgmt_vnf_interface"] = iface_index
1356 1             elif interface.get("mgmt-interface"):
1357 1                 extra_dict["mgmt_vdu_interface"] = iface_index
1358
1359 1     @staticmethod
1360 1     def _prepare_vdu_ssh_keys(
1361         target_vdu: dict, ro_nsr_public_key: dict, cloud_config: dict
1362     ) -> None:
1363         """Add ssh keys to cloud config.
1364
1365         Args:
1366            target_vdu  (dict):                 Details of VDU to be created
1367            ro_nsr_public_key   (dict):          RO NSR public Key
1368            cloud_config  (dict):               Cloud config details
1369
1370         """
1371 1         ssh_keys = []
1372
1373 1         if target_vdu.get("ssh-keys"):
1374 1             ssh_keys += target_vdu.get("ssh-keys")
1375
1376 1         if target_vdu.get("ssh-access-required"):
1377 1             ssh_keys.append(ro_nsr_public_key)
1378
1379 1         if ssh_keys:
1380 1             cloud_config["key-pairs"] = ssh_keys
1381
1382 1     @staticmethod
1383 1     def _select_persistent_root_disk(vsd: dict, vdu: dict) -> dict:
1384         """Selects the persistent root disk if exists.
1385         Args:
1386             vsd (dict):             Virtual storage descriptors in VNFD
1387             vdu (dict):             VNF descriptor
1388
1389         Returns:
1390             root_disk   (dict):     Selected persistent root disk
1391         """
1392 1         if vsd.get("id") == vdu.get("virtual-storage-desc", [[]])[0]:
1393 1             root_disk = vsd
1394 1             if root_disk.get(
1395                 "type-of-storage"
1396             ) == "persistent-storage:persistent-storage" and root_disk.get(
1397                 "size-of-storage"
1398             ):
1399 1                 return root_disk
1400
1401 1     @staticmethod
1402 1     def _add_persistent_root_disk_to_disk_list(
1403         vnfd: dict, target_vdu: dict, persistent_root_disk: dict, disk_list: list
1404     ) -> None:
1405         """Find the persistent root disk and add to disk list.
1406
1407         Args:
1408             vnfd  (dict):                           VNF descriptor
1409             target_vdu  (dict):                     Details of VDU to be created
1410             persistent_root_disk    (dict):         Details of persistent root disk
1411             disk_list   (list):                     Disks of VDU
1412
1413         """
1414 1         for vdu in vnfd.get("vdu", ()):
1415 1             if vdu["name"] == target_vdu["vdu-name"]:
1416 1                 for vsd in vnfd.get("virtual-storage-desc", ()):
1417 1                     root_disk = Ns._select_persistent_root_disk(vsd, vdu)
1418 1                     if not root_disk:
1419 1                         continue
1420
1421 1                     persistent_root_disk[vsd["id"]] = {
1422                         "image_id": vdu.get("sw-image-desc"),
1423                         "size": root_disk["size-of-storage"],
1424                         "keep": Ns.is_volume_keeping_required(root_disk),
1425                     }
1426
1427 1                     disk_list.append(persistent_root_disk[vsd["id"]])
1428 1                     break
1429
1430 1     @staticmethod
1431 1     def _add_persistent_ordinary_disks_to_disk_list(
1432         target_vdu: dict,
1433         persistent_root_disk: dict,
1434         persistent_ordinary_disk: dict,
1435         disk_list: list,
1436     ) -> None:
1437         """Fill the disk list by adding persistent ordinary disks.
1438
1439         Args:
1440             target_vdu  (dict):                     Details of VDU to be created
1441             persistent_root_disk    (dict):         Details of persistent root disk
1442             persistent_ordinary_disk    (dict):     Details of persistent ordinary disk
1443             disk_list   (list):                     Disks of VDU
1444
1445         """
1446 1         if target_vdu.get("virtual-storages"):
1447 1             for disk in target_vdu["virtual-storages"]:
1448 1                 if (
1449                     disk.get("type-of-storage")
1450                     == "persistent-storage:persistent-storage"
1451                     and disk["id"] not in persistent_root_disk.keys()
1452                 ):
1453 1                     persistent_ordinary_disk[disk["id"]] = {
1454                         "size": disk["size-of-storage"],
1455                         "keep": Ns.is_volume_keeping_required(disk),
1456                     }
1457 1                     disk_list.append(persistent_ordinary_disk[disk["id"]])
1458
1459 1     @staticmethod
1460 1     def _prepare_vdu_affinity_group_list(
1461         target_vdu: dict, extra_dict: dict, ns_preffix: str
1462     ) -> List[Dict[str, any]]:
1463         """Process affinity group details to prepare affinity group list.
1464
1465         Args:
1466             target_vdu  (dict):     Details of VDU to be created
1467             extra_dict  (dict):     Dictionary to be filled
1468             ns_preffix  (str):      Prefix as string
1469
1470         Returns:
1471
1472             affinity_group_list (list):     Affinity group details
1473
1474         """
1475 1         affinity_group_list = []
1476
1477 1         if target_vdu.get("affinity-or-anti-affinity-group-id"):
1478 1             for affinity_group_id in target_vdu["affinity-or-anti-affinity-group-id"]:
1479 1                 affinity_group = {}
1480 1                 affinity_group_text = (
1481                     ns_preffix + ":affinity-or-anti-affinity-group." + affinity_group_id
1482                 )
1483
1484 1                 if not isinstance(extra_dict.get("depends_on"), list):
1485 1                     raise NsException("Invalid extra_dict format.")
1486
1487 1                 extra_dict["depends_on"].append(affinity_group_text)
1488 1                 affinity_group["affinity_group_id"] = "TASK-" + affinity_group_text
1489 1                 affinity_group_list.append(affinity_group)
1490
1491 1         return affinity_group_list
1492
1493 1     @staticmethod
1494 1     def _process_vdu_params(
1495         target_vdu: Dict[str, Any],
1496         indata: Dict[str, Any],
1497         vim_info: Dict[str, Any],
1498         target_record_id: str,
1499         **kwargs: Dict[str, Any],
1500     ) -> Dict[str, Any]:
1501         """Function to process VDU parameters.
1502
1503         Args:
1504             target_vdu (Dict[str, Any]): [description]
1505             indata (Dict[str, Any]): [description]
1506             vim_info (Dict[str, Any]): [description]
1507             target_record_id (str): [description]
1508
1509         Returns:
1510             Dict[str, Any]: [description]
1511         """
1512 1         vnfr_id = kwargs.get("vnfr_id")
1513 1         nsr_id = kwargs.get("nsr_id")
1514 1         vnfr = kwargs.get("vnfr")
1515 1         vdu2cloud_init = kwargs.get("vdu2cloud_init")
1516 1         tasks_by_target_record_id = kwargs.get("tasks_by_target_record_id")
1517 1         logger = kwargs.get("logger")
1518 1         db = kwargs.get("db")
1519 1         fs = kwargs.get("fs")
1520 1         ro_nsr_public_key = kwargs.get("ro_nsr_public_key")
1521
1522 1         vnf_preffix = "vnfrs:{}".format(vnfr_id)
1523 1         ns_preffix = "nsrs:{}".format(nsr_id)
1524 1         image_text = ns_preffix + ":image." + target_vdu["ns-image-id"]
1525 1         flavor_text = ns_preffix + ":flavor." + target_vdu["ns-flavor-id"]
1526 1         extra_dict = {"depends_on": [image_text, flavor_text]}
1527 1         net_list = []
1528
1529 1         persistent_root_disk = {}
1530 1         persistent_ordinary_disk = {}
1531 1         vdu_instantiation_volumes_list = []
1532 1         disk_list = []
1533 1         vnfd_id = vnfr["vnfd-id"]
1534 1         vnfd = db.get_one("vnfds", {"_id": vnfd_id})
1535
1536         # If the position info is provided for all the interfaces, it will be sorted
1537         # according to position number ascendingly.
1538 1         if all(
1539             True if i.get("position") is not None else False
1540             for i in target_vdu["interfaces"]
1541         ):
1542 1             Ns._sort_vdu_interfaces(target_vdu)
1543
1544         # If the position info is provided for some interfaces but not all of them, the interfaces
1545         # which has specific position numbers will be placed and others' positions will not be taken care.
1546         else:
1547 1             Ns._partially_locate_vdu_interfaces(target_vdu)
1548
1549         # If the position info is not provided for the interfaces, interfaces will be attached
1550         # according to the order in the VNFD.
1551 1         Ns._prepare_vdu_interfaces(
1552             target_vdu,
1553             extra_dict,
1554             ns_preffix,
1555             vnf_preffix,
1556             logger,
1557             tasks_by_target_record_id,
1558             net_list,
1559         )
1560
1561         # cloud config
1562 1         cloud_config = Ns._prepare_vdu_cloud_init(target_vdu, vdu2cloud_init, db, fs)
1563
1564         # Prepare VDU ssh keys
1565 1         Ns._prepare_vdu_ssh_keys(target_vdu, ro_nsr_public_key, cloud_config)
1566
1567 1         if target_vdu.get("additionalParams"):
1568 1             vdu_instantiation_volumes_list = (
1569                 target_vdu.get("additionalParams").get("OSM").get("vdu_volumes")
1570             )
1571
1572 1         if vdu_instantiation_volumes_list:
1573             # Find the root volumes and add to the disk_list
1574 1             persistent_root_disk = Ns.find_persistent_root_volumes(
1575                 vnfd, target_vdu, vdu_instantiation_volumes_list, disk_list
1576             )
1577
1578             # Find the ordinary volumes which are not added to the persistent_root_disk
1579             # and put them to the disk list
1580 1             Ns.find_persistent_volumes(
1581                 persistent_root_disk,
1582                 target_vdu,
1583                 vdu_instantiation_volumes_list,
1584                 disk_list,
1585             )
1586
1587         else:
1588             # Vdu_instantiation_volumes_list is empty
1589             # First get add the persistent root disks to disk_list
1590 1             Ns._add_persistent_root_disk_to_disk_list(
1591                 vnfd, target_vdu, persistent_root_disk, disk_list
1592             )
1593             # Add the persistent non-root disks to disk_list
1594 1             Ns._add_persistent_ordinary_disks_to_disk_list(
1595                 target_vdu, persistent_root_disk, persistent_ordinary_disk, disk_list
1596             )
1597
1598 1         affinity_group_list = Ns._prepare_vdu_affinity_group_list(
1599             target_vdu, extra_dict, ns_preffix
1600         )
1601
1602 1         extra_dict["params"] = {
1603             "name": "{}-{}-{}-{}".format(
1604                 indata["name"][:16],
1605                 vnfr["member-vnf-index-ref"][:16],
1606                 target_vdu["vdu-name"][:32],
1607                 target_vdu.get("count-index") or 0,
1608             ),
1609             "description": target_vdu["vdu-name"],
1610             "start": True,
1611             "image_id": "TASK-" + image_text,
1612             "flavor_id": "TASK-" + flavor_text,
1613             "affinity_group_list": affinity_group_list,
1614             "net_list": net_list,
1615             "cloud_config": cloud_config or None,
1616             "disk_list": disk_list,
1617             "availability_zone_index": None,  # TODO
1618             "availability_zone_list": None,  # TODO
1619         }
1620
1621 1         return extra_dict
1622
1623 1     @staticmethod
1624 1     def _process_affinity_group_params(
1625         target_affinity_group: Dict[str, Any],
1626         indata: Dict[str, Any],
1627         vim_info: Dict[str, Any],
1628         target_record_id: str,
1629         **kwargs: Dict[str, Any],
1630     ) -> Dict[str, Any]:
1631         """Get affinity or anti-affinity group parameters.
1632
1633         Args:
1634             target_affinity_group (Dict[str, Any]): [description]
1635             indata (Dict[str, Any]): [description]
1636             vim_info (Dict[str, Any]): [description]
1637             target_record_id (str): [description]
1638
1639         Returns:
1640             Dict[str, Any]: [description]
1641         """
1642
1643 0         extra_dict = {}
1644 0         affinity_group_data = {
1645             "name": target_affinity_group["name"],
1646             "type": target_affinity_group["type"],
1647             "scope": target_affinity_group["scope"],
1648         }
1649
1650 0         if target_affinity_group.get("vim-affinity-group-id"):
1651 0             affinity_group_data["vim-affinity-group-id"] = target_affinity_group[
1652                 "vim-affinity-group-id"
1653             ]
1654
1655 0         extra_dict["params"] = {
1656             "affinity_group_data": affinity_group_data,
1657         }
1658
1659 0         return extra_dict
1660
1661 1     @staticmethod
1662 1     def _process_recreate_vdu_params(
1663         existing_vdu: Dict[str, Any],
1664         db_nsr: Dict[str, Any],
1665         vim_info: Dict[str, Any],
1666         target_record_id: str,
1667         target_id: str,
1668         **kwargs: Dict[str, Any],
1669     ) -> Dict[str, Any]:
1670         """Function to process VDU parameters to recreate.
1671
1672         Args:
1673             existing_vdu (Dict[str, Any]): [description]
1674             db_nsr (Dict[str, Any]): [description]
1675             vim_info (Dict[str, Any]): [description]
1676             target_record_id (str): [description]
1677             target_id (str): [description]
1678
1679         Returns:
1680             Dict[str, Any]: [description]
1681         """
1682 0         vnfr = kwargs.get("vnfr")
1683 0         vdu2cloud_init = kwargs.get("vdu2cloud_init")
1684         # logger = kwargs.get("logger")
1685 0         db = kwargs.get("db")
1686 0         fs = kwargs.get("fs")
1687 0         ro_nsr_public_key = kwargs.get("ro_nsr_public_key")
1688
1689 0         extra_dict = {}
1690 0         net_list = []
1691
1692 0         vim_details = {}
1693 0         vim_details_text = existing_vdu["vim_info"][target_id].get("vim_details", None)
1694 0         if vim_details_text:
1695 0             vim_details = yaml.safe_load(f"{vim_details_text}")
1696
1697 0         for iface_index, interface in enumerate(existing_vdu["interfaces"]):
1698 0             if "port-security-enabled" in interface:
1699 0                 interface["port_security"] = interface.pop("port-security-enabled")
1700
1701 0             if "port-security-disable-strategy" in interface:
1702 0                 interface["port_security_disable_strategy"] = interface.pop(
1703                     "port-security-disable-strategy"
1704                 )
1705
1706 0             net_item = {
1707                 x: v
1708                 for x, v in interface.items()
1709                 if x
1710                 in (
1711                     "name",
1712                     "vpci",
1713                     "port_security",
1714                     "port_security_disable_strategy",
1715                     "floating_ip",
1716                 )
1717             }
1718 0             existing_ifaces = existing_vdu["vim_info"][target_id].get(
1719                 "interfaces_backup", []
1720             )
1721 0             net_id = next(
1722                 (
1723                     i["vim_net_id"]
1724                     for i in existing_ifaces
1725                     if i["ip_address"] == interface["ip-address"]
1726                 ),
1727                 None,
1728             )
1729
1730 0             net_item["net_id"] = net_id
1731 0             net_item["type"] = "virtual"
1732
1733             # TODO mac_address: used for  SR-IOV ifaces #TODO for other types
1734             # TODO floating_ip: True/False (or it can be None)
1735 0             if interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
1736 0                 net_item["use"] = "data"
1737 0                 net_item["model"] = interface["type"]
1738 0                 net_item["type"] = interface["type"]
1739 0             elif (
1740                 interface.get("type") == "OM-MGMT"
1741                 or interface.get("mgmt-interface")
1742                 or interface.get("mgmt-vnf")
1743             ):
1744 0                 net_item["use"] = "mgmt"
1745             else:
1746                 # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
1747 0                 net_item["use"] = "bridge"
1748 0                 net_item["model"] = interface.get("type")
1749
1750 0             if interface.get("ip-address"):
1751 0                 net_item["ip_address"] = interface["ip-address"]
1752
1753 0             if interface.get("mac-address"):
1754 0                 net_item["mac_address"] = interface["mac-address"]
1755
1756 0             net_list.append(net_item)
1757
1758 0             if interface.get("mgmt-vnf"):
1759 0                 extra_dict["mgmt_vnf_interface"] = iface_index
1760 0             elif interface.get("mgmt-interface"):
1761 0                 extra_dict["mgmt_vdu_interface"] = iface_index
1762
1763         # cloud config
1764 0         cloud_config = {}
1765
1766 0         if existing_vdu.get("cloud-init"):
1767 0             if existing_vdu["cloud-init"] not in vdu2cloud_init:
1768 0                 vdu2cloud_init[existing_vdu["cloud-init"]] = Ns._get_cloud_init(
1769                     db=db,
1770                     fs=fs,
1771                     location=existing_vdu["cloud-init"],
1772                 )
1773
1774 0             cloud_content_ = vdu2cloud_init[existing_vdu["cloud-init"]]
1775 0             cloud_config["user-data"] = Ns._parse_jinja2(
1776                 cloud_init_content=cloud_content_,
1777                 params=existing_vdu.get("additionalParams"),
1778                 context=existing_vdu["cloud-init"],
1779             )
1780
1781 0         if existing_vdu.get("boot-data-drive"):
1782 0             cloud_config["boot-data-drive"] = existing_vdu.get("boot-data-drive")
1783
1784 0         ssh_keys = []
1785
1786 0         if existing_vdu.get("ssh-keys"):
1787 0             ssh_keys += existing_vdu.get("ssh-keys")
1788
1789 0         if existing_vdu.get("ssh-access-required"):
1790 0             ssh_keys.append(ro_nsr_public_key)
1791
1792 0         if ssh_keys:
1793 0             cloud_config["key-pairs"] = ssh_keys
1794
1795 0         disk_list = []
1796 0         for vol_id in vim_details.get("os-extended-volumes:volumes_attached", []):
1797 0             disk_list.append({"vim_id": vol_id["id"]})
1798
1799 0         affinity_group_list = []
1800
1801 0         if existing_vdu.get("affinity-or-anti-affinity-group-id"):
1802 0             affinity_group = {}
1803 0             for affinity_group_id in existing_vdu["affinity-or-anti-affinity-group-id"]:
1804 0                 for group in db_nsr.get("affinity-or-anti-affinity-group"):
1805 0                     if (
1806                         group["id"] == affinity_group_id
1807                         and group["vim_info"][target_id].get("vim_id", None) is not None
1808                     ):
1809 0                         affinity_group["affinity_group_id"] = group["vim_info"][
1810                             target_id
1811                         ].get("vim_id", None)
1812 0                         affinity_group_list.append(affinity_group)
1813
1814 0         extra_dict["params"] = {
1815             "name": "{}-{}-{}-{}".format(
1816                 db_nsr["name"][:16],
1817                 vnfr["member-vnf-index-ref"][:16],
1818                 existing_vdu["vdu-name"][:32],
1819                 existing_vdu.get("count-index") or 0,
1820             ),
1821             "description": existing_vdu["vdu-name"],
1822             "start": True,
1823             "image_id": vim_details["image"]["id"],
1824             "flavor_id": vim_details["flavor"]["id"],
1825             "affinity_group_list": affinity_group_list,
1826             "net_list": net_list,
1827             "cloud_config": cloud_config or None,
1828             "disk_list": disk_list,
1829             "availability_zone_index": None,  # TODO
1830             "availability_zone_list": None,  # TODO
1831         }
1832
1833 0         return extra_dict
1834
1835 1     def calculate_diff_items(
1836         self,
1837         indata,
1838         db_nsr,
1839         db_ro_nsr,
1840         db_nsr_update,
1841         item,
1842         tasks_by_target_record_id,
1843         action_id,
1844         nsr_id,
1845         task_index,
1846         vnfr_id=None,
1847         vnfr=None,
1848     ):
1849         """Function that returns the incremental changes (creation, deletion)
1850         related to a specific item `item` to be done. This function should be
1851         called for NS instantiation, NS termination, NS update to add a new VNF
1852         or a new VLD, remove a VNF or VLD, etc.
1853         Item can be `net`, `flavor`, `image` or `vdu`.
1854         It takes a list of target items from indata (which came from the REST API)
1855         and compares with the existing items from db_ro_nsr, identifying the
1856         incremental changes to be done. During the comparison, it calls the method
1857         `process_params` (which was passed as parameter, and is particular for each
1858         `item`)
1859
1860         Args:
1861             indata (Dict[str, Any]): deployment info
1862             db_nsr: NSR record from DB
1863             db_ro_nsr (Dict[str, Any]): record from "ro_nsrs"
1864             db_nsr_update (Dict[str, Any]): NSR info to update in DB
1865             item (str): element to process (net, vdu...)
1866             tasks_by_target_record_id (Dict[str, Any]):
1867                 [<target_record_id>, <task>]
1868             action_id (str): action id
1869             nsr_id (str): NSR id
1870             task_index (number): task index to add to task name
1871             vnfr_id (str): VNFR id
1872             vnfr (Dict[str, Any]): VNFR info
1873
1874         Returns:
1875             List: list with the incremental changes (deletes, creates) for each item
1876             number: current task index
1877         """
1878
1879 0         diff_items = []
1880 0         db_path = ""
1881 0         db_record = ""
1882 0         target_list = []
1883 0         existing_list = []
1884 0         process_params = None
1885 0         vdu2cloud_init = indata.get("cloud_init_content") or {}
1886 0         ro_nsr_public_key = db_ro_nsr["public_key"]
1887
1888         # According to the type of item, the path, the target_list,
1889         # the existing_list and the method to process params are set
1890 0         db_path = self.db_path_map[item]
1891 0         process_params = self.process_params_function_map[item]
1892 0         if item in ("net", "vdu"):
1893             # This case is specific for the NS VLD (not applied to VDU)
1894 0             if vnfr is None:
1895 0                 db_record = "nsrs:{}:{}".format(nsr_id, db_path)
1896 0                 target_list = indata.get("ns", []).get(db_path, [])
1897 0                 existing_list = db_nsr.get(db_path, [])
1898             # This case is common for VNF VLDs and VNF VDUs
1899             else:
1900 0                 db_record = "vnfrs:{}:{}".format(vnfr_id, db_path)
1901 0                 target_vnf = next(
1902                     (vnf for vnf in indata.get("vnf", ()) if vnf["_id"] == vnfr_id),
1903                     None,
1904                 )
1905 0                 target_list = target_vnf.get(db_path, []) if target_vnf else []
1906 0                 existing_list = vnfr.get(db_path, [])
1907 0         elif item in ("image", "flavor", "affinity-or-anti-affinity-group"):
1908 0             db_record = "nsrs:{}:{}".format(nsr_id, db_path)
1909 0             target_list = indata.get(item, [])
1910 0             existing_list = db_nsr.get(item, [])
1911         else:
1912 0             raise NsException("Item not supported: {}", item)
1913
1914         # ensure all the target_list elements has an "id". If not assign the index as id
1915 0         if target_list is None:
1916 0             target_list = []
1917 0         for target_index, tl in enumerate(target_list):
1918 0             if tl and not tl.get("id"):
1919 0                 tl["id"] = str(target_index)
1920
1921         # step 1 items (networks,vdus,...) to be deleted/updated
1922 0         for item_index, existing_item in enumerate(existing_list):
1923 0             target_item = next(
1924                 (t for t in target_list if t["id"] == existing_item["id"]),
1925                 None,
1926             )
1927
1928 0             for target_vim, existing_viminfo in existing_item.get(
1929                 "vim_info", {}
1930             ).items():
1931 0                 if existing_viminfo is None:
1932 0                     continue
1933
1934 0                 if target_item:
1935 0                     target_viminfo = target_item.get("vim_info", {}).get(target_vim)
1936                 else:
1937 0                     target_viminfo = None
1938
1939 0                 if target_viminfo is None:
1940                     # must be deleted
1941 0                     self._assign_vim(target_vim)
1942 0                     target_record_id = "{}.{}".format(db_record, existing_item["id"])
1943 0                     item_ = item
1944
1945 0                     if target_vim.startswith("sdn") or target_vim.startswith("wim"):
1946                         # item must be sdn-net instead of net if target_vim is a sdn
1947 0                         item_ = "sdn_net"
1948 0                         target_record_id += ".sdn"
1949
1950 0                     deployment_info = {
1951                         "action_id": action_id,
1952                         "nsr_id": nsr_id,
1953                         "task_index": task_index,
1954                     }
1955
1956 0                     diff_items.append(
1957                         {
1958                             "deployment_info": deployment_info,
1959                             "target_id": target_vim,
1960                             "item": item_,
1961                             "action": "DELETE",
1962                             "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}",
1963                             "target_record_id": target_record_id,
1964                         }
1965                     )
1966 0                     task_index += 1
1967
1968         # step 2 items (networks,vdus,...) to be created
1969 0         for target_item in target_list:
1970 0             item_index = -1
1971
1972 0             for item_index, existing_item in enumerate(existing_list):
1973 0                 if existing_item["id"] == target_item["id"]:
1974 0                     break
1975             else:
1976 0                 item_index += 1
1977 0                 db_nsr_update[db_path + ".{}".format(item_index)] = target_item
1978 0                 existing_list.append(target_item)
1979 0                 existing_item = None
1980
1981 0             for target_vim, target_viminfo in target_item.get("vim_info", {}).items():
1982 0                 existing_viminfo = None
1983
1984 0                 if existing_item:
1985 0                     existing_viminfo = existing_item.get("vim_info", {}).get(target_vim)
1986
1987 0                 if existing_viminfo is not None:
1988 0                     continue
1989
1990 0                 target_record_id = "{}.{}".format(db_record, target_item["id"])
1991 0                 item_ = item
1992
1993 0                 if target_vim.startswith("sdn") or target_vim.startswith("wim"):
1994                     # item must be sdn-net instead of net if target_vim is a sdn
1995 0                     item_ = "sdn_net"
1996 0                     target_record_id += ".sdn"
1997
1998 0                 kwargs = {}
1999 0                 self.logger.debug(
2000                     "ns.calculate_diff_items target_item={}".format(target_item)
2001                 )
2002 0                 if process_params == Ns._process_flavor_params:
2003 0                     kwargs.update(
2004                         {
2005                             "db": self.db,
2006                         }
2007                     )
2008 0                     self.logger.debug(
2009                         "calculate_diff_items for flavor kwargs={}".format(kwargs)
2010                     )
2011
2012 0                 if process_params == Ns._process_vdu_params:
2013 0                     self.logger.debug("calculate_diff_items self.fs={}".format(self.fs))
2014 0                     kwargs.update(
2015                         {
2016                             "vnfr_id": vnfr_id,
2017                             "nsr_id": nsr_id,
2018                             "vnfr": vnfr,
2019                             "vdu2cloud_init": vdu2cloud_init,
2020                             "tasks_by_target_record_id": tasks_by_target_record_id,
2021                             "logger": self.logger,
2022                             "db": self.db,
2023                             "fs": self.fs,
2024                             "ro_nsr_public_key": ro_nsr_public_key,
2025                         }
2026                     )
2027 0                     self.logger.debug("calculate_diff_items kwargs={}".format(kwargs))
2028
2029 0                 extra_dict = process_params(
2030                     target_item,
2031                     indata,
2032                     target_viminfo,
2033                     target_record_id,
2034                     **kwargs,
2035                 )
2036 0                 self._assign_vim(target_vim)
2037
2038 0                 deployment_info = {
2039                     "action_id": action_id,
2040                     "nsr_id": nsr_id,
2041                     "task_index": task_index,
2042                 }
2043
2044 0                 new_item = {
2045                     "deployment_info": deployment_info,
2046                     "target_id": target_vim,
2047                     "item": item_,
2048                     "action": "CREATE",
2049                     "target_record": f"{db_record}.{item_index}.vim_info.{target_vim}",
2050                     "target_record_id": target_record_id,
2051                     "extra_dict": extra_dict,
2052                     "common_id": target_item.get("common_id", None),
2053                 }
2054 0                 diff_items.append(new_item)
2055 0                 tasks_by_target_record_id[target_record_id] = new_item
2056 0                 task_index += 1
2057
2058 0                 db_nsr_update[db_path + ".{}".format(item_index)] = target_item
2059
2060 0         return diff_items, task_index
2061
2062 1     def calculate_all_differences_to_deploy(
2063         self,
2064         indata,
2065         nsr_id,
2066         db_nsr,
2067         db_vnfrs,
2068         db_ro_nsr,
2069         db_nsr_update,
2070         db_vnfrs_update,
2071         action_id,
2072         tasks_by_target_record_id,
2073     ):
2074         """This method calculates the ordered list of items (`changes_list`)
2075         to be created and deleted.
2076
2077         Args:
2078             indata (Dict[str, Any]): deployment info
2079             nsr_id (str): NSR id
2080             db_nsr: NSR record from DB
2081             db_vnfrs: VNFRS record from DB
2082             db_ro_nsr (Dict[str, Any]): record from "ro_nsrs"
2083             db_nsr_update (Dict[str, Any]): NSR info to update in DB
2084             db_vnfrs_update (Dict[str, Any]): VNFRS info to update in DB
2085             action_id (str): action id
2086             tasks_by_target_record_id (Dict[str, Any]):
2087                 [<target_record_id>, <task>]
2088
2089         Returns:
2090             List: ordered list of items to be created and deleted.
2091         """
2092
2093 0         task_index = 0
2094         # set list with diffs:
2095 0         changes_list = []
2096
2097         # NS vld, image and flavor
2098 0         for item in ["net", "image", "flavor", "affinity-or-anti-affinity-group"]:
2099 0             self.logger.debug("process NS={} {}".format(nsr_id, item))
2100 0             diff_items, task_index = self.calculate_diff_items(
2101                 indata=indata,
2102                 db_nsr=db_nsr,
2103                 db_ro_nsr=db_ro_nsr,
2104                 db_nsr_update=db_nsr_update,
2105                 item=item,
2106                 tasks_by_target_record_id=tasks_by_target_record_id,
2107                 action_id=action_id,
2108                 nsr_id=nsr_id,
2109                 task_index=task_index,
2110                 vnfr_id=None,
2111             )
2112 0             changes_list += diff_items
2113
2114         # VNF vlds and vdus
2115 0         for vnfr_id, vnfr in db_vnfrs.items():
2116             # vnfr_id need to be set as global variable for among others nested method _process_vdu_params
2117 0             for item in ["net", "vdu"]:
2118 0                 self.logger.debug("process VNF={} {}".format(vnfr_id, item))
2119 0                 diff_items, task_index = self.calculate_diff_items(
2120                     indata=indata,
2121                     db_nsr=db_nsr,
2122                     db_ro_nsr=db_ro_nsr,
2123                     db_nsr_update=db_vnfrs_update[vnfr["_id"]],
2124                     item=item,
2125                     tasks_by_target_record_id=tasks_by_target_record_id,
2126                     action_id=action_id,
2127                     nsr_id=nsr_id,
2128                     task_index=task_index,
2129                     vnfr_id=vnfr_id,
2130                     vnfr=vnfr,
2131                 )
2132 0                 changes_list += diff_items
2133
2134 0         return changes_list
2135
2136 1     def define_all_tasks(
2137         self,
2138         changes_list,
2139         db_new_tasks,
2140         tasks_by_target_record_id,
2141     ):
2142         """Function to create all the task structures obtanied from
2143         the method calculate_all_differences_to_deploy
2144
2145         Args:
2146             changes_list (List): ordered list of items to be created or deleted
2147             db_new_tasks (List): tasks list to be created
2148             action_id (str): action id
2149             tasks_by_target_record_id (Dict[str, Any]):
2150                 [<target_record_id>, <task>]
2151
2152         """
2153
2154 0         for change in changes_list:
2155 0             task = Ns._create_task(
2156                 deployment_info=change["deployment_info"],
2157                 target_id=change["target_id"],
2158                 item=change["item"],
2159                 action=change["action"],
2160                 target_record=change["target_record"],
2161                 target_record_id=change["target_record_id"],
2162                 extra_dict=change.get("extra_dict", None),
2163             )
2164
2165 0             self.logger.debug("ns.define_all_tasks task={}".format(task))
2166 0             tasks_by_target_record_id[change["target_record_id"]] = task
2167 0             db_new_tasks.append(task)
2168
2169 0             if change.get("common_id"):
2170 0                 task["common_id"] = change["common_id"]
2171
2172 1     def upload_all_tasks(
2173         self,
2174         db_new_tasks,
2175         now,
2176     ):
2177         """Function to save all tasks in the common DB
2178
2179         Args:
2180             db_new_tasks (List): tasks list to be created
2181             now (time): current time
2182
2183         """
2184
2185 0         nb_ro_tasks = 0  # for logging
2186
2187 0         for db_task in db_new_tasks:
2188 0             target_id = db_task.pop("target_id")
2189 0             common_id = db_task.get("common_id")
2190
2191             # Do not chek tasks with vim_status DELETED
2192             # because in manual heealing there are two tasks for the same vdur:
2193             #   one with vim_status deleted and the other one with the actual VM status.
2194
2195 0             if common_id:
2196 0                 if self.db.set_one(
2197                     "ro_tasks",
2198                     q_filter={
2199                         "target_id": target_id,
2200                         "tasks.common_id": common_id,
2201                         "vim_info.vim_status.ne": "DELETED",
2202                     },
2203                     update_dict={"to_check_at": now, "modified_at": now},
2204                     push={"tasks": db_task},
2205                     fail_on_empty=False,
2206                 ):
2207 0                     continue
2208
2209 0             if not self.db.set_one(
2210                 "ro_tasks",
2211                 q_filter={
2212                     "target_id": target_id,
2213                     "tasks.target_record": db_task["target_record"],
2214                     "vim_info.vim_status.ne": "DELETED",
2215                 },
2216                 update_dict={"to_check_at": now, "modified_at": now},
2217                 push={"tasks": db_task},
2218                 fail_on_empty=False,
2219             ):
2220                 # Create a ro_task
2221 0                 self.logger.debug("Updating database, Creating ro_tasks")
2222 0                 db_ro_task = Ns._create_ro_task(target_id, db_task)
2223 0                 nb_ro_tasks += 1
2224 0                 self.db.create("ro_tasks", db_ro_task)
2225
2226 0         self.logger.debug(
2227             "Created {} ro_tasks; {} tasks - db_new_tasks={}".format(
2228                 nb_ro_tasks, len(db_new_tasks), db_new_tasks
2229             )
2230         )
2231
2232 1     def upload_recreate_tasks(
2233         self,
2234         db_new_tasks,
2235         now,
2236     ):
2237         """Function to save recreate tasks in the common DB
2238
2239         Args:
2240             db_new_tasks (List): tasks list to be created
2241             now (time): current time
2242
2243         """
2244
2245 0         nb_ro_tasks = 0  # for logging
2246
2247 0         for db_task in db_new_tasks:
2248 0             target_id = db_task.pop("target_id")
2249 0             self.logger.debug("target_id={} db_task={}".format(target_id, db_task))
2250
2251 0             action = db_task.get("action", None)
2252
2253             # Create a ro_task
2254 0             self.logger.debug("Updating database, Creating ro_tasks")
2255 0             db_ro_task = Ns._create_ro_task(target_id, db_task)
2256
2257             # If DELETE task: the associated created items should be removed
2258             # (except persistent volumes):
2259 0             if action == "DELETE":
2260 0                 db_ro_task["vim_info"]["created"] = True
2261 0                 db_ro_task["vim_info"]["created_items"] = db_task.get(
2262                     "created_items", {}
2263                 )
2264 0                 db_ro_task["vim_info"]["volumes_to_hold"] = db_task.get(
2265                     "volumes_to_hold", []
2266                 )
2267 0                 db_ro_task["vim_info"]["vim_id"] = db_task.get("vim_id", None)
2268
2269 0             nb_ro_tasks += 1
2270 0             self.logger.debug("upload_all_tasks db_ro_task={}".format(db_ro_task))
2271 0             self.db.create("ro_tasks", db_ro_task)
2272
2273 0         self.logger.debug(
2274             "Created {} ro_tasks; {} tasks - db_new_tasks={}".format(
2275                 nb_ro_tasks, len(db_new_tasks), db_new_tasks
2276             )
2277         )
2278
2279 1     def _prepare_created_items_for_healing(
2280         self,
2281         nsr_id,
2282         target_record,
2283     ):
2284 0         created_items = {}
2285         # Get created_items from ro_task
2286 0         ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2287 0         for ro_task in ro_tasks:
2288 0             for task in ro_task["tasks"]:
2289 0                 if (
2290                     task["target_record"] == target_record
2291                     and task["action"] == "CREATE"
2292                     and ro_task["vim_info"]["created_items"]
2293                 ):
2294 0                     created_items = ro_task["vim_info"]["created_items"]
2295 0                     break
2296
2297 0         return created_items
2298
2299 1     def _prepare_persistent_volumes_for_healing(
2300         self,
2301         target_id,
2302         existing_vdu,
2303     ):
2304         # The associated volumes of the VM shouldn't be removed
2305 0         volumes_list = []
2306 0         vim_details = {}
2307 0         vim_details_text = existing_vdu["vim_info"][target_id].get("vim_details", None)
2308 0         if vim_details_text:
2309 0             vim_details = yaml.safe_load(f"{vim_details_text}")
2310
2311 0             for vol_id in vim_details.get("os-extended-volumes:volumes_attached", []):
2312 0                 volumes_list.append(vol_id["id"])
2313
2314 0         return volumes_list
2315
2316 1     def prepare_changes_to_recreate(
2317         self,
2318         indata,
2319         nsr_id,
2320         db_nsr,
2321         db_vnfrs,
2322         db_ro_nsr,
2323         action_id,
2324         tasks_by_target_record_id,
2325     ):
2326         """This method will obtain an ordered list of items (`changes_list`)
2327         to be created and deleted to meet the recreate request.
2328         """
2329
2330 0         self.logger.debug(
2331             "ns.prepare_changes_to_recreate nsr_id={} indata={}".format(nsr_id, indata)
2332         )
2333
2334 0         task_index = 0
2335         # set list with diffs:
2336 0         changes_list = []
2337 0         db_path = self.db_path_map["vdu"]
2338 0         target_list = indata.get("healVnfData", {})
2339 0         vdu2cloud_init = indata.get("cloud_init_content") or {}
2340 0         ro_nsr_public_key = db_ro_nsr["public_key"]
2341
2342         # Check each VNF of the target
2343 0         for target_vnf in target_list:
2344             # Find this VNF in the list from DB, raise exception if vnfInstanceId is not found
2345 0             vnfr_id = target_vnf["vnfInstanceId"]
2346 0             existing_vnf = db_vnfrs.get(vnfr_id)
2347 0             db_record = "vnfrs:{}:{}".format(vnfr_id, db_path)
2348             # vim_account_id = existing_vnf.get("vim-account-id", "")
2349
2350 0             target_vdus = target_vnf.get("additionalParams", {}).get("vdu", [])
2351             # Check each VDU of this VNF
2352 0             if not target_vdus:
2353                 # Create target_vdu_list from DB, if VDUs are not specified
2354 0                 target_vdus = []
2355 0                 for existing_vdu in existing_vnf.get("vdur"):
2356 0                     vdu_name = existing_vdu.get("vdu-name", None)
2357 0                     vdu_index = existing_vdu.get("count-index", 0)
2358 0                     vdu_to_be_healed = {"vdu-id": vdu_name, "count-index": vdu_index}
2359 0                     target_vdus.append(vdu_to_be_healed)
2360 0             for target_vdu in target_vdus:
2361 0                 vdu_name = target_vdu.get("vdu-id", None)
2362                 # For multi instance VDU count-index is mandatory
2363                 # For single session VDU count-indes is 0
2364 0                 count_index = target_vdu.get("count-index", 0)
2365 0                 item_index = 0
2366 0                 existing_instance = None
2367 0                 for instance in existing_vnf.get("vdur", None):
2368 0                     if (
2369                         instance["vdu-name"] == vdu_name
2370                         and instance["count-index"] == count_index
2371                     ):
2372 0                         existing_instance = instance
2373 0                         break
2374                     else:
2375 0                         item_index += 1
2376
2377 0                 target_record_id = "{}.{}".format(db_record, existing_instance["id"])
2378
2379                 # The target VIM is the one already existing in DB to recreate
2380 0                 for target_vim, target_viminfo in existing_instance.get(
2381                     "vim_info", {}
2382                 ).items():
2383                     # step 1 vdu to be deleted
2384 0                     self._assign_vim(target_vim)
2385 0                     deployment_info = {
2386                         "action_id": action_id,
2387                         "nsr_id": nsr_id,
2388                         "task_index": task_index,
2389                     }
2390
2391 0                     target_record = f"{db_record}.{item_index}.vim_info.{target_vim}"
2392 0                     created_items = self._prepare_created_items_for_healing(
2393                         nsr_id, target_record
2394                     )
2395
2396 0                     volumes_to_hold = self._prepare_persistent_volumes_for_healing(
2397                         target_vim, existing_instance
2398                     )
2399
2400                     # Specific extra params for recreate tasks:
2401 0                     extra_dict = {
2402                         "created_items": created_items,
2403                         "vim_id": existing_instance["vim-id"],
2404                         "volumes_to_hold": volumes_to_hold,
2405                     }
2406
2407 0                     changes_list.append(
2408                         {
2409                             "deployment_info": deployment_info,
2410                             "target_id": target_vim,
2411                             "item": "vdu",
2412                             "action": "DELETE",
2413                             "target_record": target_record,
2414                             "target_record_id": target_record_id,
2415                             "extra_dict": extra_dict,
2416                         }
2417                     )
2418 0                     delete_task_id = f"{action_id}:{task_index}"
2419 0                     task_index += 1
2420
2421                     # step 2 vdu to be created
2422 0                     kwargs = {}
2423 0                     kwargs.update(
2424                         {
2425                             "vnfr_id": vnfr_id,
2426                             "nsr_id": nsr_id,
2427                             "vnfr": existing_vnf,
2428                             "vdu2cloud_init": vdu2cloud_init,
2429                             "tasks_by_target_record_id": tasks_by_target_record_id,
2430                             "logger": self.logger,
2431                             "db": self.db,
2432                             "fs": self.fs,
2433                             "ro_nsr_public_key": ro_nsr_public_key,
2434                         }
2435                     )
2436
2437 0                     extra_dict = self._process_recreate_vdu_params(
2438                         existing_instance,
2439                         db_nsr,
2440                         target_viminfo,
2441                         target_record_id,
2442                         target_vim,
2443                         **kwargs,
2444                     )
2445
2446                     # The CREATE task depens on the DELETE task
2447 0                     extra_dict["depends_on"] = [delete_task_id]
2448
2449                     # Add volumes created from created_items if any
2450                     # Ports should be deleted with delete task and automatically created with create task
2451 0                     volumes = {}
2452 0                     for k, v in created_items.items():
2453 0                         try:
2454 0                             k_item, _, k_id = k.partition(":")
2455 0                             if k_item == "volume":
2456 0                                 volumes[k] = v
2457 0                         except Exception as e:
2458 0                             self.logger.error(
2459                                 "Error evaluating created item {}: {}".format(k, e)
2460                             )
2461 0                     extra_dict["previous_created_volumes"] = volumes
2462
2463 0                     deployment_info = {
2464                         "action_id": action_id,
2465                         "nsr_id": nsr_id,
2466                         "task_index": task_index,
2467                     }
2468 0                     self._assign_vim(target_vim)
2469
2470 0                     new_item = {
2471                         "deployment_info": deployment_info,
2472                         "target_id": target_vim,
2473                         "item": "vdu",
2474                         "action": "CREATE",
2475                         "target_record": target_record,
2476                         "target_record_id": target_record_id,
2477                         "extra_dict": extra_dict,
2478                     }
2479 0                     changes_list.append(new_item)
2480 0                     tasks_by_target_record_id[target_record_id] = new_item
2481 0                     task_index += 1
2482
2483 0         return changes_list
2484
2485 1     def recreate(self, session, indata, version, nsr_id, *args, **kwargs):
2486 0         self.logger.debug("ns.recreate nsr_id={} indata={}".format(nsr_id, indata))
2487         # TODO: validate_input(indata, recreate_schema)
2488 0         action_id = indata.get("action_id", str(uuid4()))
2489         # get current deployment
2490 0         db_vnfrs = {}  # vnf's info indexed by _id
2491 0         step = ""
2492 0         logging_text = "Recreate nsr_id={} action_id={} indata={}".format(
2493             nsr_id, action_id, indata
2494         )
2495 0         self.logger.debug(logging_text + "Enter")
2496
2497 0         try:
2498 0             step = "Getting ns and vnfr record from db"
2499 0             db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2500 0             db_new_tasks = []
2501 0             tasks_by_target_record_id = {}
2502             # read from db: vnf's of this ns
2503 0             step = "Getting vnfrs from db"
2504 0             db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2505 0             self.logger.debug("ns.recreate: db_vnfrs_list={}".format(db_vnfrs_list))
2506
2507 0             if not db_vnfrs_list:
2508 0                 raise NsException("Cannot obtain associated VNF for ns")
2509
2510 0             for vnfr in db_vnfrs_list:
2511 0                 db_vnfrs[vnfr["_id"]] = vnfr
2512
2513 0             now = time()
2514 0             db_ro_nsr = self.db.get_one("ro_nsrs", {"_id": nsr_id}, fail_on_empty=False)
2515 0             self.logger.debug("ns.recreate: db_ro_nsr={}".format(db_ro_nsr))
2516
2517 0             if not db_ro_nsr:
2518 0                 db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now)
2519
2520 0             with self.write_lock:
2521                 # NS
2522 0                 step = "process NS elements"
2523 0                 changes_list = self.prepare_changes_to_recreate(
2524                     indata=indata,
2525                     nsr_id=nsr_id,
2526                     db_nsr=db_nsr,
2527                     db_vnfrs=db_vnfrs,
2528                     db_ro_nsr=db_ro_nsr,
2529                     action_id=action_id,
2530                     tasks_by_target_record_id=tasks_by_target_record_id,
2531                 )
2532
2533 0                 self.define_all_tasks(
2534                     changes_list=changes_list,
2535                     db_new_tasks=db_new_tasks,
2536                     tasks_by_target_record_id=tasks_by_target_record_id,
2537                 )
2538
2539                 # Delete all ro_tasks registered for the targets vdurs (target_record)
2540                 # If task of type CREATE exist then vim will try to get info form deleted VMs.
2541                 # So remove all task related to target record.
2542 0                 ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2543 0                 for change in changes_list:
2544 0                     for ro_task in ro_tasks:
2545 0                         for task in ro_task["tasks"]:
2546 0                             if task["target_record"] == change["target_record"]:
2547 0                                 self.db.del_one(
2548                                     "ro_tasks",
2549                                     q_filter={
2550                                         "_id": ro_task["_id"],
2551                                         "modified_at": ro_task["modified_at"],
2552                                     },
2553                                     fail_on_empty=False,
2554                                 )
2555
2556 0                 step = "Updating database, Appending tasks to ro_tasks"
2557 0                 self.upload_recreate_tasks(
2558                     db_new_tasks=db_new_tasks,
2559                     now=now,
2560                 )
2561
2562 0             self.logger.debug(
2563                 logging_text + "Exit. Created {} tasks".format(len(db_new_tasks))
2564             )
2565
2566 0             return (
2567                 {"status": "ok", "nsr_id": nsr_id, "action_id": action_id},
2568                 action_id,
2569                 True,
2570             )
2571 0         except Exception as e:
2572 0             if isinstance(e, (DbException, NsException)):
2573 0                 self.logger.error(
2574                     logging_text + "Exit Exception while '{}': {}".format(step, e)
2575                 )
2576             else:
2577 0                 e = traceback_format_exc()
2578 0                 self.logger.critical(
2579                     logging_text + "Exit Exception while '{}': {}".format(step, e),
2580                     exc_info=True,
2581                 )
2582
2583 0             raise NsException(e)
2584
2585 1     def deploy(self, session, indata, version, nsr_id, *args, **kwargs):
2586 0         self.logger.debug("ns.deploy nsr_id={} indata={}".format(nsr_id, indata))
2587 0         validate_input(indata, deploy_schema)
2588 0         action_id = indata.get("action_id", str(uuid4()))
2589 0         task_index = 0
2590         # get current deployment
2591 0         db_nsr_update = {}  # update operation on nsrs
2592 0         db_vnfrs_update = {}
2593 0         db_vnfrs = {}  # vnf's info indexed by _id
2594 0         step = ""
2595 0         logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
2596 0         self.logger.debug(logging_text + "Enter")
2597
2598 0         try:
2599 0             step = "Getting ns and vnfr record from db"
2600 0             db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2601 0             self.logger.debug("ns.deploy: db_nsr={}".format(db_nsr))
2602 0             db_new_tasks = []
2603 0             tasks_by_target_record_id = {}
2604             # read from db: vnf's of this ns
2605 0             step = "Getting vnfrs from db"
2606 0             db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2607
2608 0             if not db_vnfrs_list:
2609 0                 raise NsException("Cannot obtain associated VNF for ns")
2610
2611 0             for vnfr in db_vnfrs_list:
2612 0                 db_vnfrs[vnfr["_id"]] = vnfr
2613 0                 db_vnfrs_update[vnfr["_id"]] = {}
2614 0             self.logger.debug("ns.deploy db_vnfrs={}".format(db_vnfrs))
2615
2616 0             now = time()
2617 0             db_ro_nsr = self.db.get_one("ro_nsrs", {"_id": nsr_id}, fail_on_empty=False)
2618
2619 0             if not db_ro_nsr:
2620 0                 db_ro_nsr = self._create_db_ro_nsrs(nsr_id, now)
2621
2622             # check that action_id is not in the list of actions. Suffixed with :index
2623 0             if action_id in db_ro_nsr["actions"]:
2624 0                 index = 1
2625
2626                 while True:
2627 0                     new_action_id = "{}:{}".format(action_id, index)
2628
2629 0                     if new_action_id not in db_ro_nsr["actions"]:
2630 0                         action_id = new_action_id
2631 0                         self.logger.debug(
2632                             logging_text
2633                             + "Changing action_id in use to {}".format(action_id)
2634                         )
2635 0                         break
2636
2637 0                     index += 1
2638
2639 0             def _process_action(indata):
2640                 nonlocal db_new_tasks
2641                 nonlocal action_id
2642                 nonlocal nsr_id
2643                 nonlocal task_index
2644                 nonlocal db_vnfrs
2645                 nonlocal db_ro_nsr
2646
2647 0                 if indata["action"]["action"] == "inject_ssh_key":
2648 0                     key = indata["action"].get("key")
2649 0                     user = indata["action"].get("user")
2650 0                     password = indata["action"].get("password")
2651
2652 0                     for vnf in indata.get("vnf", ()):
2653 0                         if vnf["_id"] not in db_vnfrs:
2654 0                             raise NsException("Invalid vnf={}".format(vnf["_id"]))
2655
2656 0                         db_vnfr = db_vnfrs[vnf["_id"]]
2657
2658 0                         for target_vdu in vnf.get("vdur", ()):
2659 0                             vdu_index, vdur = next(
2660                                 (
2661                                     i_v
2662                                     for i_v in enumerate(db_vnfr["vdur"])
2663                                     if i_v[1]["id"] == target_vdu["id"]
2664                                 ),
2665                                 (None, None),
2666                             )
2667
2668 0                             if not vdur:
2669 0                                 raise NsException(
2670                                     "Invalid vdu vnf={}.{}".format(
2671                                         vnf["_id"], target_vdu["id"]
2672                                     )
2673                                 )
2674
2675 0                             target_vim, vim_info = next(
2676                                 k_v for k_v in vdur["vim_info"].items()
2677                             )
2678 0                             self._assign_vim(target_vim)
2679 0                             target_record = "vnfrs:{}:vdur.{}.ssh_keys".format(
2680                                 vnf["_id"], vdu_index
2681                             )
2682 0                             extra_dict = {
2683                                 "depends_on": [
2684                                     "vnfrs:{}:vdur.{}".format(vnf["_id"], vdur["id"])
2685                                 ],
2686                                 "params": {
2687                                     "ip_address": vdur.get("ip-address"),
2688                                     "user": user,
2689                                     "key": key,
2690                                     "password": password,
2691                                     "private_key": db_ro_nsr["private_key"],
2692                                     "salt": db_ro_nsr["_id"],
2693                                     "schema_version": db_ro_nsr["_admin"][
2694                                         "schema_version"
2695                                     ],
2696                                 },
2697                             }
2698
2699 0                             deployment_info = {
2700                                 "action_id": action_id,
2701                                 "nsr_id": nsr_id,
2702                                 "task_index": task_index,
2703                             }
2704
2705 0                             task = Ns._create_task(
2706                                 deployment_info=deployment_info,
2707                                 target_id=target_vim,
2708                                 item="vdu",
2709                                 action="EXEC",
2710                                 target_record=target_record,
2711                                 target_record_id=None,
2712                                 extra_dict=extra_dict,
2713                             )
2714
2715 0                             task_index = deployment_info.get("task_index")
2716
2717 0                             db_new_tasks.append(task)
2718
2719 0             with self.write_lock:
2720 0                 if indata.get("action"):
2721 0                     _process_action(indata)
2722                 else:
2723                     # compute network differences
2724                     # NS
2725 0                     step = "process NS elements"
2726 0                     changes_list = self.calculate_all_differences_to_deploy(
2727                         indata=indata,
2728                         nsr_id=nsr_id,
2729                         db_nsr=db_nsr,
2730                         db_vnfrs=db_vnfrs,
2731                         db_ro_nsr=db_ro_nsr,
2732                         db_nsr_update=db_nsr_update,
2733                         db_vnfrs_update=db_vnfrs_update,
2734                         action_id=action_id,
2735                         tasks_by_target_record_id=tasks_by_target_record_id,
2736                     )
2737 0                     self.define_all_tasks(
2738                         changes_list=changes_list,
2739                         db_new_tasks=db_new_tasks,
2740                         tasks_by_target_record_id=tasks_by_target_record_id,
2741                     )
2742
2743 0                 step = "Updating database, Appending tasks to ro_tasks"
2744 0                 self.upload_all_tasks(
2745                     db_new_tasks=db_new_tasks,
2746                     now=now,
2747                 )
2748
2749 0                 step = "Updating database, nsrs"
2750 0                 if db_nsr_update:
2751 0                     self.db.set_one("nsrs", {"_id": nsr_id}, db_nsr_update)
2752
2753 0                 for vnfr_id, db_vnfr_update in db_vnfrs_update.items():
2754 0                     if db_vnfr_update:
2755 0                         step = "Updating database, vnfrs={}".format(vnfr_id)
2756 0                         self.db.set_one("vnfrs", {"_id": vnfr_id}, db_vnfr_update)
2757
2758 0             self.logger.debug(
2759                 logging_text + "Exit. Created {} tasks".format(len(db_new_tasks))
2760             )
2761
2762 0             return (
2763                 {"status": "ok", "nsr_id": nsr_id, "action_id": action_id},
2764                 action_id,
2765                 True,
2766             )
2767 0         except Exception as e:
2768 0             if isinstance(e, (DbException, NsException)):
2769 0                 self.logger.error(
2770                     logging_text + "Exit Exception while '{}': {}".format(step, e)
2771                 )
2772             else:
2773 0                 e = traceback_format_exc()
2774 0                 self.logger.critical(
2775                     logging_text + "Exit Exception while '{}': {}".format(step, e),
2776                     exc_info=True,
2777                 )
2778
2779 0             raise NsException(e)
2780
2781 1     def delete(self, session, indata, version, nsr_id, *args, **kwargs):
2782 0         self.logger.debug("ns.delete version={} nsr_id={}".format(version, nsr_id))
2783         # self.db.del_list({"_id": ro_task["_id"], "tasks.nsr_id.ne": nsr_id})
2784
2785 0         with self.write_lock:
2786 0             try:
2787 0                 NsWorker.delete_db_tasks(self.db, nsr_id, None)
2788 0             except NsWorkerException as e:
2789 0                 raise NsException(e)
2790
2791 0         return None, None, True
2792
2793 1     def status(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
2794 0         self.logger.debug(
2795             "ns.status version={} nsr_id={}, action_id={} indata={}".format(
2796                 version, nsr_id, action_id, indata
2797             )
2798         )
2799 0         task_list = []
2800 0         done = 0
2801 0         total = 0
2802 0         ro_tasks = self.db.get_list("ro_tasks", {"tasks.action_id": action_id})
2803 0         global_status = "DONE"
2804 0         details = []
2805
2806 0         for ro_task in ro_tasks:
2807 0             for task in ro_task["tasks"]:
2808 0                 if task and task["action_id"] == action_id:
2809 0                     task_list.append(task)
2810 0                     total += 1
2811
2812 0                     if task["status"] == "FAILED":
2813 0                         global_status = "FAILED"
2814 0                         error_text = "Error at {} {}: {}".format(
2815                             task["action"].lower(),
2816                             task["item"],
2817                             ro_task["vim_info"].get("vim_message") or "unknown",
2818                         )
2819 0                         details.append(error_text)
2820 0                     elif task["status"] in ("SCHEDULED", "BUILD"):
2821 0                         if global_status != "FAILED":
2822 0                             global_status = "BUILD"
2823                     else:
2824 0                         done += 1
2825
2826 0         return_data = {
2827             "status": global_status,
2828             "details": ". ".join(details)
2829             if details
2830             else "progress {}/{}".format(done, total),
2831             "nsr_id": nsr_id,
2832             "action_id": action_id,
2833             "tasks": task_list,
2834         }
2835
2836 0         return return_data, None, True
2837
2838 1     def recreate_status(
2839         self, session, indata, version, nsr_id, action_id, *args, **kwargs
2840     ):
2841 0         return self.status(session, indata, version, nsr_id, action_id, *args, **kwargs)
2842
2843 1     def cancel(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
2844 0         print(
2845             "ns.cancel session={} indata={} version={} nsr_id={}, action_id={}".format(
2846                 session, indata, version, nsr_id, action_id
2847             )
2848         )
2849
2850 0         return None, None, True
2851
2852 1     def rebuild_start_stop_task(
2853         self,
2854         vdu_id,
2855         vnf_id,
2856         vdu_index,
2857         action_id,
2858         nsr_id,
2859         task_index,
2860         target_vim,
2861         extra_dict,
2862     ):
2863 1         self._assign_vim(target_vim)
2864 1         target_record = "vnfrs:{}:vdur.{}".format(vnf_id, vdu_index)
2865 1         target_record_id = "vnfrs:{}:vdur.{}".format(vnf_id, vdu_id)
2866 1         deployment_info = {
2867             "action_id": action_id,
2868             "nsr_id": nsr_id,
2869             "task_index": task_index,
2870         }
2871
2872 1         task = Ns._create_task(
2873             deployment_info=deployment_info,
2874             target_id=target_vim,
2875             item="update",
2876             action="EXEC",
2877             target_record=target_record,
2878             target_record_id=target_record_id,
2879             extra_dict=extra_dict,
2880         )
2881 1         return task
2882
2883 1     def rebuild_start_stop(
2884         self, session, action_dict, version, nsr_id, *args, **kwargs
2885     ):
2886 0         task_index = 0
2887 0         extra_dict = {}
2888 0         now = time()
2889 0         action_id = action_dict.get("action_id", str(uuid4()))
2890 0         step = ""
2891 0         logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
2892 0         self.logger.debug(logging_text + "Enter")
2893
2894 0         action = list(action_dict.keys())[0]
2895 0         task_dict = action_dict.get(action)
2896 0         vim_vm_id = action_dict.get(action).get("vim_vm_id")
2897
2898 0         if action_dict.get("stop"):
2899 0             action = "shutoff"
2900 0         db_new_tasks = []
2901 0         try:
2902 0             step = "lock the operation & do task creation"
2903 0             with self.write_lock:
2904 0                 extra_dict["params"] = {
2905                     "vim_vm_id": vim_vm_id,
2906                     "action": action,
2907                 }
2908 0                 task = self.rebuild_start_stop_task(
2909                     task_dict["vdu_id"],
2910                     task_dict["vnf_id"],
2911                     task_dict["vdu_index"],
2912                     action_id,
2913                     nsr_id,
2914                     task_index,
2915                     task_dict["target_vim"],
2916                     extra_dict,
2917                 )
2918 0                 db_new_tasks.append(task)
2919 0                 step = "upload Task to db"
2920 0                 self.upload_all_tasks(
2921                     db_new_tasks=db_new_tasks,
2922                     now=now,
2923                 )
2924 0                 self.logger.debug(
2925                     logging_text + "Exit. Created {} tasks".format(len(db_new_tasks))
2926                 )
2927 0                 return (
2928                     {"status": "ok", "nsr_id": nsr_id, "action_id": action_id},
2929                     action_id,
2930                     True,
2931                 )
2932 0         except Exception as e:
2933 0             if isinstance(e, (DbException, NsException)):
2934 0                 self.logger.error(
2935                     logging_text + "Exit Exception while '{}': {}".format(step, e)
2936                 )
2937             else:
2938 0                 e = traceback_format_exc()
2939 0                 self.logger.critical(
2940                     logging_text + "Exit Exception while '{}': {}".format(step, e),
2941                     exc_info=True,
2942                 )
2943 0             raise NsException(e)
2944
2945 1     def get_deploy(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
2946 0         nsrs = self.db.get_list("nsrs", {})
2947 0         return_data = []
2948
2949 0         for ns in nsrs:
2950 0             return_data.append({"_id": ns["_id"], "name": ns["name"]})
2951
2952 0         return return_data, None, True
2953
2954 1     def get_actions(self, session, indata, version, nsr_id, action_id, *args, **kwargs):
2955 0         ro_tasks = self.db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2956 0         return_data = []
2957
2958 0         for ro_task in ro_tasks:
2959 0             for task in ro_task["tasks"]:
2960 0                 if task["action_id"] not in return_data:
2961 0                     return_data.append(task["action_id"])
2962
2963 0         return return_data, None, True
2964
2965 1     def migrate_task(
2966         self, vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict
2967     ):
2968 1         target_vim, vim_info = next(k_v for k_v in vdu["vim_info"].items())
2969 1         self._assign_vim(target_vim)
2970 1         target_record = "vnfrs:{}:vdur.{}".format(vnf["_id"], vdu_index)
2971 1         target_record_id = "vnfrs:{}:vdur.{}".format(vnf["_id"], vdu["id"])
2972 1         deployment_info = {
2973             "action_id": action_id,
2974             "nsr_id": nsr_id,
2975             "task_index": task_index,
2976         }
2977
2978 1         task = Ns._create_task(
2979             deployment_info=deployment_info,
2980             target_id=target_vim,
2981             item="migrate",
2982             action="EXEC",
2983             target_record=target_record,
2984             target_record_id=target_record_id,
2985             extra_dict=extra_dict,
2986         )
2987
2988 1         return task
2989
2990 1     def migrate(self, session, indata, version, nsr_id, *args, **kwargs):
2991 0         task_index = 0
2992 0         extra_dict = {}
2993 0         now = time()
2994 0         action_id = indata.get("action_id", str(uuid4()))
2995 0         step = ""
2996 0         logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
2997 0         self.logger.debug(logging_text + "Enter")
2998 0         try:
2999 0             vnf_instance_id = indata["vnfInstanceId"]
3000 0             step = "Getting vnfrs from db"
3001 0             db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
3002 0             vdu = indata.get("vdu")
3003 0             migrateToHost = indata.get("migrateToHost")
3004 0             db_new_tasks = []
3005
3006 0             with self.write_lock:
3007 0                 if vdu is not None:
3008 0                     vdu_id = indata["vdu"]["vduId"]
3009 0                     vdu_count_index = indata["vdu"].get("vduCountIndex", 0)
3010 0                     for vdu_index, vdu in enumerate(db_vnfr["vdur"]):
3011 0                         if (
3012                             vdu["vdu-id-ref"] == vdu_id
3013                             and vdu["count-index"] == vdu_count_index
3014                         ):
3015 0                             extra_dict["params"] = {
3016                                 "vim_vm_id": vdu["vim-id"],
3017                                 "migrate_host": migrateToHost,
3018                                 "vdu_vim_info": vdu["vim_info"],
3019                             }
3020 0                             step = "Creating migration task for vdu:{}".format(vdu)
3021 0                             task = self.migrate_task(
3022                                 vdu,
3023                                 db_vnfr,
3024                                 vdu_index,
3025                                 action_id,
3026                                 nsr_id,
3027                                 task_index,
3028                                 extra_dict,
3029                             )
3030 0                             db_new_tasks.append(task)
3031 0                             task_index += 1
3032 0                             break
3033                 else:
3034 0                     for vdu_index, vdu in enumerate(db_vnfr["vdur"]):
3035 0                         extra_dict["params"] = {
3036                             "vim_vm_id": vdu["vim-id"],
3037                             "migrate_host": migrateToHost,
3038                             "vdu_vim_info": vdu["vim_info"],
3039                         }
3040 0                         step = "Creating migration task for vdu:{}".format(vdu)
3041 0                         task = self.migrate_task(
3042                             vdu,
3043                             db_vnfr,
3044                             vdu_index,
3045                             action_id,
3046                             nsr_id,
3047                             task_index,
3048                             extra_dict,
3049                         )
3050 0                         db_new_tasks.append(task)
3051 0                         task_index += 1
3052
3053 0                 self.upload_all_tasks(
3054                     db_new_tasks=db_new_tasks,
3055                     now=now,
3056                 )
3057
3058 0             self.logger.debug(
3059                 logging_text + "Exit. Created {} tasks".format(len(db_new_tasks))
3060             )
3061 0             return (
3062                 {"status": "ok", "nsr_id": nsr_id, "action_id": action_id},
3063                 action_id,
3064                 True,
3065             )
3066 0         except Exception as e:
3067 0             if isinstance(e, (DbException, NsException)):
3068 0                 self.logger.error(
3069                     logging_text + "Exit Exception while '{}': {}".format(step, e)
3070                 )
3071             else:
3072 0                 e = traceback_format_exc()
3073 0                 self.logger.critical(
3074                     logging_text + "Exit Exception while '{}': {}".format(step, e),
3075                     exc_info=True,
3076                 )
3077 0             raise NsException(e)
3078
3079 1     def verticalscale_task(
3080         self, vdu, vnf, vdu_index, action_id, nsr_id, task_index, extra_dict
3081     ):
3082 1         target_vim, vim_info = next(k_v for k_v in vdu["vim_info"].items())
3083 1         self._assign_vim(target_vim)
3084 1         target_record = "vnfrs:{}:vdur.{}".format(vnf["_id"], vdu_index)
3085 1         target_record_id = "vnfrs:{}:vdur.{}".format(vnf["_id"], vdu["id"])
3086 1         deployment_info = {
3087             "action_id": action_id,
3088             "nsr_id": nsr_id,
3089             "task_index": task_index,
3090         }
3091
3092 1         task = Ns._create_task(
3093             deployment_info=deployment_info,
3094             target_id=target_vim,
3095             item="verticalscale",
3096             action="EXEC",
3097             target_record=target_record,
3098             target_record_id=target_record_id,
3099             extra_dict=extra_dict,
3100         )
3101 1         return task
3102
3103 1     def verticalscale(self, session, indata, version, nsr_id, *args, **kwargs):
3104 0         task_index = 0
3105 0         extra_dict = {}
3106 0         now = time()
3107 0         action_id = indata.get("action_id", str(uuid4()))
3108 0         step = ""
3109 0         logging_text = "Task deploy nsr_id={} action_id={} ".format(nsr_id, action_id)
3110 0         self.logger.debug(logging_text + "Enter")
3111 0         try:
3112 0             VnfFlavorData = indata.get("changeVnfFlavorData")
3113 0             vnf_instance_id = VnfFlavorData["vnfInstanceId"]
3114 0             step = "Getting vnfrs from db"
3115 0             db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
3116 0             vduid = VnfFlavorData["additionalParams"]["vduid"]
3117 0             vduCountIndex = VnfFlavorData["additionalParams"]["vduCountIndex"]
3118 0             virtualMemory = VnfFlavorData["additionalParams"]["virtualMemory"]
3119 0             numVirtualCpu = VnfFlavorData["additionalParams"]["numVirtualCpu"]
3120 0             sizeOfStorage = VnfFlavorData["additionalParams"]["sizeOfStorage"]
3121 0             flavor_dict = {
3122                 "name": vduid + "-flv",
3123                 "ram": virtualMemory,
3124                 "vcpus": numVirtualCpu,
3125                 "disk": sizeOfStorage,
3126             }
3127 0             db_new_tasks = []
3128 0             step = "Creating Tasks for vertical scaling"
3129 0             with self.write_lock:
3130 0                 for vdu_index, vdu in enumerate(db_vnfr["vdur"]):
3131 0                     if (
3132                         vdu["vdu-id-ref"] == vduid
3133                         and vdu["count-index"] == vduCountIndex
3134                     ):
3135 0                         extra_dict["params"] = {
3136                             "vim_vm_id": vdu["vim-id"],
3137                             "flavor_dict": flavor_dict,
3138                         }
3139 0                         task = self.verticalscale_task(
3140                             vdu,
3141                             db_vnfr,
3142                             vdu_index,
3143                             action_id,
3144                             nsr_id,
3145                             task_index,
3146                             extra_dict,
3147                         )
3148 0                         db_new_tasks.append(task)
3149 0                         task_index += 1
3150 0                         break
3151 0                 self.upload_all_tasks(
3152                     db_new_tasks=db_new_tasks,
3153                     now=now,
3154                 )
3155 0             self.logger.debug(
3156                 logging_text + "Exit. Created {} tasks".format(len(db_new_tasks))
3157             )
3158 0             return (
3159                 {"status": "ok", "nsr_id": nsr_id, "action_id": action_id},
3160                 action_id,
3161                 True,
3162             )
3163 0         except Exception as e:
3164 0             if isinstance(e, (DbException, NsException)):
3165 0                 self.logger.error(
3166                     logging_text + "Exit Exception while '{}': {}".format(step, e)
3167                 )
3168             else:
3169 0                 e = traceback_format_exc()
3170 0                 self.logger.critical(
3171                     logging_text + "Exit Exception while '{}': {}".format(step, e),
3172                     exc_info=True,
3173                 )
3174 0             raise NsException(e)