Code Coverage

Cobertura Coverage Report > NG-RO.osm_ng_ro >

ns_thread.py

Trend

File Coverage summary

NameClassesLinesConditionals
ns_thread.py
100%
1/1
30%
390/1284
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
ns_thread.py
30%
390/1284
N/A

Source

NG-RO/osm_ng_ro/ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 1 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 1 from copy import deepcopy
28 1 from http import HTTPStatus
29 1 import logging
30 1 from os import makedirs
31 1 from os import path
32 1 import queue
33 1 import threading
34 1 import time
35 1 import traceback
36 1 from typing import Dict
37 1 from unittest.mock import Mock
38
39 1 from importlib_metadata import entry_points
40 1 from osm_common.dbbase import DbException
41 1 from osm_ng_ro.vim_admin import LockRenew
42 1 from osm_ro_plugin import sdnconn
43 1 from osm_ro_plugin import vimconn
44 1 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45 1 from osm_ro_plugin.vim_dummy import VimDummyConnector
46 1 import yaml
47
48 1 __author__ = "Alfonso Tierno"
49 1 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 1 def deep_get(target_dict, *args, **kwargs):
53     """
54     Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55     Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56     :param target_dict: dictionary to be read
57     :param args: list of keys to read from  target_dict
58     :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59     :return: The wanted value if exist, None or default otherwise
60     """
61 1     for key in args:
62 1         if not isinstance(target_dict, dict) or key not in target_dict:
63 1             return kwargs.get("default")
64 1         target_dict = target_dict[key]
65 1     return target_dict
66
67
68 1 class NsWorkerException(Exception):
69 1     pass
70
71
72 1 class FailingConnector:
73 1     def __init__(self, error_msg):
74 0         self.error_msg = error_msg
75
76 0         for method in dir(vimconn.VimConnector):
77 0             if method[0] != "_":
78 0                 setattr(
79                     self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80                 )
81
82 0         for method in dir(sdnconn.SdnConnectorBase):
83 0             if method[0] != "_":
84 0                 setattr(
85                     self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86                 )
87
88
89 1 class NsWorkerExceptionNotFound(NsWorkerException):
90 1     pass
91
92
93 1 class VimInteractionBase:
94     """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95     It implements methods that does nothing and return ok"""
96
97 1     def __init__(self, db, my_vims, db_vims, logger):
98 1         self.db = db
99 1         self.logger = logger
100 1         self.my_vims = my_vims
101 1         self.db_vims = db_vims
102
103 1     def new(self, ro_task, task_index, task_depends):
104 0         return "BUILD", {}
105
106 1     def refresh(self, ro_task):
107         """skip calling VIM to get image, flavor status. Assumes ok"""
108 0         if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 0             return "FAILED", {}
110
111 0         return "DONE", {}
112
113 1     def delete(self, ro_task, task_index):
114         """skip calling VIM to delete image. Assumes ok"""
115 0         return "DONE", {}
116
117 1     def exec(self, ro_task, task_index, task_depends):
118 0         return "DONE", None, None
119
120
121 1 class VimInteractionNet(VimInteractionBase):
122 1     def new(self, ro_task, task_index, task_depends):
123 1         vim_net_id = None
124 1         task = ro_task["tasks"][task_index]
125 1         task_id = task["task_id"]
126 1         created = False
127 1         created_items = {}
128 1         target_vim = self.my_vims[ro_task["target_id"]]
129 1         mgmtnet = False
130 1         mgmtnet_defined_in_vim = False
131
132 1         try:
133             # FIND
134 1             if task.get("find_params"):
135                 # if management, get configuration of VIM
136 1                 if task["find_params"].get("filter_dict"):
137 1                     vim_filter = task["find_params"]["filter_dict"]
138                 # management network
139 1                 elif task["find_params"].get("mgmt"):
140 1                     mgmtnet = True
141 1                     if deep_get(
142                         self.db_vims[ro_task["target_id"]],
143                         "config",
144                         "management_network_id",
145                     ):
146 1                         mgmtnet_defined_in_vim = True
147 1                         vim_filter = {
148                             "id": self.db_vims[ro_task["target_id"]]["config"][
149                                 "management_network_id"
150                             ]
151                         }
152 1                     elif deep_get(
153                         self.db_vims[ro_task["target_id"]],
154                         "config",
155                         "management_network_name",
156                     ):
157 1                         mgmtnet_defined_in_vim = True
158 1                         vim_filter = {
159                             "name": self.db_vims[ro_task["target_id"]]["config"][
160                                 "management_network_name"
161                             ]
162                         }
163                     else:
164 1                         vim_filter = {"name": task["find_params"]["name"]}
165                 else:
166 1                     raise NsWorkerExceptionNotFound(
167                         "Invalid find_params for new_net {}".format(task["find_params"])
168                     )
169
170 1                 vim_nets = target_vim.get_network_list(vim_filter)
171 1                 if not vim_nets and not task.get("params"):
172                     # If there is mgmt-network in the descriptor,
173                     # there is no mapping of that network to a VIM network in the descriptor,
174                     # also there is no mapping in the "--config" parameter or at VIM creation;
175                     # that mgmt-network will be created.
176 1                     if mgmtnet and not mgmtnet_defined_in_vim:
177 1                         net_name = (
178                             vim_filter.get("name")
179                             if vim_filter.get("name")
180                             else vim_filter.get("id")[:16]
181                         )
182 1                         vim_net_id, created_items = target_vim.new_network(
183                             net_name, None
184                         )
185 1                         self.logger.debug(
186                             "Created mgmt network vim_net_id: {}".format(vim_net_id)
187                         )
188 1                         created = True
189                     else:
190 1                         raise NsWorkerExceptionNotFound(
191                             "Network not found with this criteria: '{}'".format(
192                                 task.get("find_params")
193                             )
194                         )
195 1                 elif len(vim_nets) > 1:
196 1                     raise NsWorkerException(
197                         "More than one network found with this criteria: '{}'".format(
198                             task["find_params"]
199                         )
200                     )
201
202 1                 if vim_nets:
203 1                     vim_net_id = vim_nets[0]["id"]
204             else:
205                 # CREATE
206 1                 params = task["params"]
207 1                 vim_net_id, created_items = target_vim.new_network(**params)
208 1                 created = True
209
210 1             ro_vim_item_update = {
211                 "vim_id": vim_net_id,
212                 "vim_status": "BUILD",
213                 "created": created,
214                 "created_items": created_items,
215                 "vim_details": None,
216                 "vim_message": None,
217             }
218 1             self.logger.debug(
219                 "task={} {} new-net={} created={}".format(
220                     task_id, ro_task["target_id"], vim_net_id, created
221                 )
222             )
223
224 1             return "BUILD", ro_vim_item_update
225 1         except (vimconn.VimConnException, NsWorkerException) as e:
226 1             self.logger.error(
227                 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228             )
229 1             ro_vim_item_update = {
230                 "vim_status": "VIM_ERROR",
231                 "created": created,
232                 "vim_message": str(e),
233             }
234
235 1             return "FAILED", ro_vim_item_update
236
237 1     def refresh(self, ro_task):
238         """Call VIM to get network status"""
239 1         ro_task_id = ro_task["_id"]
240 1         target_vim = self.my_vims[ro_task["target_id"]]
241 1         vim_id = ro_task["vim_info"]["vim_id"]
242 1         net_to_refresh_list = [vim_id]
243
244 1         try:
245 1             vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 1             vim_info = vim_dict[vim_id]
247
248 1             if vim_info["status"] == "ACTIVE":
249 1                 task_status = "DONE"
250 1             elif vim_info["status"] == "BUILD":
251 1                 task_status = "BUILD"
252             else:
253 1                 task_status = "FAILED"
254 1         except vimconn.VimConnException as e:
255             # Mark all tasks at VIM_ERROR status
256 1             self.logger.error(
257                 "ro_task={} vim={} get-net={}: {}".format(
258                     ro_task_id, ro_task["target_id"], vim_id, e
259                 )
260             )
261 1             vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 1             task_status = "FAILED"
263
264 1         ro_vim_item_update = {}
265 1         if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 1             ro_vim_item_update["vim_status"] = vim_info["status"]
267
268 1         if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 1             ro_vim_item_update["vim_name"] = vim_info.get("name")
270
271 1         if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 1             if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 1                 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 1         elif vim_info["status"] == "DELETED":
275 1             ro_vim_item_update["vim_id"] = None
276 1             ro_vim_item_update["vim_message"] = "Deleted externally"
277         else:
278 1             if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 1                 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
280
281 1         if ro_vim_item_update:
282 1             self.logger.debug(
283                 "ro_task={} {} get-net={}: status={} {}".format(
284                     ro_task_id,
285                     ro_task["target_id"],
286                     vim_id,
287                     ro_vim_item_update.get("vim_status"),
288                     ro_vim_item_update.get("vim_message")
289                     if ro_vim_item_update.get("vim_status") != "ACTIVE"
290                     else "",
291                 )
292             )
293
294 1         return task_status, ro_vim_item_update
295
296 1     def delete(self, ro_task, task_index):
297 0         task = ro_task["tasks"][task_index]
298 0         task_id = task["task_id"]
299 0         net_vim_id = ro_task["vim_info"]["vim_id"]
300 0         ro_vim_item_update_ok = {
301             "vim_status": "DELETED",
302             "created": False,
303             "vim_message": "DELETED",
304             "vim_id": None,
305         }
306
307 0         try:
308 0             if net_vim_id or ro_task["vim_info"]["created_items"]:
309 0                 target_vim = self.my_vims[ro_task["target_id"]]
310 0                 target_vim.delete_network(
311                     net_vim_id, ro_task["vim_info"]["created_items"]
312                 )
313 0         except vimconn.VimConnNotFoundException:
314 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
315 0         except vimconn.VimConnException as e:
316 0             self.logger.error(
317                 "ro_task={} vim={} del-net={}: {}".format(
318                     ro_task["_id"], ro_task["target_id"], net_vim_id, e
319                 )
320             )
321 0             ro_vim_item_update = {
322                 "vim_status": "VIM_ERROR",
323                 "vim_message": "Error while deleting: {}".format(e),
324             }
325
326 0             return "FAILED", ro_vim_item_update
327
328 0         self.logger.debug(
329             "task={} {} del-net={} {}".format(
330                 task_id,
331                 ro_task["target_id"],
332                 net_vim_id,
333                 ro_vim_item_update_ok.get("vim_message", ""),
334             )
335         )
336
337 0         return "DONE", ro_vim_item_update_ok
338
339
340 1 class VimInteractionVdu(VimInteractionBase):
341 1     max_retries_inject_ssh_key = 20  # 20 times
342 1     time_retries_inject_ssh_key = 30  # wevery 30 seconds
343
344 1     def new(self, ro_task, task_index, task_depends):
345 0         task = ro_task["tasks"][task_index]
346 0         task_id = task["task_id"]
347 0         created = False
348 0         created_items = {}
349 0         target_vim = self.my_vims[ro_task["target_id"]]
350
351 0         try:
352 0             created = True
353 0             params = task["params"]
354 0             params_copy = deepcopy(params)
355 0             net_list = params_copy["net_list"]
356
357 0             for net in net_list:
358                 # change task_id into network_id
359 0                 if "net_id" in net and net["net_id"].startswith("TASK-"):
360 0                     network_id = task_depends[net["net_id"]]
361
362 0                     if not network_id:
363 0                         raise NsWorkerException(
364                             "Cannot create VM because depends on a network not created or found "
365                             "for {}".format(net["net_id"])
366                         )
367
368 0                     net["net_id"] = network_id
369
370 0             if params_copy["image_id"].startswith("TASK-"):
371 0                 params_copy["image_id"] = task_depends[params_copy["image_id"]]
372
373 0             if params_copy["flavor_id"].startswith("TASK-"):
374 0                 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
375
376 0             affinity_group_list = params_copy["affinity_group_list"]
377 0             for affinity_group in affinity_group_list:
378                 # change task_id into affinity_group_id
379 0                 if "affinity_group_id" in affinity_group and affinity_group[
380                     "affinity_group_id"
381                 ].startswith("TASK-"):
382 0                     affinity_group_id = task_depends[
383                         affinity_group["affinity_group_id"]
384                     ]
385
386 0                     if not affinity_group_id:
387 0                         raise NsWorkerException(
388                             "found for {}".format(affinity_group["affinity_group_id"])
389                         )
390
391 0                     affinity_group["affinity_group_id"] = affinity_group_id
392
393 0             vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
394 0             interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
395
396             # add to created items previous_created_volumes (healing)
397 0             if task.get("previous_created_volumes"):
398 0                 for k, v in task["previous_created_volumes"].items():
399 0                     created_items[k] = v
400
401 0             ro_vim_item_update = {
402                 "vim_id": vim_vm_id,
403                 "vim_status": "BUILD",
404                 "created": created,
405                 "created_items": created_items,
406                 "vim_details": None,
407                 "vim_message": None,
408                 "interfaces_vim_ids": interfaces,
409                 "interfaces": [],
410                 "interfaces_backup": [],
411             }
412 0             self.logger.debug(
413                 "task={} {} new-vm={} created={}".format(
414                     task_id, ro_task["target_id"], vim_vm_id, created
415                 )
416             )
417
418 0             return "BUILD", ro_vim_item_update
419 0         except (vimconn.VimConnException, NsWorkerException) as e:
420 0             self.logger.debug(traceback.format_exc())
421 0             self.logger.error(
422                 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
423             )
424 0             ro_vim_item_update = {
425                 "vim_status": "VIM_ERROR",
426                 "created": created,
427                 "vim_message": str(e),
428             }
429
430 0             return "FAILED", ro_vim_item_update
431
432 1     def delete(self, ro_task, task_index):
433 0         task = ro_task["tasks"][task_index]
434 0         task_id = task["task_id"]
435 0         vm_vim_id = ro_task["vim_info"]["vim_id"]
436 0         ro_vim_item_update_ok = {
437             "vim_status": "DELETED",
438             "created": False,
439             "vim_message": "DELETED",
440             "vim_id": None,
441         }
442
443 0         try:
444 0             self.logger.debug(
445                 "delete_vminstance: vm_vim_id={} created_items={}".format(
446                     vm_vim_id, ro_task["vim_info"]["created_items"]
447                 )
448             )
449 0             if vm_vim_id or ro_task["vim_info"]["created_items"]:
450 0                 target_vim = self.my_vims[ro_task["target_id"]]
451 0                 target_vim.delete_vminstance(
452                     vm_vim_id,
453                     ro_task["vim_info"]["created_items"],
454                     ro_task["vim_info"].get("volumes_to_hold", []),
455                 )
456 0         except vimconn.VimConnNotFoundException:
457 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
458 0         except vimconn.VimConnException as e:
459 0             self.logger.error(
460                 "ro_task={} vim={} del-vm={}: {}".format(
461                     ro_task["_id"], ro_task["target_id"], vm_vim_id, e
462                 )
463             )
464 0             ro_vim_item_update = {
465                 "vim_status": "VIM_ERROR",
466                 "vim_message": "Error while deleting: {}".format(e),
467             }
468
469 0             return "FAILED", ro_vim_item_update
470
471 0         self.logger.debug(
472             "task={} {} del-vm={} {}".format(
473                 task_id,
474                 ro_task["target_id"],
475                 vm_vim_id,
476                 ro_vim_item_update_ok.get("vim_message", ""),
477             )
478         )
479
480 0         return "DONE", ro_vim_item_update_ok
481
482 1     def refresh(self, ro_task):
483         """Call VIM to get vm status"""
484 0         ro_task_id = ro_task["_id"]
485 0         target_vim = self.my_vims[ro_task["target_id"]]
486 0         vim_id = ro_task["vim_info"]["vim_id"]
487
488 0         if not vim_id:
489 0             return None, None
490
491 0         vm_to_refresh_list = [vim_id]
492 0         try:
493 0             vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
494 0             vim_info = vim_dict[vim_id]
495
496 0             if vim_info["status"] == "ACTIVE":
497 0                 task_status = "DONE"
498 0             elif vim_info["status"] == "BUILD":
499 0                 task_status = "BUILD"
500             else:
501 0                 task_status = "FAILED"
502
503             # try to load and parse vim_information
504 0             try:
505 0                 vim_info_info = yaml.safe_load(vim_info["vim_info"])
506 0                 if vim_info_info.get("name"):
507 0                     vim_info["name"] = vim_info_info["name"]
508 0             except Exception as vim_info_error:
509 0                 self.logger.exception(
510                     f"{vim_info_error} occured while getting the vim_info from yaml"
511                 )
512 0         except vimconn.VimConnException as e:
513             # Mark all tasks at VIM_ERROR status
514 0             self.logger.error(
515                 "ro_task={} vim={} get-vm={}: {}".format(
516                     ro_task_id, ro_task["target_id"], vim_id, e
517                 )
518             )
519 0             vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
520 0             task_status = "FAILED"
521
522 0         ro_vim_item_update = {}
523
524         # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
525 0         vim_interfaces = []
526 0         if vim_info.get("interfaces"):
527 0             for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
528 0                 iface = next(
529                     (
530                         iface
531                         for iface in vim_info["interfaces"]
532                         if vim_iface_id == iface["vim_interface_id"]
533                     ),
534                     None,
535                 )
536                 # if iface:
537                 #     iface.pop("vim_info", None)
538 0                 vim_interfaces.append(iface)
539
540 0         task_create = next(
541             t
542             for t in ro_task["tasks"]
543             if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
544         )
545 0         if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
546 0             vim_interfaces[task_create["mgmt_vnf_interface"]][
547                 "mgmt_vnf_interface"
548             ] = True
549
550 0         mgmt_vdu_iface = task_create.get(
551             "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
552         )
553 0         if vim_interfaces:
554 0             vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
555
556 0         if ro_task["vim_info"]["interfaces"] != vim_interfaces:
557 0             ro_vim_item_update["interfaces"] = vim_interfaces
558
559 0         if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
560 0             ro_vim_item_update["vim_status"] = vim_info["status"]
561
562 0         if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
563 0             ro_vim_item_update["vim_name"] = vim_info.get("name")
564
565 0         if vim_info["status"] in ("ERROR", "VIM_ERROR"):
566 0             if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
567 0                 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
568 0         elif vim_info["status"] == "DELETED":
569 0             ro_vim_item_update["vim_id"] = None
570 0             ro_vim_item_update["vim_message"] = "Deleted externally"
571         else:
572 0             if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
573 0                 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
574
575 0         if ro_vim_item_update:
576 0             self.logger.debug(
577                 "ro_task={} {} get-vm={}: status={} {}".format(
578                     ro_task_id,
579                     ro_task["target_id"],
580                     vim_id,
581                     ro_vim_item_update.get("vim_status"),
582                     ro_vim_item_update.get("vim_message")
583                     if ro_vim_item_update.get("vim_status") != "ACTIVE"
584                     else "",
585                 )
586             )
587
588 0         return task_status, ro_vim_item_update
589
590 1     def exec(self, ro_task, task_index, task_depends):
591 0         task = ro_task["tasks"][task_index]
592 0         task_id = task["task_id"]
593 0         target_vim = self.my_vims[ro_task["target_id"]]
594 0         db_task_update = {"retries": 0}
595 0         retries = task.get("retries", 0)
596
597 0         try:
598 0             params = task["params"]
599 0             params_copy = deepcopy(params)
600 0             params_copy["ro_key"] = self.db.decrypt(
601                 params_copy.pop("private_key"),
602                 params_copy.pop("schema_version"),
603                 params_copy.pop("salt"),
604             )
605 0             params_copy["ip_addr"] = params_copy.pop("ip_address")
606 0             target_vim.inject_user_key(**params_copy)
607 0             self.logger.debug(
608                 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
609             )
610
611 0             return (
612                 "DONE",
613                 None,
614                 db_task_update,
615             )  # params_copy["key"]
616 0         except (vimconn.VimConnException, NsWorkerException) as e:
617 0             retries += 1
618
619 0             self.logger.debug(traceback.format_exc())
620 0             if retries < self.max_retries_inject_ssh_key:
621 0                 return (
622                     "BUILD",
623                     None,
624                     {
625                         "retries": retries,
626                         "next_retry": self.time_retries_inject_ssh_key,
627                     },
628                 )
629
630 0             self.logger.error(
631                 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
632             )
633 0             ro_vim_item_update = {"vim_message": str(e)}
634
635 0             return "FAILED", ro_vim_item_update, db_task_update
636
637
638 1 class VimInteractionImage(VimInteractionBase):
639 1     def new(self, ro_task, task_index, task_depends):
640 0         task = ro_task["tasks"][task_index]
641 0         task_id = task["task_id"]
642 0         created = False
643 0         created_items = {}
644 0         target_vim = self.my_vims[ro_task["target_id"]]
645
646 0         try:
647             # FIND
648 0             if task.get("find_params"):
649 0                 vim_images = target_vim.get_image_list(**task["find_params"])
650
651 0                 if not vim_images:
652 0                     raise NsWorkerExceptionNotFound(
653                         "Image not found with this criteria: '{}'".format(
654                             task["find_params"]
655                         )
656                     )
657 0                 elif len(vim_images) > 1:
658 0                     raise NsWorkerException(
659                         "More than one image found with this criteria: '{}'".format(
660                             task["find_params"]
661                         )
662                     )
663                 else:
664 0                     vim_image_id = vim_images[0]["id"]
665
666 0             ro_vim_item_update = {
667                 "vim_id": vim_image_id,
668                 "vim_status": "DONE",
669                 "created": created,
670                 "created_items": created_items,
671                 "vim_details": None,
672                 "vim_message": None,
673             }
674 0             self.logger.debug(
675                 "task={} {} new-image={} created={}".format(
676                     task_id, ro_task["target_id"], vim_image_id, created
677                 )
678             )
679
680 0             return "DONE", ro_vim_item_update
681 0         except (NsWorkerException, vimconn.VimConnException) as e:
682 0             self.logger.error(
683                 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
684             )
685 0             ro_vim_item_update = {
686                 "vim_status": "VIM_ERROR",
687                 "created": created,
688                 "vim_message": str(e),
689             }
690
691 0             return "FAILED", ro_vim_item_update
692
693
694 1 class VimInteractionFlavor(VimInteractionBase):
695 1     def delete(self, ro_task, task_index):
696 0         task = ro_task["tasks"][task_index]
697 0         task_id = task["task_id"]
698 0         flavor_vim_id = ro_task["vim_info"]["vim_id"]
699 0         ro_vim_item_update_ok = {
700             "vim_status": "DELETED",
701             "created": False,
702             "vim_message": "DELETED",
703             "vim_id": None,
704         }
705
706 0         try:
707 0             if flavor_vim_id:
708 0                 target_vim = self.my_vims[ro_task["target_id"]]
709 0                 target_vim.delete_flavor(flavor_vim_id)
710 0         except vimconn.VimConnNotFoundException:
711 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
712 0         except vimconn.VimConnException as e:
713 0             self.logger.error(
714                 "ro_task={} vim={} del-flavor={}: {}".format(
715                     ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
716                 )
717             )
718 0             ro_vim_item_update = {
719                 "vim_status": "VIM_ERROR",
720                 "vim_message": "Error while deleting: {}".format(e),
721             }
722
723 0             return "FAILED", ro_vim_item_update
724
725 0         self.logger.debug(
726             "task={} {} del-flavor={} {}".format(
727                 task_id,
728                 ro_task["target_id"],
729                 flavor_vim_id,
730                 ro_vim_item_update_ok.get("vim_message", ""),
731             )
732         )
733
734 0         return "DONE", ro_vim_item_update_ok
735
736 1     def new(self, ro_task, task_index, task_depends):
737 0         task = ro_task["tasks"][task_index]
738 0         task_id = task["task_id"]
739 0         created = False
740 0         created_items = {}
741 0         target_vim = self.my_vims[ro_task["target_id"]]
742
743 0         try:
744             # FIND
745 0             vim_flavor_id = None
746
747 0             if task.get("find_params"):
748 0                 try:
749 0                     flavor_data = task["find_params"]["flavor_data"]
750 0                     vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
751 0                 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
752 0                     self.logger.warning(
753                         f"VimConnNotFoundException occured: {flavor_not_found_msg}"
754                     )
755
756 0             if not vim_flavor_id and task.get("params"):
757                 # CREATE
758 0                 flavor_data = task["params"]["flavor_data"]
759 0                 vim_flavor_id = target_vim.new_flavor(flavor_data)
760 0                 created = True
761
762 0             ro_vim_item_update = {
763                 "vim_id": vim_flavor_id,
764                 "vim_status": "DONE",
765                 "created": created,
766                 "created_items": created_items,
767                 "vim_details": None,
768                 "vim_message": None,
769             }
770 0             self.logger.debug(
771                 "task={} {} new-flavor={} created={}".format(
772                     task_id, ro_task["target_id"], vim_flavor_id, created
773                 )
774             )
775
776 0             return "DONE", ro_vim_item_update
777 0         except (vimconn.VimConnException, NsWorkerException) as e:
778 0             self.logger.error(
779                 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
780             )
781 0             ro_vim_item_update = {
782                 "vim_status": "VIM_ERROR",
783                 "created": created,
784                 "vim_message": str(e),
785             }
786
787 0             return "FAILED", ro_vim_item_update
788
789
790 1 class VimInteractionAffinityGroup(VimInteractionBase):
791 1     def delete(self, ro_task, task_index):
792 1         task = ro_task["tasks"][task_index]
793 1         task_id = task["task_id"]
794 1         affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
795 1         ro_vim_item_update_ok = {
796             "vim_status": "DELETED",
797             "created": False,
798             "vim_message": "DELETED",
799             "vim_id": None,
800         }
801
802 1         try:
803 1             if affinity_group_vim_id:
804 1                 target_vim = self.my_vims[ro_task["target_id"]]
805 1                 target_vim.delete_affinity_group(affinity_group_vim_id)
806 0         except vimconn.VimConnNotFoundException:
807 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
808 0         except vimconn.VimConnException as e:
809 0             self.logger.error(
810                 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
811                     ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
812                 )
813             )
814 0             ro_vim_item_update = {
815                 "vim_status": "VIM_ERROR",
816                 "vim_message": "Error while deleting: {}".format(e),
817             }
818
819 0             return "FAILED", ro_vim_item_update
820
821 1         self.logger.debug(
822             "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
823                 task_id,
824                 ro_task["target_id"],
825                 affinity_group_vim_id,
826                 ro_vim_item_update_ok.get("vim_message", ""),
827             )
828         )
829
830 1         return "DONE", ro_vim_item_update_ok
831
832 1     def new(self, ro_task, task_index, task_depends):
833 1         task = ro_task["tasks"][task_index]
834 1         task_id = task["task_id"]
835 1         created = False
836 1         created_items = {}
837 1         target_vim = self.my_vims[ro_task["target_id"]]
838
839 1         try:
840 1             affinity_group_vim_id = None
841 1             affinity_group_data = None
842
843 1             if task.get("params"):
844 1                 affinity_group_data = task["params"].get("affinity_group_data")
845
846 1             if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
847 0                 try:
848 0                     param_affinity_group_id = task["params"]["affinity_group_data"].get(
849                         "vim-affinity-group-id"
850                     )
851 0                     affinity_group_vim_id = target_vim.get_affinity_group(
852                         param_affinity_group_id
853                     ).get("id")
854 0                 except vimconn.VimConnNotFoundException:
855 0                     self.logger.error(
856                         "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
857                         "could not be found at VIM. Creating a new one.".format(
858                             task_id, ro_task["target_id"], param_affinity_group_id
859                         )
860                     )
861
862 1             if not affinity_group_vim_id and affinity_group_data:
863 1                 affinity_group_vim_id = target_vim.new_affinity_group(
864                     affinity_group_data
865                 )
866 1                 created = True
867
868 1             ro_vim_item_update = {
869                 "vim_id": affinity_group_vim_id,
870                 "vim_status": "DONE",
871                 "created": created,
872                 "created_items": created_items,
873                 "vim_details": None,
874                 "vim_message": None,
875             }
876 1             self.logger.debug(
877                 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
878                     task_id, ro_task["target_id"], affinity_group_vim_id, created
879                 )
880             )
881
882 1             return "DONE", ro_vim_item_update
883 0         except (vimconn.VimConnException, NsWorkerException) as e:
884 0             self.logger.error(
885                 "task={} vim={} new-affinity-or-anti-affinity-group:"
886                 " {}".format(task_id, ro_task["target_id"], e)
887             )
888 0             ro_vim_item_update = {
889                 "vim_status": "VIM_ERROR",
890                 "created": created,
891                 "vim_message": str(e),
892             }
893
894 0             return "FAILED", ro_vim_item_update
895
896
897 1 class VimInteractionUpdateVdu(VimInteractionBase):
898 1     def exec(self, ro_task, task_index, task_depends):
899 0         task = ro_task["tasks"][task_index]
900 0         task_id = task["task_id"]
901 0         db_task_update = {"retries": 0}
902 0         created = False
903 0         created_items = {}
904 0         target_vim = self.my_vims[ro_task["target_id"]]
905
906 0         try:
907 0             if task.get("params"):
908 0                 vim_vm_id = task["params"].get("vim_vm_id")
909 0                 action = task["params"].get("action")
910 0                 context = {action: action}
911 0                 target_vim.action_vminstance(vim_vm_id, context)
912                 # created = True
913 0             ro_vim_item_update = {
914                 "vim_id": vim_vm_id,
915                 "vim_status": "DONE",
916                 "created": created,
917                 "created_items": created_items,
918                 "vim_details": None,
919                 "vim_message": None,
920             }
921 0             self.logger.debug(
922                 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
923             )
924 0             return "DONE", ro_vim_item_update, db_task_update
925 0         except (vimconn.VimConnException, NsWorkerException) as e:
926 0             self.logger.error(
927                 "task={} vim={} VM Migration:"
928                 " {}".format(task_id, ro_task["target_id"], e)
929             )
930 0             ro_vim_item_update = {
931                 "vim_status": "VIM_ERROR",
932                 "created": created,
933                 "vim_message": str(e),
934             }
935
936 0             return "FAILED", ro_vim_item_update, db_task_update
937
938
939 1 class VimInteractionSdnNet(VimInteractionBase):
940 1     @staticmethod
941 1     def _match_pci(port_pci, mapping):
942         """
943         Check if port_pci matches with mapping
944         mapping can have brackets to indicate that several chars are accepted. e.g
945         pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
946         :param port_pci: text
947         :param mapping: text, can contain brackets to indicate several chars are available
948         :return: True if matches, False otherwise
949         """
950 0         if not port_pci or not mapping:
951 0             return False
952 0         if port_pci == mapping:
953 0             return True
954
955 0         mapping_index = 0
956 0         pci_index = 0
957         while True:
958 0             bracket_start = mapping.find("[", mapping_index)
959
960 0             if bracket_start == -1:
961 0                 break
962
963 0             bracket_end = mapping.find("]", bracket_start)
964 0             if bracket_end == -1:
965 0                 break
966
967 0             length = bracket_start - mapping_index
968 0             if (
969                 length
970                 and port_pci[pci_index : pci_index + length]
971                 != mapping[mapping_index:bracket_start]
972             ):
973 0                 return False
974
975 0             if (
976                 port_pci[pci_index + length]
977                 not in mapping[bracket_start + 1 : bracket_end]
978             ):
979 0                 return False
980
981 0             pci_index += length + 1
982 0             mapping_index = bracket_end + 1
983
984 0         if port_pci[pci_index:] != mapping[mapping_index:]:
985 0             return False
986
987 0         return True
988
989 1     def _get_interfaces(self, vlds_to_connect, vim_account_id):
990         """
991         :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
992         :param vim_account_id:
993         :return:
994         """
995 0         interfaces = []
996
997 0         for vld in vlds_to_connect:
998 0             table, _, db_id = vld.partition(":")
999 0             db_id, _, vld = db_id.partition(":")
1000 0             _, _, vld_id = vld.partition(".")
1001
1002 0             if table == "vnfrs":
1003 0                 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1004 0                 iface_key = "vnf-vld-id"
1005             else:  # table == "nsrs"
1006 0                 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1007 0                 iface_key = "ns-vld-id"
1008
1009 0             db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1010
1011 0             for db_vnfr in db_vnfrs:
1012 0                 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1013 0                     for iface_index, interface in enumerate(vdur["interfaces"]):
1014 0                         if interface.get(iface_key) == vld_id and interface.get(
1015                             "type"
1016                         ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1017                             # only SR-IOV o PT
1018 0                             interface_ = interface.copy()
1019 0                             interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1020                                 db_vnfr["_id"], vdu_index, iface_index
1021                             )
1022
1023 0                             if vdur.get("status") == "ERROR":
1024 0                                 interface_["status"] = "ERROR"
1025
1026 0                             interfaces.append(interface_)
1027
1028 0         return interfaces
1029
1030 1     def refresh(self, ro_task):
1031         # look for task create
1032 0         task_create_index, _ = next(
1033             i_t
1034             for i_t in enumerate(ro_task["tasks"])
1035             if i_t[1]
1036             and i_t[1]["action"] == "CREATE"
1037             and i_t[1]["status"] != "FINISHED"
1038         )
1039
1040 0         return self.new(ro_task, task_create_index, None)
1041
1042 1     def new(self, ro_task, task_index, task_depends):
1043 0         task = ro_task["tasks"][task_index]
1044 0         task_id = task["task_id"]
1045 0         target_vim = self.my_vims[ro_task["target_id"]]
1046
1047 0         sdn_net_id = ro_task["vim_info"]["vim_id"]
1048
1049 0         created_items = ro_task["vim_info"].get("created_items")
1050 0         connected_ports = ro_task["vim_info"].get("connected_ports", [])
1051 0         new_connected_ports = []
1052 0         last_update = ro_task["vim_info"].get("last_update", 0)
1053 0         sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1054 0         error_list = []
1055 0         created = ro_task["vim_info"].get("created", False)
1056
1057 0         try:
1058             # CREATE
1059 0             params = task["params"]
1060 0             vlds_to_connect = params.get("vlds", [])
1061 0             associated_vim = params.get("target_vim")
1062             # external additional ports
1063 0             additional_ports = params.get("sdn-ports") or ()
1064 0             _, _, vim_account_id = (
1065                 (None, None, None)
1066                 if associated_vim is None
1067                 else associated_vim.partition(":")
1068             )
1069
1070 0             if associated_vim:
1071                 # get associated VIM
1072 0                 if associated_vim not in self.db_vims:
1073 0                     self.db_vims[associated_vim] = self.db.get_one(
1074                         "vim_accounts", {"_id": vim_account_id}
1075                     )
1076
1077 0                 db_vim = self.db_vims[associated_vim]
1078
1079             # look for ports to connect
1080 0             ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1081             # print(ports)
1082
1083 0             sdn_ports = []
1084 0             pending_ports = error_ports = 0
1085 0             vlan_used = None
1086 0             sdn_need_update = False
1087
1088 0             for port in ports:
1089 0                 vlan_used = port.get("vlan") or vlan_used
1090
1091                 # TODO. Do not connect if already done
1092 0                 if not port.get("compute_node") or not port.get("pci"):
1093 0                     if port.get("status") == "ERROR":
1094 0                         error_ports += 1
1095                     else:
1096 0                         pending_ports += 1
1097 0                     continue
1098
1099 0                 pmap = None
1100 0                 compute_node_mappings = next(
1101                     (
1102                         c
1103                         for c in db_vim["config"].get("sdn-port-mapping", ())
1104                         if c and c["compute_node"] == port["compute_node"]
1105                     ),
1106                     None,
1107                 )
1108
1109 0                 if compute_node_mappings:
1110                     # process port_mapping pci of type 0000:af:1[01].[1357]
1111 0                     pmap = next(
1112                         (
1113                             p
1114                             for p in compute_node_mappings["ports"]
1115                             if self._match_pci(port["pci"], p.get("pci"))
1116                         ),
1117                         None,
1118                     )
1119
1120 0                 if not pmap:
1121 0                     if not db_vim["config"].get("mapping_not_needed"):
1122 0                         error_list.append(
1123                             "Port mapping not found for compute_node={} pci={}".format(
1124                                 port["compute_node"], port["pci"]
1125                             )
1126                         )
1127 0                         continue
1128
1129 0                     pmap = {}
1130
1131 0                 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1132 0                 new_port = {
1133                     "service_endpoint_id": pmap.get("service_endpoint_id")
1134                     or service_endpoint_id,
1135                     "service_endpoint_encapsulation_type": "dot1q"
1136                     if port["type"] == "SR-IOV"
1137                     else None,
1138                     "service_endpoint_encapsulation_info": {
1139                         "vlan": port.get("vlan"),
1140                         "mac": port.get("mac-address"),
1141                         "device_id": pmap.get("device_id") or port["compute_node"],
1142                         "device_interface_id": pmap.get("device_interface_id")
1143                         or port["pci"],
1144                         "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1145                         "switch_port": pmap.get("switch_port"),
1146                         "service_mapping_info": pmap.get("service_mapping_info"),
1147                     },
1148                 }
1149
1150                 # TODO
1151                 # if port["modified_at"] > last_update:
1152                 #     sdn_need_update = True
1153 0                 new_connected_ports.append(port["id"])  # TODO
1154 0                 sdn_ports.append(new_port)
1155
1156 0             if error_ports:
1157 0                 error_list.append(
1158                     "{} interfaces have not been created as VDU is on ERROR status".format(
1159                         error_ports
1160                     )
1161                 )
1162
1163             # connect external ports
1164 0             for index, additional_port in enumerate(additional_ports):
1165 0                 additional_port_id = additional_port.get(
1166                     "service_endpoint_id"
1167                 ) or "external-{}".format(index)
1168 0                 sdn_ports.append(
1169                     {
1170                         "service_endpoint_id": additional_port_id,
1171                         "service_endpoint_encapsulation_type": additional_port.get(
1172                             "service_endpoint_encapsulation_type", "dot1q"
1173                         ),
1174                         "service_endpoint_encapsulation_info": {
1175                             "vlan": additional_port.get("vlan") or vlan_used,
1176                             "mac": additional_port.get("mac_address"),
1177                             "device_id": additional_port.get("device_id"),
1178                             "device_interface_id": additional_port.get(
1179                                 "device_interface_id"
1180                             ),
1181                             "switch_dpid": additional_port.get("switch_dpid")
1182                             or additional_port.get("switch_id"),
1183                             "switch_port": additional_port.get("switch_port"),
1184                             "service_mapping_info": additional_port.get(
1185                                 "service_mapping_info"
1186                             ),
1187                         },
1188                     }
1189                 )
1190 0                 new_connected_ports.append(additional_port_id)
1191 0             sdn_info = ""
1192
1193             # if there are more ports to connect or they have been modified, call create/update
1194 0             if error_list:
1195 0                 sdn_status = "ERROR"
1196 0                 sdn_info = "; ".join(error_list)
1197 0             elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1198 0                 last_update = time.time()
1199
1200 0                 if not sdn_net_id:
1201 0                     if len(sdn_ports) < 2:
1202 0                         sdn_status = "ACTIVE"
1203
1204 0                         if not pending_ports:
1205 0                             self.logger.debug(
1206                                 "task={} {} new-sdn-net done, less than 2 ports".format(
1207                                     task_id, ro_task["target_id"]
1208                                 )
1209                             )
1210                     else:
1211 0                         net_type = params.get("type") or "ELAN"
1212 0                         (
1213                             sdn_net_id,
1214                             created_items,
1215                         ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1216 0                         created = True
1217 0                         self.logger.debug(
1218                             "task={} {} new-sdn-net={} created={}".format(
1219                                 task_id, ro_task["target_id"], sdn_net_id, created
1220                             )
1221                         )
1222                 else:
1223 0                     created_items = target_vim.edit_connectivity_service(
1224                         sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1225                     )
1226 0                     created = True
1227 0                     self.logger.debug(
1228                         "task={} {} update-sdn-net={} created={}".format(
1229                             task_id, ro_task["target_id"], sdn_net_id, created
1230                         )
1231                     )
1232
1233 0                 connected_ports = new_connected_ports
1234 0             elif sdn_net_id:
1235 0                 wim_status_dict = target_vim.get_connectivity_service_status(
1236                     sdn_net_id, conn_info=created_items
1237                 )
1238 0                 sdn_status = wim_status_dict["sdn_status"]
1239
1240 0                 if wim_status_dict.get("sdn_info"):
1241 0                     sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1242
1243 0                 if wim_status_dict.get("error_msg"):
1244 0                     sdn_info = wim_status_dict.get("error_msg") or ""
1245
1246 0             if pending_ports:
1247 0                 if sdn_status != "ERROR":
1248 0                     sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1249                         len(ports) - pending_ports, len(ports)
1250                     )
1251
1252 0                 if sdn_status == "ACTIVE":
1253 0                     sdn_status = "BUILD"
1254
1255 0             ro_vim_item_update = {
1256                 "vim_id": sdn_net_id,
1257                 "vim_status": sdn_status,
1258                 "created": created,
1259                 "created_items": created_items,
1260                 "connected_ports": connected_ports,
1261                 "vim_details": sdn_info,
1262                 "vim_message": None,
1263                 "last_update": last_update,
1264             }
1265
1266 0             return sdn_status, ro_vim_item_update
1267 0         except Exception as e:
1268 0             self.logger.error(
1269                 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1270                 exc_info=not isinstance(
1271                     e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1272                 ),
1273             )
1274 0             ro_vim_item_update = {
1275                 "vim_status": "VIM_ERROR",
1276                 "created": created,
1277                 "vim_message": str(e),
1278             }
1279
1280 0             return "FAILED", ro_vim_item_update
1281
1282 1     def delete(self, ro_task, task_index):
1283 0         task = ro_task["tasks"][task_index]
1284 0         task_id = task["task_id"]
1285 0         sdn_vim_id = ro_task["vim_info"].get("vim_id")
1286 0         ro_vim_item_update_ok = {
1287             "vim_status": "DELETED",
1288             "created": False,
1289             "vim_message": "DELETED",
1290             "vim_id": None,
1291         }
1292
1293 0         try:
1294 0             if sdn_vim_id:
1295 0                 target_vim = self.my_vims[ro_task["target_id"]]
1296 0                 target_vim.delete_connectivity_service(
1297                     sdn_vim_id, ro_task["vim_info"].get("created_items")
1298                 )
1299
1300 0         except Exception as e:
1301 0             if (
1302                 isinstance(e, sdnconn.SdnConnectorError)
1303                 and e.http_code == HTTPStatus.NOT_FOUND.value
1304             ):
1305 0                 ro_vim_item_update_ok["vim_message"] = "already deleted"
1306             else:
1307 0                 self.logger.error(
1308                     "ro_task={} vim={} del-sdn-net={}: {}".format(
1309                         ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1310                     ),
1311                     exc_info=not isinstance(
1312                         e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1313                     ),
1314                 )
1315 0                 ro_vim_item_update = {
1316                     "vim_status": "VIM_ERROR",
1317                     "vim_message": "Error while deleting: {}".format(e),
1318                 }
1319
1320 0                 return "FAILED", ro_vim_item_update
1321
1322 0         self.logger.debug(
1323             "task={} {} del-sdn-net={} {}".format(
1324                 task_id,
1325                 ro_task["target_id"],
1326                 sdn_vim_id,
1327                 ro_vim_item_update_ok.get("vim_message", ""),
1328             )
1329         )
1330
1331 0         return "DONE", ro_vim_item_update_ok
1332
1333
1334 1 class VimInteractionMigration(VimInteractionBase):
1335 1     def exec(self, ro_task, task_index, task_depends):
1336 1         task = ro_task["tasks"][task_index]
1337 1         task_id = task["task_id"]
1338 1         db_task_update = {"retries": 0}
1339 1         target_vim = self.my_vims[ro_task["target_id"]]
1340 1         vim_interfaces = []
1341 1         created = False
1342 1         created_items = {}
1343 1         refreshed_vim_info = {}
1344
1345 1         try:
1346 1             if task.get("params"):
1347 1                 vim_vm_id = task["params"].get("vim_vm_id")
1348 1                 migrate_host = task["params"].get("migrate_host")
1349 1                 _, migrated_compute_node = target_vim.migrate_instance(
1350                     vim_vm_id, migrate_host
1351                 )
1352
1353 1                 if migrated_compute_node:
1354                     # When VM is migrated, vdu["vim_info"] needs to be updated
1355 1                     vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1356                         ro_task["target_id"]
1357                     )
1358
1359                     # Refresh VM to get new vim_info
1360 1                     vm_to_refresh_list = [vim_vm_id]
1361 1                     vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1362 1                     refreshed_vim_info = vim_dict[vim_vm_id]
1363
1364 1                     if refreshed_vim_info.get("interfaces"):
1365 1                         for old_iface in vdu_old_vim_info.get("interfaces"):
1366 0                             iface = next(
1367                                 (
1368                                     iface
1369                                     for iface in refreshed_vim_info["interfaces"]
1370                                     if old_iface["vim_interface_id"]
1371                                     == iface["vim_interface_id"]
1372                                 ),
1373                                 None,
1374                             )
1375 0                             vim_interfaces.append(iface)
1376
1377 1             ro_vim_item_update = {
1378                 "vim_id": vim_vm_id,
1379                 "vim_status": "ACTIVE",
1380                 "created": created,
1381                 "created_items": created_items,
1382                 "vim_details": None,
1383                 "vim_message": None,
1384             }
1385
1386 1             if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1387                 "ERROR",
1388                 "VIM_ERROR",
1389             ):
1390 1                 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1391
1392 1             if vim_interfaces:
1393 0                 ro_vim_item_update["interfaces"] = vim_interfaces
1394
1395 1             self.logger.debug(
1396                 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1397             )
1398
1399 1             return "DONE", ro_vim_item_update, db_task_update
1400
1401 0         except (vimconn.VimConnException, NsWorkerException) as e:
1402 0             self.logger.error(
1403                 "task={} vim={} VM Migration:"
1404                 " {}".format(task_id, ro_task["target_id"], e)
1405             )
1406 0             ro_vim_item_update = {
1407                 "vim_status": "VIM_ERROR",
1408                 "created": created,
1409                 "vim_message": str(e),
1410             }
1411
1412 0             return "FAILED", ro_vim_item_update, db_task_update
1413
1414
1415 1 class VimInteractionResize(VimInteractionBase):
1416 1     def exec(self, ro_task, task_index, task_depends):
1417 1         task = ro_task["tasks"][task_index]
1418 1         task_id = task["task_id"]
1419 1         db_task_update = {"retries": 0}
1420 1         created = False
1421 1         target_flavor_uuid = None
1422 1         created_items = {}
1423 1         refreshed_vim_info = {}
1424 1         target_vim = self.my_vims[ro_task["target_id"]]
1425
1426 1         try:
1427 1             if task.get("params"):
1428 1                 vim_vm_id = task["params"].get("vim_vm_id")
1429 1                 flavor_dict = task["params"].get("flavor_dict")
1430 1                 self.logger.info("flavor_dict %s", flavor_dict)
1431
1432 1                 try:
1433 1                     target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1434 0                 except Exception as e:
1435 0                     self.logger.info("Cannot find any flavor matching  %s.", str(e))
1436 0                     try:
1437 0                         target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1438 0                     except Exception as e:
1439 0                         self.logger.error("Error creating flavor at VIM  %s.", str(e))
1440
1441 1                 if target_flavor_uuid is not None:
1442 1                     resized_status = target_vim.resize_instance(
1443                         vim_vm_id, target_flavor_uuid
1444                     )
1445
1446 1                     if resized_status:
1447                         # Refresh VM to get new vim_info
1448 1                         vm_to_refresh_list = [vim_vm_id]
1449 1                         vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1450 1                         refreshed_vim_info = vim_dict[vim_vm_id]
1451
1452 1             ro_vim_item_update = {
1453                 "vim_id": vim_vm_id,
1454                 "vim_status": "DONE",
1455                 "created": created,
1456                 "created_items": created_items,
1457                 "vim_details": None,
1458                 "vim_message": None,
1459             }
1460
1461 1             if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1462                 "ERROR",
1463                 "VIM_ERROR",
1464             ):
1465 1                 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1466
1467 1             self.logger.debug(
1468                 "task={} {} resize done".format(task_id, ro_task["target_id"])
1469             )
1470 1             return "DONE", ro_vim_item_update, db_task_update
1471 0         except (vimconn.VimConnException, NsWorkerException) as e:
1472 0             self.logger.error(
1473                 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1474             )
1475 0             ro_vim_item_update = {
1476                 "vim_status": "VIM_ERROR",
1477                 "created": created,
1478                 "vim_message": str(e),
1479             }
1480
1481 0             return "FAILED", ro_vim_item_update, db_task_update
1482
1483
1484 1 class ConfigValidate:
1485 1     def __init__(self, config: Dict):
1486 1         self.conf = config
1487
1488 1     @property
1489 1     def active(self):
1490         # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1491 1         if (
1492             self.conf["period"]["refresh_active"] >= 60
1493             or self.conf["period"]["refresh_active"] == -1
1494         ):
1495 1             return self.conf["period"]["refresh_active"]
1496
1497 1         return 60
1498
1499 1     @property
1500 1     def build(self):
1501 1         return self.conf["period"]["refresh_build"]
1502
1503 1     @property
1504 1     def image(self):
1505 1         return self.conf["period"]["refresh_image"]
1506
1507 1     @property
1508 1     def error(self):
1509 1         return self.conf["period"]["refresh_error"]
1510
1511 1     @property
1512 1     def queue_size(self):
1513 1         return self.conf["period"]["queue_size"]
1514
1515
1516 1 class NsWorker(threading.Thread):
1517 1     def __init__(self, worker_index, config, plugins, db):
1518         """
1519         :param worker_index: thread index
1520         :param config: general configuration of RO, among others the process_id with the docker id where it runs
1521         :param plugins: global shared dict with the loaded plugins
1522         :param db: database class instance to use
1523         """
1524 1         threading.Thread.__init__(self)
1525 1         self.config = config
1526 1         self.plugins = plugins
1527 1         self.plugin_name = "unknown"
1528 1         self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1529 1         self.worker_index = worker_index
1530         # refresh periods for created items
1531 1         self.refresh_config = ConfigValidate(config)
1532 1         self.task_queue = queue.Queue(self.refresh_config.queue_size)
1533         # targetvim: vimplugin class
1534 1         self.my_vims = {}
1535         # targetvim: vim information from database
1536 1         self.db_vims = {}
1537         # targetvim list
1538 1         self.vim_targets = []
1539 1         self.my_id = config["process_id"] + ":" + str(worker_index)
1540 1         self.db = db
1541 1         self.item2class = {
1542             "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1543             "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1544             "image": VimInteractionImage(
1545                 self.db, self.my_vims, self.db_vims, self.logger
1546             ),
1547             "flavor": VimInteractionFlavor(
1548                 self.db, self.my_vims, self.db_vims, self.logger
1549             ),
1550             "sdn_net": VimInteractionSdnNet(
1551                 self.db, self.my_vims, self.db_vims, self.logger
1552             ),
1553             "update": VimInteractionUpdateVdu(
1554                 self.db, self.my_vims, self.db_vims, self.logger
1555             ),
1556             "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1557                 self.db, self.my_vims, self.db_vims, self.logger
1558             ),
1559             "migrate": VimInteractionMigration(
1560                 self.db, self.my_vims, self.db_vims, self.logger
1561             ),
1562             "verticalscale": VimInteractionResize(
1563                 self.db, self.my_vims, self.db_vims, self.logger
1564             ),
1565         }
1566 1         self.time_last_task_processed = None
1567         # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1568 1         self.tasks_to_delete = []
1569         # it is idle when there are not vim_targets associated
1570 1         self.idle = True
1571 1         self.task_locked_time = config["global"]["task_locked_time"]
1572
1573 1     def insert_task(self, task):
1574 0         try:
1575 0             self.task_queue.put(task, False)
1576 0             return None
1577 0         except queue.Full:
1578 0             raise NsWorkerException("timeout inserting a task")
1579
1580 1     def terminate(self):
1581 0         self.insert_task("exit")
1582
1583 1     def del_task(self, task):
1584 0         with self.task_lock:
1585 0             if task["status"] == "SCHEDULED":
1586 0                 task["status"] = "SUPERSEDED"
1587 0                 return True
1588             else:  # task["status"] == "processing"
1589 0                 self.task_lock.release()
1590 0                 return False
1591
1592 1     def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
1593         """
1594         Process vim config, creating vim configuration files as ca_cert
1595         :param target_id: vim/sdn/wim + id
1596         :param db_vim: Vim dictionary obtained from database
1597         :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1598         """
1599 1         if not db_vim.get("config"):
1600 0             return
1601
1602 1         file_name = ""
1603 1         work_dir = "/app/osm_ro/certs"
1604
1605 1         try:
1606 1             if db_vim["config"].get("ca_cert_content"):
1607 1                 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
1608
1609 1                 if not path.isdir(file_name):
1610 1                     makedirs(file_name)
1611
1612 1                 file_name = file_name + "/ca_cert"
1613
1614 1                 with open(file_name, "w") as f:
1615 1                     f.write(db_vim["config"]["ca_cert_content"])
1616 1                     del db_vim["config"]["ca_cert_content"]
1617 1                     db_vim["config"]["ca_cert"] = file_name
1618 1         except Exception as e:
1619 1             raise NsWorkerException(
1620                 "Error writing to file '{}': {}".format(file_name, e)
1621             )
1622
1623 1     def _load_plugin(self, name, type="vim"):
1624         # type can be vim or sdn
1625 0         if "rovim_dummy" not in self.plugins:
1626 0             self.plugins["rovim_dummy"] = VimDummyConnector
1627
1628 0         if "rosdn_dummy" not in self.plugins:
1629 0             self.plugins["rosdn_dummy"] = SdnDummyConnector
1630
1631 0         if name in self.plugins:
1632 0             return self.plugins[name]
1633
1634 0         try:
1635 0             for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1636 0                 self.plugins[name] = ep.load()
1637 0         except Exception as e:
1638 0             raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1639
1640 0         if name and name not in self.plugins:
1641 0             raise NsWorkerException(
1642                 "Plugin 'osm_{n}' has not been installed".format(n=name)
1643             )
1644
1645 0         return self.plugins[name]
1646
1647 1     def _unload_vim(self, target_id):
1648         """
1649         Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1650         :param target_id: Contains type:_id; where type can be 'vim', ...
1651         :return: None.
1652         """
1653 0         try:
1654 0             self.db_vims.pop(target_id, None)
1655 0             self.my_vims.pop(target_id, None)
1656
1657 0             if target_id in self.vim_targets:
1658 0                 self.vim_targets.remove(target_id)
1659
1660 0             self.logger.info("Unloaded {}".format(target_id))
1661 0         except Exception as e:
1662 0             self.logger.error("Cannot unload {}: {}".format(target_id, e))
1663
1664 1     def _check_vim(self, target_id):
1665         """
1666         Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1667         :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1668         :return: None.
1669         """
1670 0         target, _, _id = target_id.partition(":")
1671 0         now = time.time()
1672 0         update_dict = {}
1673 0         unset_dict = {}
1674 0         op_text = ""
1675 0         step = ""
1676 0         loaded = target_id in self.vim_targets
1677 0         target_database = (
1678             "vim_accounts"
1679             if target == "vim"
1680             else "wim_accounts"
1681             if target == "wim"
1682             else "sdns"
1683         )
1684
1685 0         try:
1686 0             step = "Getting {} from db".format(target_id)
1687 0             db_vim = self.db.get_one(target_database, {"_id": _id})
1688
1689 0             for op_index, operation in enumerate(
1690                 db_vim["_admin"].get("operations", ())
1691             ):
1692 0                 if operation["operationState"] != "PROCESSING":
1693 0                     continue
1694
1695 0                 locked_at = operation.get("locked_at")
1696
1697 0                 if locked_at is not None and locked_at >= now - self.task_locked_time:
1698                     # some other thread is doing this operation
1699 0                     return
1700
1701                 # lock
1702 0                 op_text = "_admin.operations.{}.".format(op_index)
1703
1704 0                 if not self.db.set_one(
1705                     target_database,
1706                     q_filter={
1707                         "_id": _id,
1708                         op_text + "operationState": "PROCESSING",
1709                         op_text + "locked_at": locked_at,
1710                     },
1711                     update_dict={
1712                         op_text + "locked_at": now,
1713                         "admin.current_operation": op_index,
1714                     },
1715                     fail_on_empty=False,
1716                 ):
1717 0                     return
1718
1719 0                 unset_dict[op_text + "locked_at"] = None
1720 0                 unset_dict["current_operation"] = None
1721 0                 step = "Loading " + target_id
1722 0                 error_text = self._load_vim(target_id)
1723
1724 0                 if not error_text:
1725 0                     step = "Checking connectivity"
1726
1727 0                     if target == "vim":
1728 0                         self.my_vims[target_id].check_vim_connectivity()
1729                     else:
1730 0                         self.my_vims[target_id].check_credentials()
1731
1732 0                 update_dict["_admin.operationalState"] = "ENABLED"
1733 0                 update_dict["_admin.detailed-status"] = ""
1734 0                 unset_dict[op_text + "detailed-status"] = None
1735 0                 update_dict[op_text + "operationState"] = "COMPLETED"
1736
1737 0                 return
1738
1739 0         except Exception as e:
1740 0             error_text = "{}: {}".format(step, e)
1741 0             self.logger.error("{} for {}: {}".format(step, target_id, e))
1742
1743         finally:
1744 0             if update_dict or unset_dict:
1745 0                 if error_text:
1746 0                     update_dict[op_text + "operationState"] = "FAILED"
1747 0                     update_dict[op_text + "detailed-status"] = error_text
1748 0                     unset_dict.pop(op_text + "detailed-status", None)
1749 0                     update_dict["_admin.operationalState"] = "ERROR"
1750 0                     update_dict["_admin.detailed-status"] = error_text
1751
1752 0                 if op_text:
1753 0                     update_dict[op_text + "statusEnteredTime"] = now
1754
1755 0                 self.db.set_one(
1756                     target_database,
1757                     q_filter={"_id": _id},
1758                     update_dict=update_dict,
1759                     unset=unset_dict,
1760                     fail_on_empty=False,
1761                 )
1762
1763 0             if not loaded:
1764 0                 self._unload_vim(target_id)
1765
1766 1     def _reload_vim(self, target_id):
1767 0         if target_id in self.vim_targets:
1768 0             self._load_vim(target_id)
1769         else:
1770             # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1771             # just remove it to force load again next time it is needed
1772 0             self.db_vims.pop(target_id, None)
1773
1774 1     def _load_vim(self, target_id):
1775         """
1776         Load or reload a vim_account, sdn_controller or wim_account.
1777         Read content from database, load the plugin if not loaded.
1778         In case of error loading the plugin, it load a failing VIM_connector
1779         It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1780         :param target_id: Contains type:_id; where type can be 'vim', ...
1781         :return: None if ok, descriptive text if error
1782         """
1783 0         target, _, _id = target_id.partition(":")
1784 0         target_database = (
1785             "vim_accounts"
1786             if target == "vim"
1787             else "wim_accounts"
1788             if target == "wim"
1789             else "sdns"
1790         )
1791 0         plugin_name = ""
1792 0         vim = None
1793
1794 0         try:
1795 0             step = "Getting {}={} from db".format(target, _id)
1796             # TODO process for wim, sdnc, ...
1797 0             vim = self.db.get_one(target_database, {"_id": _id})
1798
1799             # if deep_get(vim, "config", "sdn-controller"):
1800             #     step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1801             #     db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1802
1803 0             step = "Decrypting password"
1804 0             schema_version = vim.get("schema_version")
1805 0             self.db.encrypt_decrypt_fields(
1806                 vim,
1807                 "decrypt",
1808                 fields=("password", "secret"),
1809                 schema_version=schema_version,
1810                 salt=_id,
1811             )
1812 0             self._process_vim_config(target_id, vim)
1813
1814 0             if target == "vim":
1815 0                 plugin_name = "rovim_" + vim["vim_type"]
1816 0                 step = "Loading plugin '{}'".format(plugin_name)
1817 0                 vim_module_conn = self._load_plugin(plugin_name)
1818 0                 step = "Loading {}'".format(target_id)
1819 0                 self.my_vims[target_id] = vim_module_conn(
1820                     uuid=vim["_id"],
1821                     name=vim["name"],
1822                     tenant_id=vim.get("vim_tenant_id"),
1823                     tenant_name=vim.get("vim_tenant_name"),
1824                     url=vim["vim_url"],
1825                     url_admin=None,
1826                     user=vim["vim_user"],
1827                     passwd=vim["vim_password"],
1828                     config=vim.get("config") or {},
1829                     persistent_info={},
1830                 )
1831             else:  # sdn
1832 0                 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1833 0                 step = "Loading plugin '{}'".format(plugin_name)
1834 0                 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1835 0                 step = "Loading {}'".format(target_id)
1836 0                 wim = deepcopy(vim)
1837 0                 wim_config = wim.pop("config", {}) or {}
1838 0                 wim["uuid"] = wim["_id"]
1839 0                 if "url" in wim and "wim_url" not in wim:
1840 0                     wim["wim_url"] = wim["url"]
1841 0                 elif "url" not in wim and "wim_url" in wim:
1842 0                     wim["url"] = wim["wim_url"]
1843
1844 0                 if wim.get("dpid"):
1845 0                     wim_config["dpid"] = wim.pop("dpid")
1846
1847 0                 if wim.get("switch_id"):
1848 0                     wim_config["switch_id"] = wim.pop("switch_id")
1849
1850                 # wim, wim_account, config
1851 0                 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1852 0             self.db_vims[target_id] = vim
1853 0             self.error_status = None
1854
1855 0             self.logger.info(
1856                 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1857             )
1858 0         except Exception as e:
1859 0             self.logger.error(
1860                 "Cannot load {} plugin={}: {} {}".format(
1861                     target_id, plugin_name, step, e
1862                 )
1863             )
1864
1865 0             self.db_vims[target_id] = vim or {}
1866 0             self.db_vims[target_id] = FailingConnector(str(e))
1867 0             error_status = "{} Error: {}".format(step, e)
1868
1869 0             return error_status
1870         finally:
1871 0             if target_id not in self.vim_targets:
1872 0                 self.vim_targets.append(target_id)
1873
1874 1     def _get_db_task(self):
1875         """
1876         Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1877         :return: None
1878         """
1879 0         now = time.time()
1880
1881 0         if not self.time_last_task_processed:
1882 0             self.time_last_task_processed = now
1883
1884 0         try:
1885             while True:
1886                 """
1887                 # Log RO tasks only when loglevel is DEBUG
1888                 if self.logger.getEffectiveLevel() == logging.DEBUG:
1889                     self._log_ro_task(
1890                         None,
1891                         None,
1892                         None,
1893                         "TASK_WF",
1894                         "task_locked_time="
1895                         + str(self.task_locked_time)
1896                         + " "
1897                         + "time_last_task_processed="
1898                         + str(self.time_last_task_processed)
1899                         + " "
1900                         + "now="
1901                         + str(now),
1902                     )
1903                 """
1904 0                 locked = self.db.set_one(
1905                     "ro_tasks",
1906                     q_filter={
1907                         "target_id": self.vim_targets,
1908                         "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1909                         "locked_at.lt": now - self.task_locked_time,
1910                         "to_check_at.lt": self.time_last_task_processed,
1911                         "to_check_at.gt": -1,
1912                     },
1913                     update_dict={"locked_by": self.my_id, "locked_at": now},
1914                     fail_on_empty=False,
1915                 )
1916
1917 0                 if locked:
1918                     # read and return
1919 0                     ro_task = self.db.get_one(
1920                         "ro_tasks",
1921                         q_filter={
1922                             "target_id": self.vim_targets,
1923                             "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1924                             "locked_at": now,
1925                         },
1926                     )
1927 0                     return ro_task
1928
1929 0                 if self.time_last_task_processed == now:
1930 0                     self.time_last_task_processed = None
1931 0                     return None
1932                 else:
1933 0                     self.time_last_task_processed = now
1934                     # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1935
1936 0         except DbException as e:
1937 0             self.logger.error("Database exception at _get_db_task: {}".format(e))
1938 0         except Exception as e:
1939 0             self.logger.critical(
1940                 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1941             )
1942
1943 0         return None
1944
1945 1     def _get_db_all_tasks(self):
1946         """
1947         Read all content of table ro_tasks to log it
1948         :return: None
1949         """
1950 0         try:
1951             # Checking the content of the BD:
1952
1953             # read and return
1954 0             ro_task = self.db.get_list("ro_tasks")
1955 0             for rt in ro_task:
1956 0                 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1957 0             return ro_task
1958
1959 0         except DbException as e:
1960 0             self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1961 0         except Exception as e:
1962 0             self.logger.critical(
1963                 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1964             )
1965
1966 0         return None
1967
1968 1     def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1969         """
1970         Generate a log with the following format:
1971
1972         Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1973         target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1974         task_array_index;task_id;task_action;task_item;task_args
1975
1976         Example:
1977
1978         TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1979         1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1980         ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1981         'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1982         'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1983         888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1984         CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1985         """
1986 0         try:
1987 0             line = []
1988 0             i = 0
1989 0             if ro_task is not None and isinstance(ro_task, dict):
1990 0                 for t in ro_task["tasks"]:
1991 0                     line.clear()
1992 0                     line.append(mark)
1993 0                     line.append(event)
1994 0                     line.append(ro_task.get("_id", ""))
1995 0                     line.append(str(ro_task.get("locked_at", "")))
1996 0                     line.append(str(ro_task.get("modified_at", "")))
1997 0                     line.append(str(ro_task.get("created_at", "")))
1998 0                     line.append(str(ro_task.get("to_check_at", "")))
1999 0                     line.append(str(ro_task.get("locked_by", "")))
2000 0                     line.append(str(ro_task.get("target_id", "")))
2001 0                     line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
2002 0                     line.append(str(ro_task.get("vim_info", "")))
2003 0                     line.append(str(ro_task.get("tasks", "")))
2004 0                     if isinstance(t, dict):
2005 0                         line.append(str(t.get("status", "")))
2006 0                         line.append(str(t.get("action_id", "")))
2007 0                         line.append(str(i))
2008 0                         line.append(str(t.get("task_id", "")))
2009 0                         line.append(str(t.get("action", "")))
2010 0                         line.append(str(t.get("item", "")))
2011 0                         line.append(str(t.get("find_params", "")))
2012 0                         line.append(str(t.get("params", "")))
2013                     else:
2014 0                         line.extend([""] * 2)
2015 0                         line.append(str(i))
2016 0                         line.extend([""] * 5)
2017
2018 0                     i += 1
2019 0                     self.logger.debug(";".join(line))
2020 0             elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
2021 0                 i = 0
2022                 while True:
2023 0                     st = "tasks.{}.status".format(i)
2024 0                     if st not in db_ro_task_update:
2025 0                         break
2026 0                     line.clear()
2027 0                     line.append(mark)
2028 0                     line.append(event)
2029 0                     line.append(db_ro_task_update.get("_id", ""))
2030 0                     line.append(str(db_ro_task_update.get("locked_at", "")))
2031 0                     line.append(str(db_ro_task_update.get("modified_at", "")))
2032 0                     line.append("")
2033 0                     line.append(str(db_ro_task_update.get("to_check_at", "")))
2034 0                     line.append(str(db_ro_task_update.get("locked_by", "")))
2035 0                     line.append("")
2036 0                     line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
2037 0                     line.append("")
2038 0                     line.append(str(db_ro_task_update.get("vim_info", "")))
2039 0                     line.append(str(str(db_ro_task_update).count(".status")))
2040 0                     line.append(db_ro_task_update.get(st, ""))
2041 0                     line.append("")
2042 0                     line.append(str(i))
2043 0                     line.extend([""] * 3)
2044 0                     i += 1
2045 0                     self.logger.debug(";".join(line))
2046
2047 0             elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
2048 0                 line.clear()
2049 0                 line.append(mark)
2050 0                 line.append(event)
2051 0                 line.append(db_ro_task_delete.get("_id", ""))
2052 0                 line.append("")
2053 0                 line.append(db_ro_task_delete.get("modified_at", ""))
2054 0                 line.extend([""] * 13)
2055 0                 self.logger.debug(";".join(line))
2056
2057             else:
2058 0                 line.clear()
2059 0                 line.append(mark)
2060 0                 line.append(event)
2061 0                 line.extend([""] * 16)
2062 0                 self.logger.debug(";".join(line))
2063
2064 0         except Exception as e:
2065 0             self.logger.error("Error logging ro_task: {}".format(e))
2066
2067 1     def _delete_task(self, ro_task, task_index, task_depends, db_update):
2068         """
2069         Determine if this task need to be done or superseded
2070         :return: None
2071         """
2072 0         my_task = ro_task["tasks"][task_index]
2073 0         task_id = my_task["task_id"]
2074 0         needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2075             "created_items", False
2076         )
2077
2078 0         self.logger.debug("Needed delete: {}".format(needed_delete))
2079 0         if my_task["status"] == "FAILED":
2080 0             return None, None  # TODO need to be retry??
2081
2082 0         try:
2083 0             for index, task in enumerate(ro_task["tasks"]):
2084 0                 if index == task_index or not task:
2085 0                     continue  # own task
2086
2087 0                 if (
2088                     my_task["target_record"] == task["target_record"]
2089                     and task["action"] == "CREATE"
2090                 ):
2091                     # set to finished
2092 0                     db_update["tasks.{}.status".format(index)] = task[
2093                         "status"
2094                     ] = "FINISHED"
2095 0                 elif task["action"] == "CREATE" and task["status"] not in (
2096                     "FINISHED",
2097                     "SUPERSEDED",
2098                 ):
2099 0                     needed_delete = False
2100
2101 0             if needed_delete:
2102 0                 self.logger.debug(
2103                     "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2104                 )
2105 0                 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2106             else:
2107 0                 return "SUPERSEDED", None
2108 0         except Exception as e:
2109 0             if not isinstance(e, NsWorkerException):
2110 0                 self.logger.critical(
2111                     "Unexpected exception at _delete_task task={}: {}".format(
2112                         task_id, e
2113                     ),
2114                     exc_info=True,
2115                 )
2116
2117 0             return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2118
2119 1     def _create_task(self, ro_task, task_index, task_depends, db_update):
2120         """
2121         Determine if this task need to create something at VIM
2122         :return: None
2123         """
2124 0         my_task = ro_task["tasks"][task_index]
2125 0         task_id = my_task["task_id"]
2126 0         task_status = None
2127
2128 0         if my_task["status"] == "FAILED":
2129 0             return None, None  # TODO need to be retry??
2130 0         elif my_task["status"] == "SCHEDULED":
2131             # check if already created by another task
2132 0             for index, task in enumerate(ro_task["tasks"]):
2133 0                 if index == task_index or not task:
2134 0                     continue  # own task
2135
2136 0                 if task["action"] == "CREATE" and task["status"] not in (
2137                     "SCHEDULED",
2138                     "FINISHED",
2139                     "SUPERSEDED",
2140                 ):
2141 0                     return task["status"], "COPY_VIM_INFO"
2142
2143 0             try:
2144 0                 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2145                     ro_task, task_index, task_depends
2146                 )
2147                 # TODO update other CREATE tasks
2148 0             except Exception as e:
2149 0                 if not isinstance(e, NsWorkerException):
2150 0                     self.logger.error(
2151                         "Error executing task={}: {}".format(task_id, e), exc_info=True
2152                     )
2153
2154 0                 task_status = "FAILED"
2155 0                 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2156                 # TODO update    ro_vim_item_update
2157
2158 0             return task_status, ro_vim_item_update
2159         else:
2160 0             return None, None
2161
2162 1     def _get_dependency(self, task_id, ro_task=None, target_id=None):
2163         """
2164         Look for dependency task
2165         :param task_id: Can be one of
2166             1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2167             2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2168             3. task.task_id: "<action_id>:number"
2169         :param ro_task:
2170         :param target_id:
2171         :return: database ro_task plus index of task
2172         """
2173 0         if (
2174             task_id.startswith("vim:")
2175             or task_id.startswith("sdn:")
2176             or task_id.startswith("wim:")
2177         ):
2178 0             target_id, _, task_id = task_id.partition(" ")
2179
2180 0         if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2181 0             ro_task_dependency = self.db.get_one(
2182                 "ro_tasks",
2183                 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2184                 fail_on_empty=False,
2185             )
2186
2187 0             if ro_task_dependency:
2188 0                 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2189 0                     if task["target_record_id"] == task_id:
2190 0                         return ro_task_dependency, task_index
2191
2192         else:
2193 0             if ro_task:
2194 0                 for task_index, task in enumerate(ro_task["tasks"]):
2195 0                     if task and task["task_id"] == task_id:
2196 0                         return ro_task, task_index
2197
2198 0             ro_task_dependency = self.db.get_one(
2199                 "ro_tasks",
2200                 q_filter={
2201                     "tasks.ANYINDEX.task_id": task_id,
2202                     "tasks.ANYINDEX.target_record.ne": None,
2203                 },
2204                 fail_on_empty=False,
2205             )
2206
2207 0             self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2208 0             if ro_task_dependency:
2209 0                 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2210 0                     if task["task_id"] == task_id:
2211 0                         return ro_task_dependency, task_index
2212 0         raise NsWorkerException("Cannot get depending task {}".format(task_id))
2213
2214 1     def update_vm_refresh(self, ro_task):
2215         """Enables the VM status updates if self.refresh_config.active parameter
2216         is not -1 and then updates the DB accordingly
2217
2218         """
2219 1         try:
2220 1             self.logger.debug("Checking if VM status update config")
2221 1             next_refresh = time.time()
2222 1             next_refresh = self._get_next_refresh(ro_task, next_refresh)
2223
2224 1             if next_refresh != -1:
2225 1                 db_ro_task_update = {}
2226 1                 now = time.time()
2227 1                 next_check_at = now + (24 * 60 * 60)
2228 1                 next_check_at = min(next_check_at, next_refresh)
2229 1                 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2230 1                 db_ro_task_update["to_check_at"] = next_check_at
2231
2232 1                 self.logger.debug(
2233                     "Finding tasks which to be updated to enable VM status updates"
2234                 )
2235 1                 refresh_tasks = self.db.get_list(
2236                     "ro_tasks",
2237                     q_filter={
2238                         "tasks.status": "DONE",
2239                         "to_check_at.lt": 0,
2240                     },
2241                 )
2242 1                 self.logger.debug("Updating tasks to change the to_check_at status")
2243 1                 for task in refresh_tasks:
2244 1                     q_filter = {
2245                         "_id": task["_id"],
2246                     }
2247 1                     self.db.set_one(
2248                         "ro_tasks",
2249                         q_filter=q_filter,
2250                         update_dict=db_ro_task_update,
2251                         fail_on_empty=True,
2252                     )
2253
2254 1         except Exception as e:
2255 1             self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2256
2257 1     def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2258         """Decide the next_refresh according to vim type and refresh config period.
2259         Args:
2260             ro_task (dict):             ro_task details
2261             next_refresh    (float):    next refresh time as epoch format
2262
2263         Returns:
2264             next_refresh    (float)     -1 if vm updates are disabled or vim type is openstack.
2265         """
2266 1         target_vim = ro_task["target_id"]
2267 1         vim_type = self.db_vims[target_vim]["vim_type"]
2268 1         if self.refresh_config.active == -1 or vim_type == "openstack":
2269 1             next_refresh = -1
2270         else:
2271 1             next_refresh += self.refresh_config.active
2272 1         return next_refresh
2273
2274 1     def _process_pending_tasks(self, ro_task):
2275 1         ro_task_id = ro_task["_id"]
2276 1         now = time.time()
2277         # one day
2278 1         next_check_at = now + (24 * 60 * 60)
2279 1         db_ro_task_update = {}
2280
2281 1         def _update_refresh(new_status):
2282             # compute next_refresh
2283             nonlocal task
2284             nonlocal next_check_at
2285             nonlocal db_ro_task_update
2286             nonlocal ro_task
2287
2288 1             next_refresh = time.time()
2289
2290 1             if task["item"] in ("image", "flavor"):
2291 0                 next_refresh += self.refresh_config.image
2292 1             elif new_status == "BUILD":
2293 0                 next_refresh += self.refresh_config.build
2294 1             elif new_status == "DONE":
2295 1                 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2296             else:
2297 1                 next_refresh += self.refresh_config.error
2298
2299 1             next_check_at = min(next_check_at, next_refresh)
2300 1             db_ro_task_update["vim_info.refresh_at"] = next_refresh
2301 1             ro_task["vim_info"]["refresh_at"] = next_refresh
2302
2303 1         try:
2304             """
2305             # Log RO tasks only when loglevel is DEBUG
2306             if self.logger.getEffectiveLevel() == logging.DEBUG:
2307                 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2308             """
2309             # Check if vim status refresh is enabled again
2310 1             self.update_vm_refresh(ro_task)
2311             # 0: get task_status_create
2312 1             lock_object = None
2313 1             task_status_create = None
2314 1             task_create = next(
2315                 (
2316                     t
2317                     for t in ro_task["tasks"]
2318                     if t
2319                     and t["action"] == "CREATE"
2320                     and t["status"] in ("BUILD", "DONE")
2321                 ),
2322                 None,
2323             )
2324
2325 1             if task_create:
2326 1                 task_status_create = task_create["status"]
2327
2328             # 1: look for tasks in status SCHEDULED, or in status CREATE if action is  DONE or BUILD
2329 1             for task_action in ("DELETE", "CREATE", "EXEC"):
2330 1                 db_vim_update = None
2331 1                 new_status = None
2332
2333 1                 for task_index, task in enumerate(ro_task["tasks"]):
2334 1                     if not task:
2335 0                         continue  # task deleted
2336
2337 1                     task_depends = {}
2338 1                     target_update = None
2339
2340 1                     if (
2341                         (
2342                             task_action in ("DELETE", "EXEC")
2343                             and task["status"] not in ("SCHEDULED", "BUILD")
2344                         )
2345                         or task["action"] != task_action
2346                         or (
2347                             task_action == "CREATE"
2348                             and task["status"] in ("FINISHED", "SUPERSEDED")
2349                         )
2350                     ):
2351 0                         continue
2352
2353 1                     task_path = "tasks.{}.status".format(task_index)
2354 1                     try:
2355 1                         db_vim_info_update = None
2356
2357 1                         if task["status"] == "SCHEDULED":
2358                             # check if tasks that this depends on have been completed
2359 0                             dependency_not_completed = False
2360
2361 0                             for dependency_task_id in task.get("depends_on") or ():
2362 0                                 (
2363                                     dependency_ro_task,
2364                                     dependency_task_index,
2365                                 ) = self._get_dependency(
2366                                     dependency_task_id, target_id=ro_task["target_id"]
2367                                 )
2368 0                                 dependency_task = dependency_ro_task["tasks"][
2369                                     dependency_task_index
2370                                 ]
2371 0                                 self.logger.debug(
2372                                     "dependency_ro_task={} dependency_task_index={}".format(
2373                                         dependency_ro_task, dependency_task_index
2374                                     )
2375                                 )
2376
2377 0                                 if dependency_task["status"] == "SCHEDULED":
2378 0                                     dependency_not_completed = True
2379 0                                     next_check_at = min(
2380                                         next_check_at, dependency_ro_task["to_check_at"]
2381                                     )
2382                                     # must allow dependent task to be processed first
2383                                     # to do this set time after last_task_processed
2384 0                                     next_check_at = max(
2385                                         self.time_last_task_processed, next_check_at
2386                                     )
2387 0                                     break
2388 0                                 elif dependency_task["status"] == "FAILED":
2389 0                                     error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2390                                         task["action"],
2391                                         task["item"],
2392                                         dependency_task["action"],
2393                                         dependency_task["item"],
2394                                         dependency_task_id,
2395                                         dependency_ro_task["vim_info"].get(
2396                                             "vim_message"
2397                                         ),
2398                                     )
2399 0                                     self.logger.error(
2400                                         "task={} {}".format(task["task_id"], error_text)
2401                                     )
2402 0                                     raise NsWorkerException(error_text)
2403
2404 0                                 task_depends[dependency_task_id] = dependency_ro_task[
2405                                     "vim_info"
2406                                 ]["vim_id"]
2407 0                                 task_depends[
2408                                     "TASK-{}".format(dependency_task_id)
2409                                 ] = dependency_ro_task["vim_info"]["vim_id"]
2410
2411 0                             if dependency_not_completed:
2412 0                                 self.logger.warning(
2413                                     "DEPENDENCY NOT COMPLETED {}".format(
2414                                         dependency_ro_task["vim_info"]["vim_id"]
2415                                     )
2416                                 )
2417                                 # TODO set at vim_info.vim_details that it is waiting
2418 0                                 continue
2419
2420                         # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2421                         # the task of renew this locking. It will update database locket_at periodically
2422 1                         if not lock_object:
2423 1                             lock_object = LockRenew.add_lock_object(
2424                                 "ro_tasks", ro_task, self
2425                             )
2426
2427 1                         if task["action"] == "DELETE":
2428 0                             (
2429                                 new_status,
2430                                 db_vim_info_update,
2431                             ) = self._delete_task(
2432                                 ro_task, task_index, task_depends, db_ro_task_update
2433                             )
2434 0                             new_status = (
2435                                 "FINISHED" if new_status == "DONE" else new_status
2436                             )
2437                             # ^with FINISHED instead of DONE it will not be refreshing
2438
2439 0                             if new_status in ("FINISHED", "SUPERSEDED"):
2440 0                                 target_update = "DELETE"
2441 1                         elif task["action"] == "EXEC":
2442 0                             (
2443                                 new_status,
2444                                 db_vim_info_update,
2445                                 db_task_update,
2446                             ) = self.item2class[task["item"]].exec(
2447                                 ro_task, task_index, task_depends
2448                             )
2449 0                             new_status = (
2450                                 "FINISHED" if new_status == "DONE" else new_status
2451                             )
2452                             # ^with FINISHED instead of DONE it will not be refreshing
2453
2454 0                             if db_task_update:
2455                                 # load into database the modified db_task_update "retries" and "next_retry"
2456 0                                 if db_task_update.get("retries"):
2457 0                                     db_ro_task_update[
2458                                         "tasks.{}.retries".format(task_index)
2459                                     ] = db_task_update["retries"]
2460
2461 0                                 next_check_at = time.time() + db_task_update.get(
2462                                     "next_retry", 60
2463                                 )
2464 0                             target_update = None
2465 1                         elif task["action"] == "CREATE":
2466 1                             if task["status"] == "SCHEDULED":
2467 0                                 if task_status_create:
2468 0                                     new_status = task_status_create
2469 0                                     target_update = "COPY_VIM_INFO"
2470                                 else:
2471 0                                     new_status, db_vim_info_update = self.item2class[
2472                                         task["item"]
2473                                     ].new(ro_task, task_index, task_depends)
2474                                     # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2475 0                                     _update_refresh(new_status)
2476                             else:
2477 1                                 refresh_at = ro_task["vim_info"]["refresh_at"]
2478 1                                 if refresh_at and refresh_at != -1 and now > refresh_at:
2479 0                                     (
2480                                         new_status,
2481                                         db_vim_info_update,
2482                                     ) = self.item2class[
2483                                         task["item"]
2484                                     ].refresh(ro_task)
2485 0                                     _update_refresh(new_status)
2486                                 else:
2487                                     # The refresh is updated to avoid set the value of "refresh_at" to
2488                                     # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2489                                     # because it can happen that in this case the task is never processed
2490 1                                     _update_refresh(task["status"])
2491
2492 0                     except Exception as e:
2493 0                         new_status = "FAILED"
2494 0                         db_vim_info_update = {
2495                             "vim_status": "VIM_ERROR",
2496                             "vim_message": str(e),
2497                         }
2498
2499 0                         if not isinstance(
2500                             e, (NsWorkerException, vimconn.VimConnException)
2501                         ):
2502 0                             self.logger.error(
2503                                 "Unexpected exception at _delete_task task={}: {}".format(
2504                                     task["task_id"], e
2505                                 ),
2506                                 exc_info=True,
2507                             )
2508
2509 1                     try:
2510 1                         if db_vim_info_update:
2511 0                             db_vim_update = db_vim_info_update.copy()
2512 0                             db_ro_task_update.update(
2513                                 {
2514                                     "vim_info." + k: v
2515                                     for k, v in db_vim_info_update.items()
2516                                 }
2517                             )
2518 0                             ro_task["vim_info"].update(db_vim_info_update)
2519
2520 1                         if new_status:
2521 0                             if task_action == "CREATE":
2522 0                                 task_status_create = new_status
2523 0                             db_ro_task_update[task_path] = new_status
2524
2525 1                         if target_update or db_vim_update:
2526 0                             if target_update == "DELETE":
2527 0                                 self._update_target(task, None)
2528 0                             elif target_update == "COPY_VIM_INFO":
2529 0                                 self._update_target(task, ro_task["vim_info"])
2530                             else:
2531 0                                 self._update_target(task, db_vim_update)
2532
2533 0                     except Exception as e:
2534 0                         if (
2535                             isinstance(e, DbException)
2536                             and e.http_code == HTTPStatus.NOT_FOUND
2537                         ):
2538                             # if the vnfrs or nsrs has been removed from database, this task must be removed
2539 0                             self.logger.debug(
2540                                 "marking to delete task={}".format(task["task_id"])
2541                             )
2542 0                             self.tasks_to_delete.append(task)
2543                         else:
2544 0                             self.logger.error(
2545                                 "Unexpected exception at _update_target task={}: {}".format(
2546                                     task["task_id"], e
2547                                 ),
2548                                 exc_info=True,
2549                             )
2550
2551 1             locked_at = ro_task["locked_at"]
2552
2553 1             if lock_object:
2554 1                 locked_at = [
2555                     lock_object["locked_at"],
2556                     lock_object["locked_at"] + self.task_locked_time,
2557                 ]
2558                 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2559                 # contain exactly locked_at + self.task_locked_time
2560 1                 LockRenew.remove_lock_object(lock_object)
2561
2562 1             q_filter = {
2563                 "_id": ro_task["_id"],
2564                 "to_check_at": ro_task["to_check_at"],
2565                 "locked_at": locked_at,
2566             }
2567             # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2568             # outside this task (by ro_nbi) do not update it
2569 1             db_ro_task_update["locked_by"] = None
2570             # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2571 1             db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2572 1             db_ro_task_update["modified_at"] = now
2573 1             db_ro_task_update["to_check_at"] = next_check_at
2574
2575             """
2576             # Log RO tasks only when loglevel is DEBUG
2577             if self.logger.getEffectiveLevel() == logging.DEBUG:
2578                 db_ro_task_update_log = db_ro_task_update.copy()
2579                 db_ro_task_update_log["_id"] = q_filter["_id"]
2580                 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2581             """
2582
2583 1             if not self.db.set_one(
2584                 "ro_tasks",
2585                 update_dict=db_ro_task_update,
2586                 q_filter=q_filter,
2587                 fail_on_empty=False,
2588             ):
2589 0                 del db_ro_task_update["to_check_at"]
2590 0                 del q_filter["to_check_at"]
2591                 """
2592                 # Log RO tasks only when loglevel is DEBUG
2593                 if self.logger.getEffectiveLevel() == logging.DEBUG:
2594                     self._log_ro_task(
2595                         None,
2596                         db_ro_task_update_log,
2597                         None,
2598                         "TASK_WF",
2599                         "SET_TASK " + str(q_filter),
2600                     )
2601                 """
2602 0                 self.db.set_one(
2603                     "ro_tasks",
2604                     q_filter=q_filter,
2605                     update_dict=db_ro_task_update,
2606                     fail_on_empty=True,
2607                 )
2608 0         except DbException as e:
2609 0             self.logger.error(
2610                 "ro_task={} Error updating database {}".format(ro_task_id, e)
2611             )
2612 0         except Exception as e:
2613 0             self.logger.error(
2614                 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2615             )
2616
2617 1     def _update_target(self, task, ro_vim_item_update):
2618 0         table, _, temp = task["target_record"].partition(":")
2619 0         _id, _, path_vim_status = temp.partition(":")
2620 0         path_item = path_vim_status[: path_vim_status.rfind(".")]
2621 0         path_item = path_item[: path_item.rfind(".")]
2622         # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2623         # path_item: dot separated list targeting record information, e.g. "vdur.10"
2624
2625 0         if ro_vim_item_update:
2626 0             update_dict = {
2627                 path_vim_status + "." + k: v
2628                 for k, v in ro_vim_item_update.items()
2629                 if k
2630                 in (
2631                     "vim_id",
2632                     "vim_details",
2633                     "vim_message",
2634                     "vim_name",
2635                     "vim_status",
2636                     "interfaces",
2637                     "interfaces_backup",
2638                 )
2639             }
2640
2641 0             if path_vim_status.startswith("vdur."):
2642                 # for backward compatibility, add vdur.name apart from vdur.vim_name
2643 0                 if ro_vim_item_update.get("vim_name"):
2644 0                     update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2645
2646                 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2647 0                 if ro_vim_item_update.get("vim_id"):
2648 0                     update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2649
2650                 # update general status
2651 0                 if ro_vim_item_update.get("vim_status"):
2652 0                     update_dict[path_item + ".status"] = ro_vim_item_update[
2653                         "vim_status"
2654                     ]
2655
2656 0             if ro_vim_item_update.get("interfaces"):
2657 0                 path_interfaces = path_item + ".interfaces"
2658
2659 0                 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2660 0                     if iface:
2661 0                         update_dict.update(
2662                             {
2663                                 path_interfaces + ".{}.".format(i) + k: v
2664                                 for k, v in iface.items()
2665                                 if k in ("vlan", "compute_node", "pci")
2666                             }
2667                         )
2668
2669                         # put ip_address and mac_address with ip-address and mac-address
2670 0                         if iface.get("ip_address"):
2671 0                             update_dict[
2672                                 path_interfaces + ".{}.".format(i) + "ip-address"
2673                             ] = iface["ip_address"]
2674
2675 0                         if iface.get("mac_address"):
2676 0                             update_dict[
2677                                 path_interfaces + ".{}.".format(i) + "mac-address"
2678                             ] = iface["mac_address"]
2679
2680 0                         if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2681 0                             update_dict["ip-address"] = iface.get("ip_address").split(
2682                                 ";"
2683                             )[0]
2684
2685 0                         if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2686 0                             update_dict[path_item + ".ip-address"] = iface.get(
2687                                 "ip_address"
2688                             ).split(";")[0]
2689
2690 0             self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2691
2692             # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2693 0             if ro_vim_item_update.get("interfaces"):
2694 0                 search_key = path_vim_status + ".interfaces"
2695 0                 if update_dict.get(search_key):
2696 0                     interfaces_backup_update = {
2697                         path_vim_status + ".interfaces_backup": update_dict[search_key]
2698                     }
2699
2700 0                     self.db.set_one(
2701                         table,
2702                         q_filter={"_id": _id},
2703                         update_dict=interfaces_backup_update,
2704                     )
2705
2706         else:
2707 0             update_dict = {path_item + ".status": "DELETED"}
2708 0             self.db.set_one(
2709                 table,
2710                 q_filter={"_id": _id},
2711                 update_dict=update_dict,
2712                 unset={path_vim_status: None},
2713             )
2714
2715 1     def _process_delete_db_tasks(self):
2716         """
2717         Delete task from database because vnfrs or nsrs or both have been deleted
2718         :return: None. Uses and modify self.tasks_to_delete
2719         """
2720 0         while self.tasks_to_delete:
2721 0             task = self.tasks_to_delete[0]
2722 0             vnfrs_deleted = None
2723 0             nsr_id = task["nsr_id"]
2724
2725 0             if task["target_record"].startswith("vnfrs:"):
2726                 # check if nsrs is present
2727 0                 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2728 0                     vnfrs_deleted = task["target_record"].split(":")[1]
2729
2730 0             try:
2731 0                 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2732 0             except Exception as e:
2733 0                 self.logger.error(
2734                     "Error deleting task={}: {}".format(task["task_id"], e)
2735                 )
2736 0             self.tasks_to_delete.pop(0)
2737
2738 1     @staticmethod
2739 1     def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2740         """
2741         Static method because it is called from osm_ng_ro.ns
2742         :param db: instance of database to use
2743         :param nsr_id: affected nsrs id
2744         :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2745         :return: None, exception is fails
2746         """
2747 0         retries = 5
2748 0         for retry in range(retries):
2749 0             ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2750 0             now = time.time()
2751 0             conflict = False
2752
2753 0             for ro_task in ro_tasks:
2754 0                 db_update = {}
2755 0                 to_delete_ro_task = True
2756
2757 0                 for index, task in enumerate(ro_task["tasks"]):
2758 0                     if not task:
2759 0                         pass
2760 0                     elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2761                         vnfrs_deleted
2762                         and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2763                     ):
2764 0                         db_update["tasks.{}".format(index)] = None
2765                     else:
2766                         # used by other nsr, ro_task cannot be deleted
2767 0                         to_delete_ro_task = False
2768
2769                 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2770 0                 if to_delete_ro_task:
2771 0                     if not db.del_one(
2772                         "ro_tasks",
2773                         q_filter={
2774                             "_id": ro_task["_id"],
2775                             "modified_at": ro_task["modified_at"],
2776                         },
2777                         fail_on_empty=False,
2778                     ):
2779 0                         conflict = True
2780 0                 elif db_update:
2781 0                     db_update["modified_at"] = now
2782 0                     if not db.set_one(
2783                         "ro_tasks",
2784                         q_filter={
2785                             "_id": ro_task["_id"],
2786                             "modified_at": ro_task["modified_at"],
2787                         },
2788                         update_dict=db_update,
2789                         fail_on_empty=False,
2790                     ):
2791 0                         conflict = True
2792 0             if not conflict:
2793 0                 return
2794         else:
2795 0             raise NsWorkerException("Exceeded {} retries".format(retries))
2796
2797 1     def run(self):
2798         # load database
2799 0         self.logger.info("Starting")
2800         while True:
2801             # step 1: get commands from queue
2802 0             try:
2803 0                 if self.vim_targets:
2804 0                     task = self.task_queue.get(block=False)
2805                 else:
2806 0                     if not self.idle:
2807 0                         self.logger.debug("enters in idle state")
2808 0                     self.idle = True
2809 0                     task = self.task_queue.get(block=True)
2810 0                     self.idle = False
2811
2812 0                 if task[0] == "terminate":
2813 0                     break
2814 0                 elif task[0] == "load_vim":
2815 0                     self.logger.info("order to load vim {}".format(task[1]))
2816 0                     self._load_vim(task[1])
2817 0                 elif task[0] == "unload_vim":
2818 0                     self.logger.info("order to unload vim {}".format(task[1]))
2819 0                     self._unload_vim(task[1])
2820 0                 elif task[0] == "reload_vim":
2821 0                     self._reload_vim(task[1])
2822 0                 elif task[0] == "check_vim":
2823 0                     self.logger.info("order to check vim {}".format(task[1]))
2824 0                     self._check_vim(task[1])
2825 0                 continue
2826 0             except Exception as e:
2827 0                 if isinstance(e, queue.Empty):
2828 0                     pass
2829                 else:
2830 0                     self.logger.critical(
2831                         "Error processing task: {}".format(e), exc_info=True
2832                     )
2833
2834             # step 2: process pending_tasks, delete not needed tasks
2835 0             try:
2836 0                 if self.tasks_to_delete:
2837 0                     self._process_delete_db_tasks()
2838 0                 busy = False
2839                 """
2840                 # Log RO tasks only when loglevel is DEBUG
2841                 if self.logger.getEffectiveLevel() == logging.DEBUG:
2842                     _ = self._get_db_all_tasks()
2843                 """
2844 0                 ro_task = self._get_db_task()
2845 0                 if ro_task:
2846 0                     self.logger.debug("Task to process: {}".format(ro_task))
2847 0                     time.sleep(1)
2848 0                     self._process_pending_tasks(ro_task)
2849 0                     busy = True
2850 0                 if not busy:
2851 0                     time.sleep(5)
2852 0             except Exception as e:
2853 0                 self.logger.critical(
2854                     "Unexpected exception at run: " + str(e), exc_info=True
2855                 )
2856
2857 0         self.logger.info("Finishing")