Disable the check of the release notes
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import makedirs
31 from os import path
32 import queue
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn
43 from osm_ro_plugin import vimconn
44 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45 from osm_ro_plugin.vim_dummy import VimDummyConnector
46 import yaml
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
223
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
234
235 return "FAILED", ro_vim_item_update
236
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
243
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
247
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
263
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
267
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
270
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
280
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 (
289 ro_vim_item_update.get("vim_message")
290 if ro_vim_item_update.get("vim_status") != "ACTIVE"
291 else ""
292 ),
293 )
294 )
295
296 return task_status, ro_vim_item_update
297
298 def delete(self, ro_task, task_index):
299 task = ro_task["tasks"][task_index]
300 task_id = task["task_id"]
301 net_vim_id = ro_task["vim_info"]["vim_id"]
302 ro_vim_item_update_ok = {
303 "vim_status": "DELETED",
304 "created": False,
305 "vim_message": "DELETED",
306 "vim_id": None,
307 }
308
309 try:
310 if net_vim_id or ro_task["vim_info"]["created_items"]:
311 target_vim = self.my_vims[ro_task["target_id"]]
312 target_vim.delete_network(
313 net_vim_id, ro_task["vim_info"]["created_items"]
314 )
315 except vimconn.VimConnNotFoundException:
316 ro_vim_item_update_ok["vim_message"] = "already deleted"
317 except vimconn.VimConnException as e:
318 self.logger.error(
319 "ro_task={} vim={} del-net={}: {}".format(
320 ro_task["_id"], ro_task["target_id"], net_vim_id, e
321 )
322 )
323 ro_vim_item_update = {
324 "vim_status": "VIM_ERROR",
325 "vim_message": "Error while deleting: {}".format(e),
326 }
327
328 return "FAILED", ro_vim_item_update
329
330 self.logger.debug(
331 "task={} {} del-net={} {}".format(
332 task_id,
333 ro_task["target_id"],
334 net_vim_id,
335 ro_vim_item_update_ok.get("vim_message", ""),
336 )
337 )
338
339 return "DONE", ro_vim_item_update_ok
340
341
342 class VimInteractionClassification(VimInteractionBase):
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 target_vim = self.my_vims[ro_task["target_id"]]
348
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353
354 name = params_copy.pop("name")
355 logical_source_port_index = int(
356 params_copy.pop("logical_source_port_index")
357 )
358 logical_source_port = params_copy["logical_source_port"]
359
360 if logical_source_port.startswith("TASK-"):
361 vm_id = task_depends[logical_source_port]
362 params_copy["logical_source_port"] = target_vim.refresh_vms_status(
363 [vm_id]
364 )[vm_id]["interfaces"][logical_source_port_index]["vim_interface_id"]
365
366 vim_classification_id = target_vim.new_classification(
367 name, "legacy_flow_classifier", params_copy
368 )
369
370 ro_vim_item_update = {
371 "vim_id": vim_classification_id,
372 "vim_status": "DONE",
373 "created": created,
374 "vim_details": None,
375 "vim_message": None,
376 }
377 self.logger.debug(
378 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
379 )
380
381 return "DONE", ro_vim_item_update
382 except (vimconn.VimConnException, NsWorkerException) as e:
383 self.logger.debug(traceback.format_exc())
384 self.logger.error(
385 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
386 )
387 ro_vim_item_update = {
388 "vim_status": "VIM_ERROR",
389 "created": created,
390 "vim_message": str(e),
391 }
392
393 return "FAILED", ro_vim_item_update
394
395 def delete(self, ro_task, task_index):
396 task = ro_task["tasks"][task_index]
397 task_id = task["task_id"]
398 classification_vim_id = ro_task["vim_info"]["vim_id"]
399 ro_vim_item_update_ok = {
400 "vim_status": "DELETED",
401 "created": False,
402 "vim_message": "DELETED",
403 "vim_id": None,
404 }
405
406 try:
407 if classification_vim_id:
408 target_vim = self.my_vims[ro_task["target_id"]]
409 target_vim.delete_classification(classification_vim_id)
410 except vimconn.VimConnNotFoundException:
411 ro_vim_item_update_ok["vim_message"] = "already deleted"
412 except vimconn.VimConnException as e:
413 self.logger.error(
414 "ro_task={} vim={} del-classification={}: {}".format(
415 ro_task["_id"], ro_task["target_id"], classification_vim_id, e
416 )
417 )
418 ro_vim_item_update = {
419 "vim_status": "VIM_ERROR",
420 "vim_message": "Error while deleting: {}".format(e),
421 }
422
423 return "FAILED", ro_vim_item_update
424
425 self.logger.debug(
426 "task={} {} del-classification={} {}".format(
427 task_id,
428 ro_task["target_id"],
429 classification_vim_id,
430 ro_vim_item_update_ok.get("vim_message", ""),
431 )
432 )
433
434 return "DONE", ro_vim_item_update_ok
435
436
437 class VimInteractionSfi(VimInteractionBase):
438 def new(self, ro_task, task_index, task_depends):
439 task = ro_task["tasks"][task_index]
440 task_id = task["task_id"]
441 created = False
442 target_vim = self.my_vims[ro_task["target_id"]]
443
444 try:
445 created = True
446 params = task["params"]
447 params_copy = deepcopy(params)
448 name = params_copy["name"]
449 ingress_port = params_copy["ingress_port"]
450 egress_port = params_copy["egress_port"]
451 ingress_port_index = params_copy["ingress_port_index"]
452 egress_port_index = params_copy["egress_port_index"]
453
454 ingress_port_id = ingress_port
455 egress_port_id = egress_port
456
457 vm_id = task_depends[ingress_port]
458
459 if ingress_port.startswith("TASK-"):
460 ingress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][
461 "interfaces"
462 ][ingress_port_index]["vim_interface_id"]
463
464 if ingress_port == egress_port:
465 egress_port_id = ingress_port_id
466 else:
467 if egress_port.startswith("TASK-"):
468 egress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][
469 "interfaces"
470 ][egress_port_index]["vim_interface_id"]
471
472 ingress_port_id_list = [ingress_port_id]
473 egress_port_id_list = [egress_port_id]
474
475 vim_sfi_id = target_vim.new_sfi(
476 name, ingress_port_id_list, egress_port_id_list, sfc_encap=False
477 )
478
479 ro_vim_item_update = {
480 "vim_id": vim_sfi_id,
481 "vim_status": "DONE",
482 "created": created,
483 "vim_details": None,
484 "vim_message": None,
485 }
486 self.logger.debug(
487 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
488 )
489
490 return "DONE", ro_vim_item_update
491 except (vimconn.VimConnException, NsWorkerException) as e:
492 self.logger.debug(traceback.format_exc())
493 self.logger.error(
494 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
495 )
496 ro_vim_item_update = {
497 "vim_status": "VIM_ERROR",
498 "created": created,
499 "vim_message": str(e),
500 }
501
502 return "FAILED", ro_vim_item_update
503
504 def delete(self, ro_task, task_index):
505 task = ro_task["tasks"][task_index]
506 task_id = task["task_id"]
507 sfi_vim_id = ro_task["vim_info"]["vim_id"]
508 ro_vim_item_update_ok = {
509 "vim_status": "DELETED",
510 "created": False,
511 "vim_message": "DELETED",
512 "vim_id": None,
513 }
514
515 try:
516 if sfi_vim_id:
517 target_vim = self.my_vims[ro_task["target_id"]]
518 target_vim.delete_sfi(sfi_vim_id)
519 except vimconn.VimConnNotFoundException:
520 ro_vim_item_update_ok["vim_message"] = "already deleted"
521 except vimconn.VimConnException as e:
522 self.logger.error(
523 "ro_task={} vim={} del-sfi={}: {}".format(
524 ro_task["_id"], ro_task["target_id"], sfi_vim_id, e
525 )
526 )
527 ro_vim_item_update = {
528 "vim_status": "VIM_ERROR",
529 "vim_message": "Error while deleting: {}".format(e),
530 }
531
532 return "FAILED", ro_vim_item_update
533
534 self.logger.debug(
535 "task={} {} del-sfi={} {}".format(
536 task_id,
537 ro_task["target_id"],
538 sfi_vim_id,
539 ro_vim_item_update_ok.get("vim_message", ""),
540 )
541 )
542
543 return "DONE", ro_vim_item_update_ok
544
545
546 class VimInteractionSf(VimInteractionBase):
547 def new(self, ro_task, task_index, task_depends):
548 task = ro_task["tasks"][task_index]
549 task_id = task["task_id"]
550 created = False
551 target_vim = self.my_vims[ro_task["target_id"]]
552
553 try:
554 created = True
555 params = task["params"]
556 params_copy = deepcopy(params)
557 name = params_copy["name"]
558 sfi_list = params_copy["sfis"]
559 sfi_id_list = []
560
561 for sfi in sfi_list:
562 sfi_id = task_depends[sfi] if sfi.startswith("TASK-") else sfi
563 sfi_id_list.append(sfi_id)
564
565 vim_sf_id = target_vim.new_sf(name, sfi_id_list, sfc_encap=False)
566
567 ro_vim_item_update = {
568 "vim_id": vim_sf_id,
569 "vim_status": "DONE",
570 "created": created,
571 "vim_details": None,
572 "vim_message": None,
573 }
574 self.logger.debug(
575 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
576 )
577
578 return "DONE", ro_vim_item_update
579 except (vimconn.VimConnException, NsWorkerException) as e:
580 self.logger.debug(traceback.format_exc())
581 self.logger.error(
582 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
583 )
584 ro_vim_item_update = {
585 "vim_status": "VIM_ERROR",
586 "created": created,
587 "vim_message": str(e),
588 }
589
590 return "FAILED", ro_vim_item_update
591
592 def delete(self, ro_task, task_index):
593 task = ro_task["tasks"][task_index]
594 task_id = task["task_id"]
595 sf_vim_id = ro_task["vim_info"]["vim_id"]
596 ro_vim_item_update_ok = {
597 "vim_status": "DELETED",
598 "created": False,
599 "vim_message": "DELETED",
600 "vim_id": None,
601 }
602
603 try:
604 if sf_vim_id:
605 target_vim = self.my_vims[ro_task["target_id"]]
606 target_vim.delete_sf(sf_vim_id)
607 except vimconn.VimConnNotFoundException:
608 ro_vim_item_update_ok["vim_message"] = "already deleted"
609 except vimconn.VimConnException as e:
610 self.logger.error(
611 "ro_task={} vim={} del-sf={}: {}".format(
612 ro_task["_id"], ro_task["target_id"], sf_vim_id, e
613 )
614 )
615 ro_vim_item_update = {
616 "vim_status": "VIM_ERROR",
617 "vim_message": "Error while deleting: {}".format(e),
618 }
619
620 return "FAILED", ro_vim_item_update
621
622 self.logger.debug(
623 "task={} {} del-sf={} {}".format(
624 task_id,
625 ro_task["target_id"],
626 sf_vim_id,
627 ro_vim_item_update_ok.get("vim_message", ""),
628 )
629 )
630
631 return "DONE", ro_vim_item_update_ok
632
633
634 class VimInteractionSfp(VimInteractionBase):
635 def new(self, ro_task, task_index, task_depends):
636 task = ro_task["tasks"][task_index]
637 task_id = task["task_id"]
638 created = False
639 target_vim = self.my_vims[ro_task["target_id"]]
640
641 try:
642 created = True
643 params = task["params"]
644 params_copy = deepcopy(params)
645 name = params_copy["name"]
646 sf_list = params_copy["sfs"]
647 classification_list = params_copy["classifications"]
648
649 classification_id_list = []
650 sf_id_list = []
651
652 for classification in classification_list:
653 classi_id = (
654 task_depends[classification]
655 if classification.startswith("TASK-")
656 else classification
657 )
658 classification_id_list.append(classi_id)
659
660 for sf in sf_list:
661 sf_id = task_depends[sf] if sf.startswith("TASK-") else sf
662 sf_id_list.append(sf_id)
663
664 vim_sfp_id = target_vim.new_sfp(
665 name, classification_id_list, sf_id_list, sfc_encap=False
666 )
667
668 ro_vim_item_update = {
669 "vim_id": vim_sfp_id,
670 "vim_status": "DONE",
671 "created": created,
672 "vim_details": None,
673 "vim_message": None,
674 }
675 self.logger.debug(
676 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
677 )
678
679 return "DONE", ro_vim_item_update
680 except (vimconn.VimConnException, NsWorkerException) as e:
681 self.logger.debug(traceback.format_exc())
682 self.logger.error(
683 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
684 )
685 ro_vim_item_update = {
686 "vim_status": "VIM_ERROR",
687 "created": created,
688 "vim_message": str(e),
689 }
690
691 return "FAILED", ro_vim_item_update
692
693 def delete(self, ro_task, task_index):
694 task = ro_task["tasks"][task_index]
695 task_id = task["task_id"]
696 sfp_vim_id = ro_task["vim_info"]["vim_id"]
697 ro_vim_item_update_ok = {
698 "vim_status": "DELETED",
699 "created": False,
700 "vim_message": "DELETED",
701 "vim_id": None,
702 }
703
704 try:
705 if sfp_vim_id:
706 target_vim = self.my_vims[ro_task["target_id"]]
707 target_vim.delete_sfp(sfp_vim_id)
708 except vimconn.VimConnNotFoundException:
709 ro_vim_item_update_ok["vim_message"] = "already deleted"
710 except vimconn.VimConnException as e:
711 self.logger.error(
712 "ro_task={} vim={} del-sfp={}: {}".format(
713 ro_task["_id"], ro_task["target_id"], sfp_vim_id, e
714 )
715 )
716 ro_vim_item_update = {
717 "vim_status": "VIM_ERROR",
718 "vim_message": "Error while deleting: {}".format(e),
719 }
720
721 return "FAILED", ro_vim_item_update
722
723 self.logger.debug(
724 "task={} {} del-sfp={} {}".format(
725 task_id,
726 ro_task["target_id"],
727 sfp_vim_id,
728 ro_vim_item_update_ok.get("vim_message", ""),
729 )
730 )
731
732 return "DONE", ro_vim_item_update_ok
733
734
735 class VimInteractionVdu(VimInteractionBase):
736 max_retries_inject_ssh_key = 20 # 20 times
737 time_retries_inject_ssh_key = 30 # wevery 30 seconds
738
739 def new(self, ro_task, task_index, task_depends):
740 task = ro_task["tasks"][task_index]
741 task_id = task["task_id"]
742 created = False
743 target_vim = self.my_vims[ro_task["target_id"]]
744 try:
745 created = True
746 params = task["params"]
747 params_copy = deepcopy(params)
748 net_list = params_copy["net_list"]
749
750 for net in net_list:
751 # change task_id into network_id
752 if "net_id" in net and net["net_id"].startswith("TASK-"):
753 network_id = task_depends[net["net_id"]]
754
755 if not network_id:
756 raise NsWorkerException(
757 "Cannot create VM because depends on a network not created or found "
758 "for {}".format(net["net_id"])
759 )
760
761 net["net_id"] = network_id
762
763 if params_copy["image_id"].startswith("TASK-"):
764 params_copy["image_id"] = task_depends[params_copy["image_id"]]
765
766 if params_copy["flavor_id"].startswith("TASK-"):
767 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
768
769 affinity_group_list = params_copy["affinity_group_list"]
770 for affinity_group in affinity_group_list:
771 # change task_id into affinity_group_id
772 if "affinity_group_id" in affinity_group and affinity_group[
773 "affinity_group_id"
774 ].startswith("TASK-"):
775 affinity_group_id = task_depends[
776 affinity_group["affinity_group_id"]
777 ]
778
779 if not affinity_group_id:
780 raise NsWorkerException(
781 "found for {}".format(affinity_group["affinity_group_id"])
782 )
783
784 affinity_group["affinity_group_id"] = affinity_group_id
785 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
786 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
787
788 # add to created items previous_created_volumes (healing)
789 if task.get("previous_created_volumes"):
790 for k, v in task["previous_created_volumes"].items():
791 created_items[k] = v
792
793 ro_vim_item_update = {
794 "vim_id": vim_vm_id,
795 "vim_status": "BUILD",
796 "created": created,
797 "created_items": created_items,
798 "vim_details": None,
799 "vim_message": None,
800 "interfaces_vim_ids": interfaces,
801 "interfaces": [],
802 "interfaces_backup": [],
803 }
804 self.logger.debug(
805 "task={} {} new-vm={} created={}".format(
806 task_id, ro_task["target_id"], vim_vm_id, created
807 )
808 )
809
810 return "BUILD", ro_vim_item_update
811 except (vimconn.VimConnException, NsWorkerException) as e:
812 self.logger.debug(traceback.format_exc())
813 self.logger.error(
814 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
815 )
816 ro_vim_item_update = {
817 "vim_status": "VIM_ERROR",
818 "created": created,
819 "vim_message": str(e),
820 }
821
822 return "FAILED", ro_vim_item_update
823
824 def delete(self, ro_task, task_index):
825 task = ro_task["tasks"][task_index]
826 task_id = task["task_id"]
827 vm_vim_id = ro_task["vim_info"]["vim_id"]
828 ro_vim_item_update_ok = {
829 "vim_status": "DELETED",
830 "created": False,
831 "vim_message": "DELETED",
832 "vim_id": None,
833 }
834
835 try:
836 self.logger.debug(
837 "delete_vminstance: vm_vim_id={} created_items={}".format(
838 vm_vim_id, ro_task["vim_info"]["created_items"]
839 )
840 )
841 if vm_vim_id or ro_task["vim_info"]["created_items"]:
842 target_vim = self.my_vims[ro_task["target_id"]]
843 target_vim.delete_vminstance(
844 vm_vim_id,
845 ro_task["vim_info"]["created_items"],
846 ro_task["vim_info"].get("volumes_to_hold", []),
847 )
848 except vimconn.VimConnNotFoundException:
849 ro_vim_item_update_ok["vim_message"] = "already deleted"
850 except vimconn.VimConnException as e:
851 self.logger.error(
852 "ro_task={} vim={} del-vm={}: {}".format(
853 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
854 )
855 )
856 ro_vim_item_update = {
857 "vim_status": "VIM_ERROR",
858 "vim_message": "Error while deleting: {}".format(e),
859 }
860
861 return "FAILED", ro_vim_item_update
862
863 self.logger.debug(
864 "task={} {} del-vm={} {}".format(
865 task_id,
866 ro_task["target_id"],
867 vm_vim_id,
868 ro_vim_item_update_ok.get("vim_message", ""),
869 )
870 )
871
872 return "DONE", ro_vim_item_update_ok
873
874 def refresh(self, ro_task):
875 """Call VIM to get vm status"""
876 ro_task_id = ro_task["_id"]
877 target_vim = self.my_vims[ro_task["target_id"]]
878 vim_id = ro_task["vim_info"]["vim_id"]
879
880 if not vim_id:
881 return None, None
882
883 vm_to_refresh_list = [vim_id]
884 try:
885 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
886 vim_info = vim_dict[vim_id]
887
888 if vim_info["status"] == "ACTIVE":
889 task_status = "DONE"
890 elif vim_info["status"] == "BUILD":
891 task_status = "BUILD"
892 else:
893 task_status = "FAILED"
894
895 # try to load and parse vim_information
896 try:
897 vim_info_info = yaml.safe_load(vim_info["vim_info"])
898 if vim_info_info.get("name"):
899 vim_info["name"] = vim_info_info["name"]
900 except Exception as vim_info_error:
901 self.logger.exception(
902 f"{vim_info_error} occured while getting the vim_info from yaml"
903 )
904 except vimconn.VimConnException as e:
905 # Mark all tasks at VIM_ERROR status
906 self.logger.error(
907 "ro_task={} vim={} get-vm={}: {}".format(
908 ro_task_id, ro_task["target_id"], vim_id, e
909 )
910 )
911 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
912 task_status = "FAILED"
913
914 ro_vim_item_update = {}
915
916 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
917 vim_interfaces = []
918 if vim_info.get("interfaces"):
919 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
920 iface = next(
921 (
922 iface
923 for iface in vim_info["interfaces"]
924 if vim_iface_id == iface["vim_interface_id"]
925 ),
926 None,
927 )
928 # if iface:
929 # iface.pop("vim_info", None)
930 vim_interfaces.append(iface)
931
932 task_create = next(
933 t
934 for t in ro_task["tasks"]
935 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
936 )
937 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
938 vim_interfaces[task_create["mgmt_vnf_interface"]][
939 "mgmt_vnf_interface"
940 ] = True
941
942 mgmt_vdu_iface = task_create.get(
943 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
944 )
945 if vim_interfaces:
946 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
947
948 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
949 ro_vim_item_update["interfaces"] = vim_interfaces
950
951 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
952 ro_vim_item_update["vim_status"] = vim_info["status"]
953
954 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
955 ro_vim_item_update["vim_name"] = vim_info.get("name")
956
957 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
958 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
959 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
960 elif vim_info["status"] == "DELETED":
961 ro_vim_item_update["vim_id"] = None
962 ro_vim_item_update["vim_message"] = "Deleted externally"
963 else:
964 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
965 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
966
967 if ro_vim_item_update:
968 self.logger.debug(
969 "ro_task={} {} get-vm={}: status={} {}".format(
970 ro_task_id,
971 ro_task["target_id"],
972 vim_id,
973 ro_vim_item_update.get("vim_status"),
974 (
975 ro_vim_item_update.get("vim_message")
976 if ro_vim_item_update.get("vim_status") != "ACTIVE"
977 else ""
978 ),
979 )
980 )
981
982 return task_status, ro_vim_item_update
983
984 def exec(self, ro_task, task_index, task_depends):
985 task = ro_task["tasks"][task_index]
986 task_id = task["task_id"]
987 target_vim = self.my_vims[ro_task["target_id"]]
988 db_task_update = {"retries": 0}
989 retries = task.get("retries", 0)
990
991 try:
992 params = task["params"]
993 params_copy = deepcopy(params)
994 params_copy["ro_key"] = self.db.decrypt(
995 params_copy.pop("private_key"),
996 params_copy.pop("schema_version"),
997 params_copy.pop("salt"),
998 )
999 params_copy["ip_addr"] = params_copy.pop("ip_address")
1000 target_vim.inject_user_key(**params_copy)
1001 self.logger.debug(
1002 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
1003 )
1004
1005 return (
1006 "DONE",
1007 None,
1008 db_task_update,
1009 ) # params_copy["key"]
1010 except (vimconn.VimConnException, NsWorkerException) as e:
1011 retries += 1
1012
1013 self.logger.debug(traceback.format_exc())
1014 if retries < self.max_retries_inject_ssh_key:
1015 return (
1016 "BUILD",
1017 None,
1018 {
1019 "retries": retries,
1020 "next_retry": self.time_retries_inject_ssh_key,
1021 },
1022 )
1023
1024 self.logger.error(
1025 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
1026 )
1027 ro_vim_item_update = {"vim_message": str(e)}
1028
1029 return "FAILED", ro_vim_item_update, db_task_update
1030
1031
1032 class VimInteractionImage(VimInteractionBase):
1033 def new(self, ro_task, task_index, task_depends):
1034 task = ro_task["tasks"][task_index]
1035 task_id = task["task_id"]
1036 created = False
1037 created_items = {}
1038 target_vim = self.my_vims[ro_task["target_id"]]
1039
1040 try:
1041 # FIND
1042 vim_image_id = ""
1043 if task.get("find_params"):
1044 vim_images = target_vim.get_image_list(
1045 task["find_params"].get("filter_dict", {})
1046 )
1047
1048 if not vim_images:
1049 raise NsWorkerExceptionNotFound(
1050 "Image not found with this criteria: '{}'".format(
1051 task["find_params"]
1052 )
1053 )
1054 elif len(vim_images) > 1:
1055 raise NsWorkerException(
1056 "More than one image found with this criteria: '{}'".format(
1057 task["find_params"]
1058 )
1059 )
1060 else:
1061 vim_image_id = vim_images[0]["id"]
1062
1063 ro_vim_item_update = {
1064 "vim_id": vim_image_id,
1065 "vim_status": "ACTIVE",
1066 "created": created,
1067 "created_items": created_items,
1068 "vim_details": None,
1069 "vim_message": None,
1070 }
1071 self.logger.debug(
1072 "task={} {} new-image={} created={}".format(
1073 task_id, ro_task["target_id"], vim_image_id, created
1074 )
1075 )
1076
1077 return "DONE", ro_vim_item_update
1078 except (NsWorkerException, vimconn.VimConnException) as e:
1079 self.logger.error(
1080 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
1081 )
1082 ro_vim_item_update = {
1083 "vim_status": "VIM_ERROR",
1084 "created": created,
1085 "vim_message": str(e),
1086 }
1087
1088 return "FAILED", ro_vim_item_update
1089
1090
1091 class VimInteractionSharedVolume(VimInteractionBase):
1092 def delete(self, ro_task, task_index):
1093 task = ro_task["tasks"][task_index]
1094 task_id = task["task_id"]
1095 shared_volume_vim_id = ro_task["vim_info"]["vim_id"]
1096 created_items = ro_task["vim_info"]["created_items"]
1097 ro_vim_item_update_ok = {
1098 "vim_status": "DELETED",
1099 "created": False,
1100 "vim_message": "DELETED",
1101 "vim_id": None,
1102 }
1103 if created_items and created_items.get(shared_volume_vim_id).get("keep"):
1104 ro_vim_item_update_ok = {
1105 "vim_status": "ACTIVE",
1106 "created": False,
1107 "vim_message": None,
1108 }
1109 return "DONE", ro_vim_item_update_ok
1110 try:
1111 if shared_volume_vim_id:
1112 target_vim = self.my_vims[ro_task["target_id"]]
1113 target_vim.delete_shared_volumes(shared_volume_vim_id)
1114 except vimconn.VimConnNotFoundException:
1115 ro_vim_item_update_ok["vim_message"] = "already deleted"
1116 except vimconn.VimConnException as e:
1117 self.logger.error(
1118 "ro_task={} vim={} del-shared-volume={}: {}".format(
1119 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e
1120 )
1121 )
1122 ro_vim_item_update = {
1123 "vim_status": "VIM_ERROR",
1124 "vim_message": "Error while deleting: {}".format(e),
1125 }
1126
1127 return "FAILED", ro_vim_item_update
1128
1129 self.logger.debug(
1130 "task={} {} del-shared-volume={} {}".format(
1131 task_id,
1132 ro_task["target_id"],
1133 shared_volume_vim_id,
1134 ro_vim_item_update_ok.get("vim_message", ""),
1135 )
1136 )
1137
1138 return "DONE", ro_vim_item_update_ok
1139
1140 def new(self, ro_task, task_index, task_depends):
1141 task = ro_task["tasks"][task_index]
1142 task_id = task["task_id"]
1143 created = False
1144 created_items = {}
1145 target_vim = self.my_vims[ro_task["target_id"]]
1146
1147 try:
1148 shared_volume_vim_id = None
1149 shared_volume_data = None
1150
1151 if task.get("params"):
1152 shared_volume_data = task["params"]
1153
1154 if shared_volume_data:
1155 self.logger.info(
1156 f"Creating the new shared_volume for {shared_volume_data}\n"
1157 )
1158 (
1159 shared_volume_name,
1160 shared_volume_vim_id,
1161 ) = target_vim.new_shared_volumes(shared_volume_data)
1162 created = True
1163 created_items[shared_volume_vim_id] = {
1164 "name": shared_volume_name,
1165 "keep": shared_volume_data.get("keep"),
1166 }
1167
1168 ro_vim_item_update = {
1169 "vim_id": shared_volume_vim_id,
1170 "vim_status": "ACTIVE",
1171 "created": created,
1172 "created_items": created_items,
1173 "vim_details": None,
1174 "vim_message": None,
1175 }
1176 self.logger.debug(
1177 "task={} {} new-shared-volume={} created={}".format(
1178 task_id, ro_task["target_id"], shared_volume_vim_id, created
1179 )
1180 )
1181
1182 return "DONE", ro_vim_item_update
1183 except (vimconn.VimConnException, NsWorkerException) as e:
1184 self.logger.error(
1185 "task={} vim={} new-shared-volume:"
1186 " {}".format(task_id, ro_task["target_id"], e)
1187 )
1188 ro_vim_item_update = {
1189 "vim_status": "VIM_ERROR",
1190 "created": created,
1191 "vim_message": str(e),
1192 }
1193
1194 return "FAILED", ro_vim_item_update
1195
1196
1197 class VimInteractionFlavor(VimInteractionBase):
1198 def delete(self, ro_task, task_index):
1199 task = ro_task["tasks"][task_index]
1200 task_id = task["task_id"]
1201 flavor_vim_id = ro_task["vim_info"]["vim_id"]
1202 ro_vim_item_update_ok = {
1203 "vim_status": "DELETED",
1204 "created": False,
1205 "vim_message": "DELETED",
1206 "vim_id": None,
1207 }
1208
1209 try:
1210 if flavor_vim_id:
1211 target_vim = self.my_vims[ro_task["target_id"]]
1212 target_vim.delete_flavor(flavor_vim_id)
1213 except vimconn.VimConnNotFoundException:
1214 ro_vim_item_update_ok["vim_message"] = "already deleted"
1215 except vimconn.VimConnException as e:
1216 self.logger.error(
1217 "ro_task={} vim={} del-flavor={}: {}".format(
1218 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
1219 )
1220 )
1221 ro_vim_item_update = {
1222 "vim_status": "VIM_ERROR",
1223 "vim_message": "Error while deleting: {}".format(e),
1224 }
1225
1226 return "FAILED", ro_vim_item_update
1227
1228 self.logger.debug(
1229 "task={} {} del-flavor={} {}".format(
1230 task_id,
1231 ro_task["target_id"],
1232 flavor_vim_id,
1233 ro_vim_item_update_ok.get("vim_message", ""),
1234 )
1235 )
1236
1237 return "DONE", ro_vim_item_update_ok
1238
1239 def new(self, ro_task, task_index, task_depends):
1240 task = ro_task["tasks"][task_index]
1241 task_id = task["task_id"]
1242 created = False
1243 created_items = {}
1244 target_vim = self.my_vims[ro_task["target_id"]]
1245 try:
1246 # FIND
1247 vim_flavor_id = None
1248
1249 if task.get("find_params", {}).get("vim_flavor_id"):
1250 vim_flavor_id = task["find_params"]["vim_flavor_id"]
1251 elif task.get("find_params", {}).get("flavor_data"):
1252 try:
1253 flavor_data = task["find_params"]["flavor_data"]
1254 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
1255 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
1256 self.logger.warning(
1257 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
1258 )
1259
1260 if not vim_flavor_id and task.get("params"):
1261 # CREATE
1262 flavor_data = task["params"]["flavor_data"]
1263 vim_flavor_id = target_vim.new_flavor(flavor_data)
1264 created = True
1265
1266 ro_vim_item_update = {
1267 "vim_id": vim_flavor_id,
1268 "vim_status": "ACTIVE",
1269 "created": created,
1270 "created_items": created_items,
1271 "vim_details": None,
1272 "vim_message": None,
1273 }
1274 self.logger.debug(
1275 "task={} {} new-flavor={} created={}".format(
1276 task_id, ro_task["target_id"], vim_flavor_id, created
1277 )
1278 )
1279
1280 return "DONE", ro_vim_item_update
1281 except (vimconn.VimConnException, NsWorkerException) as e:
1282 self.logger.error(
1283 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
1284 )
1285 ro_vim_item_update = {
1286 "vim_status": "VIM_ERROR",
1287 "created": created,
1288 "vim_message": str(e),
1289 }
1290
1291 return "FAILED", ro_vim_item_update
1292
1293
1294 class VimInteractionAffinityGroup(VimInteractionBase):
1295 def delete(self, ro_task, task_index):
1296 task = ro_task["tasks"][task_index]
1297 task_id = task["task_id"]
1298 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
1299 ro_vim_item_update_ok = {
1300 "vim_status": "DELETED",
1301 "created": False,
1302 "vim_message": "DELETED",
1303 "vim_id": None,
1304 }
1305
1306 try:
1307 if affinity_group_vim_id:
1308 target_vim = self.my_vims[ro_task["target_id"]]
1309 target_vim.delete_affinity_group(affinity_group_vim_id)
1310 except vimconn.VimConnNotFoundException:
1311 ro_vim_item_update_ok["vim_message"] = "already deleted"
1312 except vimconn.VimConnException as e:
1313 self.logger.error(
1314 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
1315 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
1316 )
1317 )
1318 ro_vim_item_update = {
1319 "vim_status": "VIM_ERROR",
1320 "vim_message": "Error while deleting: {}".format(e),
1321 }
1322
1323 return "FAILED", ro_vim_item_update
1324
1325 self.logger.debug(
1326 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
1327 task_id,
1328 ro_task["target_id"],
1329 affinity_group_vim_id,
1330 ro_vim_item_update_ok.get("vim_message", ""),
1331 )
1332 )
1333
1334 return "DONE", ro_vim_item_update_ok
1335
1336 def new(self, ro_task, task_index, task_depends):
1337 task = ro_task["tasks"][task_index]
1338 task_id = task["task_id"]
1339 created = False
1340 created_items = {}
1341 target_vim = self.my_vims[ro_task["target_id"]]
1342
1343 try:
1344 affinity_group_vim_id = None
1345 affinity_group_data = None
1346 param_affinity_group_id = ""
1347
1348 if task.get("params"):
1349 affinity_group_data = task["params"].get("affinity_group_data")
1350
1351 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
1352 try:
1353 param_affinity_group_id = task["params"]["affinity_group_data"].get(
1354 "vim-affinity-group-id"
1355 )
1356 affinity_group_vim_id = target_vim.get_affinity_group(
1357 param_affinity_group_id
1358 ).get("id")
1359 except vimconn.VimConnNotFoundException:
1360 self.logger.error(
1361 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
1362 "could not be found at VIM. Creating a new one.".format(
1363 task_id, ro_task["target_id"], param_affinity_group_id
1364 )
1365 )
1366
1367 if not affinity_group_vim_id and affinity_group_data:
1368 affinity_group_vim_id = target_vim.new_affinity_group(
1369 affinity_group_data
1370 )
1371 created = True
1372
1373 ro_vim_item_update = {
1374 "vim_id": affinity_group_vim_id,
1375 "vim_status": "ACTIVE",
1376 "created": created,
1377 "created_items": created_items,
1378 "vim_details": None,
1379 "vim_message": None,
1380 }
1381 self.logger.debug(
1382 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
1383 task_id, ro_task["target_id"], affinity_group_vim_id, created
1384 )
1385 )
1386
1387 return "DONE", ro_vim_item_update
1388 except (vimconn.VimConnException, NsWorkerException) as e:
1389 self.logger.error(
1390 "task={} vim={} new-affinity-or-anti-affinity-group:"
1391 " {}".format(task_id, ro_task["target_id"], e)
1392 )
1393 ro_vim_item_update = {
1394 "vim_status": "VIM_ERROR",
1395 "created": created,
1396 "vim_message": str(e),
1397 }
1398
1399 return "FAILED", ro_vim_item_update
1400
1401
1402 class VimInteractionUpdateVdu(VimInteractionBase):
1403 def exec(self, ro_task, task_index, task_depends):
1404 task = ro_task["tasks"][task_index]
1405 task_id = task["task_id"]
1406 db_task_update = {"retries": 0}
1407 created = False
1408 created_items = {}
1409 target_vim = self.my_vims[ro_task["target_id"]]
1410
1411 try:
1412 vim_vm_id = ""
1413 if task.get("params"):
1414 vim_vm_id = task["params"].get("vim_vm_id")
1415 action = task["params"].get("action")
1416 context = {action: action}
1417 target_vim.action_vminstance(vim_vm_id, context)
1418 # created = True
1419 ro_vim_item_update = {
1420 "vim_id": vim_vm_id,
1421 "vim_status": "ACTIVE",
1422 "created": created,
1423 "created_items": created_items,
1424 "vim_details": None,
1425 "vim_message": None,
1426 }
1427 self.logger.debug(
1428 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1429 )
1430 return "DONE", ro_vim_item_update, db_task_update
1431 except (vimconn.VimConnException, NsWorkerException) as e:
1432 self.logger.error(
1433 "task={} vim={} VM Migration:"
1434 " {}".format(task_id, ro_task["target_id"], e)
1435 )
1436 ro_vim_item_update = {
1437 "vim_status": "VIM_ERROR",
1438 "created": created,
1439 "vim_message": str(e),
1440 }
1441
1442 return "FAILED", ro_vim_item_update, db_task_update
1443
1444
1445 class VimInteractionSdnNet(VimInteractionBase):
1446 @staticmethod
1447 def _match_pci(port_pci, mapping):
1448 """
1449 Check if port_pci matches with mapping.
1450 The mapping can have brackets to indicate that several chars are accepted. e.g
1451 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1452 :param port_pci: text
1453 :param mapping: text, can contain brackets to indicate several chars are available
1454 :return: True if matches, False otherwise
1455 """
1456 if not port_pci or not mapping:
1457 return False
1458 if port_pci == mapping:
1459 return True
1460
1461 mapping_index = 0
1462 pci_index = 0
1463 while True:
1464 bracket_start = mapping.find("[", mapping_index)
1465
1466 if bracket_start == -1:
1467 break
1468
1469 bracket_end = mapping.find("]", bracket_start)
1470 if bracket_end == -1:
1471 break
1472
1473 length = bracket_start - mapping_index
1474 if (
1475 length
1476 and port_pci[pci_index : pci_index + length]
1477 != mapping[mapping_index:bracket_start]
1478 ):
1479 return False
1480
1481 if (
1482 port_pci[pci_index + length]
1483 not in mapping[bracket_start + 1 : bracket_end]
1484 ):
1485 return False
1486
1487 pci_index += length + 1
1488 mapping_index = bracket_end + 1
1489
1490 if port_pci[pci_index:] != mapping[mapping_index:]:
1491 return False
1492
1493 return True
1494
1495 def _get_interfaces(self, vlds_to_connect, vim_account_id):
1496 """
1497 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1498 :param vim_account_id:
1499 :return:
1500 """
1501 interfaces = []
1502
1503 for vld in vlds_to_connect:
1504 table, _, db_id = vld.partition(":")
1505 db_id, _, vld = db_id.partition(":")
1506 _, _, vld_id = vld.partition(".")
1507
1508 if table == "vnfrs":
1509 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1510 iface_key = "vnf-vld-id"
1511 else: # table == "nsrs"
1512 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1513 iface_key = "ns-vld-id"
1514
1515 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1516
1517 for db_vnfr in db_vnfrs:
1518 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1519 for iface_index, interface in enumerate(vdur["interfaces"]):
1520 if interface.get(iface_key) == vld_id and interface.get(
1521 "type"
1522 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1523 # only SR-IOV o PT
1524 interface_ = interface.copy()
1525 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1526 db_vnfr["_id"], vdu_index, iface_index
1527 )
1528
1529 if vdur.get("status") == "ERROR":
1530 interface_["status"] = "ERROR"
1531
1532 interfaces.append(interface_)
1533
1534 return interfaces
1535
1536 def refresh(self, ro_task):
1537 # look for task create
1538 task_create_index, _ = next(
1539 i_t
1540 for i_t in enumerate(ro_task["tasks"])
1541 if i_t[1]
1542 and i_t[1]["action"] == "CREATE"
1543 and i_t[1]["status"] != "FINISHED"
1544 )
1545
1546 return self.new(ro_task, task_create_index, None)
1547
1548 def new(self, ro_task, task_index, task_depends):
1549 task = ro_task["tasks"][task_index]
1550 task_id = task["task_id"]
1551 target_vim = self.my_vims[ro_task["target_id"]]
1552
1553 sdn_net_id = ro_task["vim_info"]["vim_id"]
1554
1555 created_items = ro_task["vim_info"].get("created_items")
1556 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1557 new_connected_ports = []
1558 last_update = ro_task["vim_info"].get("last_update", 0)
1559 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1560 error_list = []
1561 created = ro_task["vim_info"].get("created", False)
1562
1563 try:
1564 # CREATE
1565 db_vim = {}
1566 params = task["params"]
1567 vlds_to_connect = params.get("vlds", [])
1568 associated_vim = params.get("target_vim")
1569 # external additional ports
1570 additional_ports = params.get("sdn-ports") or ()
1571 _, _, vim_account_id = (
1572 (None, None, None)
1573 if associated_vim is None
1574 else associated_vim.partition(":")
1575 )
1576
1577 if associated_vim:
1578 # get associated VIM
1579 if associated_vim not in self.db_vims:
1580 self.db_vims[associated_vim] = self.db.get_one(
1581 "vim_accounts", {"_id": vim_account_id}
1582 )
1583
1584 db_vim = self.db_vims[associated_vim]
1585
1586 # look for ports to connect
1587 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1588 # print(ports)
1589
1590 sdn_ports = []
1591 pending_ports = error_ports = 0
1592 vlan_used = None
1593 sdn_need_update = False
1594
1595 for port in ports:
1596 vlan_used = port.get("vlan") or vlan_used
1597
1598 # TODO. Do not connect if already done
1599 if not port.get("compute_node") or not port.get("pci"):
1600 if port.get("status") == "ERROR":
1601 error_ports += 1
1602 else:
1603 pending_ports += 1
1604 continue
1605
1606 pmap = None
1607 compute_node_mappings = next(
1608 (
1609 c
1610 for c in db_vim["config"].get("sdn-port-mapping", ())
1611 if c and c["compute_node"] == port["compute_node"]
1612 ),
1613 None,
1614 )
1615
1616 if compute_node_mappings:
1617 # process port_mapping pci of type 0000:af:1[01].[1357]
1618 pmap = next(
1619 (
1620 p
1621 for p in compute_node_mappings["ports"]
1622 if self._match_pci(port["pci"], p.get("pci"))
1623 ),
1624 None,
1625 )
1626
1627 if not pmap:
1628 if not db_vim["config"].get("mapping_not_needed"):
1629 error_list.append(
1630 "Port mapping not found for compute_node={} pci={}".format(
1631 port["compute_node"], port["pci"]
1632 )
1633 )
1634 continue
1635
1636 pmap = {}
1637
1638 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1639 new_port = {
1640 "service_endpoint_id": pmap.get("service_endpoint_id")
1641 or service_endpoint_id,
1642 "service_endpoint_encapsulation_type": (
1643 "dot1q" if port["type"] == "SR-IOV" else None
1644 ),
1645 "service_endpoint_encapsulation_info": {
1646 "vlan": port.get("vlan"),
1647 "mac": port.get("mac-address"),
1648 "device_id": pmap.get("device_id") or port["compute_node"],
1649 "device_interface_id": pmap.get("device_interface_id")
1650 or port["pci"],
1651 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1652 "switch_port": pmap.get("switch_port"),
1653 "service_mapping_info": pmap.get("service_mapping_info"),
1654 },
1655 }
1656
1657 # TODO
1658 # if port["modified_at"] > last_update:
1659 # sdn_need_update = True
1660 new_connected_ports.append(port["id"]) # TODO
1661 sdn_ports.append(new_port)
1662
1663 if error_ports:
1664 error_list.append(
1665 "{} interfaces have not been created as VDU is on ERROR status".format(
1666 error_ports
1667 )
1668 )
1669
1670 # connect external ports
1671 for index, additional_port in enumerate(additional_ports):
1672 additional_port_id = additional_port.get(
1673 "service_endpoint_id"
1674 ) or "external-{}".format(index)
1675 sdn_ports.append(
1676 {
1677 "service_endpoint_id": additional_port_id,
1678 "service_endpoint_encapsulation_type": additional_port.get(
1679 "service_endpoint_encapsulation_type", "dot1q"
1680 ),
1681 "service_endpoint_encapsulation_info": {
1682 "vlan": additional_port.get("vlan") or vlan_used,
1683 "mac": additional_port.get("mac_address"),
1684 "device_id": additional_port.get("device_id"),
1685 "device_interface_id": additional_port.get(
1686 "device_interface_id"
1687 ),
1688 "switch_dpid": additional_port.get("switch_dpid")
1689 or additional_port.get("switch_id"),
1690 "switch_port": additional_port.get("switch_port"),
1691 "service_mapping_info": additional_port.get(
1692 "service_mapping_info"
1693 ),
1694 },
1695 }
1696 )
1697 new_connected_ports.append(additional_port_id)
1698 sdn_info = ""
1699
1700 # if there are more ports to connect or they have been modified, call create/update
1701 if error_list:
1702 sdn_status = "ERROR"
1703 sdn_info = "; ".join(error_list)
1704 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1705 last_update = time.time()
1706
1707 if not sdn_net_id:
1708 if len(sdn_ports) < 2:
1709 sdn_status = "ACTIVE"
1710
1711 if not pending_ports:
1712 self.logger.debug(
1713 "task={} {} new-sdn-net done, less than 2 ports".format(
1714 task_id, ro_task["target_id"]
1715 )
1716 )
1717 else:
1718 net_type = params.get("type") or "ELAN"
1719 (
1720 sdn_net_id,
1721 created_items,
1722 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1723 created = True
1724 self.logger.debug(
1725 "task={} {} new-sdn-net={} created={}".format(
1726 task_id, ro_task["target_id"], sdn_net_id, created
1727 )
1728 )
1729 else:
1730 created_items = target_vim.edit_connectivity_service(
1731 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1732 )
1733 created = True
1734 self.logger.debug(
1735 "task={} {} update-sdn-net={} created={}".format(
1736 task_id, ro_task["target_id"], sdn_net_id, created
1737 )
1738 )
1739
1740 connected_ports = new_connected_ports
1741 elif sdn_net_id:
1742 wim_status_dict = target_vim.get_connectivity_service_status(
1743 sdn_net_id, conn_info=created_items
1744 )
1745 sdn_status = wim_status_dict["sdn_status"]
1746
1747 if wim_status_dict.get("sdn_info"):
1748 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1749
1750 if wim_status_dict.get("error_msg"):
1751 sdn_info = wim_status_dict.get("error_msg") or ""
1752
1753 if pending_ports:
1754 if sdn_status != "ERROR":
1755 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1756 len(ports) - pending_ports, len(ports)
1757 )
1758
1759 if sdn_status == "ACTIVE":
1760 sdn_status = "BUILD"
1761
1762 ro_vim_item_update = {
1763 "vim_id": sdn_net_id,
1764 "vim_status": sdn_status,
1765 "created": created,
1766 "created_items": created_items,
1767 "connected_ports": connected_ports,
1768 "vim_details": sdn_info,
1769 "vim_message": None,
1770 "last_update": last_update,
1771 }
1772
1773 return sdn_status, ro_vim_item_update
1774 except Exception as e:
1775 self.logger.error(
1776 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1777 exc_info=not isinstance(
1778 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1779 ),
1780 )
1781 ro_vim_item_update = {
1782 "vim_status": "VIM_ERROR",
1783 "created": created,
1784 "vim_message": str(e),
1785 }
1786
1787 return "FAILED", ro_vim_item_update
1788
1789 def delete(self, ro_task, task_index):
1790 task = ro_task["tasks"][task_index]
1791 task_id = task["task_id"]
1792 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1793 ro_vim_item_update_ok = {
1794 "vim_status": "DELETED",
1795 "created": False,
1796 "vim_message": "DELETED",
1797 "vim_id": None,
1798 }
1799
1800 try:
1801 if sdn_vim_id:
1802 target_vim = self.my_vims[ro_task["target_id"]]
1803 target_vim.delete_connectivity_service(
1804 sdn_vim_id, ro_task["vim_info"].get("created_items")
1805 )
1806
1807 except Exception as e:
1808 if (
1809 isinstance(e, sdnconn.SdnConnectorError)
1810 and e.http_code == HTTPStatus.NOT_FOUND.value
1811 ):
1812 ro_vim_item_update_ok["vim_message"] = "already deleted"
1813 else:
1814 self.logger.error(
1815 "ro_task={} vim={} del-sdn-net={}: {}".format(
1816 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1817 ),
1818 exc_info=not isinstance(
1819 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1820 ),
1821 )
1822 ro_vim_item_update = {
1823 "vim_status": "VIM_ERROR",
1824 "vim_message": "Error while deleting: {}".format(e),
1825 }
1826
1827 return "FAILED", ro_vim_item_update
1828
1829 self.logger.debug(
1830 "task={} {} del-sdn-net={} {}".format(
1831 task_id,
1832 ro_task["target_id"],
1833 sdn_vim_id,
1834 ro_vim_item_update_ok.get("vim_message", ""),
1835 )
1836 )
1837
1838 return "DONE", ro_vim_item_update_ok
1839
1840
1841 class VimInteractionMigration(VimInteractionBase):
1842 def exec(self, ro_task, task_index, task_depends):
1843 task = ro_task["tasks"][task_index]
1844 task_id = task["task_id"]
1845 db_task_update = {"retries": 0}
1846 target_vim = self.my_vims[ro_task["target_id"]]
1847 vim_interfaces = []
1848 created = False
1849 created_items = {}
1850 refreshed_vim_info = {}
1851
1852 try:
1853 vim_vm_id = ""
1854 if task.get("params"):
1855 vim_vm_id = task["params"].get("vim_vm_id")
1856 migrate_host = task["params"].get("migrate_host")
1857 _, migrated_compute_node = target_vim.migrate_instance(
1858 vim_vm_id, migrate_host
1859 )
1860
1861 if migrated_compute_node:
1862 # When VM is migrated, vdu["vim_info"] needs to be updated
1863 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1864 ro_task["target_id"]
1865 )
1866
1867 # Refresh VM to get new vim_info
1868 vm_to_refresh_list = [vim_vm_id]
1869 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1870 refreshed_vim_info = vim_dict[vim_vm_id]
1871
1872 if refreshed_vim_info.get("interfaces"):
1873 for old_iface in vdu_old_vim_info.get("interfaces"):
1874 iface = next(
1875 (
1876 iface
1877 for iface in refreshed_vim_info["interfaces"]
1878 if old_iface["vim_interface_id"]
1879 == iface["vim_interface_id"]
1880 ),
1881 None,
1882 )
1883 vim_interfaces.append(iface)
1884
1885 ro_vim_item_update = {
1886 "vim_id": vim_vm_id,
1887 "vim_status": "ACTIVE",
1888 "created": created,
1889 "created_items": created_items,
1890 "vim_details": None,
1891 "vim_message": None,
1892 }
1893
1894 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1895 "ERROR",
1896 "VIM_ERROR",
1897 ):
1898 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1899
1900 if vim_interfaces:
1901 ro_vim_item_update["interfaces"] = vim_interfaces
1902
1903 self.logger.debug(
1904 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1905 )
1906
1907 return "DONE", ro_vim_item_update, db_task_update
1908
1909 except (vimconn.VimConnException, NsWorkerException) as e:
1910 self.logger.error(
1911 "task={} vim={} VM Migration:"
1912 " {}".format(task_id, ro_task["target_id"], e)
1913 )
1914 ro_vim_item_update = {
1915 "vim_status": "VIM_ERROR",
1916 "created": created,
1917 "vim_message": str(e),
1918 }
1919
1920 return "FAILED", ro_vim_item_update, db_task_update
1921
1922
1923 class VimInteractionResize(VimInteractionBase):
1924 def exec(self, ro_task, task_index, task_depends):
1925 task = ro_task["tasks"][task_index]
1926 task_id = task["task_id"]
1927 db_task_update = {"retries": 0}
1928 created = False
1929 target_flavor_uuid = None
1930 created_items = {}
1931 refreshed_vim_info = {}
1932 target_vim = self.my_vims[ro_task["target_id"]]
1933
1934 try:
1935 params = task["params"]
1936 params_copy = deepcopy(params)
1937 target_flavor_uuid = task_depends[params_copy["flavor_id"]]
1938 vim_vm_id = ""
1939 if task.get("params"):
1940 self.logger.info("vim_vm_id %s", vim_vm_id)
1941
1942 if target_flavor_uuid is not None:
1943 resized_status = target_vim.resize_instance(
1944 vim_vm_id, target_flavor_uuid
1945 )
1946
1947 if resized_status:
1948 # Refresh VM to get new vim_info
1949 vm_to_refresh_list = [vim_vm_id]
1950 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1951 refreshed_vim_info = vim_dict[vim_vm_id]
1952
1953 ro_vim_item_update = {
1954 "vim_id": vim_vm_id,
1955 "vim_status": "ACTIVE",
1956 "created": created,
1957 "created_items": created_items,
1958 "vim_details": None,
1959 "vim_message": None,
1960 }
1961
1962 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1963 "ERROR",
1964 "VIM_ERROR",
1965 ):
1966 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1967
1968 self.logger.debug(
1969 "task={} {} resize done".format(task_id, ro_task["target_id"])
1970 )
1971 return "DONE", ro_vim_item_update, db_task_update
1972 except (vimconn.VimConnException, NsWorkerException) as e:
1973 self.logger.error(
1974 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1975 )
1976 ro_vim_item_update = {
1977 "vim_status": "VIM_ERROR",
1978 "created": created,
1979 "vim_message": str(e),
1980 }
1981
1982 return "FAILED", ro_vim_item_update, db_task_update
1983
1984
1985 class ConfigValidate:
1986 def __init__(self, config: Dict):
1987 self.conf = config
1988
1989 @property
1990 def active(self):
1991 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1992 if (
1993 self.conf["period"]["refresh_active"] >= 60
1994 or self.conf["period"]["refresh_active"] == -1
1995 ):
1996 return self.conf["period"]["refresh_active"]
1997
1998 return 60
1999
2000 @property
2001 def build(self):
2002 return self.conf["period"]["refresh_build"]
2003
2004 @property
2005 def image(self):
2006 return self.conf["period"]["refresh_image"]
2007
2008 @property
2009 def error(self):
2010 return self.conf["period"]["refresh_error"]
2011
2012 @property
2013 def queue_size(self):
2014 return self.conf["period"]["queue_size"]
2015
2016
2017 class NsWorker(threading.Thread):
2018 def __init__(self, worker_index, config, plugins, db):
2019 """
2020 :param worker_index: thread index
2021 :param config: general configuration of RO, among others the process_id with the docker id where it runs
2022 :param plugins: global shared dict with the loaded plugins
2023 :param db: database class instance to use
2024 """
2025 threading.Thread.__init__(self)
2026 self.config = config
2027 self.plugins = plugins
2028 self.plugin_name = "unknown"
2029 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
2030 self.worker_index = worker_index
2031 # refresh periods for created items
2032 self.refresh_config = ConfigValidate(config)
2033 self.task_queue = queue.Queue(self.refresh_config.queue_size)
2034 # targetvim: vimplugin class
2035 self.my_vims = {}
2036 # targetvim: vim information from database
2037 self.db_vims = {}
2038 # targetvim list
2039 self.vim_targets = []
2040 self.my_id = config["process_id"] + ":" + str(worker_index)
2041 self.db = db
2042 self.item2class = {
2043 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
2044 "shared-volumes": VimInteractionSharedVolume(
2045 self.db, self.my_vims, self.db_vims, self.logger
2046 ),
2047 "classification": VimInteractionClassification(
2048 self.db, self.my_vims, self.db_vims, self.logger
2049 ),
2050 "sfi": VimInteractionSfi(self.db, self.my_vims, self.db_vims, self.logger),
2051 "sf": VimInteractionSf(self.db, self.my_vims, self.db_vims, self.logger),
2052 "sfp": VimInteractionSfp(self.db, self.my_vims, self.db_vims, self.logger),
2053 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
2054 "image": VimInteractionImage(
2055 self.db, self.my_vims, self.db_vims, self.logger
2056 ),
2057 "flavor": VimInteractionFlavor(
2058 self.db, self.my_vims, self.db_vims, self.logger
2059 ),
2060 "sdn_net": VimInteractionSdnNet(
2061 self.db, self.my_vims, self.db_vims, self.logger
2062 ),
2063 "update": VimInteractionUpdateVdu(
2064 self.db, self.my_vims, self.db_vims, self.logger
2065 ),
2066 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
2067 self.db, self.my_vims, self.db_vims, self.logger
2068 ),
2069 "migrate": VimInteractionMigration(
2070 self.db, self.my_vims, self.db_vims, self.logger
2071 ),
2072 "verticalscale": VimInteractionResize(
2073 self.db, self.my_vims, self.db_vims, self.logger
2074 ),
2075 }
2076 self.time_last_task_processed = None
2077 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
2078 self.tasks_to_delete = []
2079 # it is idle when there are not vim_targets associated
2080 self.idle = True
2081 self.task_locked_time = config["global"]["task_locked_time"]
2082
2083 def insert_task(self, task):
2084 try:
2085 self.task_queue.put(task, False)
2086 return None
2087 except queue.Full:
2088 raise NsWorkerException("timeout inserting a task")
2089
2090 def terminate(self):
2091 self.insert_task("exit")
2092
2093 def del_task(self, task):
2094 with self.task_lock:
2095 if task["status"] == "SCHEDULED":
2096 task["status"] = "SUPERSEDED"
2097 return True
2098 else: # task["status"] == "processing"
2099 self.task_lock.release()
2100 return False
2101
2102 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
2103 """
2104 Process vim config, creating vim configuration files as ca_cert
2105 :param target_id: vim/sdn/wim + id
2106 :param db_vim: Vim dictionary obtained from database
2107 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
2108 """
2109 if not db_vim.get("config"):
2110 return
2111
2112 file_name = ""
2113 work_dir = "/app/osm_ro/certs"
2114
2115 try:
2116 if db_vim["config"].get("ca_cert_content"):
2117 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
2118
2119 if not path.isdir(file_name):
2120 makedirs(file_name)
2121
2122 file_name = file_name + "/ca_cert"
2123
2124 with open(file_name, "w") as f:
2125 f.write(db_vim["config"]["ca_cert_content"])
2126 del db_vim["config"]["ca_cert_content"]
2127 db_vim["config"]["ca_cert"] = file_name
2128 except Exception as e:
2129 raise NsWorkerException(
2130 "Error writing to file '{}': {}".format(file_name, e)
2131 )
2132
2133 def _load_plugin(self, name, type="vim"):
2134 # type can be vim or sdn
2135 if "rovim_dummy" not in self.plugins:
2136 self.plugins["rovim_dummy"] = VimDummyConnector
2137
2138 if "rosdn_dummy" not in self.plugins:
2139 self.plugins["rosdn_dummy"] = SdnDummyConnector
2140
2141 if name in self.plugins:
2142 return self.plugins[name]
2143
2144 try:
2145 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
2146 self.plugins[name] = ep.load()
2147 except Exception as e:
2148 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
2149
2150 if name and name not in self.plugins:
2151 raise NsWorkerException(
2152 "Plugin 'osm_{n}' has not been installed".format(n=name)
2153 )
2154
2155 return self.plugins[name]
2156
2157 def _unload_vim(self, target_id):
2158 """
2159 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
2160 :param target_id: Contains type:_id; where type can be 'vim', ...
2161 :return: None.
2162 """
2163 try:
2164 self.db_vims.pop(target_id, None)
2165 self.my_vims.pop(target_id, None)
2166
2167 if target_id in self.vim_targets:
2168 self.vim_targets.remove(target_id)
2169
2170 self.logger.info("Unloaded {}".format(target_id))
2171 except Exception as e:
2172 self.logger.error("Cannot unload {}: {}".format(target_id, e))
2173
2174 def _check_vim(self, target_id):
2175 """
2176 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
2177 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
2178 :return: None.
2179 """
2180 target, _, _id = target_id.partition(":")
2181 now = time.time()
2182 update_dict = {}
2183 unset_dict = {}
2184 op_text = ""
2185 step = ""
2186 loaded = target_id in self.vim_targets
2187 target_database = (
2188 "vim_accounts"
2189 if target == "vim"
2190 else "wim_accounts" if target == "wim" else "sdns"
2191 )
2192 error_text = ""
2193
2194 try:
2195 step = "Getting {} from db".format(target_id)
2196 db_vim = self.db.get_one(target_database, {"_id": _id})
2197
2198 for op_index, operation in enumerate(
2199 db_vim["_admin"].get("operations", ())
2200 ):
2201 if operation["operationState"] != "PROCESSING":
2202 continue
2203
2204 locked_at = operation.get("locked_at")
2205
2206 if locked_at is not None and locked_at >= now - self.task_locked_time:
2207 # some other thread is doing this operation
2208 return
2209
2210 # lock
2211 op_text = "_admin.operations.{}.".format(op_index)
2212
2213 if not self.db.set_one(
2214 target_database,
2215 q_filter={
2216 "_id": _id,
2217 op_text + "operationState": "PROCESSING",
2218 op_text + "locked_at": locked_at,
2219 },
2220 update_dict={
2221 op_text + "locked_at": now,
2222 "admin.current_operation": op_index,
2223 },
2224 fail_on_empty=False,
2225 ):
2226 return
2227
2228 unset_dict[op_text + "locked_at"] = None
2229 unset_dict["current_operation"] = None
2230 step = "Loading " + target_id
2231 error_text = self._load_vim(target_id)
2232
2233 if not error_text:
2234 step = "Checking connectivity"
2235
2236 if target == "vim":
2237 self.my_vims[target_id].check_vim_connectivity()
2238 else:
2239 self.my_vims[target_id].check_credentials()
2240
2241 update_dict["_admin.operationalState"] = "ENABLED"
2242 update_dict["_admin.detailed-status"] = ""
2243 unset_dict[op_text + "detailed-status"] = None
2244 update_dict[op_text + "operationState"] = "COMPLETED"
2245
2246 return
2247
2248 except Exception as e:
2249 error_text = "{}: {}".format(step, e)
2250 self.logger.error("{} for {}: {}".format(step, target_id, e))
2251
2252 finally:
2253 if update_dict or unset_dict:
2254 if error_text:
2255 update_dict[op_text + "operationState"] = "FAILED"
2256 update_dict[op_text + "detailed-status"] = error_text
2257 unset_dict.pop(op_text + "detailed-status", None)
2258 update_dict["_admin.operationalState"] = "ERROR"
2259 update_dict["_admin.detailed-status"] = error_text
2260
2261 if op_text:
2262 update_dict[op_text + "statusEnteredTime"] = now
2263
2264 self.db.set_one(
2265 target_database,
2266 q_filter={"_id": _id},
2267 update_dict=update_dict,
2268 unset=unset_dict,
2269 fail_on_empty=False,
2270 )
2271
2272 if not loaded:
2273 self._unload_vim(target_id)
2274
2275 def _reload_vim(self, target_id):
2276 if target_id in self.vim_targets:
2277 self._load_vim(target_id)
2278 else:
2279 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
2280 # just remove it to force load again next time it is needed
2281 self.db_vims.pop(target_id, None)
2282
2283 def _load_vim(self, target_id):
2284 """
2285 Load or reload a vim_account, sdn_controller or wim_account.
2286 Read content from database, load the plugin if not loaded.
2287 In case of error loading the plugin, it loads a failing VIM_connector
2288 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
2289 :param target_id: Contains type:_id; where type can be 'vim', ...
2290 :return: None if ok, descriptive text if error
2291 """
2292 target, _, _id = target_id.partition(":")
2293 target_database = (
2294 "vim_accounts"
2295 if target == "vim"
2296 else "wim_accounts" if target == "wim" else "sdns"
2297 )
2298 plugin_name = ""
2299 vim = None
2300 step = "Getting {}={} from db".format(target, _id)
2301
2302 try:
2303 # TODO process for wim, sdnc, ...
2304 vim = self.db.get_one(target_database, {"_id": _id})
2305
2306 # if deep_get(vim, "config", "sdn-controller"):
2307 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
2308 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
2309
2310 step = "Decrypting password"
2311 schema_version = vim.get("schema_version")
2312 self.db.encrypt_decrypt_fields(
2313 vim,
2314 "decrypt",
2315 fields=("password", "secret"),
2316 schema_version=schema_version,
2317 salt=_id,
2318 )
2319 self._process_vim_config(target_id, vim)
2320
2321 if target == "vim":
2322 plugin_name = "rovim_" + vim["vim_type"]
2323 step = "Loading plugin '{}'".format(plugin_name)
2324 vim_module_conn = self._load_plugin(plugin_name)
2325 step = "Loading {}'".format(target_id)
2326 self.my_vims[target_id] = vim_module_conn(
2327 uuid=vim["_id"],
2328 name=vim["name"],
2329 tenant_id=vim.get("vim_tenant_id"),
2330 tenant_name=vim.get("vim_tenant_name"),
2331 url=vim["vim_url"],
2332 url_admin=None,
2333 user=vim["vim_user"],
2334 passwd=vim["vim_password"],
2335 config=vim.get("config") or {},
2336 persistent_info={},
2337 )
2338 else: # sdn
2339 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
2340 step = "Loading plugin '{}'".format(plugin_name)
2341 vim_module_conn = self._load_plugin(plugin_name, "sdn")
2342 step = "Loading {}'".format(target_id)
2343 wim = deepcopy(vim)
2344 wim_config = wim.pop("config", {}) or {}
2345 wim["uuid"] = wim["_id"]
2346 if "url" in wim and "wim_url" not in wim:
2347 wim["wim_url"] = wim["url"]
2348 elif "url" not in wim and "wim_url" in wim:
2349 wim["url"] = wim["wim_url"]
2350
2351 if wim.get("dpid"):
2352 wim_config["dpid"] = wim.pop("dpid")
2353
2354 if wim.get("switch_id"):
2355 wim_config["switch_id"] = wim.pop("switch_id")
2356
2357 # wim, wim_account, config
2358 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
2359 self.db_vims[target_id] = vim
2360 self.error_status = None
2361
2362 self.logger.info(
2363 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
2364 )
2365 except Exception as e:
2366 self.logger.error(
2367 "Cannot load {} plugin={}: {} {}".format(
2368 target_id, plugin_name, step, e
2369 )
2370 )
2371
2372 self.db_vims[target_id] = vim or {}
2373 self.db_vims[target_id] = FailingConnector(str(e))
2374 error_status = "{} Error: {}".format(step, e)
2375
2376 return error_status
2377 finally:
2378 if target_id not in self.vim_targets:
2379 self.vim_targets.append(target_id)
2380
2381 def _get_db_task(self):
2382 """
2383 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
2384 :return: None
2385 """
2386 now = time.time()
2387
2388 if not self.time_last_task_processed:
2389 self.time_last_task_processed = now
2390
2391 try:
2392 while True:
2393 """
2394 # Log RO tasks only when loglevel is DEBUG
2395 if self.logger.getEffectiveLevel() == logging.DEBUG:
2396 self._log_ro_task(
2397 None,
2398 None,
2399 None,
2400 "TASK_WF",
2401 "task_locked_time="
2402 + str(self.task_locked_time)
2403 + " "
2404 + "time_last_task_processed="
2405 + str(self.time_last_task_processed)
2406 + " "
2407 + "now="
2408 + str(now),
2409 )
2410 """
2411 locked = self.db.set_one(
2412 "ro_tasks",
2413 q_filter={
2414 "target_id": self.vim_targets,
2415 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2416 "locked_at.lt": now - self.task_locked_time,
2417 "to_check_at.lt": self.time_last_task_processed,
2418 "to_check_at.gt": -1,
2419 },
2420 update_dict={"locked_by": self.my_id, "locked_at": now},
2421 fail_on_empty=False,
2422 )
2423
2424 if locked:
2425 # read and return
2426 ro_task = self.db.get_one(
2427 "ro_tasks",
2428 q_filter={
2429 "target_id": self.vim_targets,
2430 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2431 "locked_at": now,
2432 },
2433 )
2434 return ro_task
2435
2436 if self.time_last_task_processed == now:
2437 self.time_last_task_processed = None
2438 return None
2439 else:
2440 self.time_last_task_processed = now
2441 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2442
2443 except DbException as e:
2444 self.logger.error("Database exception at _get_db_task: {}".format(e))
2445 except Exception as e:
2446 self.logger.critical(
2447 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
2448 )
2449
2450 return None
2451
2452 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2453 """
2454 Determine if this task need to be done or superseded
2455 :return: None
2456 """
2457 my_task = ro_task["tasks"][task_index]
2458 task_id = my_task["task_id"]
2459 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2460 "created_items", False
2461 )
2462
2463 self.logger.debug("Needed delete: {}".format(needed_delete))
2464 if my_task["status"] == "FAILED":
2465 return None, None # TODO need to be retry??
2466
2467 try:
2468 for index, task in enumerate(ro_task["tasks"]):
2469 if index == task_index or not task:
2470 continue # own task
2471
2472 if (
2473 my_task["target_record"] == task["target_record"]
2474 and task["action"] == "CREATE"
2475 ):
2476 # set to finished
2477 db_update["tasks.{}.status".format(index)] = task["status"] = (
2478 "FINISHED"
2479 )
2480 elif task["action"] == "CREATE" and task["status"] not in (
2481 "FINISHED",
2482 "SUPERSEDED",
2483 ):
2484 needed_delete = False
2485
2486 if needed_delete:
2487 self.logger.debug(
2488 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2489 )
2490 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2491 else:
2492 return "SUPERSEDED", None
2493 except Exception as e:
2494 if not isinstance(e, NsWorkerException):
2495 self.logger.critical(
2496 "Unexpected exception at _delete_task task={}: {}".format(
2497 task_id, e
2498 ),
2499 exc_info=True,
2500 )
2501
2502 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2503
2504 def _create_task(self, ro_task, task_index, task_depends, db_update):
2505 """
2506 Determine if this task need to create something at VIM
2507 :return: None
2508 """
2509 my_task = ro_task["tasks"][task_index]
2510 task_id = my_task["task_id"]
2511
2512 if my_task["status"] == "FAILED":
2513 return None, None # TODO need to be retry??
2514 elif my_task["status"] == "SCHEDULED":
2515 # check if already created by another task
2516 for index, task in enumerate(ro_task["tasks"]):
2517 if index == task_index or not task:
2518 continue # own task
2519
2520 if task["action"] == "CREATE" and task["status"] not in (
2521 "SCHEDULED",
2522 "FINISHED",
2523 "SUPERSEDED",
2524 ):
2525 return task["status"], "COPY_VIM_INFO"
2526
2527 try:
2528 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2529 ro_task, task_index, task_depends
2530 )
2531 # TODO update other CREATE tasks
2532 except Exception as e:
2533 if not isinstance(e, NsWorkerException):
2534 self.logger.error(
2535 "Error executing task={}: {}".format(task_id, e), exc_info=True
2536 )
2537
2538 task_status = "FAILED"
2539 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2540 # TODO update ro_vim_item_update
2541
2542 return task_status, ro_vim_item_update
2543 else:
2544 return None, None
2545
2546 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2547 """
2548 Look for dependency task
2549 :param task_id: Can be one of
2550 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2551 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2552 3. task.task_id: "<action_id>:number"
2553 :param ro_task:
2554 :param target_id:
2555 :return: database ro_task plus index of task
2556 """
2557 if (
2558 task_id.startswith("vim:")
2559 or task_id.startswith("sdn:")
2560 or task_id.startswith("wim:")
2561 ):
2562 target_id, _, task_id = task_id.partition(" ")
2563
2564 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2565 ro_task_dependency = self.db.get_one(
2566 "ro_tasks",
2567 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2568 fail_on_empty=False,
2569 )
2570
2571 if ro_task_dependency:
2572 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2573 if task["target_record_id"] == task_id:
2574 return ro_task_dependency, task_index
2575
2576 else:
2577 if ro_task:
2578 for task_index, task in enumerate(ro_task["tasks"]):
2579 if task and task["task_id"] == task_id:
2580 return ro_task, task_index
2581
2582 ro_task_dependency = self.db.get_one(
2583 "ro_tasks",
2584 q_filter={
2585 "tasks.ANYINDEX.task_id": task_id,
2586 "tasks.ANYINDEX.target_record.ne": None,
2587 },
2588 fail_on_empty=False,
2589 )
2590
2591 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2592 if ro_task_dependency:
2593 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2594 if task["task_id"] == task_id:
2595 return ro_task_dependency, task_index
2596 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2597
2598 def update_vm_refresh(self, ro_task):
2599 """Enables the VM status updates if self.refresh_config.active parameter
2600 is not -1 and then updates the DB accordingly
2601
2602 """
2603 try:
2604 self.logger.debug("Checking if VM status update config")
2605 next_refresh = time.time()
2606 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2607
2608 if next_refresh != -1:
2609 db_ro_task_update = {}
2610 now = time.time()
2611 next_check_at = now + (24 * 60 * 60)
2612 next_check_at = min(next_check_at, next_refresh)
2613 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2614 db_ro_task_update["to_check_at"] = next_check_at
2615
2616 self.logger.debug(
2617 "Finding tasks which to be updated to enable VM status updates"
2618 )
2619 refresh_tasks = self.db.get_list(
2620 "ro_tasks",
2621 q_filter={
2622 "tasks.status": "DONE",
2623 "to_check_at.lt": 0,
2624 },
2625 )
2626 self.logger.debug("Updating tasks to change the to_check_at status")
2627 for task in refresh_tasks:
2628 q_filter = {
2629 "_id": task["_id"],
2630 }
2631 self.db.set_one(
2632 "ro_tasks",
2633 q_filter=q_filter,
2634 update_dict=db_ro_task_update,
2635 fail_on_empty=True,
2636 )
2637
2638 except Exception as e:
2639 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2640
2641 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2642 """Decide the next_refresh according to vim type and refresh config period.
2643 Args:
2644 ro_task (dict): ro_task details
2645 next_refresh (float): next refresh time as epoch format
2646
2647 Returns:
2648 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2649 """
2650 target_vim = ro_task["target_id"]
2651 vim_type = self.db_vims[target_vim]["vim_type"]
2652 if self.refresh_config.active == -1 or vim_type == "openstack":
2653 next_refresh = -1
2654 else:
2655 next_refresh += self.refresh_config.active
2656 return next_refresh
2657
2658 def _process_pending_tasks(self, ro_task):
2659 ro_task_id = ro_task["_id"]
2660 now = time.time()
2661 # one day
2662 next_check_at = now + (24 * 60 * 60)
2663 db_ro_task_update = {}
2664
2665 def _update_refresh(new_status):
2666 # compute next_refresh
2667 nonlocal task
2668 nonlocal next_check_at
2669 nonlocal db_ro_task_update
2670 nonlocal ro_task
2671
2672 next_refresh = time.time()
2673
2674 if task["item"] in ("image", "flavor"):
2675 next_refresh += self.refresh_config.image
2676 elif new_status == "BUILD":
2677 next_refresh += self.refresh_config.build
2678 elif new_status == "DONE":
2679 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2680 else:
2681 next_refresh += self.refresh_config.error
2682
2683 next_check_at = min(next_check_at, next_refresh)
2684 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2685 ro_task["vim_info"]["refresh_at"] = next_refresh
2686
2687 try:
2688 """
2689 # Log RO tasks only when loglevel is DEBUG
2690 if self.logger.getEffectiveLevel() == logging.DEBUG:
2691 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2692 """
2693 # Check if vim status refresh is enabled again
2694 self.update_vm_refresh(ro_task)
2695 # 0: get task_status_create
2696 lock_object = None
2697 task_status_create = None
2698 task_create = next(
2699 (
2700 t
2701 for t in ro_task["tasks"]
2702 if t
2703 and t["action"] == "CREATE"
2704 and t["status"] in ("BUILD", "DONE")
2705 ),
2706 None,
2707 )
2708
2709 if task_create:
2710 task_status_create = task_create["status"]
2711
2712 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2713 for task_action in ("DELETE", "CREATE", "EXEC"):
2714 db_vim_update = None
2715 new_status = None
2716
2717 for task_index, task in enumerate(ro_task["tasks"]):
2718 if not task:
2719 continue # task deleted
2720
2721 task_depends = {}
2722 target_update = None
2723
2724 if (
2725 (
2726 task_action in ("DELETE", "EXEC")
2727 and task["status"] not in ("SCHEDULED", "BUILD")
2728 )
2729 or task["action"] != task_action
2730 or (
2731 task_action == "CREATE"
2732 and task["status"] in ("FINISHED", "SUPERSEDED")
2733 )
2734 ):
2735 continue
2736
2737 task_path = "tasks.{}.status".format(task_index)
2738 try:
2739 db_vim_info_update = None
2740 dependency_ro_task = {}
2741
2742 if task["status"] == "SCHEDULED":
2743 # check if tasks that this depends on have been completed
2744 dependency_not_completed = False
2745
2746 for dependency_task_id in task.get("depends_on") or ():
2747 (
2748 dependency_ro_task,
2749 dependency_task_index,
2750 ) = self._get_dependency(
2751 dependency_task_id, target_id=ro_task["target_id"]
2752 )
2753 dependency_task = dependency_ro_task["tasks"][
2754 dependency_task_index
2755 ]
2756 self.logger.debug(
2757 "dependency_ro_task={} dependency_task_index={}".format(
2758 dependency_ro_task, dependency_task_index
2759 )
2760 )
2761
2762 if dependency_task["status"] == "SCHEDULED":
2763 dependency_not_completed = True
2764 next_check_at = min(
2765 next_check_at, dependency_ro_task["to_check_at"]
2766 )
2767 # must allow dependent task to be processed first
2768 # to do this set time after last_task_processed
2769 next_check_at = max(
2770 self.time_last_task_processed, next_check_at
2771 )
2772 break
2773 elif dependency_task["status"] == "FAILED":
2774 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2775 task["action"],
2776 task["item"],
2777 dependency_task["action"],
2778 dependency_task["item"],
2779 dependency_task_id,
2780 dependency_ro_task["vim_info"].get(
2781 "vim_message"
2782 ),
2783 )
2784 self.logger.error(
2785 "task={} {}".format(task["task_id"], error_text)
2786 )
2787 raise NsWorkerException(error_text)
2788
2789 task_depends[dependency_task_id] = dependency_ro_task[
2790 "vim_info"
2791 ]["vim_id"]
2792 task_depends["TASK-{}".format(dependency_task_id)] = (
2793 dependency_ro_task["vim_info"]["vim_id"]
2794 )
2795
2796 if dependency_not_completed:
2797 self.logger.warning(
2798 "DEPENDENCY NOT COMPLETED {}".format(
2799 dependency_ro_task["vim_info"]["vim_id"]
2800 )
2801 )
2802 # TODO set at vim_info.vim_details that it is waiting
2803 continue
2804
2805 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2806 # the task of renew this locking. It will update database locket_at periodically
2807 if not lock_object:
2808 lock_object = LockRenew.add_lock_object(
2809 "ro_tasks", ro_task, self
2810 )
2811 if task["action"] == "DELETE":
2812 (
2813 new_status,
2814 db_vim_info_update,
2815 ) = self._delete_task(
2816 ro_task, task_index, task_depends, db_ro_task_update
2817 )
2818 new_status = (
2819 "FINISHED" if new_status == "DONE" else new_status
2820 )
2821 # ^with FINISHED instead of DONE it will not be refreshing
2822
2823 if new_status in ("FINISHED", "SUPERSEDED"):
2824 target_update = "DELETE"
2825 elif task["action"] == "EXEC":
2826 (
2827 new_status,
2828 db_vim_info_update,
2829 db_task_update,
2830 ) = self.item2class[task["item"]].exec(
2831 ro_task, task_index, task_depends
2832 )
2833 new_status = (
2834 "FINISHED" if new_status == "DONE" else new_status
2835 )
2836 # ^with FINISHED instead of DONE it will not be refreshing
2837
2838 if db_task_update:
2839 # load into database the modified db_task_update "retries" and "next_retry"
2840 if db_task_update.get("retries"):
2841 db_ro_task_update[
2842 "tasks.{}.retries".format(task_index)
2843 ] = db_task_update["retries"]
2844
2845 next_check_at = time.time() + db_task_update.get(
2846 "next_retry", 60
2847 )
2848 target_update = None
2849 elif task["action"] == "CREATE":
2850 if task["status"] == "SCHEDULED":
2851 if task_status_create:
2852 new_status = task_status_create
2853 target_update = "COPY_VIM_INFO"
2854 else:
2855 new_status, db_vim_info_update = self.item2class[
2856 task["item"]
2857 ].new(ro_task, task_index, task_depends)
2858 _update_refresh(new_status)
2859 else:
2860 refresh_at = ro_task["vim_info"]["refresh_at"]
2861 if refresh_at and refresh_at != -1 and now > refresh_at:
2862 (
2863 new_status,
2864 db_vim_info_update,
2865 ) = self.item2class[
2866 task["item"]
2867 ].refresh(ro_task)
2868 _update_refresh(new_status)
2869 else:
2870 # The refresh is updated to avoid set the value of "refresh_at" to
2871 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2872 # because it can happen that in this case the task is never processed
2873 _update_refresh(task["status"])
2874
2875 except Exception as e:
2876 new_status = "FAILED"
2877 db_vim_info_update = {
2878 "vim_status": "VIM_ERROR",
2879 "vim_message": str(e),
2880 }
2881
2882 if not isinstance(
2883 e, (NsWorkerException, vimconn.VimConnException)
2884 ):
2885 self.logger.error(
2886 "Unexpected exception at _delete_task task={}: {}".format(
2887 task["task_id"], e
2888 ),
2889 exc_info=True,
2890 )
2891
2892 try:
2893 if db_vim_info_update:
2894 db_vim_update = db_vim_info_update.copy()
2895 db_ro_task_update.update(
2896 {
2897 "vim_info." + k: v
2898 for k, v in db_vim_info_update.items()
2899 }
2900 )
2901 ro_task["vim_info"].update(db_vim_info_update)
2902
2903 if new_status:
2904 if task_action == "CREATE":
2905 task_status_create = new_status
2906 db_ro_task_update[task_path] = new_status
2907
2908 if target_update or db_vim_update:
2909 if target_update == "DELETE":
2910 self._update_target(task, None)
2911 elif target_update == "COPY_VIM_INFO":
2912 self._update_target(task, ro_task["vim_info"])
2913 else:
2914 self._update_target(task, db_vim_update)
2915
2916 except Exception as e:
2917 if (
2918 isinstance(e, DbException)
2919 and e.http_code == HTTPStatus.NOT_FOUND
2920 ):
2921 # if the vnfrs or nsrs has been removed from database, this task must be removed
2922 self.logger.debug(
2923 "marking to delete task={}".format(task["task_id"])
2924 )
2925 self.tasks_to_delete.append(task)
2926 else:
2927 self.logger.error(
2928 "Unexpected exception at _update_target task={}: {}".format(
2929 task["task_id"], e
2930 ),
2931 exc_info=True,
2932 )
2933
2934 locked_at = ro_task["locked_at"]
2935
2936 if lock_object:
2937 locked_at = [
2938 lock_object["locked_at"],
2939 lock_object["locked_at"] + self.task_locked_time,
2940 ]
2941 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2942 # contain exactly locked_at + self.task_locked_time
2943 LockRenew.remove_lock_object(lock_object)
2944
2945 q_filter = {
2946 "_id": ro_task["_id"],
2947 "to_check_at": ro_task["to_check_at"],
2948 "locked_at": locked_at,
2949 }
2950 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2951 # outside this task (by ro_nbi) do not update it
2952 db_ro_task_update["locked_by"] = None
2953 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2954 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2955 db_ro_task_update["modified_at"] = now
2956 db_ro_task_update["to_check_at"] = next_check_at
2957
2958 """
2959 # Log RO tasks only when loglevel is DEBUG
2960 if self.logger.getEffectiveLevel() == logging.DEBUG:
2961 db_ro_task_update_log = db_ro_task_update.copy()
2962 db_ro_task_update_log["_id"] = q_filter["_id"]
2963 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2964 """
2965
2966 if not self.db.set_one(
2967 "ro_tasks",
2968 update_dict=db_ro_task_update,
2969 q_filter=q_filter,
2970 fail_on_empty=False,
2971 ):
2972 del db_ro_task_update["to_check_at"]
2973 del q_filter["to_check_at"]
2974 """
2975 # Log RO tasks only when loglevel is DEBUG
2976 if self.logger.getEffectiveLevel() == logging.DEBUG:
2977 self._log_ro_task(
2978 None,
2979 db_ro_task_update_log,
2980 None,
2981 "TASK_WF",
2982 "SET_TASK " + str(q_filter),
2983 )
2984 """
2985 self.db.set_one(
2986 "ro_tasks",
2987 q_filter=q_filter,
2988 update_dict=db_ro_task_update,
2989 fail_on_empty=True,
2990 )
2991 except DbException as e:
2992 self.logger.error(
2993 "ro_task={} Error updating database {}".format(ro_task_id, e)
2994 )
2995 except Exception as e:
2996 self.logger.error(
2997 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2998 )
2999
3000 def _update_target(self, task, ro_vim_item_update):
3001 table, _, temp = task["target_record"].partition(":")
3002 _id, _, path_vim_status = temp.partition(":")
3003 path_item = path_vim_status[: path_vim_status.rfind(".")]
3004 path_item = path_item[: path_item.rfind(".")]
3005 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
3006 # path_item: dot separated list targeting record information, e.g. "vdur.10"
3007
3008 if ro_vim_item_update:
3009 update_dict = {
3010 path_vim_status + "." + k: v
3011 for k, v in ro_vim_item_update.items()
3012 if k
3013 in (
3014 "vim_id",
3015 "vim_details",
3016 "vim_message",
3017 "vim_name",
3018 "vim_status",
3019 "interfaces",
3020 "interfaces_backup",
3021 )
3022 }
3023
3024 if path_vim_status.startswith("vdur."):
3025 # for backward compatibility, add vdur.name apart from vdur.vim_name
3026 if ro_vim_item_update.get("vim_name"):
3027 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
3028
3029 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
3030 if ro_vim_item_update.get("vim_id"):
3031 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
3032
3033 # update general status
3034 if ro_vim_item_update.get("vim_status"):
3035 update_dict[path_item + ".status"] = ro_vim_item_update[
3036 "vim_status"
3037 ]
3038
3039 if ro_vim_item_update.get("interfaces"):
3040 path_interfaces = path_item + ".interfaces"
3041
3042 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
3043 if iface:
3044 update_dict.update(
3045 {
3046 path_interfaces + ".{}.".format(i) + k: v
3047 for k, v in iface.items()
3048 if k in ("vlan", "compute_node", "pci")
3049 }
3050 )
3051
3052 # put ip_address and mac_address with ip-address and mac-address
3053 if iface.get("ip_address"):
3054 update_dict[
3055 path_interfaces + ".{}.".format(i) + "ip-address"
3056 ] = iface["ip_address"]
3057
3058 if iface.get("mac_address"):
3059 update_dict[
3060 path_interfaces + ".{}.".format(i) + "mac-address"
3061 ] = iface["mac_address"]
3062
3063 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
3064 update_dict["ip-address"] = iface.get("ip_address").split(
3065 ";"
3066 )[0]
3067
3068 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
3069 update_dict[path_item + ".ip-address"] = iface.get(
3070 "ip_address"
3071 ).split(";")[0]
3072
3073 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
3074
3075 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
3076 if ro_vim_item_update.get("interfaces"):
3077 search_key = path_vim_status + ".interfaces"
3078 if update_dict.get(search_key):
3079 interfaces_backup_update = {
3080 path_vim_status + ".interfaces_backup": update_dict[search_key]
3081 }
3082
3083 self.db.set_one(
3084 table,
3085 q_filter={"_id": _id},
3086 update_dict=interfaces_backup_update,
3087 )
3088
3089 else:
3090 update_dict = {path_item + ".status": "DELETED"}
3091 self.db.set_one(
3092 table,
3093 q_filter={"_id": _id},
3094 update_dict=update_dict,
3095 unset={path_vim_status: None},
3096 )
3097
3098 def _process_delete_db_tasks(self):
3099 """
3100 Delete task from database because vnfrs or nsrs or both have been deleted
3101 :return: None. Uses and modify self.tasks_to_delete
3102 """
3103 while self.tasks_to_delete:
3104 task = self.tasks_to_delete[0]
3105 vnfrs_deleted = None
3106 nsr_id = task["nsr_id"]
3107
3108 if task["target_record"].startswith("vnfrs:"):
3109 # check if nsrs is present
3110 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
3111 vnfrs_deleted = task["target_record"].split(":")[1]
3112
3113 try:
3114 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
3115 except Exception as e:
3116 self.logger.error(
3117 "Error deleting task={}: {}".format(task["task_id"], e)
3118 )
3119 self.tasks_to_delete.pop(0)
3120
3121 @staticmethod
3122 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
3123 """
3124 Static method because it is called from osm_ng_ro.ns
3125 :param db: instance of database to use
3126 :param nsr_id: affected nsrs id
3127 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
3128 :return: None, exception is fails
3129 """
3130 retries = 5
3131 for retry in range(retries):
3132 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
3133 now = time.time()
3134 conflict = False
3135
3136 for ro_task in ro_tasks:
3137 db_update = {}
3138 to_delete_ro_task = True
3139
3140 for index, task in enumerate(ro_task["tasks"]):
3141 if not task:
3142 pass
3143 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
3144 vnfrs_deleted
3145 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
3146 ):
3147 db_update["tasks.{}".format(index)] = None
3148 else:
3149 # used by other nsr, ro_task cannot be deleted
3150 to_delete_ro_task = False
3151
3152 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
3153 if to_delete_ro_task:
3154 if not db.del_one(
3155 "ro_tasks",
3156 q_filter={
3157 "_id": ro_task["_id"],
3158 "modified_at": ro_task["modified_at"],
3159 },
3160 fail_on_empty=False,
3161 ):
3162 conflict = True
3163 elif db_update:
3164 db_update["modified_at"] = now
3165 if not db.set_one(
3166 "ro_tasks",
3167 q_filter={
3168 "_id": ro_task["_id"],
3169 "modified_at": ro_task["modified_at"],
3170 },
3171 update_dict=db_update,
3172 fail_on_empty=False,
3173 ):
3174 conflict = True
3175 if not conflict:
3176 return
3177 else:
3178 raise NsWorkerException("Exceeded {} retries".format(retries))
3179
3180 def run(self):
3181 # load database
3182 self.logger.info("Starting")
3183 while True:
3184 # step 1: get commands from queue
3185 try:
3186 if self.vim_targets:
3187 task = self.task_queue.get(block=False)
3188 else:
3189 if not self.idle:
3190 self.logger.debug("enters in idle state")
3191 self.idle = True
3192 task = self.task_queue.get(block=True)
3193 self.idle = False
3194
3195 if task[0] == "terminate":
3196 break
3197 elif task[0] == "load_vim":
3198 self.logger.info("order to load vim {}".format(task[1]))
3199 self._load_vim(task[1])
3200 elif task[0] == "unload_vim":
3201 self.logger.info("order to unload vim {}".format(task[1]))
3202 self._unload_vim(task[1])
3203 elif task[0] == "reload_vim":
3204 self._reload_vim(task[1])
3205 elif task[0] == "check_vim":
3206 self.logger.info("order to check vim {}".format(task[1]))
3207 self._check_vim(task[1])
3208 continue
3209 except Exception as e:
3210 if isinstance(e, queue.Empty):
3211 pass
3212 else:
3213 self.logger.critical(
3214 "Error processing task: {}".format(e), exc_info=True
3215 )
3216
3217 # step 2: process pending_tasks, delete not needed tasks
3218 try:
3219 if self.tasks_to_delete:
3220 self._process_delete_db_tasks()
3221 busy = False
3222 """
3223 # Log RO tasks only when loglevel is DEBUG
3224 if self.logger.getEffectiveLevel() == logging.DEBUG:
3225 _ = self._get_db_all_tasks()
3226 """
3227 ro_task = self._get_db_task()
3228 if ro_task:
3229 self.logger.debug("Task to process: {}".format(ro_task))
3230 time.sleep(1)
3231 self._process_pending_tasks(ro_task)
3232 busy = True
3233 if not busy:
3234 time.sleep(5)
3235 except Exception as e:
3236 self.logger.critical(
3237 "Unexpected exception at run: " + str(e), exc_info=True
3238 )
3239
3240 self.logger.info("Finishing")