Code Coverage

Cobertura Coverage Report > NG-RO.osm_ng_ro >

ns_thread.py

Trend

File Coverage summary

NameClassesLinesConditionals
ns_thread.py
100%
1/1
29%
374/1287
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
ns_thread.py
29%
374/1287
N/A

Source

NG-RO/osm_ng_ro/ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 1 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 1 from copy import deepcopy
28 1 from http import HTTPStatus
29 1 import logging
30 1 from os import mkdir
31 1 import queue
32 1 from shutil import rmtree
33 1 import threading
34 1 import time
35 1 import traceback
36 1 from typing import Dict
37 1 from unittest.mock import Mock
38
39 1 from importlib_metadata import entry_points
40 1 from osm_common.dbbase import DbException
41 1 from osm_ng_ro.vim_admin import LockRenew
42 1 from osm_ro_plugin import sdnconn, vimconn
43 1 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
44 1 from osm_ro_plugin.vim_dummy import VimDummyConnector
45 1 import yaml
46
47 1 __author__ = "Alfonso Tierno"
48 1 __date__ = "$28-Sep-2017 12:07:15$"
49
50
51 1 def deep_get(target_dict, *args, **kwargs):
52     """
53     Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54     Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55     :param target_dict: dictionary to be read
56     :param args: list of keys to read from  target_dict
57     :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58     :return: The wanted value if exist, None or default otherwise
59     """
60 1     for key in args:
61 1         if not isinstance(target_dict, dict) or key not in target_dict:
62 1             return kwargs.get("default")
63 1         target_dict = target_dict[key]
64 1     return target_dict
65
66
67 1 class NsWorkerException(Exception):
68 1     pass
69
70
71 1 class FailingConnector:
72 1     def __init__(self, error_msg):
73 0         self.error_msg = error_msg
74
75 0         for method in dir(vimconn.VimConnector):
76 0             if method[0] != "_":
77 0                 setattr(
78                     self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
79                 )
80
81 0         for method in dir(sdnconn.SdnConnectorBase):
82 0             if method[0] != "_":
83 0                 setattr(
84                     self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
85                 )
86
87
88 1 class NsWorkerExceptionNotFound(NsWorkerException):
89 1     pass
90
91
92 1 class VimInteractionBase:
93     """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94     It implements methods that does nothing and return ok"""
95
96 1     def __init__(self, db, my_vims, db_vims, logger):
97 1         self.db = db
98 1         self.logger = logger
99 1         self.my_vims = my_vims
100 1         self.db_vims = db_vims
101
102 1     def new(self, ro_task, task_index, task_depends):
103 0         return "BUILD", {}
104
105 1     def refresh(self, ro_task):
106         """skip calling VIM to get image, flavor status. Assumes ok"""
107 0         if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
108 0             return "FAILED", {}
109
110 0         return "DONE", {}
111
112 1     def delete(self, ro_task, task_index):
113         """skip calling VIM to delete image. Assumes ok"""
114 0         return "DONE", {}
115
116 1     def exec(self, ro_task, task_index, task_depends):
117 0         return "DONE", None, None
118
119
120 1 class VimInteractionNet(VimInteractionBase):
121 1     def new(self, ro_task, task_index, task_depends):
122 1         vim_net_id = None
123 1         task = ro_task["tasks"][task_index]
124 1         task_id = task["task_id"]
125 1         created = False
126 1         created_items = {}
127 1         target_vim = self.my_vims[ro_task["target_id"]]
128 1         mgmtnet = False
129 1         mgmtnet_defined_in_vim = False
130
131 1         try:
132             # FIND
133 1             if task.get("find_params"):
134                 # if management, get configuration of VIM
135 1                 if task["find_params"].get("filter_dict"):
136 1                     vim_filter = task["find_params"]["filter_dict"]
137                 # management network
138 1                 elif task["find_params"].get("mgmt"):
139 1                     mgmtnet = True
140 1                     if deep_get(
141                         self.db_vims[ro_task["target_id"]],
142                         "config",
143                         "management_network_id",
144                     ):
145 1                         mgmtnet_defined_in_vim = True
146 1                         vim_filter = {
147                             "id": self.db_vims[ro_task["target_id"]]["config"][
148                                 "management_network_id"
149                             ]
150                         }
151 1                     elif deep_get(
152                         self.db_vims[ro_task["target_id"]],
153                         "config",
154                         "management_network_name",
155                     ):
156 1                         mgmtnet_defined_in_vim = True
157 1                         vim_filter = {
158                             "name": self.db_vims[ro_task["target_id"]]["config"][
159                                 "management_network_name"
160                             ]
161                         }
162                     else:
163 1                         vim_filter = {"name": task["find_params"]["name"]}
164                 else:
165 1                     raise NsWorkerExceptionNotFound(
166                         "Invalid find_params for new_net {}".format(task["find_params"])
167                     )
168
169 1                 vim_nets = target_vim.get_network_list(vim_filter)
170 1                 if not vim_nets and not task.get("params"):
171                     # If there is mgmt-network in the descriptor,
172                     # there is no mapping of that network to a VIM network in the descriptor,
173                     # also there is no mapping in the "--config" parameter or at VIM creation;
174                     # that mgmt-network will be created.
175 1                     if mgmtnet and not mgmtnet_defined_in_vim:
176 1                         net_name = (
177                             vim_filter.get("name")
178                             if vim_filter.get("name")
179                             else vim_filter.get("id")[:16]
180                         )
181 1                         vim_net_id, created_items = target_vim.new_network(
182                             net_name, None
183                         )
184 1                         self.logger.debug(
185                             "Created mgmt network vim_net_id: {}".format(vim_net_id)
186                         )
187 1                         created = True
188                     else:
189 1                         raise NsWorkerExceptionNotFound(
190                             "Network not found with this criteria: '{}'".format(
191                                 task.get("find_params")
192                             )
193                         )
194 1                 elif len(vim_nets) > 1:
195 1                     raise NsWorkerException(
196                         "More than one network found with this criteria: '{}'".format(
197                             task["find_params"]
198                         )
199                     )
200
201 1                 if vim_nets:
202 1                     vim_net_id = vim_nets[0]["id"]
203             else:
204                 # CREATE
205 1                 params = task["params"]
206 1                 vim_net_id, created_items = target_vim.new_network(**params)
207 1                 created = True
208
209 1             ro_vim_item_update = {
210                 "vim_id": vim_net_id,
211                 "vim_status": "BUILD",
212                 "created": created,
213                 "created_items": created_items,
214                 "vim_details": None,
215                 "vim_message": None,
216             }
217 1             self.logger.debug(
218                 "task={} {} new-net={} created={}".format(
219                     task_id, ro_task["target_id"], vim_net_id, created
220                 )
221             )
222
223 1             return "BUILD", ro_vim_item_update
224 1         except (vimconn.VimConnException, NsWorkerException) as e:
225 1             self.logger.error(
226                 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
227             )
228 1             ro_vim_item_update = {
229                 "vim_status": "VIM_ERROR",
230                 "created": created,
231                 "vim_message": str(e),
232             }
233
234 1             return "FAILED", ro_vim_item_update
235
236 1     def refresh(self, ro_task):
237         """Call VIM to get network status"""
238 1         ro_task_id = ro_task["_id"]
239 1         target_vim = self.my_vims[ro_task["target_id"]]
240 1         vim_id = ro_task["vim_info"]["vim_id"]
241 1         net_to_refresh_list = [vim_id]
242
243 1         try:
244 1             vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
245 1             vim_info = vim_dict[vim_id]
246
247 1             if vim_info["status"] == "ACTIVE":
248 1                 task_status = "DONE"
249 1             elif vim_info["status"] == "BUILD":
250 1                 task_status = "BUILD"
251             else:
252 1                 task_status = "FAILED"
253 1         except vimconn.VimConnException as e:
254             # Mark all tasks at VIM_ERROR status
255 1             self.logger.error(
256                 "ro_task={} vim={} get-net={}: {}".format(
257                     ro_task_id, ro_task["target_id"], vim_id, e
258                 )
259             )
260 1             vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
261 1             task_status = "FAILED"
262
263 1         ro_vim_item_update = {}
264 1         if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
265 1             ro_vim_item_update["vim_status"] = vim_info["status"]
266
267 1         if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
268 1             ro_vim_item_update["vim_name"] = vim_info.get("name")
269
270 1         if vim_info["status"] in ("ERROR", "VIM_ERROR"):
271 1             if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
272 1                 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
273 1         elif vim_info["status"] == "DELETED":
274 1             ro_vim_item_update["vim_id"] = None
275 1             ro_vim_item_update["vim_message"] = "Deleted externally"
276         else:
277 1             if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
278 1                 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
279
280 1         if ro_vim_item_update:
281 1             self.logger.debug(
282                 "ro_task={} {} get-net={}: status={} {}".format(
283                     ro_task_id,
284                     ro_task["target_id"],
285                     vim_id,
286                     ro_vim_item_update.get("vim_status"),
287                     ro_vim_item_update.get("vim_message")
288                     if ro_vim_item_update.get("vim_status") != "ACTIVE"
289                     else "",
290                 )
291             )
292
293 1         return task_status, ro_vim_item_update
294
295 1     def delete(self, ro_task, task_index):
296 0         task = ro_task["tasks"][task_index]
297 0         task_id = task["task_id"]
298 0         net_vim_id = ro_task["vim_info"]["vim_id"]
299 0         ro_vim_item_update_ok = {
300             "vim_status": "DELETED",
301             "created": False,
302             "vim_message": "DELETED",
303             "vim_id": None,
304         }
305
306 0         try:
307 0             if net_vim_id or ro_task["vim_info"]["created_items"]:
308 0                 target_vim = self.my_vims[ro_task["target_id"]]
309 0                 target_vim.delete_network(
310                     net_vim_id, ro_task["vim_info"]["created_items"]
311                 )
312 0         except vimconn.VimConnNotFoundException:
313 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
314 0         except vimconn.VimConnException as e:
315 0             self.logger.error(
316                 "ro_task={} vim={} del-net={}: {}".format(
317                     ro_task["_id"], ro_task["target_id"], net_vim_id, e
318                 )
319             )
320 0             ro_vim_item_update = {
321                 "vim_status": "VIM_ERROR",
322                 "vim_message": "Error while deleting: {}".format(e),
323             }
324
325 0             return "FAILED", ro_vim_item_update
326
327 0         self.logger.debug(
328             "task={} {} del-net={} {}".format(
329                 task_id,
330                 ro_task["target_id"],
331                 net_vim_id,
332                 ro_vim_item_update_ok.get("vim_message", ""),
333             )
334         )
335
336 0         return "DONE", ro_vim_item_update_ok
337
338
339 1 class VimInteractionVdu(VimInteractionBase):
340 1     max_retries_inject_ssh_key = 20  # 20 times
341 1     time_retries_inject_ssh_key = 30  # wevery 30 seconds
342
343 1     def new(self, ro_task, task_index, task_depends):
344 0         task = ro_task["tasks"][task_index]
345 0         task_id = task["task_id"]
346 0         created = False
347 0         created_items = {}
348 0         target_vim = self.my_vims[ro_task["target_id"]]
349
350 0         try:
351 0             created = True
352 0             params = task["params"]
353 0             params_copy = deepcopy(params)
354 0             net_list = params_copy["net_list"]
355
356 0             for net in net_list:
357                 # change task_id into network_id
358 0                 if "net_id" in net and net["net_id"].startswith("TASK-"):
359 0                     network_id = task_depends[net["net_id"]]
360
361 0                     if not network_id:
362 0                         raise NsWorkerException(
363                             "Cannot create VM because depends on a network not created or found "
364                             "for {}".format(net["net_id"])
365                         )
366
367 0                     net["net_id"] = network_id
368
369 0             if params_copy["image_id"].startswith("TASK-"):
370 0                 params_copy["image_id"] = task_depends[params_copy["image_id"]]
371
372 0             if params_copy["flavor_id"].startswith("TASK-"):
373 0                 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
374
375 0             affinity_group_list = params_copy["affinity_group_list"]
376 0             for affinity_group in affinity_group_list:
377                 # change task_id into affinity_group_id
378 0                 if "affinity_group_id" in affinity_group and affinity_group[
379                     "affinity_group_id"
380                 ].startswith("TASK-"):
381 0                     affinity_group_id = task_depends[
382                         affinity_group["affinity_group_id"]
383                     ]
384
385 0                     if not affinity_group_id:
386 0                         raise NsWorkerException(
387                             "found for {}".format(affinity_group["affinity_group_id"])
388                         )
389
390 0                     affinity_group["affinity_group_id"] = affinity_group_id
391
392 0             vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
393 0             interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
394
395             # add to created items previous_created_volumes (healing)
396 0             if task.get("previous_created_volumes"):
397 0                 for k, v in task["previous_created_volumes"].items():
398 0                     created_items[k] = v
399
400 0             ro_vim_item_update = {
401                 "vim_id": vim_vm_id,
402                 "vim_status": "BUILD",
403                 "created": created,
404                 "created_items": created_items,
405                 "vim_details": None,
406                 "vim_message": None,
407                 "interfaces_vim_ids": interfaces,
408                 "interfaces": [],
409                 "interfaces_backup": [],
410             }
411 0             self.logger.debug(
412                 "task={} {} new-vm={} created={}".format(
413                     task_id, ro_task["target_id"], vim_vm_id, created
414                 )
415             )
416
417 0             return "BUILD", ro_vim_item_update
418 0         except (vimconn.VimConnException, NsWorkerException) as e:
419 0             self.logger.debug(traceback.format_exc())
420 0             self.logger.error(
421                 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
422             )
423 0             ro_vim_item_update = {
424                 "vim_status": "VIM_ERROR",
425                 "created": created,
426                 "vim_message": str(e),
427             }
428
429 0             return "FAILED", ro_vim_item_update
430
431 1     def delete(self, ro_task, task_index):
432 0         task = ro_task["tasks"][task_index]
433 0         task_id = task["task_id"]
434 0         vm_vim_id = ro_task["vim_info"]["vim_id"]
435 0         ro_vim_item_update_ok = {
436             "vim_status": "DELETED",
437             "created": False,
438             "vim_message": "DELETED",
439             "vim_id": None,
440         }
441
442 0         try:
443 0             self.logger.debug(
444                 "delete_vminstance: vm_vim_id={} created_items={}".format(
445                     vm_vim_id, ro_task["vim_info"]["created_items"]
446                 )
447             )
448 0             if vm_vim_id or ro_task["vim_info"]["created_items"]:
449 0                 target_vim = self.my_vims[ro_task["target_id"]]
450 0                 target_vim.delete_vminstance(
451                     vm_vim_id,
452                     ro_task["vim_info"]["created_items"],
453                     ro_task["vim_info"].get("volumes_to_hold", []),
454                 )
455 0         except vimconn.VimConnNotFoundException:
456 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
457 0         except vimconn.VimConnException as e:
458 0             self.logger.error(
459                 "ro_task={} vim={} del-vm={}: {}".format(
460                     ro_task["_id"], ro_task["target_id"], vm_vim_id, e
461                 )
462             )
463 0             ro_vim_item_update = {
464                 "vim_status": "VIM_ERROR",
465                 "vim_message": "Error while deleting: {}".format(e),
466             }
467
468 0             return "FAILED", ro_vim_item_update
469
470 0         self.logger.debug(
471             "task={} {} del-vm={} {}".format(
472                 task_id,
473                 ro_task["target_id"],
474                 vm_vim_id,
475                 ro_vim_item_update_ok.get("vim_message", ""),
476             )
477         )
478
479 0         return "DONE", ro_vim_item_update_ok
480
481 1     def refresh(self, ro_task):
482         """Call VIM to get vm status"""
483 0         ro_task_id = ro_task["_id"]
484 0         target_vim = self.my_vims[ro_task["target_id"]]
485 0         vim_id = ro_task["vim_info"]["vim_id"]
486
487 0         if not vim_id:
488 0             return None, None
489
490 0         vm_to_refresh_list = [vim_id]
491 0         try:
492 0             vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
493 0             vim_info = vim_dict[vim_id]
494
495 0             if vim_info["status"] == "ACTIVE":
496 0                 task_status = "DONE"
497 0             elif vim_info["status"] == "BUILD":
498 0                 task_status = "BUILD"
499             else:
500 0                 task_status = "FAILED"
501
502             # try to load and parse vim_information
503 0             try:
504 0                 vim_info_info = yaml.safe_load(vim_info["vim_info"])
505 0                 if vim_info_info.get("name"):
506 0                     vim_info["name"] = vim_info_info["name"]
507 0             except Exception as vim_info_error:
508 0                 self.logger.exception(
509                     f"{vim_info_error} occured while getting the vim_info from yaml"
510                 )
511 0         except vimconn.VimConnException as e:
512             # Mark all tasks at VIM_ERROR status
513 0             self.logger.error(
514                 "ro_task={} vim={} get-vm={}: {}".format(
515                     ro_task_id, ro_task["target_id"], vim_id, e
516                 )
517             )
518 0             vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
519 0             task_status = "FAILED"
520
521 0         ro_vim_item_update = {}
522
523         # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
524 0         vim_interfaces = []
525 0         if vim_info.get("interfaces"):
526 0             for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
527 0                 iface = next(
528                     (
529                         iface
530                         for iface in vim_info["interfaces"]
531                         if vim_iface_id == iface["vim_interface_id"]
532                     ),
533                     None,
534                 )
535                 # if iface:
536                 #     iface.pop("vim_info", None)
537 0                 vim_interfaces.append(iface)
538
539 0         task_create = next(
540             t
541             for t in ro_task["tasks"]
542             if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
543         )
544 0         if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
545 0             vim_interfaces[task_create["mgmt_vnf_interface"]][
546                 "mgmt_vnf_interface"
547             ] = True
548
549 0         mgmt_vdu_iface = task_create.get(
550             "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
551         )
552 0         if vim_interfaces:
553 0             vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
554
555 0         if ro_task["vim_info"]["interfaces"] != vim_interfaces:
556 0             ro_vim_item_update["interfaces"] = vim_interfaces
557
558 0         if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
559 0             ro_vim_item_update["vim_status"] = vim_info["status"]
560
561 0         if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
562 0             ro_vim_item_update["vim_name"] = vim_info.get("name")
563
564 0         if vim_info["status"] in ("ERROR", "VIM_ERROR"):
565 0             if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
566 0                 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
567 0         elif vim_info["status"] == "DELETED":
568 0             ro_vim_item_update["vim_id"] = None
569 0             ro_vim_item_update["vim_message"] = "Deleted externally"
570         else:
571 0             if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
572 0                 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
573
574 0         if ro_vim_item_update:
575 0             self.logger.debug(
576                 "ro_task={} {} get-vm={}: status={} {}".format(
577                     ro_task_id,
578                     ro_task["target_id"],
579                     vim_id,
580                     ro_vim_item_update.get("vim_status"),
581                     ro_vim_item_update.get("vim_message")
582                     if ro_vim_item_update.get("vim_status") != "ACTIVE"
583                     else "",
584                 )
585             )
586
587 0         return task_status, ro_vim_item_update
588
589 1     def exec(self, ro_task, task_index, task_depends):
590 0         task = ro_task["tasks"][task_index]
591 0         task_id = task["task_id"]
592 0         target_vim = self.my_vims[ro_task["target_id"]]
593 0         db_task_update = {"retries": 0}
594 0         retries = task.get("retries", 0)
595
596 0         try:
597 0             params = task["params"]
598 0             params_copy = deepcopy(params)
599 0             params_copy["ro_key"] = self.db.decrypt(
600                 params_copy.pop("private_key"),
601                 params_copy.pop("schema_version"),
602                 params_copy.pop("salt"),
603             )
604 0             params_copy["ip_addr"] = params_copy.pop("ip_address")
605 0             target_vim.inject_user_key(**params_copy)
606 0             self.logger.debug(
607                 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
608             )
609
610 0             return (
611                 "DONE",
612                 None,
613                 db_task_update,
614             )  # params_copy["key"]
615 0         except (vimconn.VimConnException, NsWorkerException) as e:
616 0             retries += 1
617
618 0             self.logger.debug(traceback.format_exc())
619 0             if retries < self.max_retries_inject_ssh_key:
620 0                 return (
621                     "BUILD",
622                     None,
623                     {
624                         "retries": retries,
625                         "next_retry": self.time_retries_inject_ssh_key,
626                     },
627                 )
628
629 0             self.logger.error(
630                 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
631             )
632 0             ro_vim_item_update = {"vim_message": str(e)}
633
634 0             return "FAILED", ro_vim_item_update, db_task_update
635
636
637 1 class VimInteractionImage(VimInteractionBase):
638 1     def new(self, ro_task, task_index, task_depends):
639 0         task = ro_task["tasks"][task_index]
640 0         task_id = task["task_id"]
641 0         created = False
642 0         created_items = {}
643 0         target_vim = self.my_vims[ro_task["target_id"]]
644
645 0         try:
646             # FIND
647 0             if task.get("find_params"):
648 0                 vim_images = target_vim.get_image_list(**task["find_params"])
649
650 0                 if not vim_images:
651 0                     raise NsWorkerExceptionNotFound(
652                         "Image not found with this criteria: '{}'".format(
653                             task["find_params"]
654                         )
655                     )
656 0                 elif len(vim_images) > 1:
657 0                     raise NsWorkerException(
658                         "More than one image found with this criteria: '{}'".format(
659                             task["find_params"]
660                         )
661                     )
662                 else:
663 0                     vim_image_id = vim_images[0]["id"]
664
665 0             ro_vim_item_update = {
666                 "vim_id": vim_image_id,
667                 "vim_status": "DONE",
668                 "created": created,
669                 "created_items": created_items,
670                 "vim_details": None,
671                 "vim_message": None,
672             }
673 0             self.logger.debug(
674                 "task={} {} new-image={} created={}".format(
675                     task_id, ro_task["target_id"], vim_image_id, created
676                 )
677             )
678
679 0             return "DONE", ro_vim_item_update
680 0         except (NsWorkerException, vimconn.VimConnException) as e:
681 0             self.logger.error(
682                 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
683             )
684 0             ro_vim_item_update = {
685                 "vim_status": "VIM_ERROR",
686                 "created": created,
687                 "vim_message": str(e),
688             }
689
690 0             return "FAILED", ro_vim_item_update
691
692
693 1 class VimInteractionFlavor(VimInteractionBase):
694 1     def delete(self, ro_task, task_index):
695 0         task = ro_task["tasks"][task_index]
696 0         task_id = task["task_id"]
697 0         flavor_vim_id = ro_task["vim_info"]["vim_id"]
698 0         ro_vim_item_update_ok = {
699             "vim_status": "DELETED",
700             "created": False,
701             "vim_message": "DELETED",
702             "vim_id": None,
703         }
704
705 0         try:
706 0             if flavor_vim_id:
707 0                 target_vim = self.my_vims[ro_task["target_id"]]
708 0                 target_vim.delete_flavor(flavor_vim_id)
709 0         except vimconn.VimConnNotFoundException:
710 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
711 0         except vimconn.VimConnException as e:
712 0             self.logger.error(
713                 "ro_task={} vim={} del-flavor={}: {}".format(
714                     ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
715                 )
716             )
717 0             ro_vim_item_update = {
718                 "vim_status": "VIM_ERROR",
719                 "vim_message": "Error while deleting: {}".format(e),
720             }
721
722 0             return "FAILED", ro_vim_item_update
723
724 0         self.logger.debug(
725             "task={} {} del-flavor={} {}".format(
726                 task_id,
727                 ro_task["target_id"],
728                 flavor_vim_id,
729                 ro_vim_item_update_ok.get("vim_message", ""),
730             )
731         )
732
733 0         return "DONE", ro_vim_item_update_ok
734
735 1     def new(self, ro_task, task_index, task_depends):
736 0         task = ro_task["tasks"][task_index]
737 0         task_id = task["task_id"]
738 0         created = False
739 0         created_items = {}
740 0         target_vim = self.my_vims[ro_task["target_id"]]
741
742 0         try:
743             # FIND
744 0             vim_flavor_id = None
745
746 0             if task.get("find_params"):
747 0                 try:
748 0                     flavor_data = task["find_params"]["flavor_data"]
749 0                     vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
750 0                 except vimconn.VimConnNotFoundException:
751 0                     self.logger.exception("VimConnNotFoundException occured.")
752
753 0             if not vim_flavor_id and task.get("params"):
754                 # CREATE
755 0                 flavor_data = task["params"]["flavor_data"]
756 0                 vim_flavor_id = target_vim.new_flavor(flavor_data)
757 0                 created = True
758
759 0             ro_vim_item_update = {
760                 "vim_id": vim_flavor_id,
761                 "vim_status": "DONE",
762                 "created": created,
763                 "created_items": created_items,
764                 "vim_details": None,
765                 "vim_message": None,
766             }
767 0             self.logger.debug(
768                 "task={} {} new-flavor={} created={}".format(
769                     task_id, ro_task["target_id"], vim_flavor_id, created
770                 )
771             )
772
773 0             return "DONE", ro_vim_item_update
774 0         except (vimconn.VimConnException, NsWorkerException) as e:
775 0             self.logger.error(
776                 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
777             )
778 0             ro_vim_item_update = {
779                 "vim_status": "VIM_ERROR",
780                 "created": created,
781                 "vim_message": str(e),
782             }
783
784 0             return "FAILED", ro_vim_item_update
785
786
787 1 class VimInteractionAffinityGroup(VimInteractionBase):
788 1     def delete(self, ro_task, task_index):
789 1         task = ro_task["tasks"][task_index]
790 1         task_id = task["task_id"]
791 1         affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
792 1         ro_vim_item_update_ok = {
793             "vim_status": "DELETED",
794             "created": False,
795             "vim_message": "DELETED",
796             "vim_id": None,
797         }
798
799 1         try:
800 1             if affinity_group_vim_id:
801 1                 target_vim = self.my_vims[ro_task["target_id"]]
802 1                 target_vim.delete_affinity_group(affinity_group_vim_id)
803 0         except vimconn.VimConnNotFoundException:
804 0             ro_vim_item_update_ok["vim_message"] = "already deleted"
805 0         except vimconn.VimConnException as e:
806 0             self.logger.error(
807                 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
808                     ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
809                 )
810             )
811 0             ro_vim_item_update = {
812                 "vim_status": "VIM_ERROR",
813                 "vim_message": "Error while deleting: {}".format(e),
814             }
815
816 0             return "FAILED", ro_vim_item_update
817
818 1         self.logger.debug(
819             "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
820                 task_id,
821                 ro_task["target_id"],
822                 affinity_group_vim_id,
823                 ro_vim_item_update_ok.get("vim_message", ""),
824             )
825         )
826
827 1         return "DONE", ro_vim_item_update_ok
828
829 1     def new(self, ro_task, task_index, task_depends):
830 1         task = ro_task["tasks"][task_index]
831 1         task_id = task["task_id"]
832 1         created = False
833 1         created_items = {}
834 1         target_vim = self.my_vims[ro_task["target_id"]]
835
836 1         try:
837 1             affinity_group_vim_id = None
838 1             affinity_group_data = None
839
840 1             if task.get("params"):
841 1                 affinity_group_data = task["params"].get("affinity_group_data")
842
843 1             if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
844 0                 try:
845 0                     param_affinity_group_id = task["params"]["affinity_group_data"].get(
846                         "vim-affinity-group-id"
847                     )
848 0                     affinity_group_vim_id = target_vim.get_affinity_group(
849                         param_affinity_group_id
850                     ).get("id")
851 0                 except vimconn.VimConnNotFoundException:
852 0                     self.logger.error(
853                         "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
854                         "could not be found at VIM. Creating a new one.".format(
855                             task_id, ro_task["target_id"], param_affinity_group_id
856                         )
857                     )
858
859 1             if not affinity_group_vim_id and affinity_group_data:
860 1                 affinity_group_vim_id = target_vim.new_affinity_group(
861                     affinity_group_data
862                 )
863 1                 created = True
864
865 1             ro_vim_item_update = {
866                 "vim_id": affinity_group_vim_id,
867                 "vim_status": "DONE",
868                 "created": created,
869                 "created_items": created_items,
870                 "vim_details": None,
871                 "vim_message": None,
872             }
873 1             self.logger.debug(
874                 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
875                     task_id, ro_task["target_id"], affinity_group_vim_id, created
876                 )
877             )
878
879 1             return "DONE", ro_vim_item_update
880 0         except (vimconn.VimConnException, NsWorkerException) as e:
881 0             self.logger.error(
882                 "task={} vim={} new-affinity-or-anti-affinity-group:"
883                 " {}".format(task_id, ro_task["target_id"], e)
884             )
885 0             ro_vim_item_update = {
886                 "vim_status": "VIM_ERROR",
887                 "created": created,
888                 "vim_message": str(e),
889             }
890
891 0             return "FAILED", ro_vim_item_update
892
893
894 1 class VimInteractionUpdateVdu(VimInteractionBase):
895 1     def exec(self, ro_task, task_index, task_depends):
896 0         task = ro_task["tasks"][task_index]
897 0         task_id = task["task_id"]
898 0         db_task_update = {"retries": 0}
899 0         created = False
900 0         created_items = {}
901 0         target_vim = self.my_vims[ro_task["target_id"]]
902
903 0         try:
904 0             if task.get("params"):
905 0                 vim_vm_id = task["params"].get("vim_vm_id")
906 0                 action = task["params"].get("action")
907 0                 context = {action: action}
908 0                 target_vim.action_vminstance(vim_vm_id, context)
909                 # created = True
910 0             ro_vim_item_update = {
911                 "vim_id": vim_vm_id,
912                 "vim_status": "DONE",
913                 "created": created,
914                 "created_items": created_items,
915                 "vim_details": None,
916                 "vim_message": None,
917             }
918 0             self.logger.debug(
919                 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
920             )
921 0             return "DONE", ro_vim_item_update, db_task_update
922 0         except (vimconn.VimConnException, NsWorkerException) as e:
923 0             self.logger.error(
924                 "task={} vim={} VM Migration:"
925                 " {}".format(task_id, ro_task["target_id"], e)
926             )
927 0             ro_vim_item_update = {
928                 "vim_status": "VIM_ERROR",
929                 "created": created,
930                 "vim_message": str(e),
931             }
932
933 0             return "FAILED", ro_vim_item_update, db_task_update
934
935
936 1 class VimInteractionSdnNet(VimInteractionBase):
937 1     @staticmethod
938 1     def _match_pci(port_pci, mapping):
939         """
940         Check if port_pci matches with mapping
941         mapping can have brackets to indicate that several chars are accepted. e.g
942         pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
943         :param port_pci: text
944         :param mapping: text, can contain brackets to indicate several chars are available
945         :return: True if matches, False otherwise
946         """
947 0         if not port_pci or not mapping:
948 0             return False
949 0         if port_pci == mapping:
950 0             return True
951
952 0         mapping_index = 0
953 0         pci_index = 0
954         while True:
955 0             bracket_start = mapping.find("[", mapping_index)
956
957 0             if bracket_start == -1:
958 0                 break
959
960 0             bracket_end = mapping.find("]", bracket_start)
961 0             if bracket_end == -1:
962 0                 break
963
964 0             length = bracket_start - mapping_index
965 0             if (
966                 length
967                 and port_pci[pci_index : pci_index + length]
968                 != mapping[mapping_index:bracket_start]
969             ):
970 0                 return False
971
972 0             if (
973                 port_pci[pci_index + length]
974                 not in mapping[bracket_start + 1 : bracket_end]
975             ):
976 0                 return False
977
978 0             pci_index += length + 1
979 0             mapping_index = bracket_end + 1
980
981 0         if port_pci[pci_index:] != mapping[mapping_index:]:
982 0             return False
983
984 0         return True
985
986 1     def _get_interfaces(self, vlds_to_connect, vim_account_id):
987         """
988         :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
989         :param vim_account_id:
990         :return:
991         """
992 0         interfaces = []
993
994 0         for vld in vlds_to_connect:
995 0             table, _, db_id = vld.partition(":")
996 0             db_id, _, vld = db_id.partition(":")
997 0             _, _, vld_id = vld.partition(".")
998
999 0             if table == "vnfrs":
1000 0                 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1001 0                 iface_key = "vnf-vld-id"
1002             else:  # table == "nsrs"
1003 0                 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1004 0                 iface_key = "ns-vld-id"
1005
1006 0             db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1007
1008 0             for db_vnfr in db_vnfrs:
1009 0                 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1010 0                     for iface_index, interface in enumerate(vdur["interfaces"]):
1011 0                         if interface.get(iface_key) == vld_id and interface.get(
1012                             "type"
1013                         ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1014                             # only SR-IOV o PT
1015 0                             interface_ = interface.copy()
1016 0                             interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1017                                 db_vnfr["_id"], vdu_index, iface_index
1018                             )
1019
1020 0                             if vdur.get("status") == "ERROR":
1021 0                                 interface_["status"] = "ERROR"
1022
1023 0                             interfaces.append(interface_)
1024
1025 0         return interfaces
1026
1027 1     def refresh(self, ro_task):
1028         # look for task create
1029 0         task_create_index, _ = next(
1030             i_t
1031             for i_t in enumerate(ro_task["tasks"])
1032             if i_t[1]
1033             and i_t[1]["action"] == "CREATE"
1034             and i_t[1]["status"] != "FINISHED"
1035         )
1036
1037 0         return self.new(ro_task, task_create_index, None)
1038
1039 1     def new(self, ro_task, task_index, task_depends):
1040 0         task = ro_task["tasks"][task_index]
1041 0         task_id = task["task_id"]
1042 0         target_vim = self.my_vims[ro_task["target_id"]]
1043
1044 0         sdn_net_id = ro_task["vim_info"]["vim_id"]
1045
1046 0         created_items = ro_task["vim_info"].get("created_items")
1047 0         connected_ports = ro_task["vim_info"].get("connected_ports", [])
1048 0         new_connected_ports = []
1049 0         last_update = ro_task["vim_info"].get("last_update", 0)
1050 0         sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1051 0         error_list = []
1052 0         created = ro_task["vim_info"].get("created", False)
1053
1054 0         try:
1055             # CREATE
1056 0             params = task["params"]
1057 0             vlds_to_connect = params.get("vlds", [])
1058 0             associated_vim = params.get("target_vim")
1059             # external additional ports
1060 0             additional_ports = params.get("sdn-ports") or ()
1061 0             _, _, vim_account_id = (
1062                 (None, None, None)
1063                 if associated_vim is None
1064                 else associated_vim.partition(":")
1065             )
1066
1067 0             if associated_vim:
1068                 # get associated VIM
1069 0                 if associated_vim not in self.db_vims:
1070 0                     self.db_vims[associated_vim] = self.db.get_one(
1071                         "vim_accounts", {"_id": vim_account_id}
1072                     )
1073
1074 0                 db_vim = self.db_vims[associated_vim]
1075
1076             # look for ports to connect
1077 0             ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1078             # print(ports)
1079
1080 0             sdn_ports = []
1081 0             pending_ports = error_ports = 0
1082 0             vlan_used = None
1083 0             sdn_need_update = False
1084
1085 0             for port in ports:
1086 0                 vlan_used = port.get("vlan") or vlan_used
1087
1088                 # TODO. Do not connect if already done
1089 0                 if not port.get("compute_node") or not port.get("pci"):
1090 0                     if port.get("status") == "ERROR":
1091 0                         error_ports += 1
1092                     else:
1093 0                         pending_ports += 1
1094 0                     continue
1095
1096 0                 pmap = None
1097 0                 compute_node_mappings = next(
1098                     (
1099                         c
1100                         for c in db_vim["config"].get("sdn-port-mapping", ())
1101                         if c and c["compute_node"] == port["compute_node"]
1102                     ),
1103                     None,
1104                 )
1105
1106 0                 if compute_node_mappings:
1107                     # process port_mapping pci of type 0000:af:1[01].[1357]
1108 0                     pmap = next(
1109                         (
1110                             p
1111                             for p in compute_node_mappings["ports"]
1112                             if self._match_pci(port["pci"], p.get("pci"))
1113                         ),
1114                         None,
1115                     )
1116
1117 0                 if not pmap:
1118 0                     if not db_vim["config"].get("mapping_not_needed"):
1119 0                         error_list.append(
1120                             "Port mapping not found for compute_node={} pci={}".format(
1121                                 port["compute_node"], port["pci"]
1122                             )
1123                         )
1124 0                         continue
1125
1126 0                     pmap = {}
1127
1128 0                 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1129 0                 new_port = {
1130                     "service_endpoint_id": pmap.get("service_endpoint_id")
1131                     or service_endpoint_id,
1132                     "service_endpoint_encapsulation_type": "dot1q"
1133                     if port["type"] == "SR-IOV"
1134                     else None,
1135                     "service_endpoint_encapsulation_info": {
1136                         "vlan": port.get("vlan"),
1137                         "mac": port.get("mac-address"),
1138                         "device_id": pmap.get("device_id") or port["compute_node"],
1139                         "device_interface_id": pmap.get("device_interface_id")
1140                         or port["pci"],
1141                         "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1142                         "switch_port": pmap.get("switch_port"),
1143                         "service_mapping_info": pmap.get("service_mapping_info"),
1144                     },
1145                 }
1146
1147                 # TODO
1148                 # if port["modified_at"] > last_update:
1149                 #     sdn_need_update = True
1150 0                 new_connected_ports.append(port["id"])  # TODO
1151 0                 sdn_ports.append(new_port)
1152
1153 0             if error_ports:
1154 0                 error_list.append(
1155                     "{} interfaces have not been created as VDU is on ERROR status".format(
1156                         error_ports
1157                     )
1158                 )
1159
1160             # connect external ports
1161 0             for index, additional_port in enumerate(additional_ports):
1162 0                 additional_port_id = additional_port.get(
1163                     "service_endpoint_id"
1164                 ) or "external-{}".format(index)
1165 0                 sdn_ports.append(
1166                     {
1167                         "service_endpoint_id": additional_port_id,
1168                         "service_endpoint_encapsulation_type": additional_port.get(
1169                             "service_endpoint_encapsulation_type", "dot1q"
1170                         ),
1171                         "service_endpoint_encapsulation_info": {
1172                             "vlan": additional_port.get("vlan") or vlan_used,
1173                             "mac": additional_port.get("mac_address"),
1174                             "device_id": additional_port.get("device_id"),
1175                             "device_interface_id": additional_port.get(
1176                                 "device_interface_id"
1177                             ),
1178                             "switch_dpid": additional_port.get("switch_dpid")
1179                             or additional_port.get("switch_id"),
1180                             "switch_port": additional_port.get("switch_port"),
1181                             "service_mapping_info": additional_port.get(
1182                                 "service_mapping_info"
1183                             ),
1184                         },
1185                     }
1186                 )
1187 0                 new_connected_ports.append(additional_port_id)
1188 0             sdn_info = ""
1189
1190             # if there are more ports to connect or they have been modified, call create/update
1191 0             if error_list:
1192 0                 sdn_status = "ERROR"
1193 0                 sdn_info = "; ".join(error_list)
1194 0             elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1195 0                 last_update = time.time()
1196
1197 0                 if not sdn_net_id:
1198 0                     if len(sdn_ports) < 2:
1199 0                         sdn_status = "ACTIVE"
1200
1201 0                         if not pending_ports:
1202 0                             self.logger.debug(
1203                                 "task={} {} new-sdn-net done, less than 2 ports".format(
1204                                     task_id, ro_task["target_id"]
1205                                 )
1206                             )
1207                     else:
1208 0                         net_type = params.get("type") or "ELAN"
1209 0                         (
1210                             sdn_net_id,
1211                             created_items,
1212                         ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1213 0                         created = True
1214 0                         self.logger.debug(
1215                             "task={} {} new-sdn-net={} created={}".format(
1216                                 task_id, ro_task["target_id"], sdn_net_id, created
1217                             )
1218                         )
1219                 else:
1220 0                     created_items = target_vim.edit_connectivity_service(
1221                         sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1222                     )
1223 0                     created = True
1224 0                     self.logger.debug(
1225                         "task={} {} update-sdn-net={} created={}".format(
1226                             task_id, ro_task["target_id"], sdn_net_id, created
1227                         )
1228                     )
1229
1230 0                 connected_ports = new_connected_ports
1231 0             elif sdn_net_id:
1232 0                 wim_status_dict = target_vim.get_connectivity_service_status(
1233                     sdn_net_id, conn_info=created_items
1234                 )
1235 0                 sdn_status = wim_status_dict["sdn_status"]
1236
1237 0                 if wim_status_dict.get("sdn_info"):
1238 0                     sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1239
1240 0                 if wim_status_dict.get("error_msg"):
1241 0                     sdn_info = wim_status_dict.get("error_msg") or ""
1242
1243 0             if pending_ports:
1244 0                 if sdn_status != "ERROR":
1245 0                     sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1246                         len(ports) - pending_ports, len(ports)
1247                     )
1248
1249 0                 if sdn_status == "ACTIVE":
1250 0                     sdn_status = "BUILD"
1251
1252 0             ro_vim_item_update = {
1253                 "vim_id": sdn_net_id,
1254                 "vim_status": sdn_status,
1255                 "created": created,
1256                 "created_items": created_items,
1257                 "connected_ports": connected_ports,
1258                 "vim_details": sdn_info,
1259                 "vim_message": None,
1260                 "last_update": last_update,
1261             }
1262
1263 0             return sdn_status, ro_vim_item_update
1264 0         except Exception as e:
1265 0             self.logger.error(
1266                 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1267                 exc_info=not isinstance(
1268                     e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1269                 ),
1270             )
1271 0             ro_vim_item_update = {
1272                 "vim_status": "VIM_ERROR",
1273                 "created": created,
1274                 "vim_message": str(e),
1275             }
1276
1277 0             return "FAILED", ro_vim_item_update
1278
1279 1     def delete(self, ro_task, task_index):
1280 0         task = ro_task["tasks"][task_index]
1281 0         task_id = task["task_id"]
1282 0         sdn_vim_id = ro_task["vim_info"].get("vim_id")
1283 0         ro_vim_item_update_ok = {
1284             "vim_status": "DELETED",
1285             "created": False,
1286             "vim_message": "DELETED",
1287             "vim_id": None,
1288         }
1289
1290 0         try:
1291 0             if sdn_vim_id:
1292 0                 target_vim = self.my_vims[ro_task["target_id"]]
1293 0                 target_vim.delete_connectivity_service(
1294                     sdn_vim_id, ro_task["vim_info"].get("created_items")
1295                 )
1296
1297 0         except Exception as e:
1298 0             if (
1299                 isinstance(e, sdnconn.SdnConnectorError)
1300                 and e.http_code == HTTPStatus.NOT_FOUND.value
1301             ):
1302 0                 ro_vim_item_update_ok["vim_message"] = "already deleted"
1303             else:
1304 0                 self.logger.error(
1305                     "ro_task={} vim={} del-sdn-net={}: {}".format(
1306                         ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1307                     ),
1308                     exc_info=not isinstance(
1309                         e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1310                     ),
1311                 )
1312 0                 ro_vim_item_update = {
1313                     "vim_status": "VIM_ERROR",
1314                     "vim_message": "Error while deleting: {}".format(e),
1315                 }
1316
1317 0                 return "FAILED", ro_vim_item_update
1318
1319 0         self.logger.debug(
1320             "task={} {} del-sdn-net={} {}".format(
1321                 task_id,
1322                 ro_task["target_id"],
1323                 sdn_vim_id,
1324                 ro_vim_item_update_ok.get("vim_message", ""),
1325             )
1326         )
1327
1328 0         return "DONE", ro_vim_item_update_ok
1329
1330
1331 1 class VimInteractionMigration(VimInteractionBase):
1332 1     def exec(self, ro_task, task_index, task_depends):
1333 1         task = ro_task["tasks"][task_index]
1334 1         task_id = task["task_id"]
1335 1         db_task_update = {"retries": 0}
1336 1         target_vim = self.my_vims[ro_task["target_id"]]
1337 1         vim_interfaces = []
1338 1         created = False
1339 1         created_items = {}
1340 1         refreshed_vim_info = {}
1341
1342 1         try:
1343 1             if task.get("params"):
1344 1                 vim_vm_id = task["params"].get("vim_vm_id")
1345 1                 migrate_host = task["params"].get("migrate_host")
1346 1                 _, migrated_compute_node = target_vim.migrate_instance(
1347                     vim_vm_id, migrate_host
1348                 )
1349
1350 1                 if migrated_compute_node:
1351                     # When VM is migrated, vdu["vim_info"] needs to be updated
1352 1                     vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1353                         ro_task["target_id"]
1354                     )
1355
1356                     # Refresh VM to get new vim_info
1357 1                     vm_to_refresh_list = [vim_vm_id]
1358 1                     vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1359 1                     refreshed_vim_info = vim_dict[vim_vm_id]
1360
1361 1                     if refreshed_vim_info.get("interfaces"):
1362 1                         for old_iface in vdu_old_vim_info.get("interfaces"):
1363 0                             iface = next(
1364                                 (
1365                                     iface
1366                                     for iface in refreshed_vim_info["interfaces"]
1367                                     if old_iface["vim_interface_id"]
1368                                     == iface["vim_interface_id"]
1369                                 ),
1370                                 None,
1371                             )
1372 0                             vim_interfaces.append(iface)
1373
1374 1             ro_vim_item_update = {
1375                 "vim_id": vim_vm_id,
1376                 "vim_status": "ACTIVE",
1377                 "created": created,
1378                 "created_items": created_items,
1379                 "vim_details": None,
1380                 "vim_message": None,
1381             }
1382
1383 1             if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1384                 "ERROR",
1385                 "VIM_ERROR",
1386             ):
1387 1                 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1388
1389 1             if vim_interfaces:
1390 0                 ro_vim_item_update["interfaces"] = vim_interfaces
1391
1392 1             self.logger.debug(
1393                 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1394             )
1395
1396 1             return "DONE", ro_vim_item_update, db_task_update
1397
1398 0         except (vimconn.VimConnException, NsWorkerException) as e:
1399 0             self.logger.error(
1400                 "task={} vim={} VM Migration:"
1401                 " {}".format(task_id, ro_task["target_id"], e)
1402             )
1403 0             ro_vim_item_update = {
1404                 "vim_status": "VIM_ERROR",
1405                 "created": created,
1406                 "vim_message": str(e),
1407             }
1408
1409 0             return "FAILED", ro_vim_item_update, db_task_update
1410
1411
1412 1 class VimInteractionResize(VimInteractionBase):
1413 1     def exec(self, ro_task, task_index, task_depends):
1414 1         task = ro_task["tasks"][task_index]
1415 1         task_id = task["task_id"]
1416 1         db_task_update = {"retries": 0}
1417 1         created = False
1418 1         target_flavor_uuid = None
1419 1         created_items = {}
1420 1         refreshed_vim_info = {}
1421 1         target_vim = self.my_vims[ro_task["target_id"]]
1422
1423 1         try:
1424 1             if task.get("params"):
1425 1                 vim_vm_id = task["params"].get("vim_vm_id")
1426 1                 flavor_dict = task["params"].get("flavor_dict")
1427 1                 self.logger.info("flavor_dict %s", flavor_dict)
1428
1429 1                 try:
1430 1                     target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1431 0                 except Exception as e:
1432 0                     self.logger.info("Cannot find any flavor matching  %s.", str(e))
1433 0                     try:
1434 0                         target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1435 0                     except Exception as e:
1436 0                         self.logger.error("Error creating flavor at VIM  %s.", str(e))
1437
1438 1                 if target_flavor_uuid is not None:
1439 1                     resized_status = target_vim.resize_instance(
1440                         vim_vm_id, target_flavor_uuid
1441                     )
1442
1443 1                     if resized_status:
1444                         # Refresh VM to get new vim_info
1445 1                         vm_to_refresh_list = [vim_vm_id]
1446 1                         vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1447 1                         refreshed_vim_info = vim_dict[vim_vm_id]
1448
1449 1             ro_vim_item_update = {
1450                 "vim_id": vim_vm_id,
1451                 "vim_status": "DONE",
1452                 "created": created,
1453                 "created_items": created_items,
1454                 "vim_details": None,
1455                 "vim_message": None,
1456             }
1457
1458 1             if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1459                 "ERROR",
1460                 "VIM_ERROR",
1461             ):
1462 1                 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1463
1464 1             self.logger.debug(
1465                 "task={} {} resize done".format(task_id, ro_task["target_id"])
1466             )
1467 1             return "DONE", ro_vim_item_update, db_task_update
1468 0         except (vimconn.VimConnException, NsWorkerException) as e:
1469 0             self.logger.error(
1470                 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1471             )
1472 0             ro_vim_item_update = {
1473                 "vim_status": "VIM_ERROR",
1474                 "created": created,
1475                 "vim_message": str(e),
1476             }
1477
1478 0             return "FAILED", ro_vim_item_update, db_task_update
1479
1480
1481 1 class ConfigValidate:
1482 1     def __init__(self, config: Dict):
1483 1         self.conf = config
1484
1485 1     @property
1486 1     def active(self):
1487         # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1488 1         if (
1489             self.conf["period"]["refresh_active"] >= 60
1490             or self.conf["period"]["refresh_active"] == -1
1491         ):
1492 1             return self.conf["period"]["refresh_active"]
1493
1494 1         return 60
1495
1496 1     @property
1497 1     def build(self):
1498 1         return self.conf["period"]["refresh_build"]
1499
1500 1     @property
1501 1     def image(self):
1502 1         return self.conf["period"]["refresh_image"]
1503
1504 1     @property
1505 1     def error(self):
1506 1         return self.conf["period"]["refresh_error"]
1507
1508 1     @property
1509 1     def queue_size(self):
1510 1         return self.conf["period"]["queue_size"]
1511
1512
1513 1 class NsWorker(threading.Thread):
1514 1     def __init__(self, worker_index, config, plugins, db):
1515         """
1516         :param worker_index: thread index
1517         :param config: general configuration of RO, among others the process_id with the docker id where it runs
1518         :param plugins: global shared dict with the loaded plugins
1519         :param db: database class instance to use
1520         """
1521 1         threading.Thread.__init__(self)
1522 1         self.config = config
1523 1         self.plugins = plugins
1524 1         self.plugin_name = "unknown"
1525 1         self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1526 1         self.worker_index = worker_index
1527         # refresh periods for created items
1528 1         self.refresh_config = ConfigValidate(config)
1529 1         self.task_queue = queue.Queue(self.refresh_config.queue_size)
1530         # targetvim: vimplugin class
1531 1         self.my_vims = {}
1532         # targetvim: vim information from database
1533 1         self.db_vims = {}
1534         # targetvim list
1535 1         self.vim_targets = []
1536 1         self.my_id = config["process_id"] + ":" + str(worker_index)
1537 1         self.db = db
1538 1         self.item2class = {
1539             "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1540             "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1541             "image": VimInteractionImage(
1542                 self.db, self.my_vims, self.db_vims, self.logger
1543             ),
1544             "flavor": VimInteractionFlavor(
1545                 self.db, self.my_vims, self.db_vims, self.logger
1546             ),
1547             "sdn_net": VimInteractionSdnNet(
1548                 self.db, self.my_vims, self.db_vims, self.logger
1549             ),
1550             "update": VimInteractionUpdateVdu(
1551                 self.db, self.my_vims, self.db_vims, self.logger
1552             ),
1553             "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1554                 self.db, self.my_vims, self.db_vims, self.logger
1555             ),
1556             "migrate": VimInteractionMigration(
1557                 self.db, self.my_vims, self.db_vims, self.logger
1558             ),
1559             "verticalscale": VimInteractionResize(
1560                 self.db, self.my_vims, self.db_vims, self.logger
1561             ),
1562         }
1563 1         self.time_last_task_processed = None
1564         # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1565 1         self.tasks_to_delete = []
1566         # it is idle when there are not vim_targets associated
1567 1         self.idle = True
1568 1         self.task_locked_time = config["global"]["task_locked_time"]
1569
1570 1     def insert_task(self, task):
1571 0         try:
1572 0             self.task_queue.put(task, False)
1573 0             return None
1574 0         except queue.Full:
1575 0             raise NsWorkerException("timeout inserting a task")
1576
1577 1     def terminate(self):
1578 0         self.insert_task("exit")
1579
1580 1     def del_task(self, task):
1581 0         with self.task_lock:
1582 0             if task["status"] == "SCHEDULED":
1583 0                 task["status"] = "SUPERSEDED"
1584 0                 return True
1585             else:  # task["status"] == "processing"
1586 0                 self.task_lock.release()
1587 0                 return False
1588
1589 1     def _process_vim_config(self, target_id, db_vim):
1590         """
1591         Process vim config, creating vim configuration files as ca_cert
1592         :param target_id: vim/sdn/wim + id
1593         :param db_vim: Vim dictionary obtained from database
1594         :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1595         """
1596 0         if not db_vim.get("config"):
1597 0             return
1598
1599 0         file_name = ""
1600
1601 0         try:
1602 0             if db_vim["config"].get("ca_cert_content"):
1603 0                 file_name = "{}:{}".format(target_id, self.worker_index)
1604
1605 0                 try:
1606 0                     mkdir(file_name)
1607 0                 except FileExistsError:
1608 0                     self.logger.exception(
1609                         "FileExistsError occured while processing vim_config."
1610                     )
1611
1612 0                 file_name = file_name + "/ca_cert"
1613
1614 0                 with open(file_name, "w") as f:
1615 0                     f.write(db_vim["config"]["ca_cert_content"])
1616 0                     del db_vim["config"]["ca_cert_content"]
1617 0                     db_vim["config"]["ca_cert"] = file_name
1618 0         except Exception as e:
1619 0             raise NsWorkerException(
1620                 "Error writing to file '{}': {}".format(file_name, e)
1621             )
1622
1623 1     def _load_plugin(self, name, type="vim"):
1624         # type can be vim or sdn
1625 0         if "rovim_dummy" not in self.plugins:
1626 0             self.plugins["rovim_dummy"] = VimDummyConnector
1627
1628 0         if "rosdn_dummy" not in self.plugins:
1629 0             self.plugins["rosdn_dummy"] = SdnDummyConnector
1630
1631 0         if name in self.plugins:
1632 0             return self.plugins[name]
1633
1634 0         try:
1635 0             for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1636 0                 self.plugins[name] = ep.load()
1637 0         except Exception as e:
1638 0             raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1639
1640 0         if name and name not in self.plugins:
1641 0             raise NsWorkerException(
1642                 "Plugin 'osm_{n}' has not been installed".format(n=name)
1643             )
1644
1645 0         return self.plugins[name]
1646
1647 1     def _unload_vim(self, target_id):
1648         """
1649         Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1650         :param target_id: Contains type:_id; where type can be 'vim', ...
1651         :return: None.
1652         """
1653 0         try:
1654 0             self.db_vims.pop(target_id, None)
1655 0             self.my_vims.pop(target_id, None)
1656
1657 0             if target_id in self.vim_targets:
1658 0                 self.vim_targets.remove(target_id)
1659
1660 0             self.logger.info("Unloaded {}".format(target_id))
1661 0             rmtree("{}:{}".format(target_id, self.worker_index))
1662 0         except FileNotFoundError:
1663             # This is raised by rmtree if folder does not exist.
1664 0             self.logger.exception("FileNotFoundError occured while unloading VIM.")
1665 0         except Exception as e:
1666 0             self.logger.error("Cannot unload {}: {}".format(target_id, e))
1667
1668 1     def _check_vim(self, target_id):
1669         """
1670         Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1671         :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1672         :return: None.
1673         """
1674 0         target, _, _id = target_id.partition(":")
1675 0         now = time.time()
1676 0         update_dict = {}
1677 0         unset_dict = {}
1678 0         op_text = ""
1679 0         step = ""
1680 0         loaded = target_id in self.vim_targets
1681 0         target_database = (
1682             "vim_accounts"
1683             if target == "vim"
1684             else "wim_accounts"
1685             if target == "wim"
1686             else "sdns"
1687         )
1688
1689 0         try:
1690 0             step = "Getting {} from db".format(target_id)
1691 0             db_vim = self.db.get_one(target_database, {"_id": _id})
1692
1693 0             for op_index, operation in enumerate(
1694                 db_vim["_admin"].get("operations", ())
1695             ):
1696 0                 if operation["operationState"] != "PROCESSING":
1697 0                     continue
1698
1699 0                 locked_at = operation.get("locked_at")
1700
1701 0                 if locked_at is not None and locked_at >= now - self.task_locked_time:
1702                     # some other thread is doing this operation
1703 0                     return
1704
1705                 # lock
1706 0                 op_text = "_admin.operations.{}.".format(op_index)
1707
1708 0                 if not self.db.set_one(
1709                     target_database,
1710                     q_filter={
1711                         "_id": _id,
1712                         op_text + "operationState": "PROCESSING",
1713                         op_text + "locked_at": locked_at,
1714                     },
1715                     update_dict={
1716                         op_text + "locked_at": now,
1717                         "admin.current_operation": op_index,
1718                     },
1719                     fail_on_empty=False,
1720                 ):
1721 0                     return
1722
1723 0                 unset_dict[op_text + "locked_at"] = None
1724 0                 unset_dict["current_operation"] = None
1725 0                 step = "Loading " + target_id
1726 0                 error_text = self._load_vim(target_id)
1727
1728 0                 if not error_text:
1729 0                     step = "Checking connectivity"
1730
1731 0                     if target == "vim":
1732 0                         self.my_vims[target_id].check_vim_connectivity()
1733                     else:
1734 0                         self.my_vims[target_id].check_credentials()
1735
1736 0                 update_dict["_admin.operationalState"] = "ENABLED"
1737 0                 update_dict["_admin.detailed-status"] = ""
1738 0                 unset_dict[op_text + "detailed-status"] = None
1739 0                 update_dict[op_text + "operationState"] = "COMPLETED"
1740
1741 0                 return
1742
1743 0         except Exception as e:
1744 0             error_text = "{}: {}".format(step, e)
1745 0             self.logger.error("{} for {}: {}".format(step, target_id, e))
1746
1747         finally:
1748 0             if update_dict or unset_dict:
1749 0                 if error_text:
1750 0                     update_dict[op_text + "operationState"] = "FAILED"
1751 0                     update_dict[op_text + "detailed-status"] = error_text
1752 0                     unset_dict.pop(op_text + "detailed-status", None)
1753 0                     update_dict["_admin.operationalState"] = "ERROR"
1754 0                     update_dict["_admin.detailed-status"] = error_text
1755
1756 0                 if op_text:
1757 0                     update_dict[op_text + "statusEnteredTime"] = now
1758
1759 0                 self.db.set_one(
1760                     target_database,
1761                     q_filter={"_id": _id},
1762                     update_dict=update_dict,
1763                     unset=unset_dict,
1764                     fail_on_empty=False,
1765                 )
1766
1767 0             if not loaded:
1768 0                 self._unload_vim(target_id)
1769
1770 1     def _reload_vim(self, target_id):
1771 0         if target_id in self.vim_targets:
1772 0             self._load_vim(target_id)
1773         else:
1774             # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1775             # just remove it to force load again next time it is needed
1776 0             self.db_vims.pop(target_id, None)
1777
1778 1     def _load_vim(self, target_id):
1779         """
1780         Load or reload a vim_account, sdn_controller or wim_account.
1781         Read content from database, load the plugin if not loaded.
1782         In case of error loading the plugin, it load a failing VIM_connector
1783         It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1784         :param target_id: Contains type:_id; where type can be 'vim', ...
1785         :return: None if ok, descriptive text if error
1786         """
1787 0         target, _, _id = target_id.partition(":")
1788 0         target_database = (
1789             "vim_accounts"
1790             if target == "vim"
1791             else "wim_accounts"
1792             if target == "wim"
1793             else "sdns"
1794         )
1795 0         plugin_name = ""
1796 0         vim = None
1797
1798 0         try:
1799 0             step = "Getting {}={} from db".format(target, _id)
1800             # TODO process for wim, sdnc, ...
1801 0             vim = self.db.get_one(target_database, {"_id": _id})
1802
1803             # if deep_get(vim, "config", "sdn-controller"):
1804             #     step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1805             #     db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1806
1807 0             step = "Decrypting password"
1808 0             schema_version = vim.get("schema_version")
1809 0             self.db.encrypt_decrypt_fields(
1810                 vim,
1811                 "decrypt",
1812                 fields=("password", "secret"),
1813                 schema_version=schema_version,
1814                 salt=_id,
1815             )
1816 0             self._process_vim_config(target_id, vim)
1817
1818 0             if target == "vim":
1819 0                 plugin_name = "rovim_" + vim["vim_type"]
1820 0                 step = "Loading plugin '{}'".format(plugin_name)
1821 0                 vim_module_conn = self._load_plugin(plugin_name)
1822 0                 step = "Loading {}'".format(target_id)
1823 0                 self.my_vims[target_id] = vim_module_conn(
1824                     uuid=vim["_id"],
1825                     name=vim["name"],
1826                     tenant_id=vim.get("vim_tenant_id"),
1827                     tenant_name=vim.get("vim_tenant_name"),
1828                     url=vim["vim_url"],
1829                     url_admin=None,
1830                     user=vim["vim_user"],
1831                     passwd=vim["vim_password"],
1832                     config=vim.get("config") or {},
1833                     persistent_info={},
1834                 )
1835             else:  # sdn
1836 0                 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
1837 0                 step = "Loading plugin '{}'".format(plugin_name)
1838 0                 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1839 0                 step = "Loading {}'".format(target_id)
1840 0                 wim = deepcopy(vim)
1841 0                 wim_config = wim.pop("config", {}) or {}
1842 0                 wim["uuid"] = wim["_id"]
1843 0                 if "url" in wim and "wim_url" not in wim:
1844 0                     wim["wim_url"] = wim["url"]
1845 0                 elif "url" not in wim and "wim_url" in wim:
1846 0                     wim["url"] = wim["wim_url"]
1847
1848 0                 if wim.get("dpid"):
1849 0                     wim_config["dpid"] = wim.pop("dpid")
1850
1851 0                 if wim.get("switch_id"):
1852 0                     wim_config["switch_id"] = wim.pop("switch_id")
1853
1854                 # wim, wim_account, config
1855 0                 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1856 0             self.db_vims[target_id] = vim
1857 0             self.error_status = None
1858
1859 0             self.logger.info(
1860                 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1861             )
1862 0         except Exception as e:
1863 0             self.logger.error(
1864                 "Cannot load {} plugin={}: {} {}".format(
1865                     target_id, plugin_name, step, e
1866                 )
1867             )
1868
1869 0             self.db_vims[target_id] = vim or {}
1870 0             self.db_vims[target_id] = FailingConnector(str(e))
1871 0             error_status = "{} Error: {}".format(step, e)
1872
1873 0             return error_status
1874         finally:
1875 0             if target_id not in self.vim_targets:
1876 0                 self.vim_targets.append(target_id)
1877
1878 1     def _get_db_task(self):
1879         """
1880         Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1881         :return: None
1882         """
1883 0         now = time.time()
1884
1885 0         if not self.time_last_task_processed:
1886 0             self.time_last_task_processed = now
1887
1888 0         try:
1889             while True:
1890                 """
1891                 # Log RO tasks only when loglevel is DEBUG
1892                 if self.logger.getEffectiveLevel() == logging.DEBUG:
1893                     self._log_ro_task(
1894                         None,
1895                         None,
1896                         None,
1897                         "TASK_WF",
1898                         "task_locked_time="
1899                         + str(self.task_locked_time)
1900                         + " "
1901                         + "time_last_task_processed="
1902                         + str(self.time_last_task_processed)
1903                         + " "
1904                         + "now="
1905                         + str(now),
1906                     )
1907                 """
1908 0                 locked = self.db.set_one(
1909                     "ro_tasks",
1910                     q_filter={
1911                         "target_id": self.vim_targets,
1912                         "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1913                         "locked_at.lt": now - self.task_locked_time,
1914                         "to_check_at.lt": self.time_last_task_processed,
1915                         "to_check_at.gt": -1,
1916                     },
1917                     update_dict={"locked_by": self.my_id, "locked_at": now},
1918                     fail_on_empty=False,
1919                 )
1920
1921 0                 if locked:
1922                     # read and return
1923 0                     ro_task = self.db.get_one(
1924                         "ro_tasks",
1925                         q_filter={
1926                             "target_id": self.vim_targets,
1927                             "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1928                             "locked_at": now,
1929                         },
1930                     )
1931 0                     return ro_task
1932
1933 0                 if self.time_last_task_processed == now:
1934 0                     self.time_last_task_processed = None
1935 0                     return None
1936                 else:
1937 0                     self.time_last_task_processed = now
1938                     # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1939
1940 0         except DbException as e:
1941 0             self.logger.error("Database exception at _get_db_task: {}".format(e))
1942 0         except Exception as e:
1943 0             self.logger.critical(
1944                 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1945             )
1946
1947 0         return None
1948
1949 1     def _get_db_all_tasks(self):
1950         """
1951         Read all content of table ro_tasks to log it
1952         :return: None
1953         """
1954 0         try:
1955             # Checking the content of the BD:
1956
1957             # read and return
1958 0             ro_task = self.db.get_list("ro_tasks")
1959 0             for rt in ro_task:
1960 0                 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1961 0             return ro_task
1962
1963 0         except DbException as e:
1964 0             self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1965 0         except Exception as e:
1966 0             self.logger.critical(
1967                 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1968             )
1969
1970 0         return None
1971
1972 1     def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1973         """
1974         Generate a log with the following format:
1975
1976         Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1977         target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1978         task_array_index;task_id;task_action;task_item;task_args
1979
1980         Example:
1981
1982         TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1983         1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1984         ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1985         'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1986         'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1987         888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1988         CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1989         """
1990 0         try:
1991 0             line = []
1992 0             i = 0
1993 0             if ro_task is not None and isinstance(ro_task, dict):
1994 0                 for t in ro_task["tasks"]:
1995 0                     line.clear()
1996 0                     line.append(mark)
1997 0                     line.append(event)
1998 0                     line.append(ro_task.get("_id", ""))
1999 0                     line.append(str(ro_task.get("locked_at", "")))
2000 0                     line.append(str(ro_task.get("modified_at", "")))
2001 0                     line.append(str(ro_task.get("created_at", "")))
2002 0                     line.append(str(ro_task.get("to_check_at", "")))
2003 0                     line.append(str(ro_task.get("locked_by", "")))
2004 0                     line.append(str(ro_task.get("target_id", "")))
2005 0                     line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
2006 0                     line.append(str(ro_task.get("vim_info", "")))
2007 0                     line.append(str(ro_task.get("tasks", "")))
2008 0                     if isinstance(t, dict):
2009 0                         line.append(str(t.get("status", "")))
2010 0                         line.append(str(t.get("action_id", "")))
2011 0                         line.append(str(i))
2012 0                         line.append(str(t.get("task_id", "")))
2013 0                         line.append(str(t.get("action", "")))
2014 0                         line.append(str(t.get("item", "")))
2015 0                         line.append(str(t.get("find_params", "")))
2016 0                         line.append(str(t.get("params", "")))
2017                     else:
2018 0                         line.extend([""] * 2)
2019 0                         line.append(str(i))
2020 0                         line.extend([""] * 5)
2021
2022 0                     i += 1
2023 0                     self.logger.debug(";".join(line))
2024 0             elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
2025 0                 i = 0
2026                 while True:
2027 0                     st = "tasks.{}.status".format(i)
2028 0                     if st not in db_ro_task_update:
2029 0                         break
2030 0                     line.clear()
2031 0                     line.append(mark)
2032 0                     line.append(event)
2033 0                     line.append(db_ro_task_update.get("_id", ""))
2034 0                     line.append(str(db_ro_task_update.get("locked_at", "")))
2035 0                     line.append(str(db_ro_task_update.get("modified_at", "")))
2036 0                     line.append("")
2037 0                     line.append(str(db_ro_task_update.get("to_check_at", "")))
2038 0                     line.append(str(db_ro_task_update.get("locked_by", "")))
2039 0                     line.append("")
2040 0                     line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
2041 0                     line.append("")
2042 0                     line.append(str(db_ro_task_update.get("vim_info", "")))
2043 0                     line.append(str(str(db_ro_task_update).count(".status")))
2044 0                     line.append(db_ro_task_update.get(st, ""))
2045 0                     line.append("")
2046 0                     line.append(str(i))
2047 0                     line.extend([""] * 3)
2048 0                     i += 1
2049 0                     self.logger.debug(";".join(line))
2050
2051 0             elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
2052 0                 line.clear()
2053 0                 line.append(mark)
2054 0                 line.append(event)
2055 0                 line.append(db_ro_task_delete.get("_id", ""))
2056 0                 line.append("")
2057 0                 line.append(db_ro_task_delete.get("modified_at", ""))
2058 0                 line.extend([""] * 13)
2059 0                 self.logger.debug(";".join(line))
2060
2061             else:
2062 0                 line.clear()
2063 0                 line.append(mark)
2064 0                 line.append(event)
2065 0                 line.extend([""] * 16)
2066 0                 self.logger.debug(";".join(line))
2067
2068 0         except Exception as e:
2069 0             self.logger.error("Error logging ro_task: {}".format(e))
2070
2071 1     def _delete_task(self, ro_task, task_index, task_depends, db_update):
2072         """
2073         Determine if this task need to be done or superseded
2074         :return: None
2075         """
2076 0         my_task = ro_task["tasks"][task_index]
2077 0         task_id = my_task["task_id"]
2078 0         needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2079             "created_items", False
2080         )
2081
2082 0         self.logger.warning("Needed delete: {}".format(needed_delete))
2083 0         if my_task["status"] == "FAILED":
2084 0             return None, None  # TODO need to be retry??
2085
2086 0         try:
2087 0             for index, task in enumerate(ro_task["tasks"]):
2088 0                 if index == task_index or not task:
2089 0                     continue  # own task
2090
2091 0                 if (
2092                     my_task["target_record"] == task["target_record"]
2093                     and task["action"] == "CREATE"
2094                 ):
2095                     # set to finished
2096 0                     db_update["tasks.{}.status".format(index)] = task[
2097                         "status"
2098                     ] = "FINISHED"
2099 0                 elif task["action"] == "CREATE" and task["status"] not in (
2100                     "FINISHED",
2101                     "SUPERSEDED",
2102                 ):
2103 0                     needed_delete = False
2104
2105 0             if needed_delete:
2106 0                 self.logger.warning(
2107                     "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2108                 )
2109 0                 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2110             else:
2111 0                 return "SUPERSEDED", None
2112 0         except Exception as e:
2113 0             if not isinstance(e, NsWorkerException):
2114 0                 self.logger.critical(
2115                     "Unexpected exception at _delete_task task={}: {}".format(
2116                         task_id, e
2117                     ),
2118                     exc_info=True,
2119                 )
2120
2121 0             return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2122
2123 1     def _create_task(self, ro_task, task_index, task_depends, db_update):
2124         """
2125         Determine if this task need to create something at VIM
2126         :return: None
2127         """
2128 0         my_task = ro_task["tasks"][task_index]
2129 0         task_id = my_task["task_id"]
2130 0         task_status = None
2131
2132 0         if my_task["status"] == "FAILED":
2133 0             return None, None  # TODO need to be retry??
2134 0         elif my_task["status"] == "SCHEDULED":
2135             # check if already created by another task
2136 0             for index, task in enumerate(ro_task["tasks"]):
2137 0                 if index == task_index or not task:
2138 0                     continue  # own task
2139
2140 0                 if task["action"] == "CREATE" and task["status"] not in (
2141                     "SCHEDULED",
2142                     "FINISHED",
2143                     "SUPERSEDED",
2144                 ):
2145 0                     return task["status"], "COPY_VIM_INFO"
2146
2147 0             try:
2148 0                 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2149                     ro_task, task_index, task_depends
2150                 )
2151                 # TODO update other CREATE tasks
2152 0             except Exception as e:
2153 0                 if not isinstance(e, NsWorkerException):
2154 0                     self.logger.error(
2155                         "Error executing task={}: {}".format(task_id, e), exc_info=True
2156                     )
2157
2158 0                 task_status = "FAILED"
2159 0                 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2160                 # TODO update    ro_vim_item_update
2161
2162 0             return task_status, ro_vim_item_update
2163         else:
2164 0             return None, None
2165
2166 1     def _get_dependency(self, task_id, ro_task=None, target_id=None):
2167         """
2168         Look for dependency task
2169         :param task_id: Can be one of
2170             1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2171             2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2172             3. task.task_id: "<action_id>:number"
2173         :param ro_task:
2174         :param target_id:
2175         :return: database ro_task plus index of task
2176         """
2177 0         if (
2178             task_id.startswith("vim:")
2179             or task_id.startswith("sdn:")
2180             or task_id.startswith("wim:")
2181         ):
2182 0             target_id, _, task_id = task_id.partition(" ")
2183
2184 0         if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2185 0             ro_task_dependency = self.db.get_one(
2186                 "ro_tasks",
2187                 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2188                 fail_on_empty=False,
2189             )
2190
2191 0             if ro_task_dependency:
2192 0                 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2193 0                     if task["target_record_id"] == task_id:
2194 0                         return ro_task_dependency, task_index
2195
2196         else:
2197 0             if ro_task:
2198 0                 for task_index, task in enumerate(ro_task["tasks"]):
2199 0                     if task and task["task_id"] == task_id:
2200 0                         return ro_task, task_index
2201
2202 0             ro_task_dependency = self.db.get_one(
2203                 "ro_tasks",
2204                 q_filter={
2205                     "tasks.ANYINDEX.task_id": task_id,
2206                     "tasks.ANYINDEX.target_record.ne": None,
2207                 },
2208                 fail_on_empty=False,
2209             )
2210
2211 0             self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
2212 0             if ro_task_dependency:
2213 0                 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2214 0                     if task["task_id"] == task_id:
2215 0                         return ro_task_dependency, task_index
2216 0         raise NsWorkerException("Cannot get depending task {}".format(task_id))
2217
2218 1     def update_vm_refresh(self, ro_task):
2219         """Enables the VM status updates if self.refresh_config.active parameter
2220         is not -1 and then updates the DB accordingly
2221
2222         """
2223 1         try:
2224 1             self.logger.debug("Checking if VM status update config")
2225 1             next_refresh = time.time()
2226 1             next_refresh = self._get_next_refresh(ro_task, next_refresh)
2227
2228 1             if next_refresh != -1:
2229 1                 db_ro_task_update = {}
2230 1                 now = time.time()
2231 1                 next_check_at = now + (24 * 60 * 60)
2232 1                 next_check_at = min(next_check_at, next_refresh)
2233 1                 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2234 1                 db_ro_task_update["to_check_at"] = next_check_at
2235
2236 1                 self.logger.debug(
2237                     "Finding tasks which to be updated to enable VM status updates"
2238                 )
2239 1                 refresh_tasks = self.db.get_list(
2240                     "ro_tasks",
2241                     q_filter={
2242                         "tasks.status": "DONE",
2243                         "to_check_at.lt": 0,
2244                     },
2245                 )
2246 1                 self.logger.debug("Updating tasks to change the to_check_at status")
2247 1                 for task in refresh_tasks:
2248 1                     q_filter = {
2249                         "_id": task["_id"],
2250                     }
2251 1                     self.db.set_one(
2252                         "ro_tasks",
2253                         q_filter=q_filter,
2254                         update_dict=db_ro_task_update,
2255                         fail_on_empty=True,
2256                     )
2257
2258 1         except Exception as e:
2259 1             self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2260
2261 1     def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2262         """Decide the next_refresh according to vim type and refresh config period.
2263         Args:
2264             ro_task (dict):             ro_task details
2265             next_refresh    (float):    next refresh time as epoch format
2266
2267         Returns:
2268             next_refresh    (float)     -1 if vm updates are disabled or vim type is openstack.
2269         """
2270 1         target_vim = ro_task["target_id"]
2271 1         vim_type = self.db_vims[target_vim]["vim_type"]
2272 1         if self.refresh_config.active == -1 or vim_type == "openstack":
2273 1             next_refresh = -1
2274         else:
2275 1             next_refresh += self.refresh_config.active
2276 1         return next_refresh
2277
2278 1     def _process_pending_tasks(self, ro_task):
2279 1         ro_task_id = ro_task["_id"]
2280 1         now = time.time()
2281         # one day
2282 1         next_check_at = now + (24 * 60 * 60)
2283 1         db_ro_task_update = {}
2284
2285 1         def _update_refresh(new_status):
2286             # compute next_refresh
2287             nonlocal task
2288             nonlocal next_check_at
2289             nonlocal db_ro_task_update
2290             nonlocal ro_task
2291
2292 1             next_refresh = time.time()
2293
2294 1             if task["item"] in ("image", "flavor"):
2295 0                 next_refresh += self.refresh_config.image
2296 1             elif new_status == "BUILD":
2297 0                 next_refresh += self.refresh_config.build
2298 1             elif new_status == "DONE":
2299 1                 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2300             else:
2301 1                 next_refresh += self.refresh_config.error
2302
2303 1             next_check_at = min(next_check_at, next_refresh)
2304 1             db_ro_task_update["vim_info.refresh_at"] = next_refresh
2305 1             ro_task["vim_info"]["refresh_at"] = next_refresh
2306
2307 1         try:
2308             """
2309             # Log RO tasks only when loglevel is DEBUG
2310             if self.logger.getEffectiveLevel() == logging.DEBUG:
2311                 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2312             """
2313             # Check if vim status refresh is enabled again
2314 1             self.update_vm_refresh(ro_task)
2315             # 0: get task_status_create
2316 1             lock_object = None
2317 1             task_status_create = None
2318 1             task_create = next(
2319                 (
2320                     t
2321                     for t in ro_task["tasks"]
2322                     if t
2323                     and t["action"] == "CREATE"
2324                     and t["status"] in ("BUILD", "DONE")
2325                 ),
2326                 None,
2327             )
2328
2329 1             if task_create:
2330 1                 task_status_create = task_create["status"]
2331
2332             # 1: look for tasks in status SCHEDULED, or in status CREATE if action is  DONE or BUILD
2333 1             for task_action in ("DELETE", "CREATE", "EXEC"):
2334 1                 db_vim_update = None
2335 1                 new_status = None
2336
2337 1                 for task_index, task in enumerate(ro_task["tasks"]):
2338 1                     if not task:
2339 0                         continue  # task deleted
2340
2341 1                     task_depends = {}
2342 1                     target_update = None
2343
2344 1                     if (
2345                         (
2346                             task_action in ("DELETE", "EXEC")
2347                             and task["status"] not in ("SCHEDULED", "BUILD")
2348                         )
2349                         or task["action"] != task_action
2350                         or (
2351                             task_action == "CREATE"
2352                             and task["status"] in ("FINISHED", "SUPERSEDED")
2353                         )
2354                     ):
2355 0                         continue
2356
2357 1                     task_path = "tasks.{}.status".format(task_index)
2358 1                     try:
2359 1                         db_vim_info_update = None
2360
2361 1                         if task["status"] == "SCHEDULED":
2362                             # check if tasks that this depends on have been completed
2363 0                             dependency_not_completed = False
2364
2365 0                             for dependency_task_id in task.get("depends_on") or ():
2366 0                                 (
2367                                     dependency_ro_task,
2368                                     dependency_task_index,
2369                                 ) = self._get_dependency(
2370                                     dependency_task_id, target_id=ro_task["target_id"]
2371                                 )
2372 0                                 dependency_task = dependency_ro_task["tasks"][
2373                                     dependency_task_index
2374                                 ]
2375 0                                 self.logger.warning(
2376                                     "dependency_ro_task={} dependency_task_index={}".format(
2377                                         dependency_ro_task, dependency_task_index
2378                                     )
2379                                 )
2380
2381 0                                 if dependency_task["status"] == "SCHEDULED":
2382 0                                     dependency_not_completed = True
2383 0                                     next_check_at = min(
2384                                         next_check_at, dependency_ro_task["to_check_at"]
2385                                     )
2386                                     # must allow dependent task to be processed first
2387                                     # to do this set time after last_task_processed
2388 0                                     next_check_at = max(
2389                                         self.time_last_task_processed, next_check_at
2390                                     )
2391 0                                     break
2392 0                                 elif dependency_task["status"] == "FAILED":
2393 0                                     error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2394                                         task["action"],
2395                                         task["item"],
2396                                         dependency_task["action"],
2397                                         dependency_task["item"],
2398                                         dependency_task_id,
2399                                         dependency_ro_task["vim_info"].get(
2400                                             "vim_message"
2401                                         ),
2402                                     )
2403 0                                     self.logger.error(
2404                                         "task={} {}".format(task["task_id"], error_text)
2405                                     )
2406 0                                     raise NsWorkerException(error_text)
2407
2408 0                                 task_depends[dependency_task_id] = dependency_ro_task[
2409                                     "vim_info"
2410                                 ]["vim_id"]
2411 0                                 task_depends[
2412                                     "TASK-{}".format(dependency_task_id)
2413                                 ] = dependency_ro_task["vim_info"]["vim_id"]
2414
2415 0                             if dependency_not_completed:
2416 0                                 self.logger.warning(
2417                                     "DEPENDENCY NOT COMPLETED {}".format(
2418                                         dependency_ro_task["vim_info"]["vim_id"]
2419                                     )
2420                                 )
2421                                 # TODO set at vim_info.vim_details that it is waiting
2422 0                                 continue
2423
2424                         # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2425                         # the task of renew this locking. It will update database locket_at periodically
2426 1                         if not lock_object:
2427 1                             lock_object = LockRenew.add_lock_object(
2428                                 "ro_tasks", ro_task, self
2429                             )
2430
2431 1                         if task["action"] == "DELETE":
2432 0                             (
2433                                 new_status,
2434                                 db_vim_info_update,
2435                             ) = self._delete_task(
2436                                 ro_task, task_index, task_depends, db_ro_task_update
2437                             )
2438 0                             new_status = (
2439                                 "FINISHED" if new_status == "DONE" else new_status
2440                             )
2441                             # ^with FINISHED instead of DONE it will not be refreshing
2442
2443 0                             if new_status in ("FINISHED", "SUPERSEDED"):
2444 0                                 target_update = "DELETE"
2445 1                         elif task["action"] == "EXEC":
2446 0                             (
2447                                 new_status,
2448                                 db_vim_info_update,
2449                                 db_task_update,
2450                             ) = self.item2class[task["item"]].exec(
2451                                 ro_task, task_index, task_depends
2452                             )
2453 0                             new_status = (
2454                                 "FINISHED" if new_status == "DONE" else new_status
2455                             )
2456                             # ^with FINISHED instead of DONE it will not be refreshing
2457
2458 0                             if db_task_update:
2459                                 # load into database the modified db_task_update "retries" and "next_retry"
2460 0                                 if db_task_update.get("retries"):
2461 0                                     db_ro_task_update[
2462                                         "tasks.{}.retries".format(task_index)
2463                                     ] = db_task_update["retries"]
2464
2465 0                                 next_check_at = time.time() + db_task_update.get(
2466                                     "next_retry", 60
2467                                 )
2468 0                             target_update = None
2469 1                         elif task["action"] == "CREATE":
2470 1                             if task["status"] == "SCHEDULED":
2471 0                                 if task_status_create:
2472 0                                     new_status = task_status_create
2473 0                                     target_update = "COPY_VIM_INFO"
2474                                 else:
2475 0                                     new_status, db_vim_info_update = self.item2class[
2476                                         task["item"]
2477                                     ].new(ro_task, task_index, task_depends)
2478                                     # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2479 0                                     _update_refresh(new_status)
2480                             else:
2481 1                                 refresh_at = ro_task["vim_info"]["refresh_at"]
2482 1                                 if refresh_at and refresh_at != -1 and now > refresh_at:
2483 0                                     (
2484                                         new_status,
2485                                         db_vim_info_update,
2486                                     ) = self.item2class[
2487                                         task["item"]
2488                                     ].refresh(ro_task)
2489 0                                     _update_refresh(new_status)
2490                                 else:
2491                                     # The refresh is updated to avoid set the value of "refresh_at" to
2492                                     # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2493                                     # because it can happen that in this case the task is never processed
2494 1                                     _update_refresh(task["status"])
2495
2496 0                     except Exception as e:
2497 0                         new_status = "FAILED"
2498 0                         db_vim_info_update = {
2499                             "vim_status": "VIM_ERROR",
2500                             "vim_message": str(e),
2501                         }
2502
2503 0                         if not isinstance(
2504                             e, (NsWorkerException, vimconn.VimConnException)
2505                         ):
2506 0                             self.logger.error(
2507                                 "Unexpected exception at _delete_task task={}: {}".format(
2508                                     task["task_id"], e
2509                                 ),
2510                                 exc_info=True,
2511                             )
2512
2513 1                     try:
2514 1                         if db_vim_info_update:
2515 0                             db_vim_update = db_vim_info_update.copy()
2516 0                             db_ro_task_update.update(
2517                                 {
2518                                     "vim_info." + k: v
2519                                     for k, v in db_vim_info_update.items()
2520                                 }
2521                             )
2522 0                             ro_task["vim_info"].update(db_vim_info_update)
2523
2524 1                         if new_status:
2525 0                             if task_action == "CREATE":
2526 0                                 task_status_create = new_status
2527 0                             db_ro_task_update[task_path] = new_status
2528
2529 1                         if target_update or db_vim_update:
2530 0                             if target_update == "DELETE":
2531 0                                 self._update_target(task, None)
2532 0                             elif target_update == "COPY_VIM_INFO":
2533 0                                 self._update_target(task, ro_task["vim_info"])
2534                             else:
2535 0                                 self._update_target(task, db_vim_update)
2536
2537 0                     except Exception as e:
2538 0                         if (
2539                             isinstance(e, DbException)
2540                             and e.http_code == HTTPStatus.NOT_FOUND
2541                         ):
2542                             # if the vnfrs or nsrs has been removed from database, this task must be removed
2543 0                             self.logger.debug(
2544                                 "marking to delete task={}".format(task["task_id"])
2545                             )
2546 0                             self.tasks_to_delete.append(task)
2547                         else:
2548 0                             self.logger.error(
2549                                 "Unexpected exception at _update_target task={}: {}".format(
2550                                     task["task_id"], e
2551                                 ),
2552                                 exc_info=True,
2553                             )
2554
2555 1             locked_at = ro_task["locked_at"]
2556
2557 1             if lock_object:
2558 1                 locked_at = [
2559                     lock_object["locked_at"],
2560                     lock_object["locked_at"] + self.task_locked_time,
2561                 ]
2562                 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2563                 # contain exactly locked_at + self.task_locked_time
2564 1                 LockRenew.remove_lock_object(lock_object)
2565
2566 1             q_filter = {
2567                 "_id": ro_task["_id"],
2568                 "to_check_at": ro_task["to_check_at"],
2569                 "locked_at": locked_at,
2570             }
2571             # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2572             # outside this task (by ro_nbi) do not update it
2573 1             db_ro_task_update["locked_by"] = None
2574             # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2575 1             db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2576 1             db_ro_task_update["modified_at"] = now
2577 1             db_ro_task_update["to_check_at"] = next_check_at
2578
2579             """
2580             # Log RO tasks only when loglevel is DEBUG
2581             if self.logger.getEffectiveLevel() == logging.DEBUG:
2582                 db_ro_task_update_log = db_ro_task_update.copy()
2583                 db_ro_task_update_log["_id"] = q_filter["_id"]
2584                 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2585             """
2586
2587 1             if not self.db.set_one(
2588                 "ro_tasks",
2589                 update_dict=db_ro_task_update,
2590                 q_filter=q_filter,
2591                 fail_on_empty=False,
2592             ):
2593 0                 del db_ro_task_update["to_check_at"]
2594 0                 del q_filter["to_check_at"]
2595                 """
2596                 # Log RO tasks only when loglevel is DEBUG
2597                 if self.logger.getEffectiveLevel() == logging.DEBUG:
2598                     self._log_ro_task(
2599                         None,
2600                         db_ro_task_update_log,
2601                         None,
2602                         "TASK_WF",
2603                         "SET_TASK " + str(q_filter),
2604                     )
2605                 """
2606 0                 self.db.set_one(
2607                     "ro_tasks",
2608                     q_filter=q_filter,
2609                     update_dict=db_ro_task_update,
2610                     fail_on_empty=True,
2611                 )
2612 0         except DbException as e:
2613 0             self.logger.error(
2614                 "ro_task={} Error updating database {}".format(ro_task_id, e)
2615             )
2616 0         except Exception as e:
2617 0             self.logger.error(
2618                 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2619             )
2620
2621 1     def _update_target(self, task, ro_vim_item_update):
2622 0         table, _, temp = task["target_record"].partition(":")
2623 0         _id, _, path_vim_status = temp.partition(":")
2624 0         path_item = path_vim_status[: path_vim_status.rfind(".")]
2625 0         path_item = path_item[: path_item.rfind(".")]
2626         # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2627         # path_item: dot separated list targeting record information, e.g. "vdur.10"
2628
2629 0         if ro_vim_item_update:
2630 0             update_dict = {
2631                 path_vim_status + "." + k: v
2632                 for k, v in ro_vim_item_update.items()
2633                 if k
2634                 in (
2635                     "vim_id",
2636                     "vim_details",
2637                     "vim_message",
2638                     "vim_name",
2639                     "vim_status",
2640                     "interfaces",
2641                     "interfaces_backup",
2642                 )
2643             }
2644
2645 0             if path_vim_status.startswith("vdur."):
2646                 # for backward compatibility, add vdur.name apart from vdur.vim_name
2647 0                 if ro_vim_item_update.get("vim_name"):
2648 0                     update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2649
2650                 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2651 0                 if ro_vim_item_update.get("vim_id"):
2652 0                     update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2653
2654                 # update general status
2655 0                 if ro_vim_item_update.get("vim_status"):
2656 0                     update_dict[path_item + ".status"] = ro_vim_item_update[
2657                         "vim_status"
2658                     ]
2659
2660 0             if ro_vim_item_update.get("interfaces"):
2661 0                 path_interfaces = path_item + ".interfaces"
2662
2663 0                 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2664 0                     if iface:
2665 0                         update_dict.update(
2666                             {
2667                                 path_interfaces + ".{}.".format(i) + k: v
2668                                 for k, v in iface.items()
2669                                 if k in ("vlan", "compute_node", "pci")
2670                             }
2671                         )
2672
2673                         # put ip_address and mac_address with ip-address and mac-address
2674 0                         if iface.get("ip_address"):
2675 0                             update_dict[
2676                                 path_interfaces + ".{}.".format(i) + "ip-address"
2677                             ] = iface["ip_address"]
2678
2679 0                         if iface.get("mac_address"):
2680 0                             update_dict[
2681                                 path_interfaces + ".{}.".format(i) + "mac-address"
2682                             ] = iface["mac_address"]
2683
2684 0                         if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2685 0                             update_dict["ip-address"] = iface.get("ip_address").split(
2686                                 ";"
2687                             )[0]
2688
2689 0                         if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2690 0                             update_dict[path_item + ".ip-address"] = iface.get(
2691                                 "ip_address"
2692                             ).split(";")[0]
2693
2694 0             self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2695
2696             # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2697 0             if ro_vim_item_update.get("interfaces"):
2698 0                 search_key = path_vim_status + ".interfaces"
2699 0                 if update_dict.get(search_key):
2700 0                     interfaces_backup_update = {
2701                         path_vim_status + ".interfaces_backup": update_dict[search_key]
2702                     }
2703
2704 0                     self.db.set_one(
2705                         table,
2706                         q_filter={"_id": _id},
2707                         update_dict=interfaces_backup_update,
2708                     )
2709
2710         else:
2711 0             update_dict = {path_item + ".status": "DELETED"}
2712 0             self.db.set_one(
2713                 table,
2714                 q_filter={"_id": _id},
2715                 update_dict=update_dict,
2716                 unset={path_vim_status: None},
2717             )
2718
2719 1     def _process_delete_db_tasks(self):
2720         """
2721         Delete task from database because vnfrs or nsrs or both have been deleted
2722         :return: None. Uses and modify self.tasks_to_delete
2723         """
2724 0         while self.tasks_to_delete:
2725 0             task = self.tasks_to_delete[0]
2726 0             vnfrs_deleted = None
2727 0             nsr_id = task["nsr_id"]
2728
2729 0             if task["target_record"].startswith("vnfrs:"):
2730                 # check if nsrs is present
2731 0                 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2732 0                     vnfrs_deleted = task["target_record"].split(":")[1]
2733
2734 0             try:
2735 0                 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2736 0             except Exception as e:
2737 0                 self.logger.error(
2738                     "Error deleting task={}: {}".format(task["task_id"], e)
2739                 )
2740 0             self.tasks_to_delete.pop(0)
2741
2742 1     @staticmethod
2743 1     def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2744         """
2745         Static method because it is called from osm_ng_ro.ns
2746         :param db: instance of database to use
2747         :param nsr_id: affected nsrs id
2748         :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2749         :return: None, exception is fails
2750         """
2751 0         retries = 5
2752 0         for retry in range(retries):
2753 0             ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2754 0             now = time.time()
2755 0             conflict = False
2756
2757 0             for ro_task in ro_tasks:
2758 0                 db_update = {}
2759 0                 to_delete_ro_task = True
2760
2761 0                 for index, task in enumerate(ro_task["tasks"]):
2762 0                     if not task:
2763 0                         pass
2764 0                     elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2765                         vnfrs_deleted
2766                         and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2767                     ):
2768 0                         db_update["tasks.{}".format(index)] = None
2769                     else:
2770                         # used by other nsr, ro_task cannot be deleted
2771 0                         to_delete_ro_task = False
2772
2773                 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2774 0                 if to_delete_ro_task:
2775 0                     if not db.del_one(
2776                         "ro_tasks",
2777                         q_filter={
2778                             "_id": ro_task["_id"],
2779                             "modified_at": ro_task["modified_at"],
2780                         },
2781                         fail_on_empty=False,
2782                     ):
2783 0                         conflict = True
2784 0                 elif db_update:
2785 0                     db_update["modified_at"] = now
2786 0                     if not db.set_one(
2787                         "ro_tasks",
2788                         q_filter={
2789                             "_id": ro_task["_id"],
2790                             "modified_at": ro_task["modified_at"],
2791                         },
2792                         update_dict=db_update,
2793                         fail_on_empty=False,
2794                     ):
2795 0                         conflict = True
2796 0             if not conflict:
2797 0                 return
2798         else:
2799 0             raise NsWorkerException("Exceeded {} retries".format(retries))
2800
2801 1     def run(self):
2802         # load database
2803 0         self.logger.info("Starting")
2804         while True:
2805             # step 1: get commands from queue
2806 0             try:
2807 0                 if self.vim_targets:
2808 0                     task = self.task_queue.get(block=False)
2809                 else:
2810 0                     if not self.idle:
2811 0                         self.logger.debug("enters in idle state")
2812 0                     self.idle = True
2813 0                     task = self.task_queue.get(block=True)
2814 0                     self.idle = False
2815
2816 0                 if task[0] == "terminate":
2817 0                     break
2818 0                 elif task[0] == "load_vim":
2819 0                     self.logger.info("order to load vim {}".format(task[1]))
2820 0                     self._load_vim(task[1])
2821 0                 elif task[0] == "unload_vim":
2822 0                     self.logger.info("order to unload vim {}".format(task[1]))
2823 0                     self._unload_vim(task[1])
2824 0                 elif task[0] == "reload_vim":
2825 0                     self._reload_vim(task[1])
2826 0                 elif task[0] == "check_vim":
2827 0                     self.logger.info("order to check vim {}".format(task[1]))
2828 0                     self._check_vim(task[1])
2829 0                 continue
2830 0             except Exception as e:
2831 0                 if isinstance(e, queue.Empty):
2832 0                     pass
2833                 else:
2834 0                     self.logger.critical(
2835                         "Error processing task: {}".format(e), exc_info=True
2836                     )
2837
2838             # step 2: process pending_tasks, delete not needed tasks
2839 0             try:
2840 0                 if self.tasks_to_delete:
2841 0                     self._process_delete_db_tasks()
2842 0                 busy = False
2843                 """
2844                 # Log RO tasks only when loglevel is DEBUG
2845                 if self.logger.getEffectiveLevel() == logging.DEBUG:
2846                     _ = self._get_db_all_tasks()
2847                 """
2848 0                 ro_task = self._get_db_task()
2849 0                 if ro_task:
2850 0                     self.logger.warning("Task to process: {}".format(ro_task))
2851 0                     time.sleep(1)
2852 0                     self._process_pending_tasks(ro_task)
2853 0                     busy = True
2854 0                 if not busy:
2855 0                     time.sleep(5)
2856 0             except Exception as e:
2857 0                 self.logger.critical(
2858                     "Unexpected exception at run: " + str(e), exc_info=True
2859                 )
2860
2861 0         self.logger.info("Finishing")