Code Coverage

Cobertura Coverage Report > NG-RO.osm_ng_ro >

ns_thread.py

Trend

Classes100%
 
Lines30%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
ns_thread.py
100%
1/1
30%
386/1285
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
ns_thread.py
30%
386/1285
N/A

Source

NG-RO/osm_ng_ro/ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 1 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 1 from copy import deepcopy
28 1 from http import HTTPStatus
29 1 import logging
30 1 from os import makedirs
31 1 from os import path
32 1 import queue
33 1 from shutil import rmtree
34 1 import threading
35 1 import time
36 1 import traceback
37 1 from typing import Dict
38 1 from unittest.mock import Mock
39
40 1 from importlib_metadata import entry_points
41 1 from osm_common.dbbase import DbException
42 1 from osm_ng_ro.vim_admin import LockRenew
43 1 from osm_ro_plugin import sdnconn
44 1 from osm_ro_plugin import vimconn
45 1 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
46 1 from osm_ro_plugin.vim_dummy import VimDummyConnector
47 1 import yaml
48
49 1 __author__ = "Alfonso Tierno"
50 1 __date__ = "$28-Sep-2017 12:07:15$"
51
52
53 1 def deep_get(target_dict, *args, **kwargs):
54     """
55     Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
56     Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
57     :param target_dict: dictionary to be read
58     :param args: list of keys to read from  target_dict
59     :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
60     :return: The wanted value if exist, None or default otherwise
61     """
62 1     for key in args:
63 1         if not isinstance(target_dict, dict) or key not in target_dict:
64 1             return kwargs.get("default")
65 1         target_dict = target_dict[key]
66 1     return target_dict
67
68
69 1 class NsWorkerException(Exception):
70 1     pass
71
72
73 1 class FailingConnector:
74 1     def __init__(self, error_msg):
75 0         self.error_msg = error_msg
76
77 0         for method in dir(vimconn.VimConnector):
78 0             if method[0] != "_":
79 0                 setattr(
80                     self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
81                 )
82
83 0         for method in dir(sdnconn.SdnConnectorBase):
84 0             if method[0] != "_":
85 0                 setattr(
86                     self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
87                 )
88
89
90 1 class NsWorkerExceptionNotFound(NsWorkerException):
91 1     pass
92
93
94 1 class VimInteractionBase:
95     """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
96     It implements methods that does nothing and return ok"""
97
98 1     def __init__(self, db, my_vims, db_vims, logger):
99 1         self.db = db
100 1         self.logger = logger
101 1         self.my_vims = my_vims
102 1         self.db_vims = db_vims
103
104 1     def new(self, ro_task, task_index, task_depends):
105 0         return "BUILD", {}
106
107 1     def refresh(self, ro_task):
108         """skip calling VIM to get image, flavor status. Assumes ok"""
109 0         if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
110 0             return "FAILED", {}
111
112 0         return "DONE", {}
113
114 1     def delete(self, ro_task, task_index):
115         """skip calling VIM to delete image. Assumes ok"""
116 0         return "DONE", {}
117
118 1     def exec(self, ro_task, task_index, task_depends):
119 0         return "DONE", None, None
120
121
122 1 class VimInteractionNet(VimInteractionBase):
123 1     def new(self, ro_task, task_index, task_depends):
124 1         vim_net_id = None
125 1         task = ro_task["tasks"][task_index]
126 1         task_id = task["task_id"]
127 1         created = False
128 1         created_items = {}
129 1         target_vim = self.my_vims[ro_task["target_id"]]
130 1         mgmtnet = False
131 1         mgmtnet_defined_in_vim = False
132
133 1         try:
134             # FIND
135 1             if task.get("find_params"):
136                 # if management, get configuration of VIM
137 1                 if task["find_params"].get("filter_dict"):
138 1                     vim_filter = task["find_params"]["filter_dict"]
139                 # management network
140 1                 elif task["find_params"].get("mgmt"):
141 1                     mgmtnet = True
142 1                     if deep_get(
143                         self.db_vims[ro_task["target_id"]],
144                         "config",
145                         "management_network_id",
146                     ):
147 1                         mgmtnet_defined_in_vim = True
148 1                         vim_filter = {
149                             "id": self.db_vims[ro_task["target_id"]]["config"][
150                                 "management_network_id"
151                             ]
152                         }
153 1                     elif deep_get(
154                         self.db_vims[ro_task["target_id"]],
155                         "config",
156                         "management_network_name",
157                     ):
158 1                         mgmtnet_defined_in_vim = True
159 1                         vim_filter = {
160                             "name": self.db_vims[ro_task["target_id"]]["config"][
161                                 "management_network_name"
162                             ]
163                         }
164                     else:
165 1                         vim_filter = {"name": task["find_params"]["name"]}
166                 else:
167 1                     raise NsWorkerExceptionNotFound(
168                         "Invalid find_params for new_net {}".format(task["find_params"])
169                     )
170
171 1                 vim_nets = target_vim.get_network_list(vim_filter)
172 1                 if not vim_nets and not task.get("params"):
173                     # If there is mgmt-network in the descriptor,
174                     # there is no mapping of that network to a VIM network in the descriptor,
175                     # also there is no mapping in the "--config" parameter or at VIM creation;
176                     # that mgmt-network will be created.
177 1                     if mgmtnet and not mgmtnet_defined_in_vim:
178 1                         net_name = (
179                             vim_filter.get("name")
180                             if vim_filter.get("name")
181                             else vim_filter.get("id")[:16]
182                         )
183 1                         vim_net_id, created_items = target_vim.new_network(
184                             net_name, None
185                         )
186 1                         self.logger.debug(
187                             "Created mgmt network vim_net_id: {}".format(vim_net_id)
188                         )
189 1                         created = True
190                     else:
191 1                         raise NsWorkerExceptionNotFound(
192                             "Network not found with this criteria: '{}'".format(
193                                 task.get("find_params")
194                             )
195                         )
196 1                 elif len(vim_nets) > 1:
197 1                     raise NsWorkerException(
198                         "More than one network found with this criteria: '{}'".format(
199                             task["find_params"]
200                         )
201                     )
202
203 1                 if vim_nets:
204 1                     vim_net_id = vim_nets[0]["id"]
205             else:
206                 # CREATE
207 1                 params = task["params"]
208 1                 vim_net_id, created_items = target_vim.new_network(**params)
209 1                 created = True
210
211 1             ro_vim_item_update = {
212                 "vim_id": vim_net_id,
213                 "vim_status": "BUILD",
214                 "created": created,
215                 "created_items": created_items,
216                 "vim_details": None,
217                 "vim_message": None,
218             }
219 1             self.logger.debug(
220                 "task={} {} new-net={} created={}".format(
221                     task_id, ro_task["target_id"], vim_net_id, created
222                 )
223             )
224
225 1             return "BUILD", ro_vim_item_update
226 1         except (vimconn.VimConnException, NsWorkerException) as e:
227 1             self.logger.error(
228                 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
229             )
230 1             ro_vim_item_update = {
231                 "vim_status": "VIM_ERROR",
232                 "created": created,
233                 "vim_message": str(e),
234             }
235
236 1             return "FAILED", ro_vim_item_update
237
238 1     def refresh(self, ro_task):
239         """Call VIM to get network status"""
240 1         ro_task_id = ro_task["_id"]
241 1         target_vim = self.my_vims[ro_task["target_id"]]
242 1         vim_id = ro_task["vim_info"]["vim_id"]
243 1         net_to_refresh_list = [vim_id]
244
245 1         try:
246 1             vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
247 1             vim_info = vim_dict[vim_id]
248
249 1             if vim_info["status"] == "ACTIVE":
250 1                 task_status = "DONE"
251 1             elif vim_info["status"] == "BUILD":
252 1                 task_status = "BUILD"
253             else:
254 1                 task_status = "FAILED"
255 1         except vimconn.VimConnException as e:
256             # Mark all tasks at VIM_ERROR status
257 1             self.logger.error(
258                 "ro_task={} vim={} get-net={}: {}".format(
259                     ro_task_id, ro_task["target_id"], vim_id, e
260                 )
261             )
262 1             vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
263 1             task_status = "FAILED"
264
265 1         ro_vim_item_update = {}
266 1         if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
267 1             ro_vim_item_update["vim_status"] = vim_info["status"]
268
269 1         if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
270 1             ro_vim_item_update["vim_name"] = vim_info.get("name")
271
272 1         if vim_info["status"] in ("ERROR", "VIM_ERROR"):
273 1             if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
274 1                 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
275 1         elif vim_info["status"] == "DELETED":
276 1             ro_vim_item_update["vim_id"] = None
277 1             ro_vim_item_update["vim_message"] = "Deleted externally"
278         else:
279 1             if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
280 1                 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
281
282 1         if ro_vim_item_update:
283 1             self.logger.debug(
284                 "ro_task={} {} get-net={}: status={} {}".format(
285                     ro_task_id,
286                     ro_task["target_id"],
287                     vim_id,
288                     ro_vim_item_update.get("vim_status"),
289                     ro_vim_item_update.get("vim_message")
290                     if ro_vim_item_update.get("vim_status") != "ACTIVE"
291                     else "",
292                 )
293             )
294
295 1         return task_status, ro_vim_item_update
296
297 1     def delete(self, ro_task, task_index):
298 0         task = ro_task["tasks"][task_index]
299 0         task_id = task["task_id"]
300 0         net_vim_id = ro_task["vim_info"]["vim_id"]
301 0         ro_vim_item_update_ok = {
302             "vim_status": "DELETED",
303             "created": False,
304             "vim_message": "DELETED",
305             "vim_id": None,
306         }
307
308 0         try:
309 0             if net_vim_id or ro_task["vim_info"]["created_items"]:
310 0                 target_vim = self.my_vims[ro_task["target_id"]]
311 0                 target_vim.delete_network(
312                     net_vim_id, ro_task["vim_info"]["created_items"]
313                 )
314 0         except vimconn.VimConnNotFoundException:
315 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
316 0         except vimconn.VimConnException as e:
317 0             self.logger.error(
318                 "ro_task={} vim={} del-net={}: {}".format(
319                     ro_task["_id"], ro_task["target_id"], net_vim_id, e
320                 )
321             )
322 0             ro_vim_item_update = {
323                 "vim_status": "VIM_ERROR",
324                 "vim_message": "Error while deleting: {}".format(e),
325             }
326
327 0             return "FAILED", ro_vim_item_update
328
329 0         self.logger.debug(
330             "task={} {} del-net={} {}".format(
331                 task_id,
332                 ro_task["target_id"],
333                 net_vim_id,
334                 ro_vim_item_update_ok.get("vim_message", ""),
335             )
336         )
337
338 0         return "DONE", ro_vim_item_update_ok
339
340
341 1 class VimInteractionVdu(VimInteractionBase):
342 1     max_retries_inject_ssh_key = 20  # 20 times
343 1     time_retries_inject_ssh_key = 30  # wevery 30 seconds
344
345 1     def new(self, ro_task, task_index, task_depends):
346 0         task = ro_task["tasks"][task_index]
347 0         task_id = task["task_id"]
348 0         created = False
349 0         created_items = {}
350 0         target_vim = self.my_vims[ro_task["target_id"]]
351
352 0         try:
353 0             created = True
354 0             params = task["params"]
355 0             params_copy = deepcopy(params)
356 0             net_list = params_copy["net_list"]
357
358 0             for net in net_list:
359                 # change task_id into network_id
360 0                 if "net_id" in net and net["net_id"].startswith("TASK-"):
361 0                     network_id = task_depends[net["net_id"]]
362
363 0                     if not network_id:
364 0                         raise NsWorkerException(
365                             "Cannot create VM because depends on a network not created or found "
366                             "for {}".format(net["net_id"])
367                         )
368
369 0                     net["net_id"] = network_id
370
371 0             if params_copy["image_id"].startswith("TASK-"):
372 0                 params_copy["image_id"] = task_depends[params_copy["image_id"]]
373
374 0             if params_copy["flavor_id"].startswith("TASK-"):
375 0                 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
376
377 0             affinity_group_list = params_copy["affinity_group_list"]
378 0             for affinity_group in affinity_group_list:
379                 # change task_id into affinity_group_id
380 0                 if "affinity_group_id" in affinity_group and affinity_group[
381                     "affinity_group_id"
382                 ].startswith("TASK-"):
383 0                     affinity_group_id = task_depends[
384                         affinity_group["affinity_group_id"]
385                     ]
386
387 0                     if not affinity_group_id:
388 0                         raise NsWorkerException(
389                             "found for {}".format(affinity_group["affinity_group_id"])
390                         )
391
392 0                     affinity_group["affinity_group_id"] = affinity_group_id
393
394 0             vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
395 0             interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
396
397             # add to created items previous_created_volumes (healing)
398 0             if task.get("previous_created_volumes"):
399 0                 for k, v in task["previous_created_volumes"].items():
400 0                     created_items[k] = v
401
402 0             ro_vim_item_update = {
403                 "vim_id": vim_vm_id,
404                 "vim_status": "BUILD",
405                 "created": created,
406                 "created_items": created_items,
407                 "vim_details": None,
408                 "vim_message": None,
409                 "interfaces_vim_ids": interfaces,
410                 "interfaces": [],
411                 "interfaces_backup": [],
412             }
413 0             self.logger.debug(
414                 "task={} {} new-vm={} created={}".format(
415                     task_id, ro_task["target_id"], vim_vm_id, created
416                 )
417             )
418
419 0             return "BUILD", ro_vim_item_update
420 0         except (vimconn.VimConnException, NsWorkerException) as e:
421 0             self.logger.debug(traceback.format_exc())
422 0             self.logger.error(
423                 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
424             )
425 0             ro_vim_item_update = {
426                 "vim_status": "VIM_ERROR",
427                 "created": created,
428                 "vim_message": str(e),
429             }
430
431 0             return "FAILED", ro_vim_item_update
432
433 1     def delete(self, ro_task, task_index):
434 0         task = ro_task["tasks"][task_index]
435 0         task_id = task["task_id"]
436 0         vm_vim_id = ro_task["vim_info"]["vim_id"]
437 0         ro_vim_item_update_ok = {
438             "vim_status": "DELETED",
439             "created": False,
440             "vim_message": "DELETED",
441             "vim_id": None,
442         }
443
444 0         try:
445 0             self.logger.debug(
446                 "delete_vminstance: vm_vim_id={} created_items={}".format(
447                     vm_vim_id, ro_task["vim_info"]["created_items"]
448                 )
449             )
450 0             if vm_vim_id or ro_task["vim_info"]["created_items"]:
451 0                 target_vim = self.my_vims[ro_task["target_id"]]
452 0                 target_vim.delete_vminstance(
453                     vm_vim_id,
454                     ro_task["vim_info"]["created_items"],
455                     ro_task["vim_info"].get("volumes_to_hold", []),
456                 )
457 0         except vimconn.VimConnNotFoundException:
458 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
459 0         except vimconn.VimConnException as e:
460 0             self.logger.error(
461                 "ro_task={} vim={} del-vm={}: {}".format(
462                     ro_task["_id"], ro_task["target_id"], vm_vim_id, e
463                 )
464             )
465 0             ro_vim_item_update = {
466                 "vim_status": "VIM_ERROR",
467                 "vim_message": "Error while deleting: {}".format(e),
468             }
469
470 0             return "FAILED", ro_vim_item_update
471
472 0         self.logger.debug(
473             "task={} {} del-vm={} {}".format(
474                 task_id,
475                 ro_task["target_id"],
476                 vm_vim_id,
477                 ro_vim_item_update_ok.get("vim_message", ""),
478             )
479         )
480
481 0         return "DONE", ro_vim_item_update_ok
482
483 1     def refresh(self, ro_task):
484         """Call VIM to get vm status"""
485 0         ro_task_id = ro_task["_id"]
486 0         target_vim = self.my_vims[ro_task["target_id"]]
487 0         vim_id = ro_task["vim_info"]["vim_id"]
488
489 0         if not vim_id:
490 0             return None, None
491
492 0         vm_to_refresh_list = [vim_id]
493 0         try:
494 0             vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
495 0             vim_info = vim_dict[vim_id]
496
497 0             if vim_info["status"] == "ACTIVE":
498 0                 task_status = "DONE"
499 0             elif vim_info["status"] == "BUILD":
500 0                 task_status = "BUILD"
501             else:
502 0                 task_status = "FAILED"
503
504             # try to load and parse vim_information
505 0             try:
506 0                 vim_info_info = yaml.safe_load(vim_info["vim_info"])
507 0                 if vim_info_info.get("name"):
508 0                     vim_info["name"] = vim_info_info["name"]
509 0             except Exception as vim_info_error:
510 0                 self.logger.exception(
511                     f"{vim_info_error} occured while getting the vim_info from yaml"
512                 )
513 0         except vimconn.VimConnException as e:
514             # Mark all tasks at VIM_ERROR status
515 0             self.logger.error(
516                 "ro_task={} vim={} get-vm={}: {}".format(
517                     ro_task_id, ro_task["target_id"], vim_id, e
518                 )
519             )
520 0             vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
521 0             task_status = "FAILED"
522
523 0         ro_vim_item_update = {}
524
525         # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
526 0         vim_interfaces = []
527 0         if vim_info.get("interfaces"):
528 0             for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
529 0                 iface = next(
530                     (
531                         iface
532                         for iface in vim_info["interfaces"]
533                         if vim_iface_id == iface["vim_interface_id"]
534                     ),
535                     None,
536                 )
537                 # if iface:
538                 #     iface.pop("vim_info", None)
539 0                 vim_interfaces.append(iface)
540
541 0         task_create = next(
542             t
543             for t in ro_task["tasks"]
544             if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
545         )
546 0         if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
547 0             vim_interfaces[task_create["mgmt_vnf_interface"]][
548                 "mgmt_vnf_interface"
549             ] = True
550
551 0         mgmt_vdu_iface = task_create.get(
552             "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
553         )
554 0         if vim_interfaces:
555 0             vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
556
557 0         if ro_task["vim_info"]["interfaces"] != vim_interfaces:
558 0             ro_vim_item_update["interfaces"] = vim_interfaces
559
560 0         if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
561 0             ro_vim_item_update["vim_status"] = vim_info["status"]
562
563 0         if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
564 0             ro_vim_item_update["vim_name"] = vim_info.get("name")
565
566 0         if vim_info["status"] in ("ERROR", "VIM_ERROR"):
567 0             if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
568 0                 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
569 0         elif vim_info["status"] == "DELETED":
570 0             ro_vim_item_update["vim_id"] = None
571 0             ro_vim_item_update["vim_message"] = "Deleted externally"
572         else:
573 0             if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
574 0                 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
575
576 0         if ro_vim_item_update:
577 0             self.logger.debug(
578                 "ro_task={} {} get-vm={}: status={} {}".format(
579                     ro_task_id,
580                     ro_task["target_id"],
581                     vim_id,
582                     ro_vim_item_update.get("vim_status"),
583                     ro_vim_item_update.get("vim_message")
584                     if ro_vim_item_update.get("vim_status") != "ACTIVE"
585                     else "",
586                 )
587             )
588
589 0         return task_status, ro_vim_item_update
590
591 1     def exec(self, ro_task, task_index, task_depends):
592 0         task = ro_task["tasks"][task_index]
593 0         task_id = task["task_id"]
594 0         target_vim = self.my_vims[ro_task["target_id"]]
595 0         db_task_update = {"retries": 0}
596 0         retries = task.get("retries", 0)
597
598 0         try:
599 0             params = task["params"]
600 0             params_copy = deepcopy(params)
601 0             params_copy["ro_key"] = self.db.decrypt(
602                 params_copy.pop("private_key"),
603                 params_copy.pop("schema_version"),
604                 params_copy.pop("salt"),
605             )
606 0             params_copy["ip_addr"] = params_copy.pop("ip_address")
607 0             target_vim.inject_user_key(**params_copy)
608 0             self.logger.debug(
609                 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
610             )
611
612 0             return (
613                 "DONE",
614                 None,
615                 db_task_update,
616             )  # params_copy["key"]
617 0         except (vimconn.VimConnException, NsWorkerException) as e:
618 0             retries += 1
619
620 0             self.logger.debug(traceback.format_exc())
621 0             if retries < self.max_retries_inject_ssh_key:
622 0                 return (
623                     "BUILD",
624                     None,
625                     {
626                         "retries": retries,
627                         "next_retry": self.time_retries_inject_ssh_key,
628                     },
629                 )
630
631 0             self.logger.error(
632                 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
633             )
634 0             ro_vim_item_update = {"vim_message": str(e)}
635
636 0             return "FAILED", ro_vim_item_update, db_task_update
637
638
639 1 class VimInteractionImage(VimInteractionBase):
640 1     def new(self, ro_task, task_index, task_depends):
641 0         task = ro_task["tasks"][task_index]
642 0         task_id = task["task_id"]
643 0         created = False
644 0         created_items = {}
645 0         target_vim = self.my_vims[ro_task["target_id"]]
646
647 0         try:
648             # FIND
649 0             if task.get("find_params"):
650 0                 vim_images = target_vim.get_image_list(**task["find_params"])
651
652 0                 if not vim_images:
653 0                     raise NsWorkerExceptionNotFound(
654                         "Image not found with this criteria: '{}'".format(
655                             task["find_params"]
656                         )
657                     )
658 0                 elif len(vim_images) > 1:
659 0                     raise NsWorkerException(
660                         "More than one image found with this criteria: '{}'".format(
661                             task["find_params"]
662                         )
663                     )
664                 else:
665 0                     vim_image_id = vim_images[0]["id"]
666
667 0             ro_vim_item_update = {
668                 "vim_id": vim_image_id,
669                 "vim_status": "DONE",
670                 "created": created,
671                 "created_items": created_items,
672                 "vim_details": None,
673                 "vim_message": None,
674             }
675 0             self.logger.debug(
676                 "task={} {} new-image={} created={}".format(
677                     task_id, ro_task["target_id"], vim_image_id, created
678                 )
679             )
680
681 0             return "DONE", ro_vim_item_update
682 0         except (NsWorkerException, vimconn.VimConnException) as e:
683 0             self.logger.error(
684                 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
685             )
686 0             ro_vim_item_update = {
687                 "vim_status": "VIM_ERROR",
688                 "created": created,
689                 "vim_message": str(e),
690             }
691
692 0             return "FAILED", ro_vim_item_update
693
694
695 1 class VimInteractionFlavor(VimInteractionBase):
696 1     def delete(self, ro_task, task_index):
697 0         task = ro_task["tasks"][task_index]
698 0         task_id = task["task_id"]
699 0         flavor_vim_id = ro_task["vim_info"]["vim_id"]
700 0         ro_vim_item_update_ok = {
701             "vim_status": "DELETED",
702             "created": False,
703             "vim_message": "DELETED",
704             "vim_id": None,
705         }
706
707 0         try:
708 0             if flavor_vim_id:
709 0                 target_vim = self.my_vims[ro_task["target_id"]]
710 0                 target_vim.delete_flavor(flavor_vim_id)
711 0         except vimconn.VimConnNotFoundException:
712 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
713 0         except vimconn.VimConnException as e:
714 0             self.logger.error(
715                 "ro_task={} vim={} del-flavor={}: {}".format(
716                     ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
717                 )
718             )
719 0             ro_vim_item_update = {
720                 "vim_status": "VIM_ERROR",
721                 "vim_message": "Error while deleting: {}".format(e),
722             }
723
724 0             return "FAILED", ro_vim_item_update
725
726 0         self.logger.debug(
727             "task={} {} del-flavor={} {}".format(
728                 task_id,
729                 ro_task["target_id"],
730                 flavor_vim_id,
731                 ro_vim_item_update_ok.get("vim_message", ""),
732             )
733         )
734
735 0         return "DONE", ro_vim_item_update_ok
736
737 1     def new(self, ro_task, task_index, task_depends):
738 0         task = ro_task["tasks"][task_index]
739 0         task_id = task["task_id"]
740 0         created = False
741 0         created_items = {}
742 0         target_vim = self.my_vims[ro_task["target_id"]]
743
744 0         try:
745             # FIND
746 0             vim_flavor_id = None
747
748 0             if task.get("find_params"):
749 0                 try:
750 0                     flavor_data = task["find_params"]["flavor_data"]
751 0                     vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
752 0                 except vimconn.VimConnNotFoundException:
753 0                     self.logger.warning("VimConnNotFoundException occured.")
754
755 0             if not vim_flavor_id and task.get("params"):
756                 # CREATE
757 0                 flavor_data = task["params"]["flavor_data"]
758 0                 vim_flavor_id = target_vim.new_flavor(flavor_data)
759 0                 created = True
760
761 0             ro_vim_item_update = {
762                 "vim_id": vim_flavor_id,
763                 "vim_status": "DONE",
764                 "created": created,
765                 "created_items": created_items,
766                 "vim_details": None,
767                 "vim_message": None,
768             }
769 0             self.logger.debug(
770                 "task={} {} new-flavor={} created={}".format(
771                     task_id, ro_task["target_id"], vim_flavor_id, created
772                 )
773             )
774
775 0             return "DONE", ro_vim_item_update
776 0         except (vimconn.VimConnException, NsWorkerException) as e:
777 0             self.logger.error(
778                 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
779             )
780 0             ro_vim_item_update = {
781                 "vim_status": "VIM_ERROR",
782                 "created": created,
783                 "vim_message": str(e),
784             }
785
786 0             return "FAILED", ro_vim_item_update
787
788
789 1 class VimInteractionAffinityGroup(VimInteractionBase):
790 1     def delete(self, ro_task, task_index):
791 1         task = ro_task["tasks"][task_index]
792 1         task_id = task["task_id"]
793 1         affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
794 1         ro_vim_item_update_ok = {
795             "vim_status": "DELETED",
796             "created": False,
797             "vim_message": "DELETED",
798             "vim_id": None,
799         }
800
801 1         try:
802 1             if affinity_group_vim_id:
803 1                 target_vim = self.my_vims[ro_task["target_id"]]
804 1                 target_vim.delete_affinity_group(affinity_group_vim_id)
805 0         except vimconn.VimConnNotFoundException:
806 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
807 0         except vimconn.VimConnException as e:
808 0             self.logger.error(
809                 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
810                     ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
811                 )
812             )
813 0             ro_vim_item_update = {
814                 "vim_status": "VIM_ERROR",
815                 "vim_message": "Error while deleting: {}".format(e),
816             }
817
818 0             return "FAILED", ro_vim_item_update
819
820 1         self.logger.debug(
821             "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
822                 task_id,
823                 ro_task["target_id"],
824                 affinity_group_vim_id,
825                 ro_vim_item_update_ok.get("vim_message", ""),
826             )
827         )
828
829 1         return "DONE", ro_vim_item_update_ok
830
831 1     def new(self, ro_task, task_index, task_depends):
832 1         task = ro_task["tasks"][task_index]
833 1         task_id = task["task_id"]
834 1         created = False
835 1         created_items = {}
836 1         target_vim = self.my_vims[ro_task["target_id"]]
837
838 1         try:
839 1             affinity_group_vim_id = None
840 1             affinity_group_data = None
841
842 1             if task.get("params"):
843 1                 affinity_group_data = task["params"].get("affinity_group_data")
844
845 1             if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
846 0                 try:
847 0                     param_affinity_group_id = task["params"]["affinity_group_data"].get(
848                         "vim-affinity-group-id"
849                     )
850 0                     affinity_group_vim_id = target_vim.get_affinity_group(
851                         param_affinity_group_id
852                     ).get("id")
853 0                 except vimconn.VimConnNotFoundException:
854 0                     self.logger.error(
855                         "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
856                         "could not be found at VIM. Creating a new one.".format(
857                             task_id, ro_task["target_id"], param_affinity_group_id
858                         )
859                     )
860
861 1             if not affinity_group_vim_id and affinity_group_data:
862 1                 affinity_group_vim_id = target_vim.new_affinity_group(
863                     affinity_group_data
864                 )
865 1                 created = True
866
867 1             ro_vim_item_update = {
868                 "vim_id": affinity_group_vim_id,
869                 "vim_status": "DONE",
870                 "created": created,
871                 "created_items": created_items,
872                 "vim_details": None,
873                 "vim_message": None,
874             }
875 1             self.logger.debug(
876                 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
877                     task_id, ro_task["target_id"], affinity_group_vim_id, created
878                 )
879             )
880
881 1             return "DONE", ro_vim_item_update
882 0         except (vimconn.VimConnException, NsWorkerException) as e:
883 0             self.logger.error(
884                 "task={} vim={} new-affinity-or-anti-affinity-group:"
885                 " {}".format(task_id, ro_task["target_id"], e)
886             )
887 0             ro_vim_item_update = {
888                 "vim_status": "VIM_ERROR",
889                 "created": created,
890                 "vim_message": str(e),
891             }
892
893 0             return "FAILED", ro_vim_item_update
894
895
896 1 class VimInteractionUpdateVdu(VimInteractionBase):
897 1     def exec(self, ro_task, task_index, task_depends):
898 0         task = ro_task["tasks"][task_index]
899 0         task_id = task["task_id"]
900 0         db_task_update = {"retries": 0}
901 0         created = False
902 0         created_items = {}
903 0         target_vim = self.my_vims[ro_task["target_id"]]
904
905 0         try:
906 0             if task.get("params"):
907 0                 vim_vm_id = task["params"].get("vim_vm_id")
908 0                 action = task["params"].get("action")
909 0                 context = {action: action}
910 0                 target_vim.action_vminstance(vim_vm_id, context)
911                 # created = True
912 0             ro_vim_item_update = {
913                 "vim_id": vim_vm_id,
914                 "vim_status": "DONE",
915                 "created": created,
916                 "created_items": created_items,
917                 "vim_details": None,
918                 "vim_message": None,
919             }
920 0             self.logger.debug(
921                 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
922             )
923 0             return "DONE", ro_vim_item_update, db_task_update
924 0         except (vimconn.VimConnException, NsWorkerException) as e:
925 0             self.logger.error(
926                 "task={} vim={} VM Migration:"
927                 " {}".format(task_id, ro_task["target_id"], e)
928             )
929 0             ro_vim_item_update = {
930                 "vim_status": "VIM_ERROR",
931                 "created": created,
932                 "vim_message": str(e),
933             }
934
935 0             return "FAILED", ro_vim_item_update, db_task_update
936
937
938 1 class VimInteractionSdnNet(VimInteractionBase):
939 1     @staticmethod
940 1     def _match_pci(port_pci, mapping):
941         """
942         Check if port_pci matches with mapping
943         mapping can have brackets to indicate that several chars are accepted. e.g
944         pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
945         :param port_pci: text
946         :param mapping: text, can contain brackets to indicate several chars are available
947         :return: True if matches, False otherwise
948         """
949 0         if not port_pci or not mapping:
950 0             return False
951 0         if port_pci == mapping:
952 0             return True
953
954 0         mapping_index = 0
955 0         pci_index = 0
956         while True:
957 0             bracket_start = mapping.find("[", mapping_index)
958
959 0             if bracket_start == -1:
960 0                 break
961
962 0             bracket_end = mapping.find("]", bracket_start)
963 0             if bracket_end == -1:
964 0                 break
965
966 0             length = bracket_start - mapping_index
967 0             if (
968                 length
969                 and port_pci[pci_index : pci_index + length]
970                 != mapping[mapping_index:bracket_start]
971             ):
972 0                 return False
973
974 0             if (
975                 port_pci[pci_index + length]
976                 not in mapping[bracket_start + 1 : bracket_end]
977             ):
978 0                 return False
979
980 0             pci_index += length + 1
981 0             mapping_index = bracket_end + 1
982
983 0         if port_pci[pci_index:] != mapping[mapping_index:]:
984 0             return False
985
986 0         return True
987
988 1     def _get_interfaces(self, vlds_to_connect, vim_account_id):
989         """
990         :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
991         :param vim_account_id:
992         :return:
993         """
994 0         interfaces = []
995
996 0         for vld in vlds_to_connect:
997 0             table, _, db_id = vld.partition(":")
998 0             db_id, _, vld = db_id.partition(":")
999 0             _, _, vld_id = vld.partition(".")
1000
1001 0             if table == "vnfrs":
1002 0                 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1003 0                 iface_key = "vnf-vld-id"
1004             else:  # table == "nsrs"
1005 0                 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1006 0                 iface_key = "ns-vld-id"
1007
1008 0             db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1009
1010 0             for db_vnfr in db_vnfrs:
1011 0                 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1012 0                     for iface_index, interface in enumerate(vdur["interfaces"]):
1013 0                         if interface.get(iface_key) == vld_id and interface.get(
1014                             "type"
1015                         ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1016                             # only SR-IOV o PT
1017 0                             interface_ = interface.copy()
1018 0                             interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1019                                 db_vnfr["_id"], vdu_index, iface_index
1020                             )
1021
1022 0                             if vdur.get("status") == "ERROR":
1023 0                                 interface_["status"] = "ERROR"
1024
1025 0                             interfaces.append(interface_)
1026
1027 0         return interfaces
1028
1029 1     def refresh(self, ro_task):
1030         # look for task create
1031 0         task_create_index, _ = next(
1032             i_t
1033             for i_t in enumerate(ro_task["tasks"])
1034             if i_t[1]
1035             and i_t[1]["action"] == "CREATE"
1036             and i_t[1]["status"] != "FINISHED"
1037         )
1038
1039 0         return self.new(ro_task, task_create_index, None)
1040
1041 1     def new(self, ro_task, task_index, task_depends):
1042
1043 0         task = ro_task["tasks"][task_index]
1044 0         task_id = task["task_id"]
1045 0         target_vim = self.my_vims[ro_task["target_id"]]
1046
1047 0         sdn_net_id = ro_task["vim_info"]["vim_id"]
1048
1049 0         created_items = ro_task["vim_info"].get("created_items")
1050 0         connected_ports = ro_task["vim_info"].get("connected_ports", [])
1051 0         new_connected_ports = []
1052 0         last_update = ro_task["vim_info"].get("last_update", 0)
1053 0         sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1054 0         error_list = []
1055 0         created = ro_task["vim_info"].get("created", False)
1056
1057 0         try:
1058             # CREATE
1059 0             params = task["params"]
1060 0             vlds_to_connect = params.get("vlds", [])
1061 0             associated_vim = params.get("target_vim")
1062             # external additional ports
1063 0             additional_ports = params.get("sdn-ports") or ()
1064 0             _, _, vim_account_id = (
1065                 (None, None, None)
1066                 if associated_vim is None
1067                 else associated_vim.partition(":")
1068             )
1069
1070 0             if associated_vim:
1071                 # get associated VIM
1072 0                 if associated_vim not in self.db_vims:
1073 0                     self.db_vims[associated_vim] = self.db.get_one(
1074                         "vim_accounts", {"_id": vim_account_id}
1075                     )
1076
1077 0                 db_vim = self.db_vims[associated_vim]
1078
1079             # look for ports to connect
1080 0             ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1081             # print(ports)
1082
1083 0             sdn_ports = []
1084 0             pending_ports = error_ports = 0
1085 0             vlan_used = None
1086 0             sdn_need_update = False
1087
1088 0             for port in ports:
1089 0                 vlan_used = port.get("vlan") or vlan_used
1090
1091                 # TODO. Do not connect if already done
1092 0                 if not port.get("compute_node") or not port.get("pci"):
1093 0                     if port.get("status") == "ERROR":
1094 0                         error_ports += 1
1095                     else:
1096 0                         pending_ports += 1
1097 0                     continue
1098
1099 0                 pmap = None
1100 0                 compute_node_mappings = next(
1101                     (
1102                         c
1103                         for c in db_vim["config"].get("sdn-port-mapping", ())
1104                         if c and c["compute_node"] == port["compute_node"]
1105                     ),
1106                     None,
1107                 )
1108
1109 0                 if compute_node_mappings:
1110                     # process port_mapping pci of type 0000:af:1[01].[1357]
1111 0                     pmap = next(
1112                         (
1113                             p
1114                             for p in compute_node_mappings["ports"]
1115                             if self._match_pci(port["pci"], p.get("pci"))
1116                         ),
1117                         None,
1118                     )
1119
1120 0                 if not pmap:
1121 0                     if not db_vim["config"].get("mapping_not_needed"):
1122 0                         error_list.append(
1123                             "Port mapping not found for compute_node={} pci={}".format(
1124                                 port["compute_node"], port["pci"]
1125                             )
1126                         )
1127 0                         continue
1128
1129 0                     pmap = {}
1130
1131 0                 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1132 0                 new_port = {
1133                     "service_endpoint_id": pmap.get("service_endpoint_id")
1134                     or service_endpoint_id,
1135                     "service_endpoint_encapsulation_type": "dot1q"
1136                     if port["type"] == "SR-IOV"
1137                     else None,
1138                     "service_endpoint_encapsulation_info": {
1139                         "vlan": port.get("vlan"),
1140                         "mac": port.get("mac-address"),
1141                         "device_id": pmap.get("device_id") or port["compute_node"],
1142                         "device_interface_id": pmap.get("device_interface_id")
1143                         or port["pci"],
1144                         "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1145                         "switch_port": pmap.get("switch_port"),
1146                         "service_mapping_info": pmap.get("service_mapping_info"),
1147                     },
1148                 }
1149
1150                 # TODO
1151                 # if port["modified_at"] > last_update:
1152                 #     sdn_need_update = True
1153 0                 new_connected_ports.append(port["id"])  # TODO
1154 0                 sdn_ports.append(new_port)
1155
1156 0             if error_ports:
1157 0                 error_list.append(
1158                     "{} interfaces have not been created as VDU is on ERROR status".format(
1159                         error_ports
1160                     )
1161                 )
1162
1163             # connect external ports
1164 0             for index, additional_port in enumerate(additional_ports):
1165 0                 additional_port_id = additional_port.get(
1166                     "service_endpoint_id"
1167                 ) or "external-{}".format(index)
1168 0                 sdn_ports.append(
1169                     {
1170                         "service_endpoint_id": additional_port_id,
1171                         "service_endpoint_encapsulation_type": additional_port.get(
1172                             "service_endpoint_encapsulation_type", "dot1q"
1173                         ),
1174                         "service_endpoint_encapsulation_info": {
1175                             "vlan": additional_port.get("vlan") or vlan_used,
1176                             "mac": additional_port.get("mac_address"),
1177                             "device_id": additional_port.get("device_id"),
1178                             "device_interface_id": additional_port.get(
1179                                 "device_interface_id"
1180                             ),
1181                             "switch_dpid": additional_port.get("switch_dpid")
1182                             or additional_port.get("switch_id"),
1183                             "switch_port": additional_port.get("switch_port"),
1184                             "service_mapping_info": additional_port.get(
1185                                 "service_mapping_info"
1186                             ),
1187                         },
1188                     }
1189                 )
1190 0                 new_connected_ports.append(additional_port_id)
1191 0             sdn_info = ""
1192
1193             # if there are more ports to connect or they have been modified, call create/update
1194 0             if error_list:
1195 0                 sdn_status = "ERROR"
1196 0                 sdn_info = "; ".join(error_list)
1197 0             elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1198 0                 last_update = time.time()
1199
1200 0                 if not sdn_net_id:
1201 0                     if len(sdn_ports) < 2:
1202 0                         sdn_status = "ACTIVE"
1203
1204 0                         if not pending_ports:
1205 0                             self.logger.debug(
1206                                 "task={} {} new-sdn-net done, less than 2 ports".format(
1207                                     task_id, ro_task["target_id"]
1208                                 )
1209                             )
1210                     else:
1211 0                         net_type = params.get("type") or "ELAN"
1212 0                         (
1213                             sdn_net_id,
1214                             created_items,
1215                         ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1216 0                         created = True
1217 0                         self.logger.debug(
1218                             "task={} {} new-sdn-net={} created={}".format(
1219                                 task_id, ro_task["target_id"], sdn_net_id, created
1220                             )
1221                         )
1222                 else:
1223 0                     created_items = target_vim.edit_connectivity_service(
1224                         sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1225                     )
1226 0                     created = True
1227 0                     self.logger.debug(
1228                         "task={} {} update-sdn-net={} created={}".format(
1229                             task_id, ro_task["target_id"], sdn_net_id, created
1230                         )
1231                     )
1232
1233 0                 connected_ports = new_connected_ports
1234 0             elif sdn_net_id:
1235 0                 wim_status_dict = target_vim.get_connectivity_service_status(
1236                     sdn_net_id, conn_info=created_items
1237                 )
1238 0                 sdn_status = wim_status_dict["sdn_status"]
1239
1240 0                 if wim_status_dict.get("sdn_info"):
1241 0                     sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1242
1243 0                 if wim_status_dict.get("error_msg"):
1244 0                     sdn_info = wim_status_dict.get("error_msg") or ""
1245
1246 0             if pending_ports:
1247 0                 if sdn_status != "ERROR":
1248 0                     sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1249                         len(ports) - pending_ports, len(ports)
1250                     )
1251
1252 0                 if sdn_status == "ACTIVE":
1253 0                     sdn_status = "BUILD"
1254
1255 0             ro_vim_item_update = {
1256                 "vim_id": sdn_net_id,
1257                 "vim_status": sdn_status,
1258                 "created": created,
1259                 "created_items": created_items,
1260                 "connected_ports": connected_ports,
1261                 "vim_details": sdn_info,
1262                 "vim_message": None,
1263                 "last_update": last_update,
1264             }
1265
1266 0             return sdn_status, ro_vim_item_update
1267 0         except Exception as e:
1268 0             self.logger.error(
1269                 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1270                 exc_info=not isinstance(
1271                     e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1272                 ),
1273             )
1274 0             ro_vim_item_update = {
1275                 "vim_status": "VIM_ERROR",
1276                 "created": created,
1277                 "vim_message": str(e),
1278             }
1279
1280 0             return "FAILED", ro_vim_item_update
1281
1282 1     def delete(self, ro_task, task_index):
1283 0         task = ro_task["tasks"][task_index]
1284 0         task_id = task["task_id"]
1285 0         sdn_vim_id = ro_task["vim_info"].get("vim_id")
1286 0         ro_vim_item_update_ok = {
1287             "vim_status": "DELETED",
1288             "created": False,
1289             "vim_message": "DELETED",
1290             "vim_id": None,
1291         }
1292
1293 0         try:
1294 0             if sdn_vim_id:
1295 0                 target_vim = self.my_vims[ro_task["target_id"]]
1296 0                 target_vim.delete_connectivity_service(
1297                     sdn_vim_id, ro_task["vim_info"].get("created_items")
1298                 )
1299
1300 0         except Exception as e:
1301 0             if (
1302                 isinstance(e, sdnconn.SdnConnectorError)
1303                 and e.http_code == HTTPStatus.NOT_FOUND.value
1304             ):
1305 0                 ro_vim_item_update_ok["vim_message"] = "already deleted"
1306             else:
1307 0                 self.logger.error(
1308                     "ro_task={} vim={} del-sdn-net={}: {}".format(
1309                         ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1310                     ),
1311                     exc_info=not isinstance(
1312                         e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1313                     ),
1314                 )
1315 0                 ro_vim_item_update = {
1316                     "vim_status": "VIM_ERROR",
1317                     "vim_message": "Error while deleting: {}".format(e),
1318                 }
1319
1320 0                 return "FAILED", ro_vim_item_update
1321
1322 0         self.logger.debug(
1323             "task={} {} del-sdn-net={} {}".format(
1324                 task_id,
1325                 ro_task["target_id"],
1326                 sdn_vim_id,
1327                 ro_vim_item_update_ok.get("vim_message", ""),
1328             )
1329         )
1330
1331 0         return "DONE", ro_vim_item_update_ok
1332
1333
1334 1 class VimInteractionMigration(VimInteractionBase):
1335 1     def exec(self, ro_task, task_index, task_depends):
1336 1         task = ro_task["tasks"][task_index]
1337 1         task_id = task["task_id"]
1338 1         db_task_update = {"retries": 0}
1339 1         target_vim = self.my_vims[ro_task["target_id"]]
1340 1         vim_interfaces = []
1341 1         created = False
1342 1         created_items = {}
1343 1         refreshed_vim_info = {}
1344
1345 1         try:
1346 1             if task.get("params"):
1347 1                 vim_vm_id = task["params"].get("vim_vm_id")
1348 1                 migrate_host = task["params"].get("migrate_host")
1349 1                 _, migrated_compute_node = target_vim.migrate_instance(
1350                     vim_vm_id, migrate_host
1351                 )
1352
1353 1                 if migrated_compute_node:
1354                     # When VM is migrated, vdu["vim_info"] needs to be updated
1355 1                     vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1356                         ro_task["target_id"]
1357                     )
1358
1359                     # Refresh VM to get new vim_info
1360 1                     vm_to_refresh_list = [vim_vm_id]
1361 1                     vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1362 1                     refreshed_vim_info = vim_dict[vim_vm_id]
1363
1364 1                     if refreshed_vim_info.get("interfaces"):
1365 1                         for old_iface in vdu_old_vim_info.get("interfaces"):
1366 0                             iface = next(
1367                                 (
1368                                     iface
1369                                     for iface in refreshed_vim_info["interfaces"]
1370                                     if old_iface["vim_interface_id"]
1371                                     == iface["vim_interface_id"]
1372                                 ),
1373                                 None,
1374                             )
1375 0                             vim_interfaces.append(iface)
1376
1377 1             ro_vim_item_update = {
1378                 "vim_id": vim_vm_id,
1379                 "vim_status": "ACTIVE",
1380                 "created": created,
1381                 "created_items": created_items,
1382                 "vim_details": None,
1383                 "vim_message": None,
1384             }
1385
1386 1             if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1387                 "ERROR",
1388                 "VIM_ERROR",
1389             ):
1390 1                 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1391
1392 1             if vim_interfaces:
1393 0                 ro_vim_item_update["interfaces"] = vim_interfaces
1394
1395 1             self.logger.debug(
1396                 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1397             )
1398
1399 1             return "DONE", ro_vim_item_update, db_task_update
1400
1401 0         except (vimconn.VimConnException, NsWorkerException) as e:
1402 0             self.logger.error(
1403                 "task={} vim={} VM Migration:"
1404                 " {}".format(task_id, ro_task["target_id"], e)
1405             )
1406 0             ro_vim_item_update = {
1407                 "vim_status": "VIM_ERROR",
1408                 "created": created,
1409                 "vim_message": str(e),
1410             }
1411
1412 0             return "FAILED", ro_vim_item_update, db_task_update
1413
1414
1415 1 class VimInteractionResize(VimInteractionBase):
1416 1     def exec(self, ro_task, task_index, task_depends):
1417 1         task = ro_task["tasks"][task_index]
1418 1         task_id = task["task_id"]
1419 1         db_task_update = {"retries": 0}
1420 1         created = False
1421 1         target_flavor_uuid = None
1422 1         created_items = {}
1423 1         refreshed_vim_info = {}
1424 1         target_vim = self.my_vims[ro_task["target_id"]]
1425
1426 1         try:
1427 1             if task.get("params"):
1428 1                 vim_vm_id = task["params"].get("vim_vm_id")
1429 1                 flavor_dict = task["params"].get("flavor_dict")
1430 1                 self.logger.info("flavor_dict %s", flavor_dict)
1431
1432 1                 try:
1433 1                     target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1434 0                 except Exception as e:
1435 0                     self.logger.info("Cannot find any flavor matching  %s.", str(e))
1436 0                     try:
1437 0                         target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1438 0                     except Exception as e:
1439 0                         self.logger.error("Error creating flavor at VIM  %s.", str(e))
1440
1441 1                 if target_flavor_uuid is not None:
1442 1                     resized_status = target_vim.resize_instance(
1443                         vim_vm_id, target_flavor_uuid
1444                     )
1445
1446 1                     if resized_status:
1447                         # Refresh VM to get new vim_info
1448 1                         vm_to_refresh_list = [vim_vm_id]
1449 1                         vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1450 1                         refreshed_vim_info = vim_dict[vim_vm_id]
1451
1452 1             ro_vim_item_update = {
1453                 "vim_id": vim_vm_id,
1454                 "vim_status": "DONE",
1455                 "created": created,
1456                 "created_items": created_items,
1457                 "vim_details": None,
1458                 "vim_message": None,
1459             }
1460
1461 1             if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1462                 "ERROR",
1463                 "VIM_ERROR",
1464             ):
1465 1                 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1466
1467 1             self.logger.debug(
1468                 "task={} {} resize done".format(task_id, ro_task["target_id"])
1469             )
1470 1             return "DONE", ro_vim_item_update, db_task_update
1471 0         except (vimconn.VimConnException, NsWorkerException) as e:
1472 0             self.logger.error(
1473                 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1474             )
1475 0             ro_vim_item_update = {
1476                 "vim_status": "VIM_ERROR",
1477                 "created": created,
1478                 "vim_message": str(e),
1479             }
1480
1481 0             return "FAILED", ro_vim_item_update, db_task_update
1482
1483
1484 1 class ConfigValidate:
1485 1     def __init__(self, config: Dict):
1486 1         self.conf = config
1487
1488 1     @property
1489 1     def active(self):
1490         # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1491 1         if (
1492             self.conf["period"]["refresh_active"] >= 60
1493             or self.conf["period"]["refresh_active"] == -1
1494         ):
1495 1             return self.conf["period"]["refresh_active"]
1496
1497 1         return 60
1498
1499 1     @property
1500 1     def build(self):
1501 1         return self.conf["period"]["refresh_build"]
1502
1503 1     @property
1504 1     def image(self):
1505 1         return self.conf["period"]["refresh_image"]
1506
1507 1     @property
1508 1     def error(self):
1509 1         return self.conf["period"]["refresh_error"]
1510
1511 1     @property
1512 1     def queue_size(self):
1513 1         return self.conf["period"]["queue_size"]
1514
1515
1516 1 class NsWorker(threading.Thread):
1517 1     def __init__(self, worker_index, config, plugins, db):
1518         """
1519         :param worker_index: thread index
1520         :param config: general configuration of RO, among others the process_id with the docker id where it runs
1521         :param plugins: global shared dict with the loaded plugins
1522         :param db: database class instance to use
1523         """
1524 1         threading.Thread.__init__(self)
1525 1         self.config = config
1526 1         self.plugins = plugins
1527 1         self.plugin_name = "unknown"
1528 1         self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1529 1         self.worker_index = worker_index
1530         # refresh periods for created items
1531 1         self.refresh_config = ConfigValidate(config)
1532 1         self.task_queue = queue.Queue(self.refresh_config.queue_size)
1533         # targetvim: vimplugin class
1534 1         self.my_vims = {}
1535         # targetvim: vim information from database
1536 1         self.db_vims = {}
1537         # targetvim list
1538 1         self.vim_targets = []
1539 1         self.my_id = config["process_id"] + ":" + str(worker_index)
1540 1         self.db = db
1541 1         self.item2class = {
1542             "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1543             "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1544             "image": VimInteractionImage(
1545                 self.db, self.my_vims, self.db_vims, self.logger
1546             ),
1547             "flavor": VimInteractionFlavor(
1548                 self.db, self.my_vims, self.db_vims, self.logger
1549             ),
1550             "sdn_net": VimInteractionSdnNet(
1551                 self.db, self.my_vims, self.db_vims, self.logger
1552             ),
1553             "update": VimInteractionUpdateVdu(
1554                 self.db, self.my_vims, self.db_vims, self.logger
1555             ),
1556             "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1557                 self.db, self.my_vims, self.db_vims, self.logger
1558             ),
1559             "migrate": VimInteractionMigration(
1560                 self.db, self.my_vims, self.db_vims, self.logger
1561             ),
1562             "verticalscale": VimInteractionResize(
1563                 self.db, self.my_vims, self.db_vims, self.logger
1564             ),
1565         }
1566 1         self.time_last_task_processed = None
1567         # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1568 1         self.tasks_to_delete = []
1569         # it is idle when there are not vim_targets associated
1570 1         self.idle = True
1571 1         self.task_locked_time = config["global"]["task_locked_time"]
1572
1573 1     def insert_task(self, task):
1574 0         try:
1575 0             self.task_queue.put(task, False)
1576 0             return None
1577 0         except queue.Full:
1578 0             raise NsWorkerException("timeout inserting a task")
1579
1580 1     def terminate(self):
1581 0         self.insert_task("exit")
1582
1583 1     def del_task(self, task):
1584 0         with self.task_lock:
1585 0             if task["status"] == "SCHEDULED":
1586 0                 task["status"] = "SUPERSEDED"
1587 0                 return True
1588             else:  # task["status"] == "processing"
1589 0                 self.task_lock.release()
1590 0                 return False
1591
1592 1     def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
1593         """
1594         Process vim config, creating vim configuration files as ca_cert
1595         :param target_id: vim/sdn/wim + id
1596         :param db_vim: Vim dictionary obtained from database
1597         :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1598         """
1599 1         if not db_vim.get("config"):
1600 0             return
1601
1602 1         file_name = ""
1603 1         work_dir = "/app/osm_ro/certs"
1604
1605 1         try:
1606 1             if db_vim["config"].get("ca_cert_content"):
1607 1                 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
1608
1609 1                 if not path.isdir(file_name):
1610 1                     makedirs(file_name)
1611
1612 1                 file_name = file_name + "/ca_cert"
1613
1614 1                 with open(file_name, "w") as f:
1615 1                     f.write(db_vim["config"]["ca_cert_content"])
1616 1                     del db_vim["config"]["ca_cert_content"]
1617 1                     db_vim["config"]["ca_cert"] = file_name
1618 1         except Exception as e:
1619 1             raise NsWorkerException(
1620                 "Error writing to file '{}': {}".format(file_name, e)
1621             )
1622
1623 1     def _load_plugin(self, name, type="vim"):
1624         # type can be vim or sdn
1625 0         if "rovim_dummy" not in self.plugins:
1626 0             self.plugins["rovim_dummy"] = VimDummyConnector
1627
1628 0         if "rosdn_dummy" not in self.plugins:
1629 0             self.plugins["rosdn_dummy"] = SdnDummyConnector
1630
1631 0         if name in self.plugins:
1632 0             return self.plugins[name]
1633
1634 0         try:
1635 0             for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1636 0                 self.plugins[name] = ep.load()
1637 0         except Exception as e:
1638 0             raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1639
1640 0         if name and name not in self.plugins:
1641 0             raise NsWorkerException(
1642                 "Plugin 'osm_{n}' has not been installed".format(n=name)
1643             )
1644
1645 0         return self.plugins[name]
1646
1647 1     def _unload_vim(self, target_id):
1648         """
1649         Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1650         :param target_id: Contains type:_id; where type can be 'vim', ...
1651         :return: None.
1652         """
1653 0         try:
1654 0             self.db_vims.pop(target_id, None)
1655 0             self.my_vims.pop(target_id, None)
1656
1657 0             if target_id in self.vim_targets:
1658 0                 self.vim_targets.remove(target_id)
1659
1660 0             self.logger.info("Unloaded {}".format(target_id))
1661 0             rmtree("{}:{}".format(target_id, self.worker_index))
1662 0         except FileNotFoundError:
1663             # This is raised by rmtree if folder does not exist.
1664 0             self.logger.exception("FileNotFoundError occured while unloading VIM.")
1665 0         except Exception as e:
1666 0             self.logger.error("Cannot unload {}: {}".format(target_id, e))
1667
1668 1     def _check_vim(self, target_id):
1669         """
1670         Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1671         :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1672         :return: None.
1673         """
1674 0         target, _, _id = target_id.partition(":")
1675 0         now = time.time()
1676 0         update_dict = {}
1677 0         unset_dict = {}
1678 0         op_text = ""
1679 0         step = ""
1680 0         loaded = target_id in self.vim_targets
1681 0         target_database = (
1682             "vim_accounts"
1683             if target == "vim"
1684             else "wim_accounts"
1685             if target == "wim"
1686             else "sdns"
1687         )
1688
1689 0         try:
1690 0             step = "Getting {} from db".format(target_id)
1691 0             db_vim = self.db.get_one(target_database, {"_id": _id})
1692
1693 0             for op_index, operation in enumerate(
1694                 db_vim["_admin"].get("operations", ())
1695             ):
1696 0                 if operation["operationState"] != "PROCESSING":
1697 0                     continue
1698
1699 0                 locked_at = operation.get("locked_at")
1700
1701 0                 if locked_at is not None and locked_at >= now - self.task_locked_time:
1702                     # some other thread is doing this operation
1703 0                     return
1704
1705                 # lock
1706 0                 op_text = "_admin.operations.{}.".format(op_index)
1707
1708 0                 if not self.db.set_one(
1709                     target_database,
1710                     q_filter={
1711                         "_id": _id,
1712                         op_text + "operationState": "PROCESSING",
1713                         op_text + "locked_at": locked_at,
1714                     },
1715                     update_dict={
1716                         op_text + "locked_at": now,
1717                         "admin.current_operation": op_index,
1718                     },
1719                     fail_on_empty=False,
1720                 ):
1721 0                     return
1722
1723 0                 unset_dict[op_text + "locked_at"] = None
1724 0                 unset_dict["current_operation"] = None
1725 0                 step = "Loading " + target_id
1726 0                 error_text = self._load_vim(target_id)
1727
1728 0                 if not error_text:
1729 0                     step = "Checking connectivity"
1730
1731 0                     if target == "vim":
1732 0                         self.my_vims[target_id].check_vim_connectivity()
1733                     else:
1734 0                         self.my_vims[target_id].check_credentials()
1735
1736 0                 update_dict["_admin.operationalState"] = "ENABLED"
1737 0                 update_dict["_admin.detailed-status"] = ""
1738 0                 unset_dict[op_text + "detailed-status"] = None
1739 0                 update_dict[op_text + "operationState"] = "COMPLETED"
1740
1741 0                 return
1742
1743 0         except Exception as e:
1744 0             error_text = "{}: {}".format(step, e)
1745 0             self.logger.error("{} for {}: {}".format(step, target_id, e))
1746
1747         finally:
1748 0             if update_dict or unset_dict:
1749 0                 if error_text:
1750 0                     update_dict[op_text + "operationState"] = "FAILED"
1751 0                     update_dict[op_text + "detailed-status"] = error_text
1752 0                     unset_dict.pop(op_text + "detailed-status", None)
1753 0                     update_dict["_admin.operationalState"] = "ERROR"
1754 0                     update_dict["_admin.detailed-status"] = error_text
1755
1756 0                 if op_text:
1757 0                     update_dict[op_text + "statusEnteredTime"] = now
1758
1759 0                 self.db.set_one(
1760                     target_database,
1761                     q_filter={"_id": _id},
1762                     update_dict=update_dict,
1763                     unset=unset_dict,
1764                     fail_on_empty=False,
1765                 )
1766
1767 0             if not loaded:
1768 0                 self._unload_vim(target_id)
1769
1770 1     def _reload_vim(self, target_id):
1771 0         if target_id in self.vim_targets:
1772 0             self._load_vim(target_id)
1773         else:
1774             # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1775             # just remove it to force load again next time it is needed
1776 0             self.db_vims.pop(target_id, None)
1777
1778 1     def _load_vim(self, target_id):
1779         """
1780         Load or reload a vim_account, sdn_controller or wim_account.
1781         Read content from database, load the plugin if not loaded.
1782         In case of error loading the plugin, it load a failing VIM_connector
1783         It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1784         :param target_id: Contains type:_id; where type can be 'vim', ...
1785         :return: None if ok, descriptive text if error
1786         """
1787 0         target, _, _id = target_id.partition(":")
1788 0         target_database = (
1789             "vim_accounts"
1790             if target == "vim"
1791             else "wim_accounts"
1792             if target == "wim"
1793             else "sdns"
1794         )
1795 0         plugin_name = ""
1796 0         vim = None
1797
1798 0         try:
1799 0             step = "Getting {}={} from db".format(target, _id)
1800             # TODO process for wim, sdnc, ...
1801 0             vim = self.db.get_one(target_database, {"_id": _id})
1802
1803             # if deep_get(vim, "config", "sdn-controller"):
1804             #     step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1805             #     db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1806
1807 0             step = "Decrypting password"
1808 0             schema_version = vim.get("schema_version")
1809 0             self.db.encrypt_decrypt_fields(
1810                 vim,
1811                 "decrypt",
1812                 fields=("password", "secret"),
1813                 schema_version=schema_version,
1814                 salt=_id,
1815             )
1816 0             self._process_vim_config(target_id, vim)
1817
1818 0             if target == "vim":
1819 0                 plugin_name = "rovim_" + vim["vim_type"]
1820 0                 step = "Loading plugin '{}'".format(plugin_name)
1821 0                 vim_module_conn = self._load_plugin(plugin_name)
1822 0                 step = "Loading {}'".format(target_id)
1823 0                 self.my_vims[target_id] = vim_module_conn(
1824                     uuid=vim["_id"],
1825                     name=vim["name"],
1826                     tenant_id=vim.get("vim_tenant_id"),
1827                     tenant_name=vim.get("vim_tenant_name"),
1828                     url=vim["vim_url"],
1829                     url_admin=None,
1830                     user=vim["vim_user"],
1831                     passwd=vim["vim_password"],
1832                     config=vim.get("config") or {},
1833                     persistent_info={},
1834                 )
1835             else:  # sdn
1836 0                 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1837 0                 step = "Loading plugin '{}'".format(plugin_name)
1838 0                 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1839 0                 step = "Loading {}'".format(target_id)
1840 0                 wim = deepcopy(vim)
1841 0                 wim_config = wim.pop("config", {}) or {}
1842 0                 wim["uuid"] = wim["_id"]
1843 0                 if "url" in wim and "wim_url" not in wim:
1844 0                     wim["wim_url"] = wim["url"]
1845 0                 elif "url" not in wim and "wim_url" in wim:
1846 0                     wim["url"] = wim["wim_url"]
1847
1848 0                 if wim.get("dpid"):
1849 0                     wim_config["dpid"] = wim.pop("dpid")
1850
1851 0                 if wim.get("switch_id"):
1852 0                     wim_config["switch_id"] = wim.pop("switch_id")
1853
1854                 # wim, wim_account, config
1855 0                 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1856 0             self.db_vims[target_id] = vim
1857 0             self.error_status = None
1858
1859 0             self.logger.info(
1860                 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1861             )
1862 0         except Exception as e:
1863 0             self.logger.error(
1864                 "Cannot load {} plugin={}: {} {}".format(
1865                     target_id, plugin_name, step, e
1866                 )
1867             )
1868
1869 0             self.db_vims[target_id] = vim or {}
1870 0             self.db_vims[target_id] = FailingConnector(str(e))
1871 0             error_status = "{} Error: {}".format(step, e)
1872
1873 0             return error_status
1874         finally:
1875 0             if target_id not in self.vim_targets:
1876 0                 self.vim_targets.append(target_id)
1877
1878 1     def _get_db_task(self):
1879         """
1880         Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1881         :return: None
1882         """
1883 0         now = time.time()
1884
1885 0         if not self.time_last_task_processed:
1886 0             self.time_last_task_processed = now
1887
1888 0         try:
1889             while True:
1890                 """
1891                 # Log RO tasks only when loglevel is DEBUG
1892                 if self.logger.getEffectiveLevel() == logging.DEBUG:
1893                     self._log_ro_task(
1894                         None,
1895                         None,
1896                         None,
1897                         "TASK_WF",
1898                         "task_locked_time="
1899                         + str(self.task_locked_time)
1900                         + " "
1901                         + "time_last_task_processed="
1902                         + str(self.time_last_task_processed)
1903                         + " "
1904                         + "now="
1905                         + str(now),
1906                     )
1907                 """
1908 0                 locked = self.db.set_one(
1909                     "ro_tasks",
1910                     q_filter={
1911                         "target_id": self.vim_targets,
1912                         "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1913                         "locked_at.lt": now - self.task_locked_time,
1914                         "to_check_at.lt": self.time_last_task_processed,
1915                         "to_check_at.gt": -1,
1916                     },
1917                     update_dict={"locked_by": self.my_id, "locked_at": now},
1918                     fail_on_empty=False,
1919                 )
1920
1921 0                 if locked:
1922                     # read and return
1923 0                     ro_task = self.db.get_one(
1924                         "ro_tasks",
1925                         q_filter={
1926                             "target_id": self.vim_targets,
1927                             "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1928                             "locked_at": now,
1929                         },
1930                     )
1931 0                     return ro_task
1932
1933 0                 if self.time_last_task_processed == now:
1934 0                     self.time_last_task_processed = None
1935 0                     return None
1936                 else:
1937 0                     self.time_last_task_processed = now
1938                     # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1939
1940 0         except DbException as e:
1941 0             self.logger.error("Database exception at _get_db_task: {}".format(e))
1942 0         except Exception as e:
1943 0             self.logger.critical(
1944                 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1945             )
1946
1947 0         return None
1948
1949 1     def _get_db_all_tasks(self):
1950         """
1951         Read all content of table ro_tasks to log it
1952         :return: None
1953         """
1954 0         try:
1955             # Checking the content of the BD:
1956
1957             # read and return
1958 0             ro_task = self.db.get_list("ro_tasks")
1959 0             for rt in ro_task:
1960 0                 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1961 0             return ro_task
1962
1963 0         except DbException as e:
1964 0             self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1965 0         except Exception as e:
1966 0             self.logger.critical(
1967                 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1968             )
1969
1970 0         return None
1971
1972 1     def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1973         """
1974         Generate a log with the following format:
1975
1976         Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1977         target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1978         task_array_index;task_id;task_action;task_item;task_args
1979
1980         Example:
1981
1982         TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1983         1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1984         ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1985         'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1986         'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1987         888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1988         CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1989         """
1990 0         try:
1991 0             line = []
1992 0             i = 0
1993 0             if ro_task is not None and isinstance(ro_task, dict):
1994 0                 for t in ro_task["tasks"]:
1995 0                     line.clear()
1996 0                     line.append(mark)
1997 0                     line.append(event)
1998 0                     line.append(ro_task.get("_id", ""))
1999 0                     line.append(str(ro_task.get("locked_at", "")))
2000 0                     line.append(str(ro_task.get("modified_at", "")))
2001 0                     line.append(str(ro_task.get("created_at", "")))
2002 0                     line.append(str(ro_task.get("to_check_at", "")))
2003 0                     line.append(str(ro_task.get("locked_by", "")))
2004 0                     line.append(str(ro_task.get("target_id", "")))
2005 0                     line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
2006 0                     line.append(str(ro_task.get("vim_info", "")))
2007 0                     line.append(str(ro_task.get("tasks", "")))
2008 0                     if isinstance(t, dict):
2009 0                         line.append(str(t.get("status", "")))
2010 0                         line.append(str(t.get("action_id", "")))
2011 0                         line.append(str(i))
2012 0                         line.append(str(t.get("task_id", "")))
2013 0                         line.append(str(t.get("action", "")))
2014 0                         line.append(str(t.get("item", "")))
2015 0                         line.append(str(t.get("find_params", "")))
2016 0                         line.append(str(t.get("params", "")))
2017                     else:
2018 0                         line.extend([""] * 2)
2019 0                         line.append(str(i))
2020 0                         line.extend([""] * 5)
2021
2022 0                     i += 1
2023 0                     self.logger.debug(";".join(line))
2024 0             elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
2025 0                 i = 0
2026                 while True:
2027 0                     st = "tasks.{}.status".format(i)
2028 0                     if st not in db_ro_task_update:
2029 0                         break
2030 0                     line.clear()
2031 0                     line.append(mark)
2032 0                     line.append(event)
2033 0                     line.append(db_ro_task_update.get("_id", ""))
2034 0                     line.append(str(db_ro_task_update.get("locked_at", "")))
2035 0                     line.append(str(db_ro_task_update.get("modified_at", "")))
2036 0                     line.append("")
2037 0                     line.append(str(db_ro_task_update.get("to_check_at", "")))
2038 0                     line.append(str(db_ro_task_update.get("locked_by", "")))
2039 0                     line.append("")
2040 0                     line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
2041 0                     line.append("")
2042 0                     line.append(str(db_ro_task_update.get("vim_info", "")))
2043 0                     line.append(str(str(db_ro_task_update).count(".status")))
2044 0                     line.append(db_ro_task_update.get(st, ""))
2045 0                     line.append("")
2046 0                     line.append(str(i))
2047 0                     line.extend([""] * 3)
2048 0                     i += 1
2049 0                     self.logger.debug(";".join(line))
2050
2051 0             elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
2052 0                 line.clear()
2053 0                 line.append(mark)
2054 0                 line.append(event)
2055 0                 line.append(db_ro_task_delete.get("_id", ""))
2056 0                 line.append("")
2057 0                 line.append(db_ro_task_delete.get("modified_at", ""))
2058 0                 line.extend([""] * 13)
2059 0                 self.logger.debug(";".join(line))
2060
2061             else:
2062 0                 line.clear()
2063 0                 line.append(mark)
2064 0                 line.append(event)
2065 0                 line.extend([""] * 16)
2066 0                 self.logger.debug(";".join(line))
2067
2068 0         except Exception as e:
2069 0             self.logger.error("Error logging ro_task: {}".format(e))
2070
2071 1     def _delete_task(self, ro_task, task_index, task_depends, db_update):
2072         """
2073         Determine if this task need to be done or superseded
2074         :return: None
2075         """
2076 0         my_task = ro_task["tasks"][task_index]
2077 0         task_id = my_task["task_id"]
2078 0         needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2079             "created_items", False
2080         )
2081
2082 0         self.logger.debug("Needed delete: {}".format(needed_delete))
2083 0         if my_task["status"] == "FAILED":
2084 0             return None, None  # TODO need to be retry??
2085
2086 0         try:
2087 0             for index, task in enumerate(ro_task["tasks"]):
2088 0                 if index == task_index or not task:
2089 0                     continue  # own task
2090
2091 0                 if (
2092                     my_task["target_record"] == task["target_record"]
2093                     and task["action"] == "CREATE"
2094                 ):
2095                     # set to finished
2096 0                     db_update["tasks.{}.status".format(index)] = task[
2097                         "status"
2098                     ] = "FINISHED"
2099 0                 elif task["action"] == "CREATE" and task["status"] not in (
2100                     "FINISHED",
2101                     "SUPERSEDED",
2102                 ):
2103 0                     needed_delete = False
2104
2105 0             if needed_delete:
2106 0                 self.logger.debug(
2107                     "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2108                 )
2109 0                 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2110             else:
2111 0                 return "SUPERSEDED", None
2112 0         except Exception as e:
2113 0             if not isinstance(e, NsWorkerException):
2114 0                 self.logger.critical(
2115                     "Unexpected exception at _delete_task task={}: {}".format(
2116                         task_id, e
2117                     ),
2118                     exc_info=True,
2119                 )
2120
2121 0             return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2122
2123 1     def _create_task(self, ro_task, task_index, task_depends, db_update):
2124         """
2125         Determine if this task need to create something at VIM
2126         :return: None
2127         """
2128 0         my_task = ro_task["tasks"][task_index]
2129 0         task_id = my_task["task_id"]
2130 0         task_status = None
2131
2132 0         if my_task["status"] == "FAILED":
2133 0             return None, None  # TODO need to be retry??
2134 0         elif my_task["status"] == "SCHEDULED":
2135             # check if already created by another task
2136 0             for index, task in enumerate(ro_task["tasks"]):
2137 0                 if index == task_index or not task:
2138 0                     continue  # own task
2139
2140 0                 if task["action"] == "CREATE" and task["status"] not in (
2141                     "SCHEDULED",
2142                     "FINISHED",
2143                     "SUPERSEDED",
2144                 ):
2145 0                     return task["status"], "COPY_VIM_INFO"
2146
2147 0             try:
2148 0                 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2149                     ro_task, task_index, task_depends
2150                 )
2151                 # TODO update other CREATE tasks
2152 0             except Exception as e:
2153 0                 if not isinstance(e, NsWorkerException):
2154 0                     self.logger.error(
2155                         "Error executing task={}: {}".format(task_id, e), exc_info=True
2156                     )
2157
2158 0                 task_status = "FAILED"
2159 0                 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2160                 # TODO update    ro_vim_item_update
2161
2162 0             return task_status, ro_vim_item_update
2163         else:
2164 0             return None, None
2165
2166 1     def _get_dependency(self, task_id, ro_task=None, target_id=None):
2167         """
2168         Look for dependency task
2169         :param task_id: Can be one of
2170             1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2171             2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2172             3. task.task_id: "<action_id>:number"
2173         :param ro_task:
2174         :param target_id:
2175         :return: database ro_task plus index of task
2176         """
2177 0         if (
2178             task_id.startswith("vim:")
2179             or task_id.startswith("sdn:")
2180             or task_id.startswith("wim:")
2181         ):
2182 0             target_id, _, task_id = task_id.partition(" ")
2183
2184 0         if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2185 0             ro_task_dependency = self.db.get_one(
2186                 "ro_tasks",
2187                 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2188                 fail_on_empty=False,
2189             )
2190
2191 0             if ro_task_dependency:
2192 0                 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2193 0                     if task["target_record_id"] == task_id:
2194 0                         return ro_task_dependency, task_index
2195
2196         else:
2197 0             if ro_task:
2198 0                 for task_index, task in enumerate(ro_task["tasks"]):
2199 0                     if task and task["task_id"] == task_id:
2200 0                         return ro_task, task_index
2201
2202 0             ro_task_dependency = self.db.get_one(
2203                 "ro_tasks",
2204                 q_filter={
2205                     "tasks.ANYINDEX.task_id": task_id,
2206                     "tasks.ANYINDEX.target_record.ne": None,
2207                 },
2208                 fail_on_empty=False,
2209             )
2210
2211 0             self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2212 0             if ro_task_dependency:
2213 0                 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2214 0                     if task["task_id"] == task_id:
2215 0                         return ro_task_dependency, task_index
2216 0         raise NsWorkerException("Cannot get depending task {}".format(task_id))
2217
2218 1     def update_vm_refresh(self):
2219         """Enables the VM status updates if self.refresh_config.active parameter
2220         is not -1 and than updates the DB accordingly
2221
2222         """
2223 1         try:
2224 1             self.logger.debug("Checking if VM status update config")
2225 1             next_refresh = time.time()
2226 1             if self.refresh_config.active == -1:
2227 1                 next_refresh = -1
2228             else:
2229 1                 next_refresh += self.refresh_config.active
2230
2231 1             if next_refresh != -1:
2232 1                 db_ro_task_update = {}
2233 1                 now = time.time()
2234 1                 next_check_at = now + (24 * 60 * 60)
2235 1                 next_check_at = min(next_check_at, next_refresh)
2236 1                 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2237 1                 db_ro_task_update["to_check_at"] = next_check_at
2238
2239 1                 self.logger.debug(
2240                     "Finding tasks which to be updated to enable VM status updates"
2241                 )
2242 1                 refresh_tasks = self.db.get_list(
2243                     "ro_tasks",
2244                     q_filter={
2245                         "tasks.status": "DONE",
2246                         "to_check_at.lt": 0,
2247                     },
2248                 )
2249 1                 self.logger.debug("Updating tasks to change the to_check_at status")
2250 1                 for task in refresh_tasks:
2251 1                     q_filter = {
2252                         "_id": task["_id"],
2253                     }
2254 1                     self.db.set_one(
2255                         "ro_tasks",
2256                         q_filter=q_filter,
2257                         update_dict=db_ro_task_update,
2258                         fail_on_empty=True,
2259                     )
2260
2261 0         except Exception as e:
2262 0             self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2263
2264 1     def _process_pending_tasks(self, ro_task):
2265 1         ro_task_id = ro_task["_id"]
2266 1         now = time.time()
2267         # one day
2268 1         next_check_at = now + (24 * 60 * 60)
2269 1         db_ro_task_update = {}
2270
2271 1         def _update_refresh(new_status):
2272             # compute next_refresh
2273             nonlocal task
2274             nonlocal next_check_at
2275             nonlocal db_ro_task_update
2276             nonlocal ro_task
2277
2278 1             next_refresh = time.time()
2279
2280 1             if task["item"] in ("image", "flavor"):
2281 0                 next_refresh += self.refresh_config.image
2282 1             elif new_status == "BUILD":
2283 0                 next_refresh += self.refresh_config.build
2284 1             elif new_status == "DONE":
2285 1                 if self.refresh_config.active == -1:
2286 1                     next_refresh = -1
2287                 else:
2288 1                     next_refresh += self.refresh_config.active
2289             else:
2290 1                 next_refresh += self.refresh_config.error
2291
2292 1             next_check_at = min(next_check_at, next_refresh)
2293 1             db_ro_task_update["vim_info.refresh_at"] = next_refresh
2294 1             ro_task["vim_info"]["refresh_at"] = next_refresh
2295
2296 1         try:
2297             """
2298             # Log RO tasks only when loglevel is DEBUG
2299             if self.logger.getEffectiveLevel() == logging.DEBUG:
2300                 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2301             """
2302             # Check if vim status refresh is enabled again
2303 1             self.update_vm_refresh()
2304             # 0: get task_status_create
2305 1             lock_object = None
2306 1             task_status_create = None
2307 1             task_create = next(
2308                 (
2309                     t
2310                     for t in ro_task["tasks"]
2311                     if t
2312                     and t["action"] == "CREATE"
2313                     and t["status"] in ("BUILD", "DONE")
2314                 ),
2315                 None,
2316             )
2317
2318 1             if task_create:
2319 1                 task_status_create = task_create["status"]
2320
2321             # 1: look for tasks in status SCHEDULED, or in status CREATE if action is  DONE or BUILD
2322 1             for task_action in ("DELETE", "CREATE", "EXEC"):
2323 1                 db_vim_update = None
2324 1                 new_status = None
2325
2326 1                 for task_index, task in enumerate(ro_task["tasks"]):
2327 1                     if not task:
2328 0                         continue  # task deleted
2329
2330 1                     task_depends = {}
2331 1                     target_update = None
2332
2333 1                     if (
2334                         (
2335                             task_action in ("DELETE", "EXEC")
2336                             and task["status"] not in ("SCHEDULED", "BUILD")
2337                         )
2338                         or task["action"] != task_action
2339                         or (
2340                             task_action == "CREATE"
2341                             and task["status"] in ("FINISHED", "SUPERSEDED")
2342                         )
2343                     ):
2344 0                         continue
2345
2346 1                     task_path = "tasks.{}.status".format(task_index)
2347 1                     try:
2348 1                         db_vim_info_update = None
2349
2350 1                         if task["status"] == "SCHEDULED":
2351                             # check if tasks that this depends on have been completed
2352 0                             dependency_not_completed = False
2353
2354 0                             for dependency_task_id in task.get("depends_on") or ():
2355 0                                 (
2356                                     dependency_ro_task,
2357                                     dependency_task_index,
2358                                 ) = self._get_dependency(
2359                                     dependency_task_id, target_id=ro_task["target_id"]
2360                                 )
2361 0                                 dependency_task = dependency_ro_task["tasks"][
2362                                     dependency_task_index
2363                                 ]
2364 0                                 self.logger.debug(
2365                                     "dependency_ro_task={} dependency_task_index={}".format(
2366                                         dependency_ro_task, dependency_task_index
2367                                     )
2368                                 )
2369
2370 0                                 if dependency_task["status"] == "SCHEDULED":
2371 0                                     dependency_not_completed = True
2372 0                                     next_check_at = min(
2373                                         next_check_at, dependency_ro_task["to_check_at"]
2374                                     )
2375                                     # must allow dependent task to be processed first
2376                                     # to do this set time after last_task_processed
2377 0                                     next_check_at = max(
2378                                         self.time_last_task_processed, next_check_at
2379                                     )
2380 0                                     break
2381 0                                 elif dependency_task["status"] == "FAILED":
2382 0                                     error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2383                                         task["action"],
2384                                         task["item"],
2385                                         dependency_task["action"],
2386                                         dependency_task["item"],
2387                                         dependency_task_id,
2388                                         dependency_ro_task["vim_info"].get(
2389                                             "vim_message"
2390                                         ),
2391                                     )
2392 0                                     self.logger.error(
2393                                         "task={} {}".format(task["task_id"], error_text)
2394                                     )
2395 0                                     raise NsWorkerException(error_text)
2396
2397 0                                 task_depends[dependency_task_id] = dependency_ro_task[
2398                                     "vim_info"
2399                                 ]["vim_id"]
2400 0                                 task_depends[
2401                                     "TASK-{}".format(dependency_task_id)
2402                                 ] = dependency_ro_task["vim_info"]["vim_id"]
2403
2404 0                             if dependency_not_completed:
2405 0                                 self.logger.warning(
2406                                     "DEPENDENCY NOT COMPLETED {}".format(
2407                                         dependency_ro_task["vim_info"]["vim_id"]
2408                                     )
2409                                 )
2410                                 # TODO set at vim_info.vim_details that it is waiting
2411 0                                 continue
2412
2413                         # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2414                         # the task of renew this locking. It will update database locket_at periodically
2415 1                         if not lock_object:
2416 1                             lock_object = LockRenew.add_lock_object(
2417                                 "ro_tasks", ro_task, self
2418                             )
2419
2420 1                         if task["action"] == "DELETE":
2421 0                             (new_status, db_vim_info_update,) = self._delete_task(
2422                                 ro_task, task_index, task_depends, db_ro_task_update
2423                             )
2424 0                             new_status = (
2425                                 "FINISHED" if new_status == "DONE" else new_status
2426                             )
2427                             # ^with FINISHED instead of DONE it will not be refreshing
2428
2429 0                             if new_status in ("FINISHED", "SUPERSEDED"):
2430 0                                 target_update = "DELETE"
2431 1                         elif task["action"] == "EXEC":
2432 0                             (
2433                                 new_status,
2434                                 db_vim_info_update,
2435                                 db_task_update,
2436                             ) = self.item2class[task["item"]].exec(
2437                                 ro_task, task_index, task_depends
2438                             )
2439 0                             new_status = (
2440                                 "FINISHED" if new_status == "DONE" else new_status
2441                             )
2442                             # ^with FINISHED instead of DONE it will not be refreshing
2443
2444 0                             if db_task_update:
2445                                 # load into database the modified db_task_update "retries" and "next_retry"
2446 0                                 if db_task_update.get("retries"):
2447 0                                     db_ro_task_update[
2448                                         "tasks.{}.retries".format(task_index)
2449                                     ] = db_task_update["retries"]
2450
2451 0                                 next_check_at = time.time() + db_task_update.get(
2452                                     "next_retry", 60
2453                                 )
2454 0                             target_update = None
2455 1                         elif task["action"] == "CREATE":
2456 1                             if task["status"] == "SCHEDULED":
2457 0                                 if task_status_create:
2458 0                                     new_status = task_status_create
2459 0                                     target_update = "COPY_VIM_INFO"
2460                                 else:
2461 0                                     new_status, db_vim_info_update = self.item2class[
2462                                         task["item"]
2463                                     ].new(ro_task, task_index, task_depends)
2464                                     # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2465 0                                     _update_refresh(new_status)
2466                             else:
2467 1                                 refresh_at = ro_task["vim_info"]["refresh_at"]
2468 1                                 if refresh_at and refresh_at != -1 and now > refresh_at:
2469 0                                     (new_status, db_vim_info_update,) = self.item2class[
2470                                         task["item"]
2471                                     ].refresh(ro_task)
2472 0                                     _update_refresh(new_status)
2473                                 else:
2474                                     # The refresh is updated to avoid set the value of "refresh_at" to
2475                                     # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2476                                     # because it can happen that in this case the task is never processed
2477 1                                     _update_refresh(task["status"])
2478
2479 0                     except Exception as e:
2480 0                         new_status = "FAILED"
2481 0                         db_vim_info_update = {
2482                             "vim_status": "VIM_ERROR",
2483                             "vim_message": str(e),
2484                         }
2485
2486 0                         if not isinstance(
2487                             e, (NsWorkerException, vimconn.VimConnException)
2488                         ):
2489 0                             self.logger.error(
2490                                 "Unexpected exception at _delete_task task={}: {}".format(
2491                                     task["task_id"], e
2492                                 ),
2493                                 exc_info=True,
2494                             )
2495
2496 1                     try:
2497 1                         if db_vim_info_update:
2498 0                             db_vim_update = db_vim_info_update.copy()
2499 0                             db_ro_task_update.update(
2500                                 {
2501                                     "vim_info." + k: v
2502                                     for k, v in db_vim_info_update.items()
2503                                 }
2504                             )
2505 0                             ro_task["vim_info"].update(db_vim_info_update)
2506
2507 1                         if new_status:
2508 0                             if task_action == "CREATE":
2509 0                                 task_status_create = new_status
2510 0                             db_ro_task_update[task_path] = new_status
2511
2512 1                         if target_update or db_vim_update:
2513 0                             if target_update == "DELETE":
2514 0                                 self._update_target(task, None)
2515 0                             elif target_update == "COPY_VIM_INFO":
2516 0                                 self._update_target(task, ro_task["vim_info"])
2517                             else:
2518 0                                 self._update_target(task, db_vim_update)
2519
2520 0                     except Exception as e:
2521 0                         if (
2522                             isinstance(e, DbException)
2523                             and e.http_code == HTTPStatus.NOT_FOUND
2524                         ):
2525                             # if the vnfrs or nsrs has been removed from database, this task must be removed
2526 0                             self.logger.debug(
2527                                 "marking to delete task={}".format(task["task_id"])
2528                             )
2529 0                             self.tasks_to_delete.append(task)
2530                         else:
2531 0                             self.logger.error(
2532                                 "Unexpected exception at _update_target task={}: {}".format(
2533                                     task["task_id"], e
2534                                 ),
2535                                 exc_info=True,
2536                             )
2537
2538 1             locked_at = ro_task["locked_at"]
2539
2540 1             if lock_object:
2541 1                 locked_at = [
2542                     lock_object["locked_at"],
2543                     lock_object["locked_at"] + self.task_locked_time,
2544                 ]
2545                 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2546                 # contain exactly locked_at + self.task_locked_time
2547 1                 LockRenew.remove_lock_object(lock_object)
2548
2549 1             q_filter = {
2550                 "_id": ro_task["_id"],
2551                 "to_check_at": ro_task["to_check_at"],
2552                 "locked_at": locked_at,
2553             }
2554             # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2555             # outside this task (by ro_nbi) do not update it
2556 1             db_ro_task_update["locked_by"] = None
2557             # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2558 1             db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2559 1             db_ro_task_update["modified_at"] = now
2560 1             db_ro_task_update["to_check_at"] = next_check_at
2561
2562             """
2563             # Log RO tasks only when loglevel is DEBUG
2564             if self.logger.getEffectiveLevel() == logging.DEBUG:
2565                 db_ro_task_update_log = db_ro_task_update.copy()
2566                 db_ro_task_update_log["_id"] = q_filter["_id"]
2567                 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2568             """
2569
2570 1             if not self.db.set_one(
2571                 "ro_tasks",
2572                 update_dict=db_ro_task_update,
2573                 q_filter=q_filter,
2574                 fail_on_empty=False,
2575             ):
2576 0                 del db_ro_task_update["to_check_at"]
2577 0                 del q_filter["to_check_at"]
2578                 """
2579                 # Log RO tasks only when loglevel is DEBUG
2580                 if self.logger.getEffectiveLevel() == logging.DEBUG:
2581                     self._log_ro_task(
2582                         None,
2583                         db_ro_task_update_log,
2584                         None,
2585                         "TASK_WF",
2586                         "SET_TASK " + str(q_filter),
2587                     )
2588                 """
2589 0                 self.db.set_one(
2590                     "ro_tasks",
2591                     q_filter=q_filter,
2592                     update_dict=db_ro_task_update,
2593                     fail_on_empty=True,
2594                 )
2595 0         except DbException as e:
2596 0             self.logger.error(
2597                 "ro_task={} Error updating database {}".format(ro_task_id, e)
2598             )
2599 0         except Exception as e:
2600 0             self.logger.error(
2601                 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2602             )
2603
2604 1     def _update_target(self, task, ro_vim_item_update):
2605 0         table, _, temp = task["target_record"].partition(":")
2606 0         _id, _, path_vim_status = temp.partition(":")
2607 0         path_item = path_vim_status[: path_vim_status.rfind(".")]
2608 0         path_item = path_item[: path_item.rfind(".")]
2609         # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2610         # path_item: dot separated list targeting record information, e.g. "vdur.10"
2611
2612 0         if ro_vim_item_update:
2613 0             update_dict = {
2614                 path_vim_status + "." + k: v
2615                 for k, v in ro_vim_item_update.items()
2616                 if k
2617                 in (
2618                     "vim_id",
2619                     "vim_details",
2620                     "vim_message",
2621                     "vim_name",
2622                     "vim_status",
2623                     "interfaces",
2624                     "interfaces_backup",
2625                 )
2626             }
2627
2628 0             if path_vim_status.startswith("vdur."):
2629                 # for backward compatibility, add vdur.name apart from vdur.vim_name
2630 0                 if ro_vim_item_update.get("vim_name"):
2631 0                     update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2632
2633                 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2634 0                 if ro_vim_item_update.get("vim_id"):
2635 0                     update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2636
2637                 # update general status
2638 0                 if ro_vim_item_update.get("vim_status"):
2639 0                     update_dict[path_item + ".status"] = ro_vim_item_update[
2640                         "vim_status"
2641                     ]
2642
2643 0             if ro_vim_item_update.get("interfaces"):
2644 0                 path_interfaces = path_item + ".interfaces"
2645
2646 0                 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2647 0                     if iface:
2648 0                         update_dict.update(
2649                             {
2650                                 path_interfaces + ".{}.".format(i) + k: v
2651                                 for k, v in iface.items()
2652                                 if k in ("vlan", "compute_node", "pci")
2653                             }
2654                         )
2655
2656                         # put ip_address and mac_address with ip-address and mac-address
2657 0                         if iface.get("ip_address"):
2658 0                             update_dict[
2659                                 path_interfaces + ".{}.".format(i) + "ip-address"
2660                             ] = iface["ip_address"]
2661
2662 0                         if iface.get("mac_address"):
2663 0                             update_dict[
2664                                 path_interfaces + ".{}.".format(i) + "mac-address"
2665                             ] = iface["mac_address"]
2666
2667 0                         if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2668 0                             update_dict["ip-address"] = iface.get("ip_address").split(
2669                                 ";"
2670                             )[0]
2671
2672 0                         if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2673 0                             update_dict[path_item + ".ip-address"] = iface.get(
2674                                 "ip_address"
2675                             ).split(";")[0]
2676
2677 0             self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2678
2679             # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2680 0             if ro_vim_item_update.get("interfaces"):
2681 0                 search_key = path_vim_status + ".interfaces"
2682 0                 if update_dict.get(search_key):
2683 0                     interfaces_backup_update = {
2684                         path_vim_status + ".interfaces_backup": update_dict[search_key]
2685                     }
2686
2687 0                     self.db.set_one(
2688                         table,
2689                         q_filter={"_id": _id},
2690                         update_dict=interfaces_backup_update,
2691                     )
2692
2693         else:
2694 0             update_dict = {path_item + ".status": "DELETED"}
2695 0             self.db.set_one(
2696                 table,
2697                 q_filter={"_id": _id},
2698                 update_dict=update_dict,
2699                 unset={path_vim_status: None},
2700             )
2701
2702 1     def _process_delete_db_tasks(self):
2703         """
2704         Delete task from database because vnfrs or nsrs or both have been deleted
2705         :return: None. Uses and modify self.tasks_to_delete
2706         """
2707 0         while self.tasks_to_delete:
2708 0             task = self.tasks_to_delete[0]
2709 0             vnfrs_deleted = None
2710 0             nsr_id = task["nsr_id"]
2711
2712 0             if task["target_record"].startswith("vnfrs:"):
2713                 # check if nsrs is present
2714 0                 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2715 0                     vnfrs_deleted = task["target_record"].split(":")[1]
2716
2717 0             try:
2718 0                 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2719 0             except Exception as e:
2720 0                 self.logger.error(
2721                     "Error deleting task={}: {}".format(task["task_id"], e)
2722                 )
2723 0             self.tasks_to_delete.pop(0)
2724
2725 1     @staticmethod
2726 1     def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2727         """
2728         Static method because it is called from osm_ng_ro.ns
2729         :param db: instance of database to use
2730         :param nsr_id: affected nsrs id
2731         :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2732         :return: None, exception is fails
2733         """
2734 0         retries = 5
2735 0         for retry in range(retries):
2736 0             ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2737 0             now = time.time()
2738 0             conflict = False
2739
2740 0             for ro_task in ro_tasks:
2741 0                 db_update = {}
2742 0                 to_delete_ro_task = True
2743
2744 0                 for index, task in enumerate(ro_task["tasks"]):
2745 0                     if not task:
2746 0                         pass
2747 0                     elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2748                         vnfrs_deleted
2749                         and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2750                     ):
2751 0                         db_update["tasks.{}".format(index)] = None
2752                     else:
2753                         # used by other nsr, ro_task cannot be deleted
2754 0                         to_delete_ro_task = False
2755
2756                 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2757 0                 if to_delete_ro_task:
2758 0                     if not db.del_one(
2759                         "ro_tasks",
2760                         q_filter={
2761                             "_id": ro_task["_id"],
2762                             "modified_at": ro_task["modified_at"],
2763                         },
2764                         fail_on_empty=False,
2765                     ):
2766 0                         conflict = True
2767 0                 elif db_update:
2768 0                     db_update["modified_at"] = now
2769 0                     if not db.set_one(
2770                         "ro_tasks",
2771                         q_filter={
2772                             "_id": ro_task["_id"],
2773                             "modified_at": ro_task["modified_at"],
2774                         },
2775                         update_dict=db_update,
2776                         fail_on_empty=False,
2777                     ):
2778 0                         conflict = True
2779 0             if not conflict:
2780 0                 return
2781         else:
2782 0             raise NsWorkerException("Exceeded {} retries".format(retries))
2783
2784 1     def run(self):
2785         # load database
2786 0         self.logger.info("Starting")
2787         while True:
2788             # step 1: get commands from queue
2789 0             try:
2790 0                 if self.vim_targets:
2791 0                     task = self.task_queue.get(block=False)
2792                 else:
2793 0                     if not self.idle:
2794 0                         self.logger.debug("enters in idle state")
2795 0                     self.idle = True
2796 0                     task = self.task_queue.get(block=True)
2797 0                     self.idle = False
2798
2799 0                 if task[0] == "terminate":
2800 0                     break
2801 0                 elif task[0] == "load_vim":
2802 0                     self.logger.info("order to load vim {}".format(task[1]))
2803 0                     self._load_vim(task[1])
2804 0                 elif task[0] == "unload_vim":
2805 0                     self.logger.info("order to unload vim {}".format(task[1]))
2806 0                     self._unload_vim(task[1])
2807 0                 elif task[0] == "reload_vim":
2808 0                     self._reload_vim(task[1])
2809 0                 elif task[0] == "check_vim":
2810 0                     self.logger.info("order to check vim {}".format(task[1]))
2811 0                     self._check_vim(task[1])
2812 0                 continue
2813 0             except Exception as e:
2814 0                 if isinstance(e, queue.Empty):
2815 0                     pass
2816                 else:
2817 0                     self.logger.critical(
2818                         "Error processing task: {}".format(e), exc_info=True
2819                     )
2820
2821             # step 2: process pending_tasks, delete not needed tasks
2822 0             try:
2823 0                 if self.tasks_to_delete:
2824 0                     self._process_delete_db_tasks()
2825 0                 busy = False
2826                 """
2827                 # Log RO tasks only when loglevel is DEBUG
2828                 if self.logger.getEffectiveLevel() == logging.DEBUG:
2829                     _ = self._get_db_all_tasks()
2830                 """
2831 0                 ro_task = self._get_db_task()
2832 0                 if ro_task:
2833 0                     self.logger.debug("Task to process: {}".format(ro_task))
2834 0                     time.sleep(1)
2835 0                     self._process_pending_tasks(ro_task)
2836 0                     busy = True
2837 0                 if not busy:
2838 0                     time.sleep(5)
2839 0             except Exception as e:
2840 0                 self.logger.critical(
2841                     "Unexpected exception at run: " + str(e), exc_info=True
2842                 )
2843
2844 0         self.logger.info("Finishing")