Bug 2316: Fix for Unable to do vertical scaling when VM is in Shutdown state
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import makedirs
31 from os import path
32 import queue
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn
43 from osm_ro_plugin import vimconn
44 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45 from osm_ro_plugin.vim_dummy import VimDummyConnector
46 import yaml
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
223
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
234
235 return "FAILED", ro_vim_item_update
236
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
243
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
247
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
263
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
267
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
270
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
280
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 (
289 ro_vim_item_update.get("vim_message")
290 if ro_vim_item_update.get("vim_status") != "ACTIVE"
291 else ""
292 ),
293 )
294 )
295
296 return task_status, ro_vim_item_update
297
298 def delete(self, ro_task, task_index):
299 task = ro_task["tasks"][task_index]
300 task_id = task["task_id"]
301 net_vim_id = ro_task["vim_info"]["vim_id"]
302 ro_vim_item_update_ok = {
303 "vim_status": "DELETED",
304 "created": False,
305 "vim_message": "DELETED",
306 "vim_id": None,
307 }
308
309 try:
310 if net_vim_id or ro_task["vim_info"]["created_items"]:
311 target_vim = self.my_vims[ro_task["target_id"]]
312 target_vim.delete_network(
313 net_vim_id, ro_task["vim_info"]["created_items"]
314 )
315 except vimconn.VimConnNotFoundException:
316 ro_vim_item_update_ok["vim_message"] = "already deleted"
317 except vimconn.VimConnException as e:
318 self.logger.error(
319 "ro_task={} vim={} del-net={}: {}".format(
320 ro_task["_id"], ro_task["target_id"], net_vim_id, e
321 )
322 )
323 ro_vim_item_update = {
324 "vim_status": "VIM_ERROR",
325 "vim_message": "Error while deleting: {}".format(e),
326 }
327
328 return "FAILED", ro_vim_item_update
329
330 self.logger.debug(
331 "task={} {} del-net={} {}".format(
332 task_id,
333 ro_task["target_id"],
334 net_vim_id,
335 ro_vim_item_update_ok.get("vim_message", ""),
336 )
337 )
338
339 return "DONE", ro_vim_item_update_ok
340
341
342 class VimInteractionClassification(VimInteractionBase):
343 def new(self, ro_task, task_index, task_depends):
344 task = ro_task["tasks"][task_index]
345 task_id = task["task_id"]
346 created = False
347 target_vim = self.my_vims[ro_task["target_id"]]
348
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353
354 name = params_copy.pop("name")
355 logical_source_port_index = int(
356 params_copy.pop("logical_source_port_index")
357 )
358 logical_source_port = params_copy["logical_source_port"]
359
360 if logical_source_port.startswith("TASK-"):
361 vm_id = task_depends[logical_source_port]
362 params_copy["logical_source_port"] = target_vim.refresh_vms_status(
363 [vm_id]
364 )[vm_id]["interfaces"][logical_source_port_index]["vim_interface_id"]
365
366 vim_classification_id = target_vim.new_classification(
367 name, "legacy_flow_classifier", params_copy
368 )
369
370 ro_vim_item_update = {
371 "vim_id": vim_classification_id,
372 "vim_status": "DONE",
373 "created": created,
374 "vim_details": None,
375 "vim_message": None,
376 }
377 self.logger.debug(
378 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
379 )
380
381 return "DONE", ro_vim_item_update
382 except (vimconn.VimConnException, NsWorkerException) as e:
383 self.logger.debug(traceback.format_exc())
384 self.logger.error(
385 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
386 )
387 ro_vim_item_update = {
388 "vim_status": "VIM_ERROR",
389 "created": created,
390 "vim_message": str(e),
391 }
392
393 return "FAILED", ro_vim_item_update
394
395 def delete(self, ro_task, task_index):
396 task = ro_task["tasks"][task_index]
397 task_id = task["task_id"]
398 classification_vim_id = ro_task["vim_info"]["vim_id"]
399 ro_vim_item_update_ok = {
400 "vim_status": "DELETED",
401 "created": False,
402 "vim_message": "DELETED",
403 "vim_id": None,
404 }
405
406 try:
407 if classification_vim_id:
408 target_vim = self.my_vims[ro_task["target_id"]]
409 target_vim.delete_classification(classification_vim_id)
410 except vimconn.VimConnNotFoundException:
411 ro_vim_item_update_ok["vim_message"] = "already deleted"
412 except vimconn.VimConnException as e:
413 self.logger.error(
414 "ro_task={} vim={} del-classification={}: {}".format(
415 ro_task["_id"], ro_task["target_id"], classification_vim_id, e
416 )
417 )
418 ro_vim_item_update = {
419 "vim_status": "VIM_ERROR",
420 "vim_message": "Error while deleting: {}".format(e),
421 }
422
423 return "FAILED", ro_vim_item_update
424
425 self.logger.debug(
426 "task={} {} del-classification={} {}".format(
427 task_id,
428 ro_task["target_id"],
429 classification_vim_id,
430 ro_vim_item_update_ok.get("vim_message", ""),
431 )
432 )
433
434 return "DONE", ro_vim_item_update_ok
435
436
437 class VimInteractionSfi(VimInteractionBase):
438 def new(self, ro_task, task_index, task_depends):
439 task = ro_task["tasks"][task_index]
440 task_id = task["task_id"]
441 created = False
442 target_vim = self.my_vims[ro_task["target_id"]]
443
444 try:
445 created = True
446 params = task["params"]
447 params_copy = deepcopy(params)
448 name = params_copy["name"]
449 ingress_port = params_copy["ingress_port"]
450 egress_port = params_copy["egress_port"]
451 ingress_port_index = params_copy["ingress_port_index"]
452 egress_port_index = params_copy["egress_port_index"]
453
454 ingress_port_id = ingress_port
455 egress_port_id = egress_port
456
457 vm_id = task_depends[ingress_port]
458
459 if ingress_port.startswith("TASK-"):
460 ingress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][
461 "interfaces"
462 ][ingress_port_index]["vim_interface_id"]
463
464 if ingress_port == egress_port:
465 egress_port_id = ingress_port_id
466 else:
467 if egress_port.startswith("TASK-"):
468 egress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][
469 "interfaces"
470 ][egress_port_index]["vim_interface_id"]
471
472 ingress_port_id_list = [ingress_port_id]
473 egress_port_id_list = [egress_port_id]
474
475 vim_sfi_id = target_vim.new_sfi(
476 name, ingress_port_id_list, egress_port_id_list, sfc_encap=False
477 )
478
479 ro_vim_item_update = {
480 "vim_id": vim_sfi_id,
481 "vim_status": "DONE",
482 "created": created,
483 "vim_details": None,
484 "vim_message": None,
485 }
486 self.logger.debug(
487 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
488 )
489
490 return "DONE", ro_vim_item_update
491 except (vimconn.VimConnException, NsWorkerException) as e:
492 self.logger.debug(traceback.format_exc())
493 self.logger.error(
494 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
495 )
496 ro_vim_item_update = {
497 "vim_status": "VIM_ERROR",
498 "created": created,
499 "vim_message": str(e),
500 }
501
502 return "FAILED", ro_vim_item_update
503
504 def delete(self, ro_task, task_index):
505 task = ro_task["tasks"][task_index]
506 task_id = task["task_id"]
507 sfi_vim_id = ro_task["vim_info"]["vim_id"]
508 ro_vim_item_update_ok = {
509 "vim_status": "DELETED",
510 "created": False,
511 "vim_message": "DELETED",
512 "vim_id": None,
513 }
514
515 try:
516 if sfi_vim_id:
517 target_vim = self.my_vims[ro_task["target_id"]]
518 target_vim.delete_sfi(sfi_vim_id)
519 except vimconn.VimConnNotFoundException:
520 ro_vim_item_update_ok["vim_message"] = "already deleted"
521 except vimconn.VimConnException as e:
522 self.logger.error(
523 "ro_task={} vim={} del-sfi={}: {}".format(
524 ro_task["_id"], ro_task["target_id"], sfi_vim_id, e
525 )
526 )
527 ro_vim_item_update = {
528 "vim_status": "VIM_ERROR",
529 "vim_message": "Error while deleting: {}".format(e),
530 }
531
532 return "FAILED", ro_vim_item_update
533
534 self.logger.debug(
535 "task={} {} del-sfi={} {}".format(
536 task_id,
537 ro_task["target_id"],
538 sfi_vim_id,
539 ro_vim_item_update_ok.get("vim_message", ""),
540 )
541 )
542
543 return "DONE", ro_vim_item_update_ok
544
545
546 class VimInteractionSf(VimInteractionBase):
547 def new(self, ro_task, task_index, task_depends):
548 task = ro_task["tasks"][task_index]
549 task_id = task["task_id"]
550 created = False
551 target_vim = self.my_vims[ro_task["target_id"]]
552
553 try:
554 created = True
555 params = task["params"]
556 params_copy = deepcopy(params)
557 name = params_copy["name"]
558 sfi_list = params_copy["sfis"]
559 sfi_id_list = []
560
561 for sfi in sfi_list:
562 sfi_id = task_depends[sfi] if sfi.startswith("TASK-") else sfi
563 sfi_id_list.append(sfi_id)
564
565 vim_sf_id = target_vim.new_sf(name, sfi_id_list, sfc_encap=False)
566
567 ro_vim_item_update = {
568 "vim_id": vim_sf_id,
569 "vim_status": "DONE",
570 "created": created,
571 "vim_details": None,
572 "vim_message": None,
573 }
574 self.logger.debug(
575 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
576 )
577
578 return "DONE", ro_vim_item_update
579 except (vimconn.VimConnException, NsWorkerException) as e:
580 self.logger.debug(traceback.format_exc())
581 self.logger.error(
582 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
583 )
584 ro_vim_item_update = {
585 "vim_status": "VIM_ERROR",
586 "created": created,
587 "vim_message": str(e),
588 }
589
590 return "FAILED", ro_vim_item_update
591
592 def delete(self, ro_task, task_index):
593 task = ro_task["tasks"][task_index]
594 task_id = task["task_id"]
595 sf_vim_id = ro_task["vim_info"]["vim_id"]
596 ro_vim_item_update_ok = {
597 "vim_status": "DELETED",
598 "created": False,
599 "vim_message": "DELETED",
600 "vim_id": None,
601 }
602
603 try:
604 if sf_vim_id:
605 target_vim = self.my_vims[ro_task["target_id"]]
606 target_vim.delete_sf(sf_vim_id)
607 except vimconn.VimConnNotFoundException:
608 ro_vim_item_update_ok["vim_message"] = "already deleted"
609 except vimconn.VimConnException as e:
610 self.logger.error(
611 "ro_task={} vim={} del-sf={}: {}".format(
612 ro_task["_id"], ro_task["target_id"], sf_vim_id, e
613 )
614 )
615 ro_vim_item_update = {
616 "vim_status": "VIM_ERROR",
617 "vim_message": "Error while deleting: {}".format(e),
618 }
619
620 return "FAILED", ro_vim_item_update
621
622 self.logger.debug(
623 "task={} {} del-sf={} {}".format(
624 task_id,
625 ro_task["target_id"],
626 sf_vim_id,
627 ro_vim_item_update_ok.get("vim_message", ""),
628 )
629 )
630
631 return "DONE", ro_vim_item_update_ok
632
633
634 class VimInteractionSfp(VimInteractionBase):
635 def new(self, ro_task, task_index, task_depends):
636 task = ro_task["tasks"][task_index]
637 task_id = task["task_id"]
638 created = False
639 target_vim = self.my_vims[ro_task["target_id"]]
640
641 try:
642 created = True
643 params = task["params"]
644 params_copy = deepcopy(params)
645 name = params_copy["name"]
646 sf_list = params_copy["sfs"]
647 classification_list = params_copy["classifications"]
648
649 classification_id_list = []
650 sf_id_list = []
651
652 for classification in classification_list:
653 classi_id = (
654 task_depends[classification]
655 if classification.startswith("TASK-")
656 else classification
657 )
658 classification_id_list.append(classi_id)
659
660 for sf in sf_list:
661 sf_id = task_depends[sf] if sf.startswith("TASK-") else sf
662 sf_id_list.append(sf_id)
663
664 vim_sfp_id = target_vim.new_sfp(
665 name, classification_id_list, sf_id_list, sfc_encap=False
666 )
667
668 ro_vim_item_update = {
669 "vim_id": vim_sfp_id,
670 "vim_status": "DONE",
671 "created": created,
672 "vim_details": None,
673 "vim_message": None,
674 }
675 self.logger.debug(
676 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
677 )
678
679 return "DONE", ro_vim_item_update
680 except (vimconn.VimConnException, NsWorkerException) as e:
681 self.logger.debug(traceback.format_exc())
682 self.logger.error(
683 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
684 )
685 ro_vim_item_update = {
686 "vim_status": "VIM_ERROR",
687 "created": created,
688 "vim_message": str(e),
689 }
690
691 return "FAILED", ro_vim_item_update
692
693 def delete(self, ro_task, task_index):
694 task = ro_task["tasks"][task_index]
695 task_id = task["task_id"]
696 sfp_vim_id = ro_task["vim_info"]["vim_id"]
697 ro_vim_item_update_ok = {
698 "vim_status": "DELETED",
699 "created": False,
700 "vim_message": "DELETED",
701 "vim_id": None,
702 }
703
704 try:
705 if sfp_vim_id:
706 target_vim = self.my_vims[ro_task["target_id"]]
707 target_vim.delete_sfp(sfp_vim_id)
708 except vimconn.VimConnNotFoundException:
709 ro_vim_item_update_ok["vim_message"] = "already deleted"
710 except vimconn.VimConnException as e:
711 self.logger.error(
712 "ro_task={} vim={} del-sfp={}: {}".format(
713 ro_task["_id"], ro_task["target_id"], sfp_vim_id, e
714 )
715 )
716 ro_vim_item_update = {
717 "vim_status": "VIM_ERROR",
718 "vim_message": "Error while deleting: {}".format(e),
719 }
720
721 return "FAILED", ro_vim_item_update
722
723 self.logger.debug(
724 "task={} {} del-sfp={} {}".format(
725 task_id,
726 ro_task["target_id"],
727 sfp_vim_id,
728 ro_vim_item_update_ok.get("vim_message", ""),
729 )
730 )
731
732 return "DONE", ro_vim_item_update_ok
733
734
735 class VimInteractionVdu(VimInteractionBase):
736 max_retries_inject_ssh_key = 20 # 20 times
737 time_retries_inject_ssh_key = 30 # wevery 30 seconds
738
739 def new(self, ro_task, task_index, task_depends):
740 task = ro_task["tasks"][task_index]
741 task_id = task["task_id"]
742 created = False
743 target_vim = self.my_vims[ro_task["target_id"]]
744 try:
745 created = True
746 params = task["params"]
747 params_copy = deepcopy(params)
748 net_list = params_copy["net_list"]
749
750 for net in net_list:
751 # change task_id into network_id
752 if "net_id" in net and net["net_id"].startswith("TASK-"):
753 network_id = task_depends[net["net_id"]]
754
755 if not network_id:
756 raise NsWorkerException(
757 "Cannot create VM because depends on a network not created or found "
758 "for {}".format(net["net_id"])
759 )
760
761 net["net_id"] = network_id
762
763 if params_copy["image_id"].startswith("TASK-"):
764 params_copy["image_id"] = task_depends[params_copy["image_id"]]
765
766 if params_copy["flavor_id"].startswith("TASK-"):
767 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
768
769 affinity_group_list = params_copy["affinity_group_list"]
770 for affinity_group in affinity_group_list:
771 # change task_id into affinity_group_id
772 if "affinity_group_id" in affinity_group and affinity_group[
773 "affinity_group_id"
774 ].startswith("TASK-"):
775 affinity_group_id = task_depends[
776 affinity_group["affinity_group_id"]
777 ]
778
779 if not affinity_group_id:
780 raise NsWorkerException(
781 "found for {}".format(affinity_group["affinity_group_id"])
782 )
783
784 affinity_group["affinity_group_id"] = affinity_group_id
785 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
786 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
787
788 # add to created items previous_created_volumes (healing)
789 if task.get("previous_created_volumes"):
790 for k, v in task["previous_created_volumes"].items():
791 created_items[k] = v
792
793 ro_vim_item_update = {
794 "vim_id": vim_vm_id,
795 "vim_status": "BUILD",
796 "created": created,
797 "created_items": created_items,
798 "vim_details": None,
799 "vim_message": None,
800 "interfaces_vim_ids": interfaces,
801 "interfaces": [],
802 "interfaces_backup": [],
803 }
804 self.logger.debug(
805 "task={} {} new-vm={} created={}".format(
806 task_id, ro_task["target_id"], vim_vm_id, created
807 )
808 )
809
810 return "BUILD", ro_vim_item_update
811 except (vimconn.VimConnException, NsWorkerException) as e:
812 self.logger.debug(traceback.format_exc())
813 self.logger.error(
814 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
815 )
816 ro_vim_item_update = {
817 "vim_status": "VIM_ERROR",
818 "created": created,
819 "vim_message": str(e),
820 }
821
822 return "FAILED", ro_vim_item_update
823
824 def delete(self, ro_task, task_index):
825 task = ro_task["tasks"][task_index]
826 task_id = task["task_id"]
827 vm_vim_id = ro_task["vim_info"]["vim_id"]
828 ro_vim_item_update_ok = {
829 "vim_status": "DELETED",
830 "created": False,
831 "vim_message": "DELETED",
832 "vim_id": None,
833 }
834
835 try:
836 self.logger.debug(
837 "delete_vminstance: vm_vim_id={} created_items={}".format(
838 vm_vim_id, ro_task["vim_info"]["created_items"]
839 )
840 )
841 if vm_vim_id or ro_task["vim_info"]["created_items"]:
842 target_vim = self.my_vims[ro_task["target_id"]]
843 target_vim.delete_vminstance(
844 vm_vim_id,
845 ro_task["vim_info"]["created_items"],
846 ro_task["vim_info"].get("volumes_to_hold", []),
847 )
848 except vimconn.VimConnNotFoundException:
849 ro_vim_item_update_ok["vim_message"] = "already deleted"
850 except vimconn.VimConnException as e:
851 self.logger.error(
852 "ro_task={} vim={} del-vm={}: {}".format(
853 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
854 )
855 )
856 ro_vim_item_update = {
857 "vim_status": "VIM_ERROR",
858 "vim_message": "Error while deleting: {}".format(e),
859 }
860
861 return "FAILED", ro_vim_item_update
862
863 self.logger.debug(
864 "task={} {} del-vm={} {}".format(
865 task_id,
866 ro_task["target_id"],
867 vm_vim_id,
868 ro_vim_item_update_ok.get("vim_message", ""),
869 )
870 )
871
872 return "DONE", ro_vim_item_update_ok
873
874 def refresh(self, ro_task):
875 """Call VIM to get vm status"""
876 ro_task_id = ro_task["_id"]
877 target_vim = self.my_vims[ro_task["target_id"]]
878 vim_id = ro_task["vim_info"]["vim_id"]
879
880 if not vim_id:
881 return None, None
882
883 vm_to_refresh_list = [vim_id]
884 try:
885 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
886 vim_info = vim_dict[vim_id]
887
888 if vim_info["status"] == "ACTIVE":
889 task_status = "DONE"
890 elif vim_info["status"] == "BUILD":
891 task_status = "BUILD"
892 else:
893 task_status = "FAILED"
894
895 # try to load and parse vim_information
896 try:
897 vim_info_info = yaml.safe_load(vim_info["vim_info"])
898 if vim_info_info.get("name"):
899 vim_info["name"] = vim_info_info["name"]
900 except Exception as vim_info_error:
901 self.logger.exception(
902 f"{vim_info_error} occured while getting the vim_info from yaml"
903 )
904 except vimconn.VimConnException as e:
905 # Mark all tasks at VIM_ERROR status
906 self.logger.error(
907 "ro_task={} vim={} get-vm={}: {}".format(
908 ro_task_id, ro_task["target_id"], vim_id, e
909 )
910 )
911 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
912 task_status = "FAILED"
913
914 ro_vim_item_update = {}
915
916 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
917 vim_interfaces = []
918 if vim_info.get("interfaces"):
919 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
920 iface = next(
921 (
922 iface
923 for iface in vim_info["interfaces"]
924 if vim_iface_id == iface["vim_interface_id"]
925 ),
926 None,
927 )
928 # if iface:
929 # iface.pop("vim_info", None)
930 vim_interfaces.append(iface)
931
932 task_create = next(
933 t
934 for t in ro_task["tasks"]
935 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
936 )
937 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
938 vim_interfaces[task_create["mgmt_vnf_interface"]][
939 "mgmt_vnf_interface"
940 ] = True
941
942 mgmt_vdu_iface = task_create.get(
943 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
944 )
945 if vim_interfaces:
946 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
947
948 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
949 ro_vim_item_update["interfaces"] = vim_interfaces
950
951 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
952 ro_vim_item_update["vim_status"] = vim_info["status"]
953
954 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
955 ro_vim_item_update["vim_name"] = vim_info.get("name")
956
957 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
958 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
959 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
960 elif vim_info["status"] == "DELETED":
961 ro_vim_item_update["vim_id"] = None
962 ro_vim_item_update["vim_message"] = "Deleted externally"
963 else:
964 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
965 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
966
967 if ro_vim_item_update:
968 self.logger.debug(
969 "ro_task={} {} get-vm={}: status={} {}".format(
970 ro_task_id,
971 ro_task["target_id"],
972 vim_id,
973 ro_vim_item_update.get("vim_status"),
974 (
975 ro_vim_item_update.get("vim_message")
976 if ro_vim_item_update.get("vim_status") != "ACTIVE"
977 else ""
978 ),
979 )
980 )
981
982 return task_status, ro_vim_item_update
983
984 def exec(self, ro_task, task_index, task_depends):
985 task = ro_task["tasks"][task_index]
986 task_id = task["task_id"]
987 target_vim = self.my_vims[ro_task["target_id"]]
988 db_task_update = {"retries": 0}
989 retries = task.get("retries", 0)
990
991 try:
992 params = task["params"]
993 params_copy = deepcopy(params)
994 params_copy["ro_key"] = self.db.decrypt(
995 params_copy.pop("private_key"),
996 params_copy.pop("schema_version"),
997 params_copy.pop("salt"),
998 )
999 params_copy["ip_addr"] = params_copy.pop("ip_address")
1000 target_vim.inject_user_key(**params_copy)
1001 self.logger.debug(
1002 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
1003 )
1004
1005 return (
1006 "DONE",
1007 None,
1008 db_task_update,
1009 ) # params_copy["key"]
1010 except (vimconn.VimConnException, NsWorkerException) as e:
1011 retries += 1
1012
1013 self.logger.debug(traceback.format_exc())
1014 if retries < self.max_retries_inject_ssh_key:
1015 return (
1016 "BUILD",
1017 None,
1018 {
1019 "retries": retries,
1020 "next_retry": self.time_retries_inject_ssh_key,
1021 },
1022 )
1023
1024 self.logger.error(
1025 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
1026 )
1027 ro_vim_item_update = {"vim_message": str(e)}
1028
1029 return "FAILED", ro_vim_item_update, db_task_update
1030
1031
1032 class VimInteractionImage(VimInteractionBase):
1033 def new(self, ro_task, task_index, task_depends):
1034 task = ro_task["tasks"][task_index]
1035 task_id = task["task_id"]
1036 created = False
1037 created_items = {}
1038 target_vim = self.my_vims[ro_task["target_id"]]
1039
1040 try:
1041 # FIND
1042 vim_image_id = ""
1043 if task.get("find_params"):
1044 vim_images = target_vim.get_image_list(
1045 task["find_params"].get("filter_dict", {})
1046 )
1047
1048 if not vim_images:
1049 raise NsWorkerExceptionNotFound(
1050 "Image not found with this criteria: '{}'".format(
1051 task["find_params"]
1052 )
1053 )
1054 elif len(vim_images) > 1:
1055 raise NsWorkerException(
1056 "More than one image found with this criteria: '{}'".format(
1057 task["find_params"]
1058 )
1059 )
1060 else:
1061 vim_image_id = vim_images[0]["id"]
1062
1063 ro_vim_item_update = {
1064 "vim_id": vim_image_id,
1065 "vim_status": "ACTIVE",
1066 "created": created,
1067 "created_items": created_items,
1068 "vim_details": None,
1069 "vim_message": None,
1070 }
1071 self.logger.debug(
1072 "task={} {} new-image={} created={}".format(
1073 task_id, ro_task["target_id"], vim_image_id, created
1074 )
1075 )
1076
1077 return "DONE", ro_vim_item_update
1078 except (NsWorkerException, vimconn.VimConnException) as e:
1079 self.logger.error(
1080 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
1081 )
1082 ro_vim_item_update = {
1083 "vim_status": "VIM_ERROR",
1084 "created": created,
1085 "vim_message": str(e),
1086 }
1087
1088 return "FAILED", ro_vim_item_update
1089
1090
1091 class VimInteractionSharedVolume(VimInteractionBase):
1092 def delete(self, ro_task, task_index):
1093 task = ro_task["tasks"][task_index]
1094 task_id = task["task_id"]
1095 shared_volume_vim_id = ro_task["vim_info"]["vim_id"]
1096 created_items = ro_task["vim_info"]["created_items"]
1097 ro_vim_item_update_ok = {
1098 "vim_status": "DELETED",
1099 "created": False,
1100 "vim_message": "DELETED",
1101 "vim_id": None,
1102 }
1103 if created_items and created_items.get(shared_volume_vim_id).get("keep"):
1104 ro_vim_item_update_ok = {
1105 "vim_status": "ACTIVE",
1106 "created": False,
1107 "vim_message": None,
1108 }
1109 return "DONE", ro_vim_item_update_ok
1110 try:
1111 if shared_volume_vim_id:
1112 target_vim = self.my_vims[ro_task["target_id"]]
1113 target_vim.delete_shared_volumes(shared_volume_vim_id)
1114 except vimconn.VimConnNotFoundException:
1115 ro_vim_item_update_ok["vim_message"] = "already deleted"
1116 except vimconn.VimConnException as e:
1117 self.logger.error(
1118 "ro_task={} vim={} del-shared-volume={}: {}".format(
1119 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e
1120 )
1121 )
1122 ro_vim_item_update = {
1123 "vim_status": "VIM_ERROR",
1124 "vim_message": "Error while deleting: {}".format(e),
1125 }
1126
1127 return "FAILED", ro_vim_item_update
1128
1129 self.logger.debug(
1130 "task={} {} del-shared-volume={} {}".format(
1131 task_id,
1132 ro_task["target_id"],
1133 shared_volume_vim_id,
1134 ro_vim_item_update_ok.get("vim_message", ""),
1135 )
1136 )
1137
1138 return "DONE", ro_vim_item_update_ok
1139
1140 def new(self, ro_task, task_index, task_depends):
1141 task = ro_task["tasks"][task_index]
1142 task_id = task["task_id"]
1143 created = False
1144 created_items = {}
1145 target_vim = self.my_vims[ro_task["target_id"]]
1146
1147 try:
1148 shared_volume_vim_id = None
1149 shared_volume_data = None
1150
1151 if task.get("params"):
1152 shared_volume_data = task["params"]
1153
1154 if shared_volume_data:
1155 self.logger.info(
1156 f"Creating the new shared_volume for {shared_volume_data}\n"
1157 )
1158 (
1159 shared_volume_name,
1160 shared_volume_vim_id,
1161 ) = target_vim.new_shared_volumes(shared_volume_data)
1162 created = True
1163 created_items[shared_volume_vim_id] = {
1164 "name": shared_volume_name,
1165 "keep": shared_volume_data.get("keep"),
1166 }
1167
1168 ro_vim_item_update = {
1169 "vim_id": shared_volume_vim_id,
1170 "vim_status": "ACTIVE",
1171 "created": created,
1172 "created_items": created_items,
1173 "vim_details": None,
1174 "vim_message": None,
1175 }
1176 self.logger.debug(
1177 "task={} {} new-shared-volume={} created={}".format(
1178 task_id, ro_task["target_id"], shared_volume_vim_id, created
1179 )
1180 )
1181
1182 return "DONE", ro_vim_item_update
1183 except (vimconn.VimConnException, NsWorkerException) as e:
1184 self.logger.error(
1185 "task={} vim={} new-shared-volume:"
1186 " {}".format(task_id, ro_task["target_id"], e)
1187 )
1188 ro_vim_item_update = {
1189 "vim_status": "VIM_ERROR",
1190 "created": created,
1191 "vim_message": str(e),
1192 }
1193
1194 return "FAILED", ro_vim_item_update
1195
1196
1197 class VimInteractionFlavor(VimInteractionBase):
1198 def delete(self, ro_task, task_index):
1199 task = ro_task["tasks"][task_index]
1200 task_id = task["task_id"]
1201 flavor_vim_id = ro_task["vim_info"]["vim_id"]
1202 ro_vim_item_update_ok = {
1203 "vim_status": "DELETED",
1204 "created": False,
1205 "vim_message": "DELETED",
1206 "vim_id": None,
1207 }
1208
1209 try:
1210 if flavor_vim_id:
1211 target_vim = self.my_vims[ro_task["target_id"]]
1212 target_vim.delete_flavor(flavor_vim_id)
1213 except vimconn.VimConnNotFoundException:
1214 ro_vim_item_update_ok["vim_message"] = "already deleted"
1215 except vimconn.VimConnException as e:
1216 self.logger.error(
1217 "ro_task={} vim={} del-flavor={}: {}".format(
1218 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
1219 )
1220 )
1221 ro_vim_item_update = {
1222 "vim_status": "VIM_ERROR",
1223 "vim_message": "Error while deleting: {}".format(e),
1224 }
1225
1226 return "FAILED", ro_vim_item_update
1227
1228 self.logger.debug(
1229 "task={} {} del-flavor={} {}".format(
1230 task_id,
1231 ro_task["target_id"],
1232 flavor_vim_id,
1233 ro_vim_item_update_ok.get("vim_message", ""),
1234 )
1235 )
1236
1237 return "DONE", ro_vim_item_update_ok
1238
1239 def new(self, ro_task, task_index, task_depends):
1240 task = ro_task["tasks"][task_index]
1241 task_id = task["task_id"]
1242 created = False
1243 created_items = {}
1244 target_vim = self.my_vims[ro_task["target_id"]]
1245 try:
1246 # FIND
1247 vim_flavor_id = None
1248
1249 if task.get("find_params", {}).get("vim_flavor_id"):
1250 vim_flavor_id = task["find_params"]["vim_flavor_id"]
1251 elif task.get("find_params", {}).get("flavor_data"):
1252 try:
1253 flavor_data = task["find_params"]["flavor_data"]
1254 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
1255 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
1256 self.logger.warning(
1257 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
1258 )
1259
1260 if not vim_flavor_id and task.get("params"):
1261 # CREATE
1262 flavor_data = task["params"]["flavor_data"]
1263 vim_flavor_id = target_vim.new_flavor(flavor_data)
1264 created = True
1265
1266 ro_vim_item_update = {
1267 "vim_id": vim_flavor_id,
1268 "vim_status": "ACTIVE",
1269 "created": created,
1270 "created_items": created_items,
1271 "vim_details": None,
1272 "vim_message": None,
1273 }
1274 self.logger.debug(
1275 "task={} {} new-flavor={} created={}".format(
1276 task_id, ro_task["target_id"], vim_flavor_id, created
1277 )
1278 )
1279
1280 return "DONE", ro_vim_item_update
1281 except (vimconn.VimConnException, NsWorkerException) as e:
1282 self.logger.error(
1283 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
1284 )
1285 ro_vim_item_update = {
1286 "vim_status": "VIM_ERROR",
1287 "created": created,
1288 "vim_message": str(e),
1289 }
1290
1291 return "FAILED", ro_vim_item_update
1292
1293
1294 class VimInteractionAffinityGroup(VimInteractionBase):
1295 def delete(self, ro_task, task_index):
1296 task = ro_task["tasks"][task_index]
1297 task_id = task["task_id"]
1298 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
1299 ro_vim_item_update_ok = {
1300 "vim_status": "DELETED",
1301 "created": False,
1302 "vim_message": "DELETED",
1303 "vim_id": None,
1304 }
1305
1306 try:
1307 if affinity_group_vim_id:
1308 target_vim = self.my_vims[ro_task["target_id"]]
1309 target_vim.delete_affinity_group(affinity_group_vim_id)
1310 except vimconn.VimConnNotFoundException:
1311 ro_vim_item_update_ok["vim_message"] = "already deleted"
1312 except vimconn.VimConnException as e:
1313 self.logger.error(
1314 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
1315 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
1316 )
1317 )
1318 ro_vim_item_update = {
1319 "vim_status": "VIM_ERROR",
1320 "vim_message": "Error while deleting: {}".format(e),
1321 }
1322
1323 return "FAILED", ro_vim_item_update
1324
1325 self.logger.debug(
1326 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
1327 task_id,
1328 ro_task["target_id"],
1329 affinity_group_vim_id,
1330 ro_vim_item_update_ok.get("vim_message", ""),
1331 )
1332 )
1333
1334 return "DONE", ro_vim_item_update_ok
1335
1336 def new(self, ro_task, task_index, task_depends):
1337 task = ro_task["tasks"][task_index]
1338 task_id = task["task_id"]
1339 created = False
1340 created_items = {}
1341 target_vim = self.my_vims[ro_task["target_id"]]
1342
1343 try:
1344 affinity_group_vim_id = None
1345 affinity_group_data = None
1346 param_affinity_group_id = ""
1347
1348 if task.get("params"):
1349 affinity_group_data = task["params"].get("affinity_group_data")
1350
1351 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
1352 try:
1353 param_affinity_group_id = task["params"]["affinity_group_data"].get(
1354 "vim-affinity-group-id"
1355 )
1356 affinity_group_vim_id = target_vim.get_affinity_group(
1357 param_affinity_group_id
1358 ).get("id")
1359 except vimconn.VimConnNotFoundException:
1360 self.logger.error(
1361 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
1362 "could not be found at VIM. Creating a new one.".format(
1363 task_id, ro_task["target_id"], param_affinity_group_id
1364 )
1365 )
1366
1367 if not affinity_group_vim_id and affinity_group_data:
1368 affinity_group_vim_id = target_vim.new_affinity_group(
1369 affinity_group_data
1370 )
1371 created = True
1372
1373 ro_vim_item_update = {
1374 "vim_id": affinity_group_vim_id,
1375 "vim_status": "ACTIVE",
1376 "created": created,
1377 "created_items": created_items,
1378 "vim_details": None,
1379 "vim_message": None,
1380 }
1381 self.logger.debug(
1382 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
1383 task_id, ro_task["target_id"], affinity_group_vim_id, created
1384 )
1385 )
1386
1387 return "DONE", ro_vim_item_update
1388 except (vimconn.VimConnException, NsWorkerException) as e:
1389 self.logger.error(
1390 "task={} vim={} new-affinity-or-anti-affinity-group:"
1391 " {}".format(task_id, ro_task["target_id"], e)
1392 )
1393 ro_vim_item_update = {
1394 "vim_status": "VIM_ERROR",
1395 "created": created,
1396 "vim_message": str(e),
1397 }
1398
1399 return "FAILED", ro_vim_item_update
1400
1401
1402 class VimInteractionUpdateVdu(VimInteractionBase):
1403 def exec(self, ro_task, task_index, task_depends):
1404 task = ro_task["tasks"][task_index]
1405 task_id = task["task_id"]
1406 db_task_update = {"retries": 0}
1407 target_vim = self.my_vims[ro_task["target_id"]]
1408
1409 try:
1410 vim_vm_id = ""
1411 if task.get("params"):
1412 vim_vm_id = task["params"].get("vim_vm_id")
1413 action = task["params"].get("action")
1414 context = {action: action}
1415 target_vim.action_vminstance(vim_vm_id, context)
1416 # created = True
1417 ro_vim_item_update = {
1418 "vim_id": vim_vm_id,
1419 "vim_status": "ACTIVE",
1420 }
1421 self.logger.debug(
1422 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1423 )
1424 return "DONE", ro_vim_item_update, db_task_update
1425 except (vimconn.VimConnException, NsWorkerException) as e:
1426 self.logger.error(
1427 "task={} vim={} VM Migration:"
1428 " {}".format(task_id, ro_task["target_id"], e)
1429 )
1430 ro_vim_item_update = {
1431 "vim_status": "VIM_ERROR",
1432 "vim_message": str(e),
1433 }
1434
1435 return "FAILED", ro_vim_item_update, db_task_update
1436
1437
1438 class VimInteractionSdnNet(VimInteractionBase):
1439 @staticmethod
1440 def _match_pci(port_pci, mapping):
1441 """
1442 Check if port_pci matches with mapping.
1443 The mapping can have brackets to indicate that several chars are accepted. e.g
1444 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1445 :param port_pci: text
1446 :param mapping: text, can contain brackets to indicate several chars are available
1447 :return: True if matches, False otherwise
1448 """
1449 if not port_pci or not mapping:
1450 return False
1451 if port_pci == mapping:
1452 return True
1453
1454 mapping_index = 0
1455 pci_index = 0
1456 while True:
1457 bracket_start = mapping.find("[", mapping_index)
1458
1459 if bracket_start == -1:
1460 break
1461
1462 bracket_end = mapping.find("]", bracket_start)
1463 if bracket_end == -1:
1464 break
1465
1466 length = bracket_start - mapping_index
1467 if (
1468 length
1469 and port_pci[pci_index : pci_index + length]
1470 != mapping[mapping_index:bracket_start]
1471 ):
1472 return False
1473
1474 if (
1475 port_pci[pci_index + length]
1476 not in mapping[bracket_start + 1 : bracket_end]
1477 ):
1478 return False
1479
1480 pci_index += length + 1
1481 mapping_index = bracket_end + 1
1482
1483 if port_pci[pci_index:] != mapping[mapping_index:]:
1484 return False
1485
1486 return True
1487
1488 def _get_interfaces(self, vlds_to_connect, vim_account_id):
1489 """
1490 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1491 :param vim_account_id:
1492 :return:
1493 """
1494 interfaces = []
1495
1496 for vld in vlds_to_connect:
1497 table, _, db_id = vld.partition(":")
1498 db_id, _, vld = db_id.partition(":")
1499 _, _, vld_id = vld.partition(".")
1500
1501 if table == "vnfrs":
1502 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1503 iface_key = "vnf-vld-id"
1504 else: # table == "nsrs"
1505 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1506 iface_key = "ns-vld-id"
1507
1508 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1509
1510 for db_vnfr in db_vnfrs:
1511 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1512 for iface_index, interface in enumerate(vdur["interfaces"]):
1513 if interface.get(iface_key) == vld_id and interface.get(
1514 "type"
1515 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1516 # only SR-IOV o PT
1517 interface_ = interface.copy()
1518 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1519 db_vnfr["_id"], vdu_index, iface_index
1520 )
1521
1522 if vdur.get("status") == "ERROR":
1523 interface_["status"] = "ERROR"
1524
1525 interfaces.append(interface_)
1526
1527 return interfaces
1528
1529 def refresh(self, ro_task):
1530 # look for task create
1531 task_create_index, _ = next(
1532 i_t
1533 for i_t in enumerate(ro_task["tasks"])
1534 if i_t[1]
1535 and i_t[1]["action"] == "CREATE"
1536 and i_t[1]["status"] != "FINISHED"
1537 )
1538
1539 return self.new(ro_task, task_create_index, None)
1540
1541 def new(self, ro_task, task_index, task_depends):
1542 task = ro_task["tasks"][task_index]
1543 task_id = task["task_id"]
1544 target_vim = self.my_vims[ro_task["target_id"]]
1545
1546 sdn_net_id = ro_task["vim_info"]["vim_id"]
1547
1548 created_items = ro_task["vim_info"].get("created_items")
1549 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1550 new_connected_ports = []
1551 last_update = ro_task["vim_info"].get("last_update", 0)
1552 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1553 error_list = []
1554 created = ro_task["vim_info"].get("created", False)
1555
1556 try:
1557 # CREATE
1558 db_vim = {}
1559 params = task["params"]
1560 vlds_to_connect = params.get("vlds", [])
1561 associated_vim = params.get("target_vim")
1562 # external additional ports
1563 additional_ports = params.get("sdn-ports") or ()
1564 _, _, vim_account_id = (
1565 (None, None, None)
1566 if associated_vim is None
1567 else associated_vim.partition(":")
1568 )
1569
1570 if associated_vim:
1571 # get associated VIM
1572 if associated_vim not in self.db_vims:
1573 self.db_vims[associated_vim] = self.db.get_one(
1574 "vim_accounts", {"_id": vim_account_id}
1575 )
1576
1577 db_vim = self.db_vims[associated_vim]
1578
1579 # look for ports to connect
1580 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1581 # print(ports)
1582
1583 sdn_ports = []
1584 pending_ports = error_ports = 0
1585 vlan_used = None
1586 sdn_need_update = False
1587
1588 for port in ports:
1589 vlan_used = port.get("vlan") or vlan_used
1590
1591 # TODO. Do not connect if already done
1592 if not port.get("compute_node") or not port.get("pci"):
1593 if port.get("status") == "ERROR":
1594 error_ports += 1
1595 else:
1596 pending_ports += 1
1597 continue
1598
1599 pmap = None
1600 compute_node_mappings = next(
1601 (
1602 c
1603 for c in db_vim["config"].get("sdn-port-mapping", ())
1604 if c and c["compute_node"] == port["compute_node"]
1605 ),
1606 None,
1607 )
1608
1609 if compute_node_mappings:
1610 # process port_mapping pci of type 0000:af:1[01].[1357]
1611 pmap = next(
1612 (
1613 p
1614 for p in compute_node_mappings["ports"]
1615 if self._match_pci(port["pci"], p.get("pci"))
1616 ),
1617 None,
1618 )
1619
1620 if not pmap:
1621 if not db_vim["config"].get("mapping_not_needed"):
1622 error_list.append(
1623 "Port mapping not found for compute_node={} pci={}".format(
1624 port["compute_node"], port["pci"]
1625 )
1626 )
1627 continue
1628
1629 pmap = {}
1630
1631 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1632 new_port = {
1633 "service_endpoint_id": pmap.get("service_endpoint_id")
1634 or service_endpoint_id,
1635 "service_endpoint_encapsulation_type": (
1636 "dot1q" if port["type"] == "SR-IOV" else None
1637 ),
1638 "service_endpoint_encapsulation_info": {
1639 "vlan": port.get("vlan"),
1640 "mac": port.get("mac-address"),
1641 "device_id": pmap.get("device_id") or port["compute_node"],
1642 "device_interface_id": pmap.get("device_interface_id")
1643 or port["pci"],
1644 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1645 "switch_port": pmap.get("switch_port"),
1646 "service_mapping_info": pmap.get("service_mapping_info"),
1647 },
1648 }
1649
1650 # TODO
1651 # if port["modified_at"] > last_update:
1652 # sdn_need_update = True
1653 new_connected_ports.append(port["id"]) # TODO
1654 sdn_ports.append(new_port)
1655
1656 if error_ports:
1657 error_list.append(
1658 "{} interfaces have not been created as VDU is on ERROR status".format(
1659 error_ports
1660 )
1661 )
1662
1663 # connect external ports
1664 for index, additional_port in enumerate(additional_ports):
1665 additional_port_id = additional_port.get(
1666 "service_endpoint_id"
1667 ) or "external-{}".format(index)
1668 sdn_ports.append(
1669 {
1670 "service_endpoint_id": additional_port_id,
1671 "service_endpoint_encapsulation_type": additional_port.get(
1672 "service_endpoint_encapsulation_type", "dot1q"
1673 ),
1674 "service_endpoint_encapsulation_info": {
1675 "vlan": additional_port.get("vlan") or vlan_used,
1676 "mac": additional_port.get("mac_address"),
1677 "device_id": additional_port.get("device_id"),
1678 "device_interface_id": additional_port.get(
1679 "device_interface_id"
1680 ),
1681 "switch_dpid": additional_port.get("switch_dpid")
1682 or additional_port.get("switch_id"),
1683 "switch_port": additional_port.get("switch_port"),
1684 "service_mapping_info": additional_port.get(
1685 "service_mapping_info"
1686 ),
1687 },
1688 }
1689 )
1690 new_connected_ports.append(additional_port_id)
1691 sdn_info = ""
1692
1693 # if there are more ports to connect or they have been modified, call create/update
1694 if error_list:
1695 sdn_status = "ERROR"
1696 sdn_info = "; ".join(error_list)
1697 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1698 last_update = time.time()
1699
1700 if not sdn_net_id:
1701 if len(sdn_ports) < 2:
1702 sdn_status = "ACTIVE"
1703
1704 if not pending_ports:
1705 self.logger.debug(
1706 "task={} {} new-sdn-net done, less than 2 ports".format(
1707 task_id, ro_task["target_id"]
1708 )
1709 )
1710 else:
1711 net_type = params.get("type") or "ELAN"
1712 (
1713 sdn_net_id,
1714 created_items,
1715 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1716 created = True
1717 self.logger.debug(
1718 "task={} {} new-sdn-net={} created={}".format(
1719 task_id, ro_task["target_id"], sdn_net_id, created
1720 )
1721 )
1722 else:
1723 created_items = target_vim.edit_connectivity_service(
1724 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1725 )
1726 created = True
1727 self.logger.debug(
1728 "task={} {} update-sdn-net={} created={}".format(
1729 task_id, ro_task["target_id"], sdn_net_id, created
1730 )
1731 )
1732
1733 connected_ports = new_connected_ports
1734 elif sdn_net_id:
1735 wim_status_dict = target_vim.get_connectivity_service_status(
1736 sdn_net_id, conn_info=created_items
1737 )
1738 sdn_status = wim_status_dict["sdn_status"]
1739
1740 if wim_status_dict.get("sdn_info"):
1741 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1742
1743 if wim_status_dict.get("error_msg"):
1744 sdn_info = wim_status_dict.get("error_msg") or ""
1745
1746 if pending_ports:
1747 if sdn_status != "ERROR":
1748 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1749 len(ports) - pending_ports, len(ports)
1750 )
1751
1752 if sdn_status == "ACTIVE":
1753 sdn_status = "BUILD"
1754
1755 ro_vim_item_update = {
1756 "vim_id": sdn_net_id,
1757 "vim_status": sdn_status,
1758 "created": created,
1759 "created_items": created_items,
1760 "connected_ports": connected_ports,
1761 "vim_details": sdn_info,
1762 "vim_message": None,
1763 "last_update": last_update,
1764 }
1765
1766 return sdn_status, ro_vim_item_update
1767 except Exception as e:
1768 self.logger.error(
1769 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1770 exc_info=not isinstance(
1771 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1772 ),
1773 )
1774 ro_vim_item_update = {
1775 "vim_status": "VIM_ERROR",
1776 "created": created,
1777 "vim_message": str(e),
1778 }
1779
1780 return "FAILED", ro_vim_item_update
1781
1782 def delete(self, ro_task, task_index):
1783 task = ro_task["tasks"][task_index]
1784 task_id = task["task_id"]
1785 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1786 ro_vim_item_update_ok = {
1787 "vim_status": "DELETED",
1788 "created": False,
1789 "vim_message": "DELETED",
1790 "vim_id": None,
1791 }
1792
1793 try:
1794 if sdn_vim_id:
1795 target_vim = self.my_vims[ro_task["target_id"]]
1796 target_vim.delete_connectivity_service(
1797 sdn_vim_id, ro_task["vim_info"].get("created_items")
1798 )
1799
1800 except Exception as e:
1801 if (
1802 isinstance(e, sdnconn.SdnConnectorError)
1803 and e.http_code == HTTPStatus.NOT_FOUND.value
1804 ):
1805 ro_vim_item_update_ok["vim_message"] = "already deleted"
1806 else:
1807 self.logger.error(
1808 "ro_task={} vim={} del-sdn-net={}: {}".format(
1809 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1810 ),
1811 exc_info=not isinstance(
1812 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1813 ),
1814 )
1815 ro_vim_item_update = {
1816 "vim_status": "VIM_ERROR",
1817 "vim_message": "Error while deleting: {}".format(e),
1818 }
1819
1820 return "FAILED", ro_vim_item_update
1821
1822 self.logger.debug(
1823 "task={} {} del-sdn-net={} {}".format(
1824 task_id,
1825 ro_task["target_id"],
1826 sdn_vim_id,
1827 ro_vim_item_update_ok.get("vim_message", ""),
1828 )
1829 )
1830
1831 return "DONE", ro_vim_item_update_ok
1832
1833
1834 class VimInteractionMigration(VimInteractionBase):
1835 def exec(self, ro_task, task_index, task_depends):
1836 task = ro_task["tasks"][task_index]
1837 task_id = task["task_id"]
1838 db_task_update = {"retries": 0}
1839 target_vim = self.my_vims[ro_task["target_id"]]
1840 vim_interfaces = []
1841 refreshed_vim_info = {}
1842
1843 try:
1844 vim_vm_id = ""
1845 if task.get("params"):
1846 vim_vm_id = task["params"].get("vim_vm_id")
1847 migrate_host = task["params"].get("migrate_host")
1848 _, migrated_compute_node = target_vim.migrate_instance(
1849 vim_vm_id, migrate_host
1850 )
1851
1852 if migrated_compute_node:
1853 # When VM is migrated, vdu["vim_info"] needs to be updated
1854 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1855 ro_task["target_id"]
1856 )
1857
1858 # Refresh VM to get new vim_info
1859 vm_to_refresh_list = [vim_vm_id]
1860 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1861 refreshed_vim_info = vim_dict[vim_vm_id]
1862
1863 if refreshed_vim_info.get("interfaces"):
1864 for old_iface in vdu_old_vim_info.get("interfaces"):
1865 iface = next(
1866 (
1867 iface
1868 for iface in refreshed_vim_info["interfaces"]
1869 if old_iface["vim_interface_id"]
1870 == iface["vim_interface_id"]
1871 ),
1872 None,
1873 )
1874 vim_interfaces.append(iface)
1875
1876 ro_vim_item_update = {
1877 "vim_id": vim_vm_id,
1878 "vim_status": "ACTIVE",
1879 "vim_details": None,
1880 "vim_message": None,
1881 }
1882
1883 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1884 "ERROR",
1885 "VIM_ERROR",
1886 ):
1887 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1888
1889 if vim_interfaces:
1890 ro_vim_item_update["interfaces"] = vim_interfaces
1891
1892 self.logger.debug(
1893 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1894 )
1895
1896 return "DONE", ro_vim_item_update, db_task_update
1897
1898 except (vimconn.VimConnException, NsWorkerException) as e:
1899 self.logger.error(
1900 "task={} vim={} VM Migration:"
1901 " {}".format(task_id, ro_task["target_id"], e)
1902 )
1903 ro_vim_item_update = {
1904 "vim_status": "VIM_ERROR",
1905 "vim_message": str(e),
1906 }
1907
1908 return "FAILED", ro_vim_item_update, db_task_update
1909
1910
1911 class VimInteractionResize(VimInteractionBase):
1912 def exec(self, ro_task, task_index, task_depends):
1913 task = ro_task["tasks"][task_index]
1914 task_id = task["task_id"]
1915 db_task_update = {"retries": 0}
1916 target_flavor_uuid = None
1917 refreshed_vim_info = {}
1918 target_vim = self.my_vims[ro_task["target_id"]]
1919
1920 try:
1921 params = task["params"]
1922 params_copy = deepcopy(params)
1923 target_flavor_uuid = task_depends[params_copy["flavor_id"]]
1924 vim_vm_id = ""
1925 if task.get("params"):
1926 self.logger.info("vim_vm_id %s", vim_vm_id)
1927
1928 if target_flavor_uuid is not None:
1929 resized_status = target_vim.resize_instance(
1930 vim_vm_id, target_flavor_uuid
1931 )
1932
1933 if resized_status:
1934 # Refresh VM to get new vim_info
1935 vm_to_refresh_list = [vim_vm_id]
1936 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1937 refreshed_vim_info = vim_dict[vim_vm_id]
1938
1939 ro_vim_item_update = {
1940 "vim_id": vim_vm_id,
1941 "vim_status": "ACTIVE",
1942 "vim_details": None,
1943 "vim_message": None,
1944 }
1945
1946 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1947 "ERROR",
1948 "VIM_ERROR",
1949 ):
1950 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1951
1952 self.logger.debug(
1953 "task={} {} resize done".format(task_id, ro_task["target_id"])
1954 )
1955 return "DONE", ro_vim_item_update, db_task_update
1956 except (vimconn.VimConnException, NsWorkerException) as e:
1957 self.logger.error(
1958 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1959 )
1960 ro_vim_item_update = {
1961 "vim_status": "VIM_ERROR",
1962 "vim_message": str(e),
1963 }
1964
1965 return "FAILED", ro_vim_item_update, db_task_update
1966
1967
1968 class ConfigValidate:
1969 def __init__(self, config: Dict):
1970 self.conf = config
1971
1972 @property
1973 def active(self):
1974 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1975 if (
1976 self.conf["period"]["refresh_active"] >= 60
1977 or self.conf["period"]["refresh_active"] == -1
1978 ):
1979 return self.conf["period"]["refresh_active"]
1980
1981 return 60
1982
1983 @property
1984 def build(self):
1985 return self.conf["period"]["refresh_build"]
1986
1987 @property
1988 def image(self):
1989 return self.conf["period"]["refresh_image"]
1990
1991 @property
1992 def error(self):
1993 return self.conf["period"]["refresh_error"]
1994
1995 @property
1996 def queue_size(self):
1997 return self.conf["period"]["queue_size"]
1998
1999
2000 class NsWorker(threading.Thread):
2001 def __init__(self, worker_index, config, plugins, db):
2002 """
2003 :param worker_index: thread index
2004 :param config: general configuration of RO, among others the process_id with the docker id where it runs
2005 :param plugins: global shared dict with the loaded plugins
2006 :param db: database class instance to use
2007 """
2008 threading.Thread.__init__(self)
2009 self.config = config
2010 self.plugins = plugins
2011 self.plugin_name = "unknown"
2012 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
2013 self.worker_index = worker_index
2014 # refresh periods for created items
2015 self.refresh_config = ConfigValidate(config)
2016 self.task_queue = queue.Queue(self.refresh_config.queue_size)
2017 # targetvim: vimplugin class
2018 self.my_vims = {}
2019 # targetvim: vim information from database
2020 self.db_vims = {}
2021 # targetvim list
2022 self.vim_targets = []
2023 self.my_id = config["process_id"] + ":" + str(worker_index)
2024 self.db = db
2025 self.item2class = {
2026 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
2027 "shared-volumes": VimInteractionSharedVolume(
2028 self.db, self.my_vims, self.db_vims, self.logger
2029 ),
2030 "classification": VimInteractionClassification(
2031 self.db, self.my_vims, self.db_vims, self.logger
2032 ),
2033 "sfi": VimInteractionSfi(self.db, self.my_vims, self.db_vims, self.logger),
2034 "sf": VimInteractionSf(self.db, self.my_vims, self.db_vims, self.logger),
2035 "sfp": VimInteractionSfp(self.db, self.my_vims, self.db_vims, self.logger),
2036 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
2037 "image": VimInteractionImage(
2038 self.db, self.my_vims, self.db_vims, self.logger
2039 ),
2040 "flavor": VimInteractionFlavor(
2041 self.db, self.my_vims, self.db_vims, self.logger
2042 ),
2043 "sdn_net": VimInteractionSdnNet(
2044 self.db, self.my_vims, self.db_vims, self.logger
2045 ),
2046 "update": VimInteractionUpdateVdu(
2047 self.db, self.my_vims, self.db_vims, self.logger
2048 ),
2049 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
2050 self.db, self.my_vims, self.db_vims, self.logger
2051 ),
2052 "migrate": VimInteractionMigration(
2053 self.db, self.my_vims, self.db_vims, self.logger
2054 ),
2055 "verticalscale": VimInteractionResize(
2056 self.db, self.my_vims, self.db_vims, self.logger
2057 ),
2058 }
2059 self.time_last_task_processed = None
2060 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
2061 self.tasks_to_delete = []
2062 # it is idle when there are not vim_targets associated
2063 self.idle = True
2064 self.task_locked_time = config["global"]["task_locked_time"]
2065
2066 def insert_task(self, task):
2067 try:
2068 self.task_queue.put(task, False)
2069 return None
2070 except queue.Full:
2071 raise NsWorkerException("timeout inserting a task")
2072
2073 def terminate(self):
2074 self.insert_task("exit")
2075
2076 def del_task(self, task):
2077 with self.task_lock:
2078 if task["status"] == "SCHEDULED":
2079 task["status"] = "SUPERSEDED"
2080 return True
2081 else: # task["status"] == "processing"
2082 self.task_lock.release()
2083 return False
2084
2085 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
2086 """
2087 Process vim config, creating vim configuration files as ca_cert
2088 :param target_id: vim/sdn/wim + id
2089 :param db_vim: Vim dictionary obtained from database
2090 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
2091 """
2092 if not db_vim.get("config"):
2093 return
2094
2095 file_name = ""
2096 work_dir = "/app/osm_ro/certs"
2097
2098 try:
2099 if db_vim["config"].get("ca_cert_content"):
2100 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
2101
2102 if not path.isdir(file_name):
2103 makedirs(file_name)
2104
2105 file_name = file_name + "/ca_cert"
2106
2107 with open(file_name, "w") as f:
2108 f.write(db_vim["config"]["ca_cert_content"])
2109 del db_vim["config"]["ca_cert_content"]
2110 db_vim["config"]["ca_cert"] = file_name
2111 except Exception as e:
2112 raise NsWorkerException(
2113 "Error writing to file '{}': {}".format(file_name, e)
2114 )
2115
2116 def _load_plugin(self, name, type="vim"):
2117 # type can be vim or sdn
2118 if "rovim_dummy" not in self.plugins:
2119 self.plugins["rovim_dummy"] = VimDummyConnector
2120
2121 if "rosdn_dummy" not in self.plugins:
2122 self.plugins["rosdn_dummy"] = SdnDummyConnector
2123
2124 if name in self.plugins:
2125 return self.plugins[name]
2126
2127 try:
2128 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
2129 self.plugins[name] = ep.load()
2130 except Exception as e:
2131 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
2132
2133 if name and name not in self.plugins:
2134 raise NsWorkerException(
2135 "Plugin 'osm_{n}' has not been installed".format(n=name)
2136 )
2137
2138 return self.plugins[name]
2139
2140 def _unload_vim(self, target_id):
2141 """
2142 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
2143 :param target_id: Contains type:_id; where type can be 'vim', ...
2144 :return: None.
2145 """
2146 try:
2147 self.db_vims.pop(target_id, None)
2148 self.my_vims.pop(target_id, None)
2149
2150 if target_id in self.vim_targets:
2151 self.vim_targets.remove(target_id)
2152
2153 self.logger.info("Unloaded {}".format(target_id))
2154 except Exception as e:
2155 self.logger.error("Cannot unload {}: {}".format(target_id, e))
2156
2157 def _check_vim(self, target_id):
2158 """
2159 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
2160 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
2161 :return: None.
2162 """
2163 target, _, _id = target_id.partition(":")
2164 now = time.time()
2165 update_dict = {}
2166 unset_dict = {}
2167 op_text = ""
2168 step = ""
2169 loaded = target_id in self.vim_targets
2170 target_database = (
2171 "vim_accounts"
2172 if target == "vim"
2173 else "wim_accounts" if target == "wim" else "sdns"
2174 )
2175 error_text = ""
2176
2177 try:
2178 step = "Getting {} from db".format(target_id)
2179 db_vim = self.db.get_one(target_database, {"_id": _id})
2180
2181 for op_index, operation in enumerate(
2182 db_vim["_admin"].get("operations", ())
2183 ):
2184 if operation["operationState"] != "PROCESSING":
2185 continue
2186
2187 locked_at = operation.get("locked_at")
2188
2189 if locked_at is not None and locked_at >= now - self.task_locked_time:
2190 # some other thread is doing this operation
2191 return
2192
2193 # lock
2194 op_text = "_admin.operations.{}.".format(op_index)
2195
2196 if not self.db.set_one(
2197 target_database,
2198 q_filter={
2199 "_id": _id,
2200 op_text + "operationState": "PROCESSING",
2201 op_text + "locked_at": locked_at,
2202 },
2203 update_dict={
2204 op_text + "locked_at": now,
2205 "admin.current_operation": op_index,
2206 },
2207 fail_on_empty=False,
2208 ):
2209 return
2210
2211 unset_dict[op_text + "locked_at"] = None
2212 unset_dict["current_operation"] = None
2213 step = "Loading " + target_id
2214 error_text = self._load_vim(target_id)
2215
2216 if not error_text:
2217 step = "Checking connectivity"
2218
2219 if target == "vim":
2220 self.my_vims[target_id].check_vim_connectivity()
2221 else:
2222 self.my_vims[target_id].check_credentials()
2223
2224 update_dict["_admin.operationalState"] = "ENABLED"
2225 update_dict["_admin.detailed-status"] = ""
2226 unset_dict[op_text + "detailed-status"] = None
2227 update_dict[op_text + "operationState"] = "COMPLETED"
2228
2229 return
2230
2231 except Exception as e:
2232 error_text = "{}: {}".format(step, e)
2233 self.logger.error("{} for {}: {}".format(step, target_id, e))
2234
2235 finally:
2236 if update_dict or unset_dict:
2237 if error_text:
2238 update_dict[op_text + "operationState"] = "FAILED"
2239 update_dict[op_text + "detailed-status"] = error_text
2240 unset_dict.pop(op_text + "detailed-status", None)
2241 update_dict["_admin.operationalState"] = "ERROR"
2242 update_dict["_admin.detailed-status"] = error_text
2243
2244 if op_text:
2245 update_dict[op_text + "statusEnteredTime"] = now
2246
2247 self.db.set_one(
2248 target_database,
2249 q_filter={"_id": _id},
2250 update_dict=update_dict,
2251 unset=unset_dict,
2252 fail_on_empty=False,
2253 )
2254
2255 if not loaded:
2256 self._unload_vim(target_id)
2257
2258 def _reload_vim(self, target_id):
2259 if target_id in self.vim_targets:
2260 self._load_vim(target_id)
2261 else:
2262 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
2263 # just remove it to force load again next time it is needed
2264 self.db_vims.pop(target_id, None)
2265
2266 def _load_vim(self, target_id):
2267 """
2268 Load or reload a vim_account, sdn_controller or wim_account.
2269 Read content from database, load the plugin if not loaded.
2270 In case of error loading the plugin, it loads a failing VIM_connector
2271 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
2272 :param target_id: Contains type:_id; where type can be 'vim', ...
2273 :return: None if ok, descriptive text if error
2274 """
2275 target, _, _id = target_id.partition(":")
2276 target_database = (
2277 "vim_accounts"
2278 if target == "vim"
2279 else "wim_accounts" if target == "wim" else "sdns"
2280 )
2281 plugin_name = ""
2282 vim = None
2283 step = "Getting {}={} from db".format(target, _id)
2284
2285 try:
2286 # TODO process for wim, sdnc, ...
2287 vim = self.db.get_one(target_database, {"_id": _id})
2288
2289 # if deep_get(vim, "config", "sdn-controller"):
2290 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
2291 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
2292
2293 step = "Decrypting password"
2294 schema_version = vim.get("schema_version")
2295 self.db.encrypt_decrypt_fields(
2296 vim,
2297 "decrypt",
2298 fields=("password", "secret"),
2299 schema_version=schema_version,
2300 salt=_id,
2301 )
2302 self._process_vim_config(target_id, vim)
2303
2304 if target == "vim":
2305 plugin_name = "rovim_" + vim["vim_type"]
2306 step = "Loading plugin '{}'".format(plugin_name)
2307 vim_module_conn = self._load_plugin(plugin_name)
2308 step = "Loading {}'".format(target_id)
2309 self.my_vims[target_id] = vim_module_conn(
2310 uuid=vim["_id"],
2311 name=vim["name"],
2312 tenant_id=vim.get("vim_tenant_id"),
2313 tenant_name=vim.get("vim_tenant_name"),
2314 url=vim["vim_url"],
2315 url_admin=None,
2316 user=vim["vim_user"],
2317 passwd=vim["vim_password"],
2318 config=vim.get("config") or {},
2319 persistent_info={},
2320 )
2321 else: # sdn
2322 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
2323 step = "Loading plugin '{}'".format(plugin_name)
2324 vim_module_conn = self._load_plugin(plugin_name, "sdn")
2325 step = "Loading {}'".format(target_id)
2326 wim = deepcopy(vim)
2327 wim_config = wim.pop("config", {}) or {}
2328 wim["uuid"] = wim["_id"]
2329 if "url" in wim and "wim_url" not in wim:
2330 wim["wim_url"] = wim["url"]
2331 elif "url" not in wim and "wim_url" in wim:
2332 wim["url"] = wim["wim_url"]
2333
2334 if wim.get("dpid"):
2335 wim_config["dpid"] = wim.pop("dpid")
2336
2337 if wim.get("switch_id"):
2338 wim_config["switch_id"] = wim.pop("switch_id")
2339
2340 # wim, wim_account, config
2341 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
2342 self.db_vims[target_id] = vim
2343 self.error_status = None
2344
2345 self.logger.info(
2346 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
2347 )
2348 except Exception as e:
2349 self.logger.error(
2350 "Cannot load {} plugin={}: {} {}".format(
2351 target_id, plugin_name, step, e
2352 )
2353 )
2354
2355 self.db_vims[target_id] = vim or {}
2356 self.db_vims[target_id] = FailingConnector(str(e))
2357 error_status = "{} Error: {}".format(step, e)
2358
2359 return error_status
2360 finally:
2361 if target_id not in self.vim_targets:
2362 self.vim_targets.append(target_id)
2363
2364 def _get_db_task(self):
2365 """
2366 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
2367 :return: None
2368 """
2369 now = time.time()
2370
2371 if not self.time_last_task_processed:
2372 self.time_last_task_processed = now
2373
2374 try:
2375 while True:
2376 """
2377 # Log RO tasks only when loglevel is DEBUG
2378 if self.logger.getEffectiveLevel() == logging.DEBUG:
2379 self._log_ro_task(
2380 None,
2381 None,
2382 None,
2383 "TASK_WF",
2384 "task_locked_time="
2385 + str(self.task_locked_time)
2386 + " "
2387 + "time_last_task_processed="
2388 + str(self.time_last_task_processed)
2389 + " "
2390 + "now="
2391 + str(now),
2392 )
2393 """
2394 locked = self.db.set_one(
2395 "ro_tasks",
2396 q_filter={
2397 "target_id": self.vim_targets,
2398 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2399 "locked_at.lt": now - self.task_locked_time,
2400 "to_check_at.lt": self.time_last_task_processed,
2401 "to_check_at.gt": -1,
2402 },
2403 update_dict={"locked_by": self.my_id, "locked_at": now},
2404 fail_on_empty=False,
2405 )
2406
2407 if locked:
2408 # read and return
2409 ro_task = self.db.get_one(
2410 "ro_tasks",
2411 q_filter={
2412 "target_id": self.vim_targets,
2413 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2414 "locked_at": now,
2415 },
2416 )
2417 return ro_task
2418
2419 if self.time_last_task_processed == now:
2420 self.time_last_task_processed = None
2421 return None
2422 else:
2423 self.time_last_task_processed = now
2424 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2425
2426 except DbException as e:
2427 self.logger.error("Database exception at _get_db_task: {}".format(e))
2428 except Exception as e:
2429 self.logger.critical(
2430 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
2431 )
2432
2433 return None
2434
2435 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2436 """
2437 Determine if this task need to be done or superseded
2438 :return: None
2439 """
2440 my_task = ro_task["tasks"][task_index]
2441 task_id = my_task["task_id"]
2442 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2443 "created_items", False
2444 )
2445
2446 self.logger.debug("Needed delete: {}".format(needed_delete))
2447 if my_task["status"] == "FAILED":
2448 return None, None # TODO need to be retry??
2449
2450 try:
2451 for index, task in enumerate(ro_task["tasks"]):
2452 if index == task_index or not task:
2453 continue # own task
2454
2455 if (
2456 my_task["target_record"] == task["target_record"]
2457 and task["action"] == "CREATE"
2458 ):
2459 # set to finished
2460 db_update["tasks.{}.status".format(index)] = task["status"] = (
2461 "FINISHED"
2462 )
2463 elif task["action"] == "CREATE" and task["status"] not in (
2464 "FINISHED",
2465 "SUPERSEDED",
2466 ):
2467 needed_delete = False
2468
2469 if needed_delete:
2470 self.logger.debug(
2471 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2472 )
2473 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2474 else:
2475 return "SUPERSEDED", None
2476 except Exception as e:
2477 if not isinstance(e, NsWorkerException):
2478 self.logger.critical(
2479 "Unexpected exception at _delete_task task={}: {}".format(
2480 task_id, e
2481 ),
2482 exc_info=True,
2483 )
2484
2485 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2486
2487 def _create_task(self, ro_task, task_index, task_depends, db_update):
2488 """
2489 Determine if this task need to create something at VIM
2490 :return: None
2491 """
2492 my_task = ro_task["tasks"][task_index]
2493 task_id = my_task["task_id"]
2494
2495 if my_task["status"] == "FAILED":
2496 return None, None # TODO need to be retry??
2497 elif my_task["status"] == "SCHEDULED":
2498 # check if already created by another task
2499 for index, task in enumerate(ro_task["tasks"]):
2500 if index == task_index or not task:
2501 continue # own task
2502
2503 if task["action"] == "CREATE" and task["status"] not in (
2504 "SCHEDULED",
2505 "FINISHED",
2506 "SUPERSEDED",
2507 ):
2508 return task["status"], "COPY_VIM_INFO"
2509
2510 try:
2511 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2512 ro_task, task_index, task_depends
2513 )
2514 # TODO update other CREATE tasks
2515 except Exception as e:
2516 if not isinstance(e, NsWorkerException):
2517 self.logger.error(
2518 "Error executing task={}: {}".format(task_id, e), exc_info=True
2519 )
2520
2521 task_status = "FAILED"
2522 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2523 # TODO update ro_vim_item_update
2524
2525 return task_status, ro_vim_item_update
2526 else:
2527 return None, None
2528
2529 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2530 """
2531 Look for dependency task
2532 :param task_id: Can be one of
2533 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2534 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2535 3. task.task_id: "<action_id>:number"
2536 :param ro_task:
2537 :param target_id:
2538 :return: database ro_task plus index of task
2539 """
2540 if (
2541 task_id.startswith("vim:")
2542 or task_id.startswith("sdn:")
2543 or task_id.startswith("wim:")
2544 ):
2545 target_id, _, task_id = task_id.partition(" ")
2546
2547 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2548 ro_task_dependency = self.db.get_one(
2549 "ro_tasks",
2550 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2551 fail_on_empty=False,
2552 )
2553
2554 if ro_task_dependency:
2555 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2556 if task["target_record_id"] == task_id:
2557 return ro_task_dependency, task_index
2558
2559 else:
2560 if ro_task:
2561 for task_index, task in enumerate(ro_task["tasks"]):
2562 if task and task["task_id"] == task_id:
2563 return ro_task, task_index
2564
2565 ro_task_dependency = self.db.get_one(
2566 "ro_tasks",
2567 q_filter={
2568 "tasks.ANYINDEX.task_id": task_id,
2569 "tasks.ANYINDEX.target_record.ne": None,
2570 },
2571 fail_on_empty=False,
2572 )
2573
2574 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2575 if ro_task_dependency:
2576 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2577 if task["task_id"] == task_id:
2578 return ro_task_dependency, task_index
2579 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2580
2581 def update_vm_refresh(self, ro_task):
2582 """Enables the VM status updates if self.refresh_config.active parameter
2583 is not -1 and then updates the DB accordingly
2584
2585 """
2586 try:
2587 self.logger.debug("Checking if VM status update config")
2588 next_refresh = time.time()
2589 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2590
2591 if next_refresh != -1:
2592 db_ro_task_update = {}
2593 now = time.time()
2594 next_check_at = now + (24 * 60 * 60)
2595 next_check_at = min(next_check_at, next_refresh)
2596 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2597 db_ro_task_update["to_check_at"] = next_check_at
2598
2599 self.logger.debug(
2600 "Finding tasks which to be updated to enable VM status updates"
2601 )
2602 refresh_tasks = self.db.get_list(
2603 "ro_tasks",
2604 q_filter={
2605 "tasks.status": "DONE",
2606 "to_check_at.lt": 0,
2607 },
2608 )
2609 self.logger.debug("Updating tasks to change the to_check_at status")
2610 for task in refresh_tasks:
2611 q_filter = {
2612 "_id": task["_id"],
2613 }
2614 self.db.set_one(
2615 "ro_tasks",
2616 q_filter=q_filter,
2617 update_dict=db_ro_task_update,
2618 fail_on_empty=True,
2619 )
2620
2621 except Exception as e:
2622 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2623
2624 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2625 """Decide the next_refresh according to vim type and refresh config period.
2626 Args:
2627 ro_task (dict): ro_task details
2628 next_refresh (float): next refresh time as epoch format
2629
2630 Returns:
2631 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2632 """
2633 target_vim = ro_task["target_id"]
2634 vim_type = self.db_vims[target_vim]["vim_type"]
2635 if self.refresh_config.active == -1 or vim_type == "openstack":
2636 next_refresh = -1
2637 else:
2638 next_refresh += self.refresh_config.active
2639 return next_refresh
2640
2641 def _process_pending_tasks(self, ro_task):
2642 ro_task_id = ro_task["_id"]
2643 now = time.time()
2644 # one day
2645 next_check_at = now + (24 * 60 * 60)
2646 db_ro_task_update = {}
2647
2648 def _update_refresh(new_status):
2649 # compute next_refresh
2650 nonlocal task
2651 nonlocal next_check_at
2652 nonlocal db_ro_task_update
2653 nonlocal ro_task
2654
2655 next_refresh = time.time()
2656
2657 if task["item"] in ("image", "flavor"):
2658 next_refresh += self.refresh_config.image
2659 elif new_status == "BUILD":
2660 next_refresh += self.refresh_config.build
2661 elif new_status == "DONE":
2662 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2663 else:
2664 next_refresh += self.refresh_config.error
2665
2666 next_check_at = min(next_check_at, next_refresh)
2667 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2668 ro_task["vim_info"]["refresh_at"] = next_refresh
2669
2670 try:
2671 """
2672 # Log RO tasks only when loglevel is DEBUG
2673 if self.logger.getEffectiveLevel() == logging.DEBUG:
2674 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2675 """
2676 # Check if vim status refresh is enabled again
2677 self.update_vm_refresh(ro_task)
2678 # 0: get task_status_create
2679 lock_object = None
2680 task_status_create = None
2681 task_create = next(
2682 (
2683 t
2684 for t in ro_task["tasks"]
2685 if t
2686 and t["action"] == "CREATE"
2687 and t["status"] in ("BUILD", "DONE")
2688 ),
2689 None,
2690 )
2691
2692 if task_create:
2693 task_status_create = task_create["status"]
2694
2695 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2696 for task_action in ("DELETE", "CREATE", "EXEC"):
2697 db_vim_update = None
2698 new_status = None
2699
2700 for task_index, task in enumerate(ro_task["tasks"]):
2701 if not task:
2702 continue # task deleted
2703
2704 task_depends = {}
2705 target_update = None
2706
2707 if (
2708 (
2709 task_action in ("DELETE", "EXEC")
2710 and task["status"] not in ("SCHEDULED", "BUILD")
2711 )
2712 or task["action"] != task_action
2713 or (
2714 task_action == "CREATE"
2715 and task["status"] in ("FINISHED", "SUPERSEDED")
2716 )
2717 ):
2718 continue
2719
2720 task_path = "tasks.{}.status".format(task_index)
2721 try:
2722 db_vim_info_update = None
2723 dependency_ro_task = {}
2724
2725 if task["status"] == "SCHEDULED":
2726 # check if tasks that this depends on have been completed
2727 dependency_not_completed = False
2728
2729 for dependency_task_id in task.get("depends_on") or ():
2730 (
2731 dependency_ro_task,
2732 dependency_task_index,
2733 ) = self._get_dependency(
2734 dependency_task_id, target_id=ro_task["target_id"]
2735 )
2736 dependency_task = dependency_ro_task["tasks"][
2737 dependency_task_index
2738 ]
2739 self.logger.debug(
2740 "dependency_ro_task={} dependency_task_index={}".format(
2741 dependency_ro_task, dependency_task_index
2742 )
2743 )
2744
2745 if dependency_task["status"] == "SCHEDULED":
2746 dependency_not_completed = True
2747 next_check_at = min(
2748 next_check_at, dependency_ro_task["to_check_at"]
2749 )
2750 # must allow dependent task to be processed first
2751 # to do this set time after last_task_processed
2752 next_check_at = max(
2753 self.time_last_task_processed, next_check_at
2754 )
2755 break
2756 elif dependency_task["status"] == "FAILED":
2757 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2758 task["action"],
2759 task["item"],
2760 dependency_task["action"],
2761 dependency_task["item"],
2762 dependency_task_id,
2763 dependency_ro_task["vim_info"].get(
2764 "vim_message"
2765 ),
2766 )
2767 self.logger.error(
2768 "task={} {}".format(task["task_id"], error_text)
2769 )
2770 raise NsWorkerException(error_text)
2771
2772 task_depends[dependency_task_id] = dependency_ro_task[
2773 "vim_info"
2774 ]["vim_id"]
2775 task_depends["TASK-{}".format(dependency_task_id)] = (
2776 dependency_ro_task["vim_info"]["vim_id"]
2777 )
2778
2779 if dependency_not_completed:
2780 self.logger.warning(
2781 "DEPENDENCY NOT COMPLETED {}".format(
2782 dependency_ro_task["vim_info"]["vim_id"]
2783 )
2784 )
2785 # TODO set at vim_info.vim_details that it is waiting
2786 continue
2787
2788 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2789 # the task of renew this locking. It will update database locket_at periodically
2790 if not lock_object:
2791 lock_object = LockRenew.add_lock_object(
2792 "ro_tasks", ro_task, self
2793 )
2794 if task["action"] == "DELETE":
2795 (
2796 new_status,
2797 db_vim_info_update,
2798 ) = self._delete_task(
2799 ro_task, task_index, task_depends, db_ro_task_update
2800 )
2801 new_status = (
2802 "FINISHED" if new_status == "DONE" else new_status
2803 )
2804 # ^with FINISHED instead of DONE it will not be refreshing
2805
2806 if new_status in ("FINISHED", "SUPERSEDED"):
2807 target_update = "DELETE"
2808 elif task["action"] == "EXEC":
2809 (
2810 new_status,
2811 db_vim_info_update,
2812 db_task_update,
2813 ) = self.item2class[task["item"]].exec(
2814 ro_task, task_index, task_depends
2815 )
2816 new_status = (
2817 "FINISHED" if new_status == "DONE" else new_status
2818 )
2819 # ^with FINISHED instead of DONE it will not be refreshing
2820
2821 if db_task_update:
2822 # load into database the modified db_task_update "retries" and "next_retry"
2823 if db_task_update.get("retries"):
2824 db_ro_task_update[
2825 "tasks.{}.retries".format(task_index)
2826 ] = db_task_update["retries"]
2827
2828 next_check_at = time.time() + db_task_update.get(
2829 "next_retry", 60
2830 )
2831 target_update = None
2832 elif task["action"] == "CREATE":
2833 if task["status"] == "SCHEDULED":
2834 if task_status_create:
2835 new_status = task_status_create
2836 target_update = "COPY_VIM_INFO"
2837 else:
2838 new_status, db_vim_info_update = self.item2class[
2839 task["item"]
2840 ].new(ro_task, task_index, task_depends)
2841 _update_refresh(new_status)
2842 else:
2843 refresh_at = ro_task["vim_info"]["refresh_at"]
2844 if refresh_at and refresh_at != -1 and now > refresh_at:
2845 (
2846 new_status,
2847 db_vim_info_update,
2848 ) = self.item2class[
2849 task["item"]
2850 ].refresh(ro_task)
2851 _update_refresh(new_status)
2852 else:
2853 # The refresh is updated to avoid set the value of "refresh_at" to
2854 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2855 # because it can happen that in this case the task is never processed
2856 _update_refresh(task["status"])
2857
2858 except Exception as e:
2859 new_status = "FAILED"
2860 db_vim_info_update = {
2861 "vim_status": "VIM_ERROR",
2862 "vim_message": str(e),
2863 }
2864
2865 if not isinstance(
2866 e, (NsWorkerException, vimconn.VimConnException)
2867 ):
2868 self.logger.error(
2869 "Unexpected exception at _delete_task task={}: {}".format(
2870 task["task_id"], e
2871 ),
2872 exc_info=True,
2873 )
2874
2875 try:
2876 if db_vim_info_update:
2877 db_vim_update = db_vim_info_update.copy()
2878 db_ro_task_update.update(
2879 {
2880 "vim_info." + k: v
2881 for k, v in db_vim_info_update.items()
2882 }
2883 )
2884 ro_task["vim_info"].update(db_vim_info_update)
2885
2886 if new_status:
2887 if task_action == "CREATE":
2888 task_status_create = new_status
2889 db_ro_task_update[task_path] = new_status
2890
2891 if target_update or db_vim_update:
2892 if target_update == "DELETE":
2893 self._update_target(task, None)
2894 elif target_update == "COPY_VIM_INFO":
2895 self._update_target(task, ro_task["vim_info"])
2896 else:
2897 self._update_target(task, db_vim_update)
2898
2899 except Exception as e:
2900 if (
2901 isinstance(e, DbException)
2902 and e.http_code == HTTPStatus.NOT_FOUND
2903 ):
2904 # if the vnfrs or nsrs has been removed from database, this task must be removed
2905 self.logger.debug(
2906 "marking to delete task={}".format(task["task_id"])
2907 )
2908 self.tasks_to_delete.append(task)
2909 else:
2910 self.logger.error(
2911 "Unexpected exception at _update_target task={}: {}".format(
2912 task["task_id"], e
2913 ),
2914 exc_info=True,
2915 )
2916
2917 locked_at = ro_task["locked_at"]
2918
2919 if lock_object:
2920 locked_at = [
2921 lock_object["locked_at"],
2922 lock_object["locked_at"] + self.task_locked_time,
2923 ]
2924 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2925 # contain exactly locked_at + self.task_locked_time
2926 LockRenew.remove_lock_object(lock_object)
2927
2928 q_filter = {
2929 "_id": ro_task["_id"],
2930 "to_check_at": ro_task["to_check_at"],
2931 "locked_at": locked_at,
2932 }
2933 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2934 # outside this task (by ro_nbi) do not update it
2935 db_ro_task_update["locked_by"] = None
2936 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2937 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2938 db_ro_task_update["modified_at"] = now
2939 db_ro_task_update["to_check_at"] = next_check_at
2940
2941 """
2942 # Log RO tasks only when loglevel is DEBUG
2943 if self.logger.getEffectiveLevel() == logging.DEBUG:
2944 db_ro_task_update_log = db_ro_task_update.copy()
2945 db_ro_task_update_log["_id"] = q_filter["_id"]
2946 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2947 """
2948
2949 if not self.db.set_one(
2950 "ro_tasks",
2951 update_dict=db_ro_task_update,
2952 q_filter=q_filter,
2953 fail_on_empty=False,
2954 ):
2955 del db_ro_task_update["to_check_at"]
2956 del q_filter["to_check_at"]
2957 """
2958 # Log RO tasks only when loglevel is DEBUG
2959 if self.logger.getEffectiveLevel() == logging.DEBUG:
2960 self._log_ro_task(
2961 None,
2962 db_ro_task_update_log,
2963 None,
2964 "TASK_WF",
2965 "SET_TASK " + str(q_filter),
2966 )
2967 """
2968 self.db.set_one(
2969 "ro_tasks",
2970 q_filter=q_filter,
2971 update_dict=db_ro_task_update,
2972 fail_on_empty=True,
2973 )
2974 except DbException as e:
2975 self.logger.error(
2976 "ro_task={} Error updating database {}".format(ro_task_id, e)
2977 )
2978 except Exception as e:
2979 self.logger.error(
2980 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2981 )
2982
2983 def _update_target(self, task, ro_vim_item_update):
2984 table, _, temp = task["target_record"].partition(":")
2985 _id, _, path_vim_status = temp.partition(":")
2986 path_item = path_vim_status[: path_vim_status.rfind(".")]
2987 path_item = path_item[: path_item.rfind(".")]
2988 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2989 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2990
2991 if ro_vim_item_update:
2992 update_dict = {
2993 path_vim_status + "." + k: v
2994 for k, v in ro_vim_item_update.items()
2995 if k
2996 in (
2997 "vim_id",
2998 "vim_details",
2999 "vim_message",
3000 "vim_name",
3001 "vim_status",
3002 "interfaces",
3003 "interfaces_backup",
3004 )
3005 }
3006
3007 if path_vim_status.startswith("vdur."):
3008 # for backward compatibility, add vdur.name apart from vdur.vim_name
3009 if ro_vim_item_update.get("vim_name"):
3010 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
3011
3012 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
3013 if ro_vim_item_update.get("vim_id"):
3014 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
3015
3016 # update general status
3017 if ro_vim_item_update.get("vim_status"):
3018 update_dict[path_item + ".status"] = ro_vim_item_update[
3019 "vim_status"
3020 ]
3021
3022 if ro_vim_item_update.get("interfaces"):
3023 path_interfaces = path_item + ".interfaces"
3024
3025 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
3026 if iface:
3027 update_dict.update(
3028 {
3029 path_interfaces + ".{}.".format(i) + k: v
3030 for k, v in iface.items()
3031 if k in ("vlan", "compute_node", "pci")
3032 }
3033 )
3034
3035 # put ip_address and mac_address with ip-address and mac-address
3036 if iface.get("ip_address"):
3037 update_dict[
3038 path_interfaces + ".{}.".format(i) + "ip-address"
3039 ] = iface["ip_address"]
3040
3041 if iface.get("mac_address"):
3042 update_dict[
3043 path_interfaces + ".{}.".format(i) + "mac-address"
3044 ] = iface["mac_address"]
3045
3046 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
3047 update_dict["ip-address"] = iface.get("ip_address").split(
3048 ";"
3049 )[0]
3050
3051 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
3052 update_dict[path_item + ".ip-address"] = iface.get(
3053 "ip_address"
3054 ).split(";")[0]
3055
3056 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
3057
3058 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
3059 if ro_vim_item_update.get("interfaces"):
3060 search_key = path_vim_status + ".interfaces"
3061 if update_dict.get(search_key):
3062 interfaces_backup_update = {
3063 path_vim_status + ".interfaces_backup": update_dict[search_key]
3064 }
3065
3066 self.db.set_one(
3067 table,
3068 q_filter={"_id": _id},
3069 update_dict=interfaces_backup_update,
3070 )
3071
3072 else:
3073 update_dict = {path_item + ".status": "DELETED"}
3074 self.db.set_one(
3075 table,
3076 q_filter={"_id": _id},
3077 update_dict=update_dict,
3078 unset={path_vim_status: None},
3079 )
3080
3081 def _process_delete_db_tasks(self):
3082 """
3083 Delete task from database because vnfrs or nsrs or both have been deleted
3084 :return: None. Uses and modify self.tasks_to_delete
3085 """
3086 while self.tasks_to_delete:
3087 task = self.tasks_to_delete[0]
3088 vnfrs_deleted = None
3089 nsr_id = task["nsr_id"]
3090
3091 if task["target_record"].startswith("vnfrs:"):
3092 # check if nsrs is present
3093 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
3094 vnfrs_deleted = task["target_record"].split(":")[1]
3095
3096 try:
3097 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
3098 except Exception as e:
3099 self.logger.error(
3100 "Error deleting task={}: {}".format(task["task_id"], e)
3101 )
3102 self.tasks_to_delete.pop(0)
3103
3104 @staticmethod
3105 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
3106 """
3107 Static method because it is called from osm_ng_ro.ns
3108 :param db: instance of database to use
3109 :param nsr_id: affected nsrs id
3110 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
3111 :return: None, exception is fails
3112 """
3113 retries = 5
3114 for retry in range(retries):
3115 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
3116 now = time.time()
3117 conflict = False
3118
3119 for ro_task in ro_tasks:
3120 db_update = {}
3121 to_delete_ro_task = True
3122
3123 for index, task in enumerate(ro_task["tasks"]):
3124 if not task:
3125 pass
3126 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
3127 vnfrs_deleted
3128 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
3129 ):
3130 db_update["tasks.{}".format(index)] = None
3131 else:
3132 # used by other nsr, ro_task cannot be deleted
3133 to_delete_ro_task = False
3134
3135 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
3136 if to_delete_ro_task:
3137 if not db.del_one(
3138 "ro_tasks",
3139 q_filter={
3140 "_id": ro_task["_id"],
3141 "modified_at": ro_task["modified_at"],
3142 },
3143 fail_on_empty=False,
3144 ):
3145 conflict = True
3146 elif db_update:
3147 db_update["modified_at"] = now
3148 if not db.set_one(
3149 "ro_tasks",
3150 q_filter={
3151 "_id": ro_task["_id"],
3152 "modified_at": ro_task["modified_at"],
3153 },
3154 update_dict=db_update,
3155 fail_on_empty=False,
3156 ):
3157 conflict = True
3158 if not conflict:
3159 return
3160 else:
3161 raise NsWorkerException("Exceeded {} retries".format(retries))
3162
3163 def run(self):
3164 # load database
3165 self.logger.info("Starting")
3166 while True:
3167 # step 1: get commands from queue
3168 try:
3169 if self.vim_targets:
3170 task = self.task_queue.get(block=False)
3171 else:
3172 if not self.idle:
3173 self.logger.debug("enters in idle state")
3174 self.idle = True
3175 task = self.task_queue.get(block=True)
3176 self.idle = False
3177
3178 if task[0] == "terminate":
3179 break
3180 elif task[0] == "load_vim":
3181 self.logger.info("order to load vim {}".format(task[1]))
3182 self._load_vim(task[1])
3183 elif task[0] == "unload_vim":
3184 self.logger.info("order to unload vim {}".format(task[1]))
3185 self._unload_vim(task[1])
3186 elif task[0] == "reload_vim":
3187 self._reload_vim(task[1])
3188 elif task[0] == "check_vim":
3189 self.logger.info("order to check vim {}".format(task[1]))
3190 self._check_vim(task[1])
3191 continue
3192 except Exception as e:
3193 if isinstance(e, queue.Empty):
3194 pass
3195 else:
3196 self.logger.critical(
3197 "Error processing task: {}".format(e), exc_info=True
3198 )
3199
3200 # step 2: process pending_tasks, delete not needed tasks
3201 try:
3202 if self.tasks_to_delete:
3203 self._process_delete_db_tasks()
3204 busy = False
3205 """
3206 # Log RO tasks only when loglevel is DEBUG
3207 if self.logger.getEffectiveLevel() == logging.DEBUG:
3208 _ = self._get_db_all_tasks()
3209 """
3210 ro_task = self._get_db_task()
3211 if ro_task:
3212 self.logger.debug("Task to process: {}".format(ro_task))
3213 time.sleep(1)
3214 self._process_pending_tasks(ro_task)
3215 busy = True
3216 if not busy:
3217 time.sleep(5)
3218 except Exception as e:
3219 self.logger.critical(
3220 "Unexpected exception at run: " + str(e), exc_info=True
3221 )
3222
3223 self.logger.info("Finishing")