bca21e0cbdd814217ce2276ea46c6cc8c33e8785
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import makedirs
31 from os import path
32 import queue
33 import threading
34 import time
35 import traceback
36 from typing import Dict
37 from unittest.mock import Mock
38
39 from importlib_metadata import entry_points
40 from osm_common.dbbase import DbException
41 from osm_ng_ro.vim_admin import LockRenew
42 from osm_ro_plugin import sdnconn
43 from osm_ro_plugin import vimconn
44 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
45 from osm_ro_plugin.vim_dummy import VimDummyConnector
46 import yaml
47
48 __author__ = "Alfonso Tierno"
49 __date__ = "$28-Sep-2017 12:07:15$"
50
51
52 def deep_get(target_dict, *args, **kwargs):
53 """
54 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
55 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
56 :param target_dict: dictionary to be read
57 :param args: list of keys to read from target_dict
58 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
59 :return: The wanted value if exists, None or default otherwise
60 """
61 for key in args:
62 if not isinstance(target_dict, dict) or key not in target_dict:
63 return kwargs.get("default")
64 target_dict = target_dict[key]
65 return target_dict
66
67
68 class NsWorkerException(Exception):
69 pass
70
71
72 class FailingConnector:
73 def __init__(self, error_msg):
74 self.error_msg = error_msg
75
76 for method in dir(vimconn.VimConnector):
77 if method[0] != "_":
78 setattr(
79 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
80 )
81
82 for method in dir(sdnconn.SdnConnectorBase):
83 if method[0] != "_":
84 setattr(
85 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
86 )
87
88
89 class NsWorkerExceptionNotFound(NsWorkerException):
90 pass
91
92
93 class VimInteractionBase:
94 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
95 It implements methods that does nothing and return ok"""
96
97 def __init__(self, db, my_vims, db_vims, logger):
98 self.db = db
99 self.logger = logger
100 self.my_vims = my_vims
101 self.db_vims = db_vims
102
103 def new(self, ro_task, task_index, task_depends):
104 return "BUILD", {}
105
106 def refresh(self, ro_task):
107 """skip calling VIM to get image, flavor status. Assumes ok"""
108 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
109 return "FAILED", {}
110
111 return "DONE", {}
112
113 def delete(self, ro_task, task_index):
114 """skip calling VIM to delete image. Assumes ok"""
115 return "DONE", {}
116
117 def exec(self, ro_task, task_index, task_depends):
118 return "DONE", None, None
119
120
121 class VimInteractionNet(VimInteractionBase):
122 def new(self, ro_task, task_index, task_depends):
123 vim_net_id = None
124 task = ro_task["tasks"][task_index]
125 task_id = task["task_id"]
126 created = False
127 created_items = {}
128 target_vim = self.my_vims[ro_task["target_id"]]
129 mgmtnet = False
130 mgmtnet_defined_in_vim = False
131
132 try:
133 # FIND
134 if task.get("find_params"):
135 # if management, get configuration of VIM
136 if task["find_params"].get("filter_dict"):
137 vim_filter = task["find_params"]["filter_dict"]
138 # management network
139 elif task["find_params"].get("mgmt"):
140 mgmtnet = True
141 if deep_get(
142 self.db_vims[ro_task["target_id"]],
143 "config",
144 "management_network_id",
145 ):
146 mgmtnet_defined_in_vim = True
147 vim_filter = {
148 "id": self.db_vims[ro_task["target_id"]]["config"][
149 "management_network_id"
150 ]
151 }
152 elif deep_get(
153 self.db_vims[ro_task["target_id"]],
154 "config",
155 "management_network_name",
156 ):
157 mgmtnet_defined_in_vim = True
158 vim_filter = {
159 "name": self.db_vims[ro_task["target_id"]]["config"][
160 "management_network_name"
161 ]
162 }
163 else:
164 vim_filter = {"name": task["find_params"]["name"]}
165 else:
166 raise NsWorkerExceptionNotFound(
167 "Invalid find_params for new_net {}".format(task["find_params"])
168 )
169
170 vim_nets = target_vim.get_network_list(vim_filter)
171 if not vim_nets and not task.get("params"):
172 # If there is mgmt-network in the descriptor,
173 # there is no mapping of that network to a VIM network in the descriptor,
174 # also there is no mapping in the "--config" parameter or at VIM creation;
175 # that mgmt-network will be created.
176 if mgmtnet and not mgmtnet_defined_in_vim:
177 net_name = (
178 vim_filter.get("name")
179 if vim_filter.get("name")
180 else vim_filter.get("id")[:16]
181 )
182 vim_net_id, created_items = target_vim.new_network(
183 net_name, None
184 )
185 self.logger.debug(
186 "Created mgmt network vim_net_id: {}".format(vim_net_id)
187 )
188 created = True
189 else:
190 raise NsWorkerExceptionNotFound(
191 "Network not found with this criteria: '{}'".format(
192 task.get("find_params")
193 )
194 )
195 elif len(vim_nets) > 1:
196 raise NsWorkerException(
197 "More than one network found with this criteria: '{}'".format(
198 task["find_params"]
199 )
200 )
201
202 if vim_nets:
203 vim_net_id = vim_nets[0]["id"]
204 else:
205 # CREATE
206 params = task["params"]
207 vim_net_id, created_items = target_vim.new_network(**params)
208 created = True
209
210 ro_vim_item_update = {
211 "vim_id": vim_net_id,
212 "vim_status": "BUILD",
213 "created": created,
214 "created_items": created_items,
215 "vim_details": None,
216 "vim_message": None,
217 }
218 self.logger.debug(
219 "task={} {} new-net={} created={}".format(
220 task_id, ro_task["target_id"], vim_net_id, created
221 )
222 )
223
224 return "BUILD", ro_vim_item_update
225 except (vimconn.VimConnException, NsWorkerException) as e:
226 self.logger.error(
227 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
228 )
229 ro_vim_item_update = {
230 "vim_status": "VIM_ERROR",
231 "created": created,
232 "vim_message": str(e),
233 }
234
235 return "FAILED", ro_vim_item_update
236
237 def refresh(self, ro_task):
238 """Call VIM to get network status"""
239 ro_task_id = ro_task["_id"]
240 target_vim = self.my_vims[ro_task["target_id"]]
241 vim_id = ro_task["vim_info"]["vim_id"]
242 net_to_refresh_list = [vim_id]
243
244 try:
245 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
246 vim_info = vim_dict[vim_id]
247
248 if vim_info["status"] == "ACTIVE":
249 task_status = "DONE"
250 elif vim_info["status"] == "BUILD":
251 task_status = "BUILD"
252 else:
253 task_status = "FAILED"
254 except vimconn.VimConnException as e:
255 # Mark all tasks at VIM_ERROR status
256 self.logger.error(
257 "ro_task={} vim={} get-net={}: {}".format(
258 ro_task_id, ro_task["target_id"], vim_id, e
259 )
260 )
261 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
262 task_status = "FAILED"
263
264 ro_vim_item_update = {}
265 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
266 ro_vim_item_update["vim_status"] = vim_info["status"]
267
268 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
269 ro_vim_item_update["vim_name"] = vim_info.get("name")
270
271 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
272 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
273 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
274 elif vim_info["status"] == "DELETED":
275 ro_vim_item_update["vim_id"] = None
276 ro_vim_item_update["vim_message"] = "Deleted externally"
277 else:
278 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
279 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
280
281 if ro_vim_item_update:
282 self.logger.debug(
283 "ro_task={} {} get-net={}: status={} {}".format(
284 ro_task_id,
285 ro_task["target_id"],
286 vim_id,
287 ro_vim_item_update.get("vim_status"),
288 ro_vim_item_update.get("vim_message")
289 if ro_vim_item_update.get("vim_status") != "ACTIVE"
290 else "",
291 )
292 )
293
294 return task_status, ro_vim_item_update
295
296 def delete(self, ro_task, task_index):
297 task = ro_task["tasks"][task_index]
298 task_id = task["task_id"]
299 net_vim_id = ro_task["vim_info"]["vim_id"]
300 ro_vim_item_update_ok = {
301 "vim_status": "DELETED",
302 "created": False,
303 "vim_message": "DELETED",
304 "vim_id": None,
305 }
306
307 try:
308 if net_vim_id or ro_task["vim_info"]["created_items"]:
309 target_vim = self.my_vims[ro_task["target_id"]]
310 target_vim.delete_network(
311 net_vim_id, ro_task["vim_info"]["created_items"]
312 )
313 except vimconn.VimConnNotFoundException:
314 ro_vim_item_update_ok["vim_message"] = "already deleted"
315 except vimconn.VimConnException as e:
316 self.logger.error(
317 "ro_task={} vim={} del-net={}: {}".format(
318 ro_task["_id"], ro_task["target_id"], net_vim_id, e
319 )
320 )
321 ro_vim_item_update = {
322 "vim_status": "VIM_ERROR",
323 "vim_message": "Error while deleting: {}".format(e),
324 }
325
326 return "FAILED", ro_vim_item_update
327
328 self.logger.debug(
329 "task={} {} del-net={} {}".format(
330 task_id,
331 ro_task["target_id"],
332 net_vim_id,
333 ro_vim_item_update_ok.get("vim_message", ""),
334 )
335 )
336
337 return "DONE", ro_vim_item_update_ok
338
339
340 class VimInteractionClassification(VimInteractionBase):
341 def new(self, ro_task, task_index, task_depends):
342 task = ro_task["tasks"][task_index]
343 task_id = task["task_id"]
344 created = False
345 target_vim = self.my_vims[ro_task["target_id"]]
346
347 try:
348 created = True
349 params = task["params"]
350 params_copy = deepcopy(params)
351
352 name = params_copy.pop("name")
353 logical_source_port_index = int(
354 params_copy.pop("logical_source_port_index")
355 )
356 logical_source_port = params_copy["logical_source_port"]
357
358 if logical_source_port.startswith("TASK-"):
359 vm_id = task_depends[logical_source_port]
360 params_copy["logical_source_port"] = target_vim.refresh_vms_status(
361 [vm_id]
362 )[vm_id]["interfaces"][logical_source_port_index]["vim_interface_id"]
363
364 vim_classification_id = target_vim.new_classification(
365 name, "legacy_flow_classifier", params_copy
366 )
367
368 ro_vim_item_update = {
369 "vim_id": vim_classification_id,
370 "vim_status": "DONE",
371 "created": created,
372 "vim_details": None,
373 "vim_message": None,
374 }
375 self.logger.debug(
376 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
377 )
378
379 return "DONE", ro_vim_item_update
380 except (vimconn.VimConnException, NsWorkerException) as e:
381 self.logger.debug(traceback.format_exc())
382 self.logger.error(
383 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
384 )
385 ro_vim_item_update = {
386 "vim_status": "VIM_ERROR",
387 "created": created,
388 "vim_message": str(e),
389 }
390
391 return "FAILED", ro_vim_item_update
392
393 def delete(self, ro_task, task_index):
394 task = ro_task["tasks"][task_index]
395 task_id = task["task_id"]
396 classification_vim_id = ro_task["vim_info"]["vim_id"]
397 ro_vim_item_update_ok = {
398 "vim_status": "DELETED",
399 "created": False,
400 "vim_message": "DELETED",
401 "vim_id": None,
402 }
403
404 try:
405 if classification_vim_id:
406 target_vim = self.my_vims[ro_task["target_id"]]
407 target_vim.delete_classification(classification_vim_id)
408 except vimconn.VimConnNotFoundException:
409 ro_vim_item_update_ok["vim_message"] = "already deleted"
410 except vimconn.VimConnException as e:
411 self.logger.error(
412 "ro_task={} vim={} del-classification={}: {}".format(
413 ro_task["_id"], ro_task["target_id"], classification_vim_id, e
414 )
415 )
416 ro_vim_item_update = {
417 "vim_status": "VIM_ERROR",
418 "vim_message": "Error while deleting: {}".format(e),
419 }
420
421 return "FAILED", ro_vim_item_update
422
423 self.logger.debug(
424 "task={} {} del-classification={} {}".format(
425 task_id,
426 ro_task["target_id"],
427 classification_vim_id,
428 ro_vim_item_update_ok.get("vim_message", ""),
429 )
430 )
431
432 return "DONE", ro_vim_item_update_ok
433
434
435 class VimInteractionSfi(VimInteractionBase):
436 def new(self, ro_task, task_index, task_depends):
437 task = ro_task["tasks"][task_index]
438 task_id = task["task_id"]
439 created = False
440 target_vim = self.my_vims[ro_task["target_id"]]
441
442 try:
443 created = True
444 params = task["params"]
445 params_copy = deepcopy(params)
446 name = params_copy["name"]
447 ingress_port = params_copy["ingress_port"]
448 egress_port = params_copy["egress_port"]
449 ingress_port_index = params_copy["ingress_port_index"]
450 egress_port_index = params_copy["egress_port_index"]
451
452 ingress_port_id = ingress_port
453 egress_port_id = egress_port
454
455 vm_id = task_depends[ingress_port]
456
457 if ingress_port.startswith("TASK-"):
458 ingress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][
459 "interfaces"
460 ][ingress_port_index]["vim_interface_id"]
461
462 if ingress_port == egress_port:
463 egress_port_id = ingress_port_id
464 else:
465 if egress_port.startswith("TASK-"):
466 egress_port_id = target_vim.refresh_vms_status([vm_id])[vm_id][
467 "interfaces"
468 ][egress_port_index]["vim_interface_id"]
469
470 ingress_port_id_list = [ingress_port_id]
471 egress_port_id_list = [egress_port_id]
472
473 vim_sfi_id = target_vim.new_sfi(
474 name, ingress_port_id_list, egress_port_id_list, sfc_encap=False
475 )
476
477 ro_vim_item_update = {
478 "vim_id": vim_sfi_id,
479 "vim_status": "DONE",
480 "created": created,
481 "vim_details": None,
482 "vim_message": None,
483 }
484 self.logger.debug(
485 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
486 )
487
488 return "DONE", ro_vim_item_update
489 except (vimconn.VimConnException, NsWorkerException) as e:
490 self.logger.debug(traceback.format_exc())
491 self.logger.error(
492 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
493 )
494 ro_vim_item_update = {
495 "vim_status": "VIM_ERROR",
496 "created": created,
497 "vim_message": str(e),
498 }
499
500 return "FAILED", ro_vim_item_update
501
502 def delete(self, ro_task, task_index):
503 task = ro_task["tasks"][task_index]
504 task_id = task["task_id"]
505 sfi_vim_id = ro_task["vim_info"]["vim_id"]
506 ro_vim_item_update_ok = {
507 "vim_status": "DELETED",
508 "created": False,
509 "vim_message": "DELETED",
510 "vim_id": None,
511 }
512
513 try:
514 if sfi_vim_id:
515 target_vim = self.my_vims[ro_task["target_id"]]
516 target_vim.delete_sfi(sfi_vim_id)
517 except vimconn.VimConnNotFoundException:
518 ro_vim_item_update_ok["vim_message"] = "already deleted"
519 except vimconn.VimConnException as e:
520 self.logger.error(
521 "ro_task={} vim={} del-sfi={}: {}".format(
522 ro_task["_id"], ro_task["target_id"], sfi_vim_id, e
523 )
524 )
525 ro_vim_item_update = {
526 "vim_status": "VIM_ERROR",
527 "vim_message": "Error while deleting: {}".format(e),
528 }
529
530 return "FAILED", ro_vim_item_update
531
532 self.logger.debug(
533 "task={} {} del-sfi={} {}".format(
534 task_id,
535 ro_task["target_id"],
536 sfi_vim_id,
537 ro_vim_item_update_ok.get("vim_message", ""),
538 )
539 )
540
541 return "DONE", ro_vim_item_update_ok
542
543
544 class VimInteractionSf(VimInteractionBase):
545 def new(self, ro_task, task_index, task_depends):
546 task = ro_task["tasks"][task_index]
547 task_id = task["task_id"]
548 created = False
549 target_vim = self.my_vims[ro_task["target_id"]]
550
551 try:
552 created = True
553 params = task["params"]
554 params_copy = deepcopy(params)
555 name = params_copy["name"]
556 sfi_list = params_copy["sfis"]
557 sfi_id_list = []
558
559 for sfi in sfi_list:
560 sfi_id = task_depends[sfi] if sfi.startswith("TASK-") else sfi
561 sfi_id_list.append(sfi_id)
562
563 vim_sf_id = target_vim.new_sf(name, sfi_id_list, sfc_encap=False)
564
565 ro_vim_item_update = {
566 "vim_id": vim_sf_id,
567 "vim_status": "DONE",
568 "created": created,
569 "vim_details": None,
570 "vim_message": None,
571 }
572 self.logger.debug(
573 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
574 )
575
576 return "DONE", ro_vim_item_update
577 except (vimconn.VimConnException, NsWorkerException) as e:
578 self.logger.debug(traceback.format_exc())
579 self.logger.error(
580 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
581 )
582 ro_vim_item_update = {
583 "vim_status": "VIM_ERROR",
584 "created": created,
585 "vim_message": str(e),
586 }
587
588 return "FAILED", ro_vim_item_update
589
590 def delete(self, ro_task, task_index):
591 task = ro_task["tasks"][task_index]
592 task_id = task["task_id"]
593 sf_vim_id = ro_task["vim_info"]["vim_id"]
594 ro_vim_item_update_ok = {
595 "vim_status": "DELETED",
596 "created": False,
597 "vim_message": "DELETED",
598 "vim_id": None,
599 }
600
601 try:
602 if sf_vim_id:
603 target_vim = self.my_vims[ro_task["target_id"]]
604 target_vim.delete_sf(sf_vim_id)
605 except vimconn.VimConnNotFoundException:
606 ro_vim_item_update_ok["vim_message"] = "already deleted"
607 except vimconn.VimConnException as e:
608 self.logger.error(
609 "ro_task={} vim={} del-sf={}: {}".format(
610 ro_task["_id"], ro_task["target_id"], sf_vim_id, e
611 )
612 )
613 ro_vim_item_update = {
614 "vim_status": "VIM_ERROR",
615 "vim_message": "Error while deleting: {}".format(e),
616 }
617
618 return "FAILED", ro_vim_item_update
619
620 self.logger.debug(
621 "task={} {} del-sf={} {}".format(
622 task_id,
623 ro_task["target_id"],
624 sf_vim_id,
625 ro_vim_item_update_ok.get("vim_message", ""),
626 )
627 )
628
629 return "DONE", ro_vim_item_update_ok
630
631
632 class VimInteractionSfp(VimInteractionBase):
633 def new(self, ro_task, task_index, task_depends):
634 task = ro_task["tasks"][task_index]
635 task_id = task["task_id"]
636 created = False
637 target_vim = self.my_vims[ro_task["target_id"]]
638
639 try:
640 created = True
641 params = task["params"]
642 params_copy = deepcopy(params)
643 name = params_copy["name"]
644 sf_list = params_copy["sfs"]
645 classification_list = params_copy["classifications"]
646
647 classification_id_list = []
648 sf_id_list = []
649
650 for classification in classification_list:
651 classi_id = (
652 task_depends[classification]
653 if classification.startswith("TASK-")
654 else classification
655 )
656 classification_id_list.append(classi_id)
657
658 for sf in sf_list:
659 sf_id = task_depends[sf] if sf.startswith("TASK-") else sf
660 sf_id_list.append(sf_id)
661
662 vim_sfp_id = target_vim.new_sfp(
663 name, classification_id_list, sf_id_list, sfc_encap=False
664 )
665
666 ro_vim_item_update = {
667 "vim_id": vim_sfp_id,
668 "vim_status": "DONE",
669 "created": created,
670 "vim_details": None,
671 "vim_message": None,
672 }
673 self.logger.debug(
674 "task={} {} created={}".format(task_id, ro_task["target_id"], created)
675 )
676
677 return "DONE", ro_vim_item_update
678 except (vimconn.VimConnException, NsWorkerException) as e:
679 self.logger.debug(traceback.format_exc())
680 self.logger.error(
681 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
682 )
683 ro_vim_item_update = {
684 "vim_status": "VIM_ERROR",
685 "created": created,
686 "vim_message": str(e),
687 }
688
689 return "FAILED", ro_vim_item_update
690
691 def delete(self, ro_task, task_index):
692 task = ro_task["tasks"][task_index]
693 task_id = task["task_id"]
694 sfp_vim_id = ro_task["vim_info"]["vim_id"]
695 ro_vim_item_update_ok = {
696 "vim_status": "DELETED",
697 "created": False,
698 "vim_message": "DELETED",
699 "vim_id": None,
700 }
701
702 try:
703 if sfp_vim_id:
704 target_vim = self.my_vims[ro_task["target_id"]]
705 target_vim.delete_sfp(sfp_vim_id)
706 except vimconn.VimConnNotFoundException:
707 ro_vim_item_update_ok["vim_message"] = "already deleted"
708 except vimconn.VimConnException as e:
709 self.logger.error(
710 "ro_task={} vim={} del-sfp={}: {}".format(
711 ro_task["_id"], ro_task["target_id"], sfp_vim_id, e
712 )
713 )
714 ro_vim_item_update = {
715 "vim_status": "VIM_ERROR",
716 "vim_message": "Error while deleting: {}".format(e),
717 }
718
719 return "FAILED", ro_vim_item_update
720
721 self.logger.debug(
722 "task={} {} del-sfp={} {}".format(
723 task_id,
724 ro_task["target_id"],
725 sfp_vim_id,
726 ro_vim_item_update_ok.get("vim_message", ""),
727 )
728 )
729
730 return "DONE", ro_vim_item_update_ok
731
732
733 class VimInteractionVdu(VimInteractionBase):
734 max_retries_inject_ssh_key = 20 # 20 times
735 time_retries_inject_ssh_key = 30 # wevery 30 seconds
736
737 def new(self, ro_task, task_index, task_depends):
738 task = ro_task["tasks"][task_index]
739 task_id = task["task_id"]
740 created = False
741 target_vim = self.my_vims[ro_task["target_id"]]
742 try:
743 created = True
744 params = task["params"]
745 params_copy = deepcopy(params)
746 net_list = params_copy["net_list"]
747
748 for net in net_list:
749 # change task_id into network_id
750 if "net_id" in net and net["net_id"].startswith("TASK-"):
751 network_id = task_depends[net["net_id"]]
752
753 if not network_id:
754 raise NsWorkerException(
755 "Cannot create VM because depends on a network not created or found "
756 "for {}".format(net["net_id"])
757 )
758
759 net["net_id"] = network_id
760
761 if params_copy["image_id"].startswith("TASK-"):
762 params_copy["image_id"] = task_depends[params_copy["image_id"]]
763
764 if params_copy["flavor_id"].startswith("TASK-"):
765 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
766
767 affinity_group_list = params_copy["affinity_group_list"]
768 for affinity_group in affinity_group_list:
769 # change task_id into affinity_group_id
770 if "affinity_group_id" in affinity_group and affinity_group[
771 "affinity_group_id"
772 ].startswith("TASK-"):
773 affinity_group_id = task_depends[
774 affinity_group["affinity_group_id"]
775 ]
776
777 if not affinity_group_id:
778 raise NsWorkerException(
779 "found for {}".format(affinity_group["affinity_group_id"])
780 )
781
782 affinity_group["affinity_group_id"] = affinity_group_id
783 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
784 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
785
786 # add to created items previous_created_volumes (healing)
787 if task.get("previous_created_volumes"):
788 for k, v in task["previous_created_volumes"].items():
789 created_items[k] = v
790
791 ro_vim_item_update = {
792 "vim_id": vim_vm_id,
793 "vim_status": "BUILD",
794 "created": created,
795 "created_items": created_items,
796 "vim_details": None,
797 "vim_message": None,
798 "interfaces_vim_ids": interfaces,
799 "interfaces": [],
800 "interfaces_backup": [],
801 }
802 self.logger.debug(
803 "task={} {} new-vm={} created={}".format(
804 task_id, ro_task["target_id"], vim_vm_id, created
805 )
806 )
807
808 return "BUILD", ro_vim_item_update
809 except (vimconn.VimConnException, NsWorkerException) as e:
810 self.logger.debug(traceback.format_exc())
811 self.logger.error(
812 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
813 )
814 ro_vim_item_update = {
815 "vim_status": "VIM_ERROR",
816 "created": created,
817 "vim_message": str(e),
818 }
819
820 return "FAILED", ro_vim_item_update
821
822 def delete(self, ro_task, task_index):
823 task = ro_task["tasks"][task_index]
824 task_id = task["task_id"]
825 vm_vim_id = ro_task["vim_info"]["vim_id"]
826 ro_vim_item_update_ok = {
827 "vim_status": "DELETED",
828 "created": False,
829 "vim_message": "DELETED",
830 "vim_id": None,
831 }
832
833 try:
834 self.logger.debug(
835 "delete_vminstance: vm_vim_id={} created_items={}".format(
836 vm_vim_id, ro_task["vim_info"]["created_items"]
837 )
838 )
839 if vm_vim_id or ro_task["vim_info"]["created_items"]:
840 target_vim = self.my_vims[ro_task["target_id"]]
841 target_vim.delete_vminstance(
842 vm_vim_id,
843 ro_task["vim_info"]["created_items"],
844 ro_task["vim_info"].get("volumes_to_hold", []),
845 )
846 except vimconn.VimConnNotFoundException:
847 ro_vim_item_update_ok["vim_message"] = "already deleted"
848 except vimconn.VimConnException as e:
849 self.logger.error(
850 "ro_task={} vim={} del-vm={}: {}".format(
851 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
852 )
853 )
854 ro_vim_item_update = {
855 "vim_status": "VIM_ERROR",
856 "vim_message": "Error while deleting: {}".format(e),
857 }
858
859 return "FAILED", ro_vim_item_update
860
861 self.logger.debug(
862 "task={} {} del-vm={} {}".format(
863 task_id,
864 ro_task["target_id"],
865 vm_vim_id,
866 ro_vim_item_update_ok.get("vim_message", ""),
867 )
868 )
869
870 return "DONE", ro_vim_item_update_ok
871
872 def refresh(self, ro_task):
873 """Call VIM to get vm status"""
874 ro_task_id = ro_task["_id"]
875 target_vim = self.my_vims[ro_task["target_id"]]
876 vim_id = ro_task["vim_info"]["vim_id"]
877
878 if not vim_id:
879 return None, None
880
881 vm_to_refresh_list = [vim_id]
882 try:
883 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
884 vim_info = vim_dict[vim_id]
885
886 if vim_info["status"] == "ACTIVE":
887 task_status = "DONE"
888 elif vim_info["status"] == "BUILD":
889 task_status = "BUILD"
890 else:
891 task_status = "FAILED"
892
893 # try to load and parse vim_information
894 try:
895 vim_info_info = yaml.safe_load(vim_info["vim_info"])
896 if vim_info_info.get("name"):
897 vim_info["name"] = vim_info_info["name"]
898 except Exception as vim_info_error:
899 self.logger.exception(
900 f"{vim_info_error} occured while getting the vim_info from yaml"
901 )
902 except vimconn.VimConnException as e:
903 # Mark all tasks at VIM_ERROR status
904 self.logger.error(
905 "ro_task={} vim={} get-vm={}: {}".format(
906 ro_task_id, ro_task["target_id"], vim_id, e
907 )
908 )
909 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
910 task_status = "FAILED"
911
912 ro_vim_item_update = {}
913
914 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
915 vim_interfaces = []
916 if vim_info.get("interfaces"):
917 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
918 iface = next(
919 (
920 iface
921 for iface in vim_info["interfaces"]
922 if vim_iface_id == iface["vim_interface_id"]
923 ),
924 None,
925 )
926 # if iface:
927 # iface.pop("vim_info", None)
928 vim_interfaces.append(iface)
929
930 task_create = next(
931 t
932 for t in ro_task["tasks"]
933 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
934 )
935 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
936 vim_interfaces[task_create["mgmt_vnf_interface"]][
937 "mgmt_vnf_interface"
938 ] = True
939
940 mgmt_vdu_iface = task_create.get(
941 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
942 )
943 if vim_interfaces:
944 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
945
946 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
947 ro_vim_item_update["interfaces"] = vim_interfaces
948
949 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
950 ro_vim_item_update["vim_status"] = vim_info["status"]
951
952 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
953 ro_vim_item_update["vim_name"] = vim_info.get("name")
954
955 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
956 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
957 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
958 elif vim_info["status"] == "DELETED":
959 ro_vim_item_update["vim_id"] = None
960 ro_vim_item_update["vim_message"] = "Deleted externally"
961 else:
962 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
963 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
964
965 if ro_vim_item_update:
966 self.logger.debug(
967 "ro_task={} {} get-vm={}: status={} {}".format(
968 ro_task_id,
969 ro_task["target_id"],
970 vim_id,
971 ro_vim_item_update.get("vim_status"),
972 ro_vim_item_update.get("vim_message")
973 if ro_vim_item_update.get("vim_status") != "ACTIVE"
974 else "",
975 )
976 )
977
978 return task_status, ro_vim_item_update
979
980 def exec(self, ro_task, task_index, task_depends):
981 task = ro_task["tasks"][task_index]
982 task_id = task["task_id"]
983 target_vim = self.my_vims[ro_task["target_id"]]
984 db_task_update = {"retries": 0}
985 retries = task.get("retries", 0)
986
987 try:
988 params = task["params"]
989 params_copy = deepcopy(params)
990 params_copy["ro_key"] = self.db.decrypt(
991 params_copy.pop("private_key"),
992 params_copy.pop("schema_version"),
993 params_copy.pop("salt"),
994 )
995 params_copy["ip_addr"] = params_copy.pop("ip_address")
996 target_vim.inject_user_key(**params_copy)
997 self.logger.debug(
998 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
999 )
1000
1001 return (
1002 "DONE",
1003 None,
1004 db_task_update,
1005 ) # params_copy["key"]
1006 except (vimconn.VimConnException, NsWorkerException) as e:
1007 retries += 1
1008
1009 self.logger.debug(traceback.format_exc())
1010 if retries < self.max_retries_inject_ssh_key:
1011 return (
1012 "BUILD",
1013 None,
1014 {
1015 "retries": retries,
1016 "next_retry": self.time_retries_inject_ssh_key,
1017 },
1018 )
1019
1020 self.logger.error(
1021 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
1022 )
1023 ro_vim_item_update = {"vim_message": str(e)}
1024
1025 return "FAILED", ro_vim_item_update, db_task_update
1026
1027
1028 class VimInteractionImage(VimInteractionBase):
1029 def new(self, ro_task, task_index, task_depends):
1030 task = ro_task["tasks"][task_index]
1031 task_id = task["task_id"]
1032 created = False
1033 created_items = {}
1034 target_vim = self.my_vims[ro_task["target_id"]]
1035
1036 try:
1037 # FIND
1038 vim_image_id = ""
1039 if task.get("find_params"):
1040 vim_images = target_vim.get_image_list(
1041 task["find_params"].get("filter_dict", {})
1042 )
1043
1044 if not vim_images:
1045 raise NsWorkerExceptionNotFound(
1046 "Image not found with this criteria: '{}'".format(
1047 task["find_params"]
1048 )
1049 )
1050 elif len(vim_images) > 1:
1051 raise NsWorkerException(
1052 "More than one image found with this criteria: '{}'".format(
1053 task["find_params"]
1054 )
1055 )
1056 else:
1057 vim_image_id = vim_images[0]["id"]
1058
1059 ro_vim_item_update = {
1060 "vim_id": vim_image_id,
1061 "vim_status": "ACTIVE",
1062 "created": created,
1063 "created_items": created_items,
1064 "vim_details": None,
1065 "vim_message": None,
1066 }
1067 self.logger.debug(
1068 "task={} {} new-image={} created={}".format(
1069 task_id, ro_task["target_id"], vim_image_id, created
1070 )
1071 )
1072
1073 return "DONE", ro_vim_item_update
1074 except (NsWorkerException, vimconn.VimConnException) as e:
1075 self.logger.error(
1076 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
1077 )
1078 ro_vim_item_update = {
1079 "vim_status": "VIM_ERROR",
1080 "created": created,
1081 "vim_message": str(e),
1082 }
1083
1084 return "FAILED", ro_vim_item_update
1085
1086
1087 class VimInteractionSharedVolume(VimInteractionBase):
1088 def delete(self, ro_task, task_index):
1089 task = ro_task["tasks"][task_index]
1090 task_id = task["task_id"]
1091 shared_volume_vim_id = ro_task["vim_info"]["vim_id"]
1092 created_items = ro_task["vim_info"]["created_items"]
1093 ro_vim_item_update_ok = {
1094 "vim_status": "DELETED",
1095 "created": False,
1096 "vim_message": "DELETED",
1097 "vim_id": None,
1098 }
1099 if created_items and created_items.get(shared_volume_vim_id).get("keep"):
1100 ro_vim_item_update_ok = {
1101 "vim_status": "ACTIVE",
1102 "created": False,
1103 "vim_message": None,
1104 }
1105 return "DONE", ro_vim_item_update_ok
1106 try:
1107 if shared_volume_vim_id:
1108 target_vim = self.my_vims[ro_task["target_id"]]
1109 target_vim.delete_shared_volumes(shared_volume_vim_id)
1110 except vimconn.VimConnNotFoundException:
1111 ro_vim_item_update_ok["vim_message"] = "already deleted"
1112 except vimconn.VimConnException as e:
1113 self.logger.error(
1114 "ro_task={} vim={} del-shared-volume={}: {}".format(
1115 ro_task["_id"], ro_task["target_id"], shared_volume_vim_id, e
1116 )
1117 )
1118 ro_vim_item_update = {
1119 "vim_status": "VIM_ERROR",
1120 "vim_message": "Error while deleting: {}".format(e),
1121 }
1122
1123 return "FAILED", ro_vim_item_update
1124
1125 self.logger.debug(
1126 "task={} {} del-shared-volume={} {}".format(
1127 task_id,
1128 ro_task["target_id"],
1129 shared_volume_vim_id,
1130 ro_vim_item_update_ok.get("vim_message", ""),
1131 )
1132 )
1133
1134 return "DONE", ro_vim_item_update_ok
1135
1136 def new(self, ro_task, task_index, task_depends):
1137 task = ro_task["tasks"][task_index]
1138 task_id = task["task_id"]
1139 created = False
1140 created_items = {}
1141 target_vim = self.my_vims[ro_task["target_id"]]
1142
1143 try:
1144 shared_volume_vim_id = None
1145 shared_volume_data = None
1146
1147 if task.get("params"):
1148 shared_volume_data = task["params"]
1149
1150 if shared_volume_data:
1151 self.logger.info(
1152 f"Creating the new shared_volume for {shared_volume_data}\n"
1153 )
1154 (
1155 shared_volume_name,
1156 shared_volume_vim_id,
1157 ) = target_vim.new_shared_volumes(shared_volume_data)
1158 created = True
1159 created_items[shared_volume_vim_id] = {
1160 "name": shared_volume_name,
1161 "keep": shared_volume_data.get("keep"),
1162 }
1163
1164 ro_vim_item_update = {
1165 "vim_id": shared_volume_vim_id,
1166 "vim_status": "ACTIVE",
1167 "created": created,
1168 "created_items": created_items,
1169 "vim_details": None,
1170 "vim_message": None,
1171 }
1172 self.logger.debug(
1173 "task={} {} new-shared-volume={} created={}".format(
1174 task_id, ro_task["target_id"], shared_volume_vim_id, created
1175 )
1176 )
1177
1178 return "DONE", ro_vim_item_update
1179 except (vimconn.VimConnException, NsWorkerException) as e:
1180 self.logger.error(
1181 "task={} vim={} new-shared-volume:"
1182 " {}".format(task_id, ro_task["target_id"], e)
1183 )
1184 ro_vim_item_update = {
1185 "vim_status": "VIM_ERROR",
1186 "created": created,
1187 "vim_message": str(e),
1188 }
1189
1190 return "FAILED", ro_vim_item_update
1191
1192
1193 class VimInteractionFlavor(VimInteractionBase):
1194 def delete(self, ro_task, task_index):
1195 task = ro_task["tasks"][task_index]
1196 task_id = task["task_id"]
1197 flavor_vim_id = ro_task["vim_info"]["vim_id"]
1198 ro_vim_item_update_ok = {
1199 "vim_status": "DELETED",
1200 "created": False,
1201 "vim_message": "DELETED",
1202 "vim_id": None,
1203 }
1204
1205 try:
1206 if flavor_vim_id:
1207 target_vim = self.my_vims[ro_task["target_id"]]
1208 target_vim.delete_flavor(flavor_vim_id)
1209 except vimconn.VimConnNotFoundException:
1210 ro_vim_item_update_ok["vim_message"] = "already deleted"
1211 except vimconn.VimConnException as e:
1212 self.logger.error(
1213 "ro_task={} vim={} del-flavor={}: {}".format(
1214 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
1215 )
1216 )
1217 ro_vim_item_update = {
1218 "vim_status": "VIM_ERROR",
1219 "vim_message": "Error while deleting: {}".format(e),
1220 }
1221
1222 return "FAILED", ro_vim_item_update
1223
1224 self.logger.debug(
1225 "task={} {} del-flavor={} {}".format(
1226 task_id,
1227 ro_task["target_id"],
1228 flavor_vim_id,
1229 ro_vim_item_update_ok.get("vim_message", ""),
1230 )
1231 )
1232
1233 return "DONE", ro_vim_item_update_ok
1234
1235 def new(self, ro_task, task_index, task_depends):
1236 task = ro_task["tasks"][task_index]
1237 task_id = task["task_id"]
1238 created = False
1239 created_items = {}
1240 target_vim = self.my_vims[ro_task["target_id"]]
1241 try:
1242 # FIND
1243 vim_flavor_id = None
1244
1245 if task.get("find_params", {}).get("vim_flavor_id"):
1246 vim_flavor_id = task["find_params"]["vim_flavor_id"]
1247 elif task.get("find_params", {}).get("flavor_data"):
1248 try:
1249 flavor_data = task["find_params"]["flavor_data"]
1250 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
1251 except vimconn.VimConnNotFoundException as flavor_not_found_msg:
1252 self.logger.warning(
1253 f"VimConnNotFoundException occured: {flavor_not_found_msg}"
1254 )
1255
1256 if not vim_flavor_id and task.get("params"):
1257 # CREATE
1258 flavor_data = task["params"]["flavor_data"]
1259 vim_flavor_id = target_vim.new_flavor(flavor_data)
1260 created = True
1261
1262 ro_vim_item_update = {
1263 "vim_id": vim_flavor_id,
1264 "vim_status": "ACTIVE",
1265 "created": created,
1266 "created_items": created_items,
1267 "vim_details": None,
1268 "vim_message": None,
1269 }
1270 self.logger.debug(
1271 "task={} {} new-flavor={} created={}".format(
1272 task_id, ro_task["target_id"], vim_flavor_id, created
1273 )
1274 )
1275
1276 return "DONE", ro_vim_item_update
1277 except (vimconn.VimConnException, NsWorkerException) as e:
1278 self.logger.error(
1279 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
1280 )
1281 ro_vim_item_update = {
1282 "vim_status": "VIM_ERROR",
1283 "created": created,
1284 "vim_message": str(e),
1285 }
1286
1287 return "FAILED", ro_vim_item_update
1288
1289
1290 class VimInteractionAffinityGroup(VimInteractionBase):
1291 def delete(self, ro_task, task_index):
1292 task = ro_task["tasks"][task_index]
1293 task_id = task["task_id"]
1294 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
1295 ro_vim_item_update_ok = {
1296 "vim_status": "DELETED",
1297 "created": False,
1298 "vim_message": "DELETED",
1299 "vim_id": None,
1300 }
1301
1302 try:
1303 if affinity_group_vim_id:
1304 target_vim = self.my_vims[ro_task["target_id"]]
1305 target_vim.delete_affinity_group(affinity_group_vim_id)
1306 except vimconn.VimConnNotFoundException:
1307 ro_vim_item_update_ok["vim_message"] = "already deleted"
1308 except vimconn.VimConnException as e:
1309 self.logger.error(
1310 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
1311 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
1312 )
1313 )
1314 ro_vim_item_update = {
1315 "vim_status": "VIM_ERROR",
1316 "vim_message": "Error while deleting: {}".format(e),
1317 }
1318
1319 return "FAILED", ro_vim_item_update
1320
1321 self.logger.debug(
1322 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
1323 task_id,
1324 ro_task["target_id"],
1325 affinity_group_vim_id,
1326 ro_vim_item_update_ok.get("vim_message", ""),
1327 )
1328 )
1329
1330 return "DONE", ro_vim_item_update_ok
1331
1332 def new(self, ro_task, task_index, task_depends):
1333 task = ro_task["tasks"][task_index]
1334 task_id = task["task_id"]
1335 created = False
1336 created_items = {}
1337 target_vim = self.my_vims[ro_task["target_id"]]
1338
1339 try:
1340 affinity_group_vim_id = None
1341 affinity_group_data = None
1342 param_affinity_group_id = ""
1343
1344 if task.get("params"):
1345 affinity_group_data = task["params"].get("affinity_group_data")
1346
1347 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
1348 try:
1349 param_affinity_group_id = task["params"]["affinity_group_data"].get(
1350 "vim-affinity-group-id"
1351 )
1352 affinity_group_vim_id = target_vim.get_affinity_group(
1353 param_affinity_group_id
1354 ).get("id")
1355 except vimconn.VimConnNotFoundException:
1356 self.logger.error(
1357 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
1358 "could not be found at VIM. Creating a new one.".format(
1359 task_id, ro_task["target_id"], param_affinity_group_id
1360 )
1361 )
1362
1363 if not affinity_group_vim_id and affinity_group_data:
1364 affinity_group_vim_id = target_vim.new_affinity_group(
1365 affinity_group_data
1366 )
1367 created = True
1368
1369 ro_vim_item_update = {
1370 "vim_id": affinity_group_vim_id,
1371 "vim_status": "ACTIVE",
1372 "created": created,
1373 "created_items": created_items,
1374 "vim_details": None,
1375 "vim_message": None,
1376 }
1377 self.logger.debug(
1378 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
1379 task_id, ro_task["target_id"], affinity_group_vim_id, created
1380 )
1381 )
1382
1383 return "DONE", ro_vim_item_update
1384 except (vimconn.VimConnException, NsWorkerException) as e:
1385 self.logger.error(
1386 "task={} vim={} new-affinity-or-anti-affinity-group:"
1387 " {}".format(task_id, ro_task["target_id"], e)
1388 )
1389 ro_vim_item_update = {
1390 "vim_status": "VIM_ERROR",
1391 "created": created,
1392 "vim_message": str(e),
1393 }
1394
1395 return "FAILED", ro_vim_item_update
1396
1397
1398 class VimInteractionUpdateVdu(VimInteractionBase):
1399 def exec(self, ro_task, task_index, task_depends):
1400 task = ro_task["tasks"][task_index]
1401 task_id = task["task_id"]
1402 db_task_update = {"retries": 0}
1403 created = False
1404 created_items = {}
1405 target_vim = self.my_vims[ro_task["target_id"]]
1406
1407 try:
1408 vim_vm_id = ""
1409 if task.get("params"):
1410 vim_vm_id = task["params"].get("vim_vm_id")
1411 action = task["params"].get("action")
1412 context = {action: action}
1413 target_vim.action_vminstance(vim_vm_id, context)
1414 # created = True
1415 ro_vim_item_update = {
1416 "vim_id": vim_vm_id,
1417 "vim_status": "ACTIVE",
1418 "created": created,
1419 "created_items": created_items,
1420 "vim_details": None,
1421 "vim_message": None,
1422 }
1423 self.logger.debug(
1424 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1425 )
1426 return "DONE", ro_vim_item_update, db_task_update
1427 except (vimconn.VimConnException, NsWorkerException) as e:
1428 self.logger.error(
1429 "task={} vim={} VM Migration:"
1430 " {}".format(task_id, ro_task["target_id"], e)
1431 )
1432 ro_vim_item_update = {
1433 "vim_status": "VIM_ERROR",
1434 "created": created,
1435 "vim_message": str(e),
1436 }
1437
1438 return "FAILED", ro_vim_item_update, db_task_update
1439
1440
1441 class VimInteractionSdnNet(VimInteractionBase):
1442 @staticmethod
1443 def _match_pci(port_pci, mapping):
1444 """
1445 Check if port_pci matches with mapping.
1446 The mapping can have brackets to indicate that several chars are accepted. e.g
1447 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
1448 :param port_pci: text
1449 :param mapping: text, can contain brackets to indicate several chars are available
1450 :return: True if matches, False otherwise
1451 """
1452 if not port_pci or not mapping:
1453 return False
1454 if port_pci == mapping:
1455 return True
1456
1457 mapping_index = 0
1458 pci_index = 0
1459 while True:
1460 bracket_start = mapping.find("[", mapping_index)
1461
1462 if bracket_start == -1:
1463 break
1464
1465 bracket_end = mapping.find("]", bracket_start)
1466 if bracket_end == -1:
1467 break
1468
1469 length = bracket_start - mapping_index
1470 if (
1471 length
1472 and port_pci[pci_index : pci_index + length]
1473 != mapping[mapping_index:bracket_start]
1474 ):
1475 return False
1476
1477 if (
1478 port_pci[pci_index + length]
1479 not in mapping[bracket_start + 1 : bracket_end]
1480 ):
1481 return False
1482
1483 pci_index += length + 1
1484 mapping_index = bracket_end + 1
1485
1486 if port_pci[pci_index:] != mapping[mapping_index:]:
1487 return False
1488
1489 return True
1490
1491 def _get_interfaces(self, vlds_to_connect, vim_account_id):
1492 """
1493 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
1494 :param vim_account_id:
1495 :return:
1496 """
1497 interfaces = []
1498
1499 for vld in vlds_to_connect:
1500 table, _, db_id = vld.partition(":")
1501 db_id, _, vld = db_id.partition(":")
1502 _, _, vld_id = vld.partition(".")
1503
1504 if table == "vnfrs":
1505 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
1506 iface_key = "vnf-vld-id"
1507 else: # table == "nsrs"
1508 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1509 iface_key = "ns-vld-id"
1510
1511 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1512
1513 for db_vnfr in db_vnfrs:
1514 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1515 for iface_index, interface in enumerate(vdur["interfaces"]):
1516 if interface.get(iface_key) == vld_id and interface.get(
1517 "type"
1518 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1519 # only SR-IOV o PT
1520 interface_ = interface.copy()
1521 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1522 db_vnfr["_id"], vdu_index, iface_index
1523 )
1524
1525 if vdur.get("status") == "ERROR":
1526 interface_["status"] = "ERROR"
1527
1528 interfaces.append(interface_)
1529
1530 return interfaces
1531
1532 def refresh(self, ro_task):
1533 # look for task create
1534 task_create_index, _ = next(
1535 i_t
1536 for i_t in enumerate(ro_task["tasks"])
1537 if i_t[1]
1538 and i_t[1]["action"] == "CREATE"
1539 and i_t[1]["status"] != "FINISHED"
1540 )
1541
1542 return self.new(ro_task, task_create_index, None)
1543
1544 def new(self, ro_task, task_index, task_depends):
1545 task = ro_task["tasks"][task_index]
1546 task_id = task["task_id"]
1547 target_vim = self.my_vims[ro_task["target_id"]]
1548
1549 sdn_net_id = ro_task["vim_info"]["vim_id"]
1550
1551 created_items = ro_task["vim_info"].get("created_items")
1552 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1553 new_connected_ports = []
1554 last_update = ro_task["vim_info"].get("last_update", 0)
1555 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1556 error_list = []
1557 created = ro_task["vim_info"].get("created", False)
1558
1559 try:
1560 # CREATE
1561 db_vim = {}
1562 params = task["params"]
1563 vlds_to_connect = params.get("vlds", [])
1564 associated_vim = params.get("target_vim")
1565 # external additional ports
1566 additional_ports = params.get("sdn-ports") or ()
1567 _, _, vim_account_id = (
1568 (None, None, None)
1569 if associated_vim is None
1570 else associated_vim.partition(":")
1571 )
1572
1573 if associated_vim:
1574 # get associated VIM
1575 if associated_vim not in self.db_vims:
1576 self.db_vims[associated_vim] = self.db.get_one(
1577 "vim_accounts", {"_id": vim_account_id}
1578 )
1579
1580 db_vim = self.db_vims[associated_vim]
1581
1582 # look for ports to connect
1583 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1584 # print(ports)
1585
1586 sdn_ports = []
1587 pending_ports = error_ports = 0
1588 vlan_used = None
1589 sdn_need_update = False
1590
1591 for port in ports:
1592 vlan_used = port.get("vlan") or vlan_used
1593
1594 # TODO. Do not connect if already done
1595 if not port.get("compute_node") or not port.get("pci"):
1596 if port.get("status") == "ERROR":
1597 error_ports += 1
1598 else:
1599 pending_ports += 1
1600 continue
1601
1602 pmap = None
1603 compute_node_mappings = next(
1604 (
1605 c
1606 for c in db_vim["config"].get("sdn-port-mapping", ())
1607 if c and c["compute_node"] == port["compute_node"]
1608 ),
1609 None,
1610 )
1611
1612 if compute_node_mappings:
1613 # process port_mapping pci of type 0000:af:1[01].[1357]
1614 pmap = next(
1615 (
1616 p
1617 for p in compute_node_mappings["ports"]
1618 if self._match_pci(port["pci"], p.get("pci"))
1619 ),
1620 None,
1621 )
1622
1623 if not pmap:
1624 if not db_vim["config"].get("mapping_not_needed"):
1625 error_list.append(
1626 "Port mapping not found for compute_node={} pci={}".format(
1627 port["compute_node"], port["pci"]
1628 )
1629 )
1630 continue
1631
1632 pmap = {}
1633
1634 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1635 new_port = {
1636 "service_endpoint_id": pmap.get("service_endpoint_id")
1637 or service_endpoint_id,
1638 "service_endpoint_encapsulation_type": "dot1q"
1639 if port["type"] == "SR-IOV"
1640 else None,
1641 "service_endpoint_encapsulation_info": {
1642 "vlan": port.get("vlan"),
1643 "mac": port.get("mac-address"),
1644 "device_id": pmap.get("device_id") or port["compute_node"],
1645 "device_interface_id": pmap.get("device_interface_id")
1646 or port["pci"],
1647 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1648 "switch_port": pmap.get("switch_port"),
1649 "service_mapping_info": pmap.get("service_mapping_info"),
1650 },
1651 }
1652
1653 # TODO
1654 # if port["modified_at"] > last_update:
1655 # sdn_need_update = True
1656 new_connected_ports.append(port["id"]) # TODO
1657 sdn_ports.append(new_port)
1658
1659 if error_ports:
1660 error_list.append(
1661 "{} interfaces have not been created as VDU is on ERROR status".format(
1662 error_ports
1663 )
1664 )
1665
1666 # connect external ports
1667 for index, additional_port in enumerate(additional_ports):
1668 additional_port_id = additional_port.get(
1669 "service_endpoint_id"
1670 ) or "external-{}".format(index)
1671 sdn_ports.append(
1672 {
1673 "service_endpoint_id": additional_port_id,
1674 "service_endpoint_encapsulation_type": additional_port.get(
1675 "service_endpoint_encapsulation_type", "dot1q"
1676 ),
1677 "service_endpoint_encapsulation_info": {
1678 "vlan": additional_port.get("vlan") or vlan_used,
1679 "mac": additional_port.get("mac_address"),
1680 "device_id": additional_port.get("device_id"),
1681 "device_interface_id": additional_port.get(
1682 "device_interface_id"
1683 ),
1684 "switch_dpid": additional_port.get("switch_dpid")
1685 or additional_port.get("switch_id"),
1686 "switch_port": additional_port.get("switch_port"),
1687 "service_mapping_info": additional_port.get(
1688 "service_mapping_info"
1689 ),
1690 },
1691 }
1692 )
1693 new_connected_ports.append(additional_port_id)
1694 sdn_info = ""
1695
1696 # if there are more ports to connect or they have been modified, call create/update
1697 if error_list:
1698 sdn_status = "ERROR"
1699 sdn_info = "; ".join(error_list)
1700 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1701 last_update = time.time()
1702
1703 if not sdn_net_id:
1704 if len(sdn_ports) < 2:
1705 sdn_status = "ACTIVE"
1706
1707 if not pending_ports:
1708 self.logger.debug(
1709 "task={} {} new-sdn-net done, less than 2 ports".format(
1710 task_id, ro_task["target_id"]
1711 )
1712 )
1713 else:
1714 net_type = params.get("type") or "ELAN"
1715 (
1716 sdn_net_id,
1717 created_items,
1718 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1719 created = True
1720 self.logger.debug(
1721 "task={} {} new-sdn-net={} created={}".format(
1722 task_id, ro_task["target_id"], sdn_net_id, created
1723 )
1724 )
1725 else:
1726 created_items = target_vim.edit_connectivity_service(
1727 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1728 )
1729 created = True
1730 self.logger.debug(
1731 "task={} {} update-sdn-net={} created={}".format(
1732 task_id, ro_task["target_id"], sdn_net_id, created
1733 )
1734 )
1735
1736 connected_ports = new_connected_ports
1737 elif sdn_net_id:
1738 wim_status_dict = target_vim.get_connectivity_service_status(
1739 sdn_net_id, conn_info=created_items
1740 )
1741 sdn_status = wim_status_dict["sdn_status"]
1742
1743 if wim_status_dict.get("sdn_info"):
1744 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1745
1746 if wim_status_dict.get("error_msg"):
1747 sdn_info = wim_status_dict.get("error_msg") or ""
1748
1749 if pending_ports:
1750 if sdn_status != "ERROR":
1751 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1752 len(ports) - pending_ports, len(ports)
1753 )
1754
1755 if sdn_status == "ACTIVE":
1756 sdn_status = "BUILD"
1757
1758 ro_vim_item_update = {
1759 "vim_id": sdn_net_id,
1760 "vim_status": sdn_status,
1761 "created": created,
1762 "created_items": created_items,
1763 "connected_ports": connected_ports,
1764 "vim_details": sdn_info,
1765 "vim_message": None,
1766 "last_update": last_update,
1767 }
1768
1769 return sdn_status, ro_vim_item_update
1770 except Exception as e:
1771 self.logger.error(
1772 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1773 exc_info=not isinstance(
1774 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1775 ),
1776 )
1777 ro_vim_item_update = {
1778 "vim_status": "VIM_ERROR",
1779 "created": created,
1780 "vim_message": str(e),
1781 }
1782
1783 return "FAILED", ro_vim_item_update
1784
1785 def delete(self, ro_task, task_index):
1786 task = ro_task["tasks"][task_index]
1787 task_id = task["task_id"]
1788 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1789 ro_vim_item_update_ok = {
1790 "vim_status": "DELETED",
1791 "created": False,
1792 "vim_message": "DELETED",
1793 "vim_id": None,
1794 }
1795
1796 try:
1797 if sdn_vim_id:
1798 target_vim = self.my_vims[ro_task["target_id"]]
1799 target_vim.delete_connectivity_service(
1800 sdn_vim_id, ro_task["vim_info"].get("created_items")
1801 )
1802
1803 except Exception as e:
1804 if (
1805 isinstance(e, sdnconn.SdnConnectorError)
1806 and e.http_code == HTTPStatus.NOT_FOUND.value
1807 ):
1808 ro_vim_item_update_ok["vim_message"] = "already deleted"
1809 else:
1810 self.logger.error(
1811 "ro_task={} vim={} del-sdn-net={}: {}".format(
1812 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1813 ),
1814 exc_info=not isinstance(
1815 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1816 ),
1817 )
1818 ro_vim_item_update = {
1819 "vim_status": "VIM_ERROR",
1820 "vim_message": "Error while deleting: {}".format(e),
1821 }
1822
1823 return "FAILED", ro_vim_item_update
1824
1825 self.logger.debug(
1826 "task={} {} del-sdn-net={} {}".format(
1827 task_id,
1828 ro_task["target_id"],
1829 sdn_vim_id,
1830 ro_vim_item_update_ok.get("vim_message", ""),
1831 )
1832 )
1833
1834 return "DONE", ro_vim_item_update_ok
1835
1836
1837 class VimInteractionMigration(VimInteractionBase):
1838 def exec(self, ro_task, task_index, task_depends):
1839 task = ro_task["tasks"][task_index]
1840 task_id = task["task_id"]
1841 db_task_update = {"retries": 0}
1842 target_vim = self.my_vims[ro_task["target_id"]]
1843 vim_interfaces = []
1844 created = False
1845 created_items = {}
1846 refreshed_vim_info = {}
1847
1848 try:
1849 vim_vm_id = ""
1850 if task.get("params"):
1851 vim_vm_id = task["params"].get("vim_vm_id")
1852 migrate_host = task["params"].get("migrate_host")
1853 _, migrated_compute_node = target_vim.migrate_instance(
1854 vim_vm_id, migrate_host
1855 )
1856
1857 if migrated_compute_node:
1858 # When VM is migrated, vdu["vim_info"] needs to be updated
1859 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1860 ro_task["target_id"]
1861 )
1862
1863 # Refresh VM to get new vim_info
1864 vm_to_refresh_list = [vim_vm_id]
1865 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1866 refreshed_vim_info = vim_dict[vim_vm_id]
1867
1868 if refreshed_vim_info.get("interfaces"):
1869 for old_iface in vdu_old_vim_info.get("interfaces"):
1870 iface = next(
1871 (
1872 iface
1873 for iface in refreshed_vim_info["interfaces"]
1874 if old_iface["vim_interface_id"]
1875 == iface["vim_interface_id"]
1876 ),
1877 None,
1878 )
1879 vim_interfaces.append(iface)
1880
1881 ro_vim_item_update = {
1882 "vim_id": vim_vm_id,
1883 "vim_status": "ACTIVE",
1884 "created": created,
1885 "created_items": created_items,
1886 "vim_details": None,
1887 "vim_message": None,
1888 }
1889
1890 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1891 "ERROR",
1892 "VIM_ERROR",
1893 ):
1894 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1895
1896 if vim_interfaces:
1897 ro_vim_item_update["interfaces"] = vim_interfaces
1898
1899 self.logger.debug(
1900 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1901 )
1902
1903 return "DONE", ro_vim_item_update, db_task_update
1904
1905 except (vimconn.VimConnException, NsWorkerException) as e:
1906 self.logger.error(
1907 "task={} vim={} VM Migration:"
1908 " {}".format(task_id, ro_task["target_id"], e)
1909 )
1910 ro_vim_item_update = {
1911 "vim_status": "VIM_ERROR",
1912 "created": created,
1913 "vim_message": str(e),
1914 }
1915
1916 return "FAILED", ro_vim_item_update, db_task_update
1917
1918
1919 class VimInteractionResize(VimInteractionBase):
1920 def exec(self, ro_task, task_index, task_depends):
1921 task = ro_task["tasks"][task_index]
1922 task_id = task["task_id"]
1923 db_task_update = {"retries": 0}
1924 created = False
1925 target_flavor_uuid = None
1926 created_items = {}
1927 refreshed_vim_info = {}
1928 target_vim = self.my_vims[ro_task["target_id"]]
1929
1930 try:
1931 params = task["params"]
1932 params_copy = deepcopy(params)
1933 target_flavor_uuid = task_depends[params_copy["flavor_id"]]
1934 vim_vm_id = ""
1935 if task.get("params"):
1936 self.logger.info("vim_vm_id %s", vim_vm_id)
1937
1938 if target_flavor_uuid is not None:
1939 resized_status = target_vim.resize_instance(
1940 vim_vm_id, target_flavor_uuid
1941 )
1942
1943 if resized_status:
1944 # Refresh VM to get new vim_info
1945 vm_to_refresh_list = [vim_vm_id]
1946 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1947 refreshed_vim_info = vim_dict[vim_vm_id]
1948
1949 ro_vim_item_update = {
1950 "vim_id": vim_vm_id,
1951 "vim_status": "ACTIVE",
1952 "created": created,
1953 "created_items": created_items,
1954 "vim_details": None,
1955 "vim_message": None,
1956 }
1957
1958 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1959 "ERROR",
1960 "VIM_ERROR",
1961 ):
1962 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1963
1964 self.logger.debug(
1965 "task={} {} resize done".format(task_id, ro_task["target_id"])
1966 )
1967 return "DONE", ro_vim_item_update, db_task_update
1968 except (vimconn.VimConnException, NsWorkerException) as e:
1969 self.logger.error(
1970 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1971 )
1972 ro_vim_item_update = {
1973 "vim_status": "VIM_ERROR",
1974 "created": created,
1975 "vim_message": str(e),
1976 }
1977
1978 return "FAILED", ro_vim_item_update, db_task_update
1979
1980
1981 class ConfigValidate:
1982 def __init__(self, config: Dict):
1983 self.conf = config
1984
1985 @property
1986 def active(self):
1987 # default 1 min, allowed >= 60 or -1, -1 disables periodic checks
1988 if (
1989 self.conf["period"]["refresh_active"] >= 60
1990 or self.conf["period"]["refresh_active"] == -1
1991 ):
1992 return self.conf["period"]["refresh_active"]
1993
1994 return 60
1995
1996 @property
1997 def build(self):
1998 return self.conf["period"]["refresh_build"]
1999
2000 @property
2001 def image(self):
2002 return self.conf["period"]["refresh_image"]
2003
2004 @property
2005 def error(self):
2006 return self.conf["period"]["refresh_error"]
2007
2008 @property
2009 def queue_size(self):
2010 return self.conf["period"]["queue_size"]
2011
2012
2013 class NsWorker(threading.Thread):
2014 def __init__(self, worker_index, config, plugins, db):
2015 """
2016 :param worker_index: thread index
2017 :param config: general configuration of RO, among others the process_id with the docker id where it runs
2018 :param plugins: global shared dict with the loaded plugins
2019 :param db: database class instance to use
2020 """
2021 threading.Thread.__init__(self)
2022 self.config = config
2023 self.plugins = plugins
2024 self.plugin_name = "unknown"
2025 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
2026 self.worker_index = worker_index
2027 # refresh periods for created items
2028 self.refresh_config = ConfigValidate(config)
2029 self.task_queue = queue.Queue(self.refresh_config.queue_size)
2030 # targetvim: vimplugin class
2031 self.my_vims = {}
2032 # targetvim: vim information from database
2033 self.db_vims = {}
2034 # targetvim list
2035 self.vim_targets = []
2036 self.my_id = config["process_id"] + ":" + str(worker_index)
2037 self.db = db
2038 self.item2class = {
2039 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
2040 "shared-volumes": VimInteractionSharedVolume(
2041 self.db, self.my_vims, self.db_vims, self.logger
2042 ),
2043 "classification": VimInteractionClassification(
2044 self.db, self.my_vims, self.db_vims, self.logger
2045 ),
2046 "sfi": VimInteractionSfi(self.db, self.my_vims, self.db_vims, self.logger),
2047 "sf": VimInteractionSf(self.db, self.my_vims, self.db_vims, self.logger),
2048 "sfp": VimInteractionSfp(self.db, self.my_vims, self.db_vims, self.logger),
2049 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
2050 "image": VimInteractionImage(
2051 self.db, self.my_vims, self.db_vims, self.logger
2052 ),
2053 "flavor": VimInteractionFlavor(
2054 self.db, self.my_vims, self.db_vims, self.logger
2055 ),
2056 "sdn_net": VimInteractionSdnNet(
2057 self.db, self.my_vims, self.db_vims, self.logger
2058 ),
2059 "update": VimInteractionUpdateVdu(
2060 self.db, self.my_vims, self.db_vims, self.logger
2061 ),
2062 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
2063 self.db, self.my_vims, self.db_vims, self.logger
2064 ),
2065 "migrate": VimInteractionMigration(
2066 self.db, self.my_vims, self.db_vims, self.logger
2067 ),
2068 "verticalscale": VimInteractionResize(
2069 self.db, self.my_vims, self.db_vims, self.logger
2070 ),
2071 }
2072 self.time_last_task_processed = None
2073 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
2074 self.tasks_to_delete = []
2075 # it is idle when there are not vim_targets associated
2076 self.idle = True
2077 self.task_locked_time = config["global"]["task_locked_time"]
2078
2079 def insert_task(self, task):
2080 try:
2081 self.task_queue.put(task, False)
2082 return None
2083 except queue.Full:
2084 raise NsWorkerException("timeout inserting a task")
2085
2086 def terminate(self):
2087 self.insert_task("exit")
2088
2089 def del_task(self, task):
2090 with self.task_lock:
2091 if task["status"] == "SCHEDULED":
2092 task["status"] = "SUPERSEDED"
2093 return True
2094 else: # task["status"] == "processing"
2095 self.task_lock.release()
2096 return False
2097
2098 def _process_vim_config(self, target_id: str, db_vim: dict) -> None:
2099 """
2100 Process vim config, creating vim configuration files as ca_cert
2101 :param target_id: vim/sdn/wim + id
2102 :param db_vim: Vim dictionary obtained from database
2103 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
2104 """
2105 if not db_vim.get("config"):
2106 return
2107
2108 file_name = ""
2109 work_dir = "/app/osm_ro/certs"
2110
2111 try:
2112 if db_vim["config"].get("ca_cert_content"):
2113 file_name = f"{work_dir}/{target_id}:{self.worker_index}"
2114
2115 if not path.isdir(file_name):
2116 makedirs(file_name)
2117
2118 file_name = file_name + "/ca_cert"
2119
2120 with open(file_name, "w") as f:
2121 f.write(db_vim["config"]["ca_cert_content"])
2122 del db_vim["config"]["ca_cert_content"]
2123 db_vim["config"]["ca_cert"] = file_name
2124 except Exception as e:
2125 raise NsWorkerException(
2126 "Error writing to file '{}': {}".format(file_name, e)
2127 )
2128
2129 def _load_plugin(self, name, type="vim"):
2130 # type can be vim or sdn
2131 if "rovim_dummy" not in self.plugins:
2132 self.plugins["rovim_dummy"] = VimDummyConnector
2133
2134 if "rosdn_dummy" not in self.plugins:
2135 self.plugins["rosdn_dummy"] = SdnDummyConnector
2136
2137 if name in self.plugins:
2138 return self.plugins[name]
2139
2140 try:
2141 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
2142 self.plugins[name] = ep.load()
2143 except Exception as e:
2144 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
2145
2146 if name and name not in self.plugins:
2147 raise NsWorkerException(
2148 "Plugin 'osm_{n}' has not been installed".format(n=name)
2149 )
2150
2151 return self.plugins[name]
2152
2153 def _unload_vim(self, target_id):
2154 """
2155 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
2156 :param target_id: Contains type:_id; where type can be 'vim', ...
2157 :return: None.
2158 """
2159 try:
2160 self.db_vims.pop(target_id, None)
2161 self.my_vims.pop(target_id, None)
2162
2163 if target_id in self.vim_targets:
2164 self.vim_targets.remove(target_id)
2165
2166 self.logger.info("Unloaded {}".format(target_id))
2167 except Exception as e:
2168 self.logger.error("Cannot unload {}: {}".format(target_id, e))
2169
2170 def _check_vim(self, target_id):
2171 """
2172 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
2173 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
2174 :return: None.
2175 """
2176 target, _, _id = target_id.partition(":")
2177 now = time.time()
2178 update_dict = {}
2179 unset_dict = {}
2180 op_text = ""
2181 step = ""
2182 loaded = target_id in self.vim_targets
2183 target_database = (
2184 "vim_accounts"
2185 if target == "vim"
2186 else "wim_accounts"
2187 if target == "wim"
2188 else "sdns"
2189 )
2190 error_text = ""
2191
2192 try:
2193 step = "Getting {} from db".format(target_id)
2194 db_vim = self.db.get_one(target_database, {"_id": _id})
2195
2196 for op_index, operation in enumerate(
2197 db_vim["_admin"].get("operations", ())
2198 ):
2199 if operation["operationState"] != "PROCESSING":
2200 continue
2201
2202 locked_at = operation.get("locked_at")
2203
2204 if locked_at is not None and locked_at >= now - self.task_locked_time:
2205 # some other thread is doing this operation
2206 return
2207
2208 # lock
2209 op_text = "_admin.operations.{}.".format(op_index)
2210
2211 if not self.db.set_one(
2212 target_database,
2213 q_filter={
2214 "_id": _id,
2215 op_text + "operationState": "PROCESSING",
2216 op_text + "locked_at": locked_at,
2217 },
2218 update_dict={
2219 op_text + "locked_at": now,
2220 "admin.current_operation": op_index,
2221 },
2222 fail_on_empty=False,
2223 ):
2224 return
2225
2226 unset_dict[op_text + "locked_at"] = None
2227 unset_dict["current_operation"] = None
2228 step = "Loading " + target_id
2229 error_text = self._load_vim(target_id)
2230
2231 if not error_text:
2232 step = "Checking connectivity"
2233
2234 if target == "vim":
2235 self.my_vims[target_id].check_vim_connectivity()
2236 else:
2237 self.my_vims[target_id].check_credentials()
2238
2239 update_dict["_admin.operationalState"] = "ENABLED"
2240 update_dict["_admin.detailed-status"] = ""
2241 unset_dict[op_text + "detailed-status"] = None
2242 update_dict[op_text + "operationState"] = "COMPLETED"
2243
2244 return
2245
2246 except Exception as e:
2247 error_text = "{}: {}".format(step, e)
2248 self.logger.error("{} for {}: {}".format(step, target_id, e))
2249
2250 finally:
2251 if update_dict or unset_dict:
2252 if error_text:
2253 update_dict[op_text + "operationState"] = "FAILED"
2254 update_dict[op_text + "detailed-status"] = error_text
2255 unset_dict.pop(op_text + "detailed-status", None)
2256 update_dict["_admin.operationalState"] = "ERROR"
2257 update_dict["_admin.detailed-status"] = error_text
2258
2259 if op_text:
2260 update_dict[op_text + "statusEnteredTime"] = now
2261
2262 self.db.set_one(
2263 target_database,
2264 q_filter={"_id": _id},
2265 update_dict=update_dict,
2266 unset=unset_dict,
2267 fail_on_empty=False,
2268 )
2269
2270 if not loaded:
2271 self._unload_vim(target_id)
2272
2273 def _reload_vim(self, target_id):
2274 if target_id in self.vim_targets:
2275 self._load_vim(target_id)
2276 else:
2277 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
2278 # just remove it to force load again next time it is needed
2279 self.db_vims.pop(target_id, None)
2280
2281 def _load_vim(self, target_id):
2282 """
2283 Load or reload a vim_account, sdn_controller or wim_account.
2284 Read content from database, load the plugin if not loaded.
2285 In case of error loading the plugin, it loads a failing VIM_connector
2286 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
2287 :param target_id: Contains type:_id; where type can be 'vim', ...
2288 :return: None if ok, descriptive text if error
2289 """
2290 target, _, _id = target_id.partition(":")
2291 target_database = (
2292 "vim_accounts"
2293 if target == "vim"
2294 else "wim_accounts"
2295 if target == "wim"
2296 else "sdns"
2297 )
2298 plugin_name = ""
2299 vim = None
2300 step = "Getting {}={} from db".format(target, _id)
2301
2302 try:
2303 # TODO process for wim, sdnc, ...
2304 vim = self.db.get_one(target_database, {"_id": _id})
2305
2306 # if deep_get(vim, "config", "sdn-controller"):
2307 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
2308 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
2309
2310 step = "Decrypting password"
2311 schema_version = vim.get("schema_version")
2312 self.db.encrypt_decrypt_fields(
2313 vim,
2314 "decrypt",
2315 fields=("password", "secret"),
2316 schema_version=schema_version,
2317 salt=_id,
2318 )
2319 self._process_vim_config(target_id, vim)
2320
2321 if target == "vim":
2322 plugin_name = "rovim_" + vim["vim_type"]
2323 step = "Loading plugin '{}'".format(plugin_name)
2324 vim_module_conn = self._load_plugin(plugin_name)
2325 step = "Loading {}'".format(target_id)
2326 self.my_vims[target_id] = vim_module_conn(
2327 uuid=vim["_id"],
2328 name=vim["name"],
2329 tenant_id=vim.get("vim_tenant_id"),
2330 tenant_name=vim.get("vim_tenant_name"),
2331 url=vim["vim_url"],
2332 url_admin=None,
2333 user=vim["vim_user"],
2334 passwd=vim["vim_password"],
2335 config=vim.get("config") or {},
2336 persistent_info={},
2337 )
2338 else: # sdn
2339 plugin_name = "rosdn_" + (vim.get("type") or vim.get("wim_type"))
2340 step = "Loading plugin '{}'".format(plugin_name)
2341 vim_module_conn = self._load_plugin(plugin_name, "sdn")
2342 step = "Loading {}'".format(target_id)
2343 wim = deepcopy(vim)
2344 wim_config = wim.pop("config", {}) or {}
2345 wim["uuid"] = wim["_id"]
2346 if "url" in wim and "wim_url" not in wim:
2347 wim["wim_url"] = wim["url"]
2348 elif "url" not in wim and "wim_url" in wim:
2349 wim["url"] = wim["wim_url"]
2350
2351 if wim.get("dpid"):
2352 wim_config["dpid"] = wim.pop("dpid")
2353
2354 if wim.get("switch_id"):
2355 wim_config["switch_id"] = wim.pop("switch_id")
2356
2357 # wim, wim_account, config
2358 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
2359 self.db_vims[target_id] = vim
2360 self.error_status = None
2361
2362 self.logger.info(
2363 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
2364 )
2365 except Exception as e:
2366 self.logger.error(
2367 "Cannot load {} plugin={}: {} {}".format(
2368 target_id, plugin_name, step, e
2369 )
2370 )
2371
2372 self.db_vims[target_id] = vim or {}
2373 self.db_vims[target_id] = FailingConnector(str(e))
2374 error_status = "{} Error: {}".format(step, e)
2375
2376 return error_status
2377 finally:
2378 if target_id not in self.vim_targets:
2379 self.vim_targets.append(target_id)
2380
2381 def _get_db_task(self):
2382 """
2383 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
2384 :return: None
2385 """
2386 now = time.time()
2387
2388 if not self.time_last_task_processed:
2389 self.time_last_task_processed = now
2390
2391 try:
2392 while True:
2393 """
2394 # Log RO tasks only when loglevel is DEBUG
2395 if self.logger.getEffectiveLevel() == logging.DEBUG:
2396 self._log_ro_task(
2397 None,
2398 None,
2399 None,
2400 "TASK_WF",
2401 "task_locked_time="
2402 + str(self.task_locked_time)
2403 + " "
2404 + "time_last_task_processed="
2405 + str(self.time_last_task_processed)
2406 + " "
2407 + "now="
2408 + str(now),
2409 )
2410 """
2411 locked = self.db.set_one(
2412 "ro_tasks",
2413 q_filter={
2414 "target_id": self.vim_targets,
2415 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2416 "locked_at.lt": now - self.task_locked_time,
2417 "to_check_at.lt": self.time_last_task_processed,
2418 "to_check_at.gt": -1,
2419 },
2420 update_dict={"locked_by": self.my_id, "locked_at": now},
2421 fail_on_empty=False,
2422 )
2423
2424 if locked:
2425 # read and return
2426 ro_task = self.db.get_one(
2427 "ro_tasks",
2428 q_filter={
2429 "target_id": self.vim_targets,
2430 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
2431 "locked_at": now,
2432 },
2433 )
2434 return ro_task
2435
2436 if self.time_last_task_processed == now:
2437 self.time_last_task_processed = None
2438 return None
2439 else:
2440 self.time_last_task_processed = now
2441 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
2442
2443 except DbException as e:
2444 self.logger.error("Database exception at _get_db_task: {}".format(e))
2445 except Exception as e:
2446 self.logger.critical(
2447 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
2448 )
2449
2450 return None
2451
2452 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2453 """
2454 Determine if this task need to be done or superseded
2455 :return: None
2456 """
2457 my_task = ro_task["tasks"][task_index]
2458 task_id = my_task["task_id"]
2459 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2460 "created_items", False
2461 )
2462
2463 self.logger.debug("Needed delete: {}".format(needed_delete))
2464 if my_task["status"] == "FAILED":
2465 return None, None # TODO need to be retry??
2466
2467 try:
2468 for index, task in enumerate(ro_task["tasks"]):
2469 if index == task_index or not task:
2470 continue # own task
2471
2472 if (
2473 my_task["target_record"] == task["target_record"]
2474 and task["action"] == "CREATE"
2475 ):
2476 # set to finished
2477 db_update["tasks.{}.status".format(index)] = task[
2478 "status"
2479 ] = "FINISHED"
2480 elif task["action"] == "CREATE" and task["status"] not in (
2481 "FINISHED",
2482 "SUPERSEDED",
2483 ):
2484 needed_delete = False
2485
2486 if needed_delete:
2487 self.logger.debug(
2488 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2489 )
2490 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2491 else:
2492 return "SUPERSEDED", None
2493 except Exception as e:
2494 if not isinstance(e, NsWorkerException):
2495 self.logger.critical(
2496 "Unexpected exception at _delete_task task={}: {}".format(
2497 task_id, e
2498 ),
2499 exc_info=True,
2500 )
2501
2502 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2503
2504 def _create_task(self, ro_task, task_index, task_depends, db_update):
2505 """
2506 Determine if this task need to create something at VIM
2507 :return: None
2508 """
2509 my_task = ro_task["tasks"][task_index]
2510 task_id = my_task["task_id"]
2511
2512 if my_task["status"] == "FAILED":
2513 return None, None # TODO need to be retry??
2514 elif my_task["status"] == "SCHEDULED":
2515 # check if already created by another task
2516 for index, task in enumerate(ro_task["tasks"]):
2517 if index == task_index or not task:
2518 continue # own task
2519
2520 if task["action"] == "CREATE" and task["status"] not in (
2521 "SCHEDULED",
2522 "FINISHED",
2523 "SUPERSEDED",
2524 ):
2525 return task["status"], "COPY_VIM_INFO"
2526
2527 try:
2528 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2529 ro_task, task_index, task_depends
2530 )
2531 # TODO update other CREATE tasks
2532 except Exception as e:
2533 if not isinstance(e, NsWorkerException):
2534 self.logger.error(
2535 "Error executing task={}: {}".format(task_id, e), exc_info=True
2536 )
2537
2538 task_status = "FAILED"
2539 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2540 # TODO update ro_vim_item_update
2541
2542 return task_status, ro_vim_item_update
2543 else:
2544 return None, None
2545
2546 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2547 """
2548 Look for dependency task
2549 :param task_id: Can be one of
2550 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2551 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2552 3. task.task_id: "<action_id>:number"
2553 :param ro_task:
2554 :param target_id:
2555 :return: database ro_task plus index of task
2556 """
2557 if (
2558 task_id.startswith("vim:")
2559 or task_id.startswith("sdn:")
2560 or task_id.startswith("wim:")
2561 ):
2562 target_id, _, task_id = task_id.partition(" ")
2563
2564 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2565 ro_task_dependency = self.db.get_one(
2566 "ro_tasks",
2567 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2568 fail_on_empty=False,
2569 )
2570
2571 if ro_task_dependency:
2572 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2573 if task["target_record_id"] == task_id:
2574 return ro_task_dependency, task_index
2575
2576 else:
2577 if ro_task:
2578 for task_index, task in enumerate(ro_task["tasks"]):
2579 if task and task["task_id"] == task_id:
2580 return ro_task, task_index
2581
2582 ro_task_dependency = self.db.get_one(
2583 "ro_tasks",
2584 q_filter={
2585 "tasks.ANYINDEX.task_id": task_id,
2586 "tasks.ANYINDEX.target_record.ne": None,
2587 },
2588 fail_on_empty=False,
2589 )
2590
2591 self.logger.debug("ro_task_dependency={}".format(ro_task_dependency))
2592 if ro_task_dependency:
2593 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2594 if task["task_id"] == task_id:
2595 return ro_task_dependency, task_index
2596 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2597
2598 def update_vm_refresh(self, ro_task):
2599 """Enables the VM status updates if self.refresh_config.active parameter
2600 is not -1 and then updates the DB accordingly
2601
2602 """
2603 try:
2604 self.logger.debug("Checking if VM status update config")
2605 next_refresh = time.time()
2606 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2607
2608 if next_refresh != -1:
2609 db_ro_task_update = {}
2610 now = time.time()
2611 next_check_at = now + (24 * 60 * 60)
2612 next_check_at = min(next_check_at, next_refresh)
2613 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2614 db_ro_task_update["to_check_at"] = next_check_at
2615
2616 self.logger.debug(
2617 "Finding tasks which to be updated to enable VM status updates"
2618 )
2619 refresh_tasks = self.db.get_list(
2620 "ro_tasks",
2621 q_filter={
2622 "tasks.status": "DONE",
2623 "to_check_at.lt": 0,
2624 },
2625 )
2626 self.logger.debug("Updating tasks to change the to_check_at status")
2627 for task in refresh_tasks:
2628 q_filter = {
2629 "_id": task["_id"],
2630 }
2631 self.db.set_one(
2632 "ro_tasks",
2633 q_filter=q_filter,
2634 update_dict=db_ro_task_update,
2635 fail_on_empty=True,
2636 )
2637
2638 except Exception as e:
2639 self.logger.error(f"Error updating tasks to enable VM status updates: {e}")
2640
2641 def _get_next_refresh(self, ro_task: dict, next_refresh: float):
2642 """Decide the next_refresh according to vim type and refresh config period.
2643 Args:
2644 ro_task (dict): ro_task details
2645 next_refresh (float): next refresh time as epoch format
2646
2647 Returns:
2648 next_refresh (float) -1 if vm updates are disabled or vim type is openstack.
2649 """
2650 target_vim = ro_task["target_id"]
2651 vim_type = self.db_vims[target_vim]["vim_type"]
2652 if self.refresh_config.active == -1 or vim_type == "openstack":
2653 next_refresh = -1
2654 else:
2655 next_refresh += self.refresh_config.active
2656 return next_refresh
2657
2658 def _process_pending_tasks(self, ro_task):
2659 ro_task_id = ro_task["_id"]
2660 now = time.time()
2661 # one day
2662 next_check_at = now + (24 * 60 * 60)
2663 db_ro_task_update = {}
2664
2665 def _update_refresh(new_status):
2666 # compute next_refresh
2667 nonlocal task
2668 nonlocal next_check_at
2669 nonlocal db_ro_task_update
2670 nonlocal ro_task
2671
2672 next_refresh = time.time()
2673
2674 if task["item"] in ("image", "flavor"):
2675 next_refresh += self.refresh_config.image
2676 elif new_status == "BUILD":
2677 next_refresh += self.refresh_config.build
2678 elif new_status == "DONE":
2679 next_refresh = self._get_next_refresh(ro_task, next_refresh)
2680 else:
2681 next_refresh += self.refresh_config.error
2682
2683 next_check_at = min(next_check_at, next_refresh)
2684 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2685 ro_task["vim_info"]["refresh_at"] = next_refresh
2686
2687 try:
2688 """
2689 # Log RO tasks only when loglevel is DEBUG
2690 if self.logger.getEffectiveLevel() == logging.DEBUG:
2691 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2692 """
2693 # Check if vim status refresh is enabled again
2694 self.update_vm_refresh(ro_task)
2695 # 0: get task_status_create
2696 lock_object = None
2697 task_status_create = None
2698 task_create = next(
2699 (
2700 t
2701 for t in ro_task["tasks"]
2702 if t
2703 and t["action"] == "CREATE"
2704 and t["status"] in ("BUILD", "DONE")
2705 ),
2706 None,
2707 )
2708
2709 if task_create:
2710 task_status_create = task_create["status"]
2711
2712 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2713 for task_action in ("DELETE", "CREATE", "EXEC"):
2714 db_vim_update = None
2715 new_status = None
2716
2717 for task_index, task in enumerate(ro_task["tasks"]):
2718 if not task:
2719 continue # task deleted
2720
2721 task_depends = {}
2722 target_update = None
2723
2724 if (
2725 (
2726 task_action in ("DELETE", "EXEC")
2727 and task["status"] not in ("SCHEDULED", "BUILD")
2728 )
2729 or task["action"] != task_action
2730 or (
2731 task_action == "CREATE"
2732 and task["status"] in ("FINISHED", "SUPERSEDED")
2733 )
2734 ):
2735 continue
2736
2737 task_path = "tasks.{}.status".format(task_index)
2738 try:
2739 db_vim_info_update = None
2740 dependency_ro_task = {}
2741
2742 if task["status"] == "SCHEDULED":
2743 # check if tasks that this depends on have been completed
2744 dependency_not_completed = False
2745
2746 for dependency_task_id in task.get("depends_on") or ():
2747 (
2748 dependency_ro_task,
2749 dependency_task_index,
2750 ) = self._get_dependency(
2751 dependency_task_id, target_id=ro_task["target_id"]
2752 )
2753 dependency_task = dependency_ro_task["tasks"][
2754 dependency_task_index
2755 ]
2756 self.logger.debug(
2757 "dependency_ro_task={} dependency_task_index={}".format(
2758 dependency_ro_task, dependency_task_index
2759 )
2760 )
2761
2762 if dependency_task["status"] == "SCHEDULED":
2763 dependency_not_completed = True
2764 next_check_at = min(
2765 next_check_at, dependency_ro_task["to_check_at"]
2766 )
2767 # must allow dependent task to be processed first
2768 # to do this set time after last_task_processed
2769 next_check_at = max(
2770 self.time_last_task_processed, next_check_at
2771 )
2772 break
2773 elif dependency_task["status"] == "FAILED":
2774 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2775 task["action"],
2776 task["item"],
2777 dependency_task["action"],
2778 dependency_task["item"],
2779 dependency_task_id,
2780 dependency_ro_task["vim_info"].get(
2781 "vim_message"
2782 ),
2783 )
2784 self.logger.error(
2785 "task={} {}".format(task["task_id"], error_text)
2786 )
2787 raise NsWorkerException(error_text)
2788
2789 task_depends[dependency_task_id] = dependency_ro_task[
2790 "vim_info"
2791 ]["vim_id"]
2792 task_depends[
2793 "TASK-{}".format(dependency_task_id)
2794 ] = dependency_ro_task["vim_info"]["vim_id"]
2795
2796 if dependency_not_completed:
2797 self.logger.warning(
2798 "DEPENDENCY NOT COMPLETED {}".format(
2799 dependency_ro_task["vim_info"]["vim_id"]
2800 )
2801 )
2802 # TODO set at vim_info.vim_details that it is waiting
2803 continue
2804
2805 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2806 # the task of renew this locking. It will update database locket_at periodically
2807 if not lock_object:
2808 lock_object = LockRenew.add_lock_object(
2809 "ro_tasks", ro_task, self
2810 )
2811 if task["action"] == "DELETE":
2812 (
2813 new_status,
2814 db_vim_info_update,
2815 ) = self._delete_task(
2816 ro_task, task_index, task_depends, db_ro_task_update
2817 )
2818 new_status = (
2819 "FINISHED" if new_status == "DONE" else new_status
2820 )
2821 # ^with FINISHED instead of DONE it will not be refreshing
2822
2823 if new_status in ("FINISHED", "SUPERSEDED"):
2824 target_update = "DELETE"
2825 elif task["action"] == "EXEC":
2826 (
2827 new_status,
2828 db_vim_info_update,
2829 db_task_update,
2830 ) = self.item2class[task["item"]].exec(
2831 ro_task, task_index, task_depends
2832 )
2833 new_status = (
2834 "FINISHED" if new_status == "DONE" else new_status
2835 )
2836 # ^with FINISHED instead of DONE it will not be refreshing
2837
2838 if db_task_update:
2839 # load into database the modified db_task_update "retries" and "next_retry"
2840 if db_task_update.get("retries"):
2841 db_ro_task_update[
2842 "tasks.{}.retries".format(task_index)
2843 ] = db_task_update["retries"]
2844
2845 next_check_at = time.time() + db_task_update.get(
2846 "next_retry", 60
2847 )
2848 target_update = None
2849 elif task["action"] == "CREATE":
2850 if task["status"] == "SCHEDULED":
2851 if task_status_create:
2852 new_status = task_status_create
2853 target_update = "COPY_VIM_INFO"
2854 else:
2855 new_status, db_vim_info_update = self.item2class[
2856 task["item"]
2857 ].new(ro_task, task_index, task_depends)
2858 _update_refresh(new_status)
2859 else:
2860 refresh_at = ro_task["vim_info"]["refresh_at"]
2861 if refresh_at and refresh_at != -1 and now > refresh_at:
2862 (
2863 new_status,
2864 db_vim_info_update,
2865 ) = self.item2class[
2866 task["item"]
2867 ].refresh(ro_task)
2868 _update_refresh(new_status)
2869 else:
2870 # The refresh is updated to avoid set the value of "refresh_at" to
2871 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2872 # because it can happen that in this case the task is never processed
2873 _update_refresh(task["status"])
2874
2875 except Exception as e:
2876 new_status = "FAILED"
2877 db_vim_info_update = {
2878 "vim_status": "VIM_ERROR",
2879 "vim_message": str(e),
2880 }
2881
2882 if not isinstance(
2883 e, (NsWorkerException, vimconn.VimConnException)
2884 ):
2885 self.logger.error(
2886 "Unexpected exception at _delete_task task={}: {}".format(
2887 task["task_id"], e
2888 ),
2889 exc_info=True,
2890 )
2891
2892 try:
2893 if db_vim_info_update:
2894 db_vim_update = db_vim_info_update.copy()
2895 db_ro_task_update.update(
2896 {
2897 "vim_info." + k: v
2898 for k, v in db_vim_info_update.items()
2899 }
2900 )
2901 ro_task["vim_info"].update(db_vim_info_update)
2902
2903 if new_status:
2904 if task_action == "CREATE":
2905 task_status_create = new_status
2906 db_ro_task_update[task_path] = new_status
2907
2908 if target_update or db_vim_update:
2909 if target_update == "DELETE":
2910 self._update_target(task, None)
2911 elif target_update == "COPY_VIM_INFO":
2912 self._update_target(task, ro_task["vim_info"])
2913 else:
2914 self._update_target(task, db_vim_update)
2915
2916 except Exception as e:
2917 if (
2918 isinstance(e, DbException)
2919 and e.http_code == HTTPStatus.NOT_FOUND
2920 ):
2921 # if the vnfrs or nsrs has been removed from database, this task must be removed
2922 self.logger.debug(
2923 "marking to delete task={}".format(task["task_id"])
2924 )
2925 self.tasks_to_delete.append(task)
2926 else:
2927 self.logger.error(
2928 "Unexpected exception at _update_target task={}: {}".format(
2929 task["task_id"], e
2930 ),
2931 exc_info=True,
2932 )
2933
2934 locked_at = ro_task["locked_at"]
2935
2936 if lock_object:
2937 locked_at = [
2938 lock_object["locked_at"],
2939 lock_object["locked_at"] + self.task_locked_time,
2940 ]
2941 # locked_at contains two times to avoid race condition. In case the lock has been renewed, it will
2942 # contain exactly locked_at + self.task_locked_time
2943 LockRenew.remove_lock_object(lock_object)
2944
2945 q_filter = {
2946 "_id": ro_task["_id"],
2947 "to_check_at": ro_task["to_check_at"],
2948 "locked_at": locked_at,
2949 }
2950 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2951 # outside this task (by ro_nbi) do not update it
2952 db_ro_task_update["locked_by"] = None
2953 # locked_at converted to int only for debugging. When it is not decimals it means it has been unlocked
2954 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2955 db_ro_task_update["modified_at"] = now
2956 db_ro_task_update["to_check_at"] = next_check_at
2957
2958 """
2959 # Log RO tasks only when loglevel is DEBUG
2960 if self.logger.getEffectiveLevel() == logging.DEBUG:
2961 db_ro_task_update_log = db_ro_task_update.copy()
2962 db_ro_task_update_log["_id"] = q_filter["_id"]
2963 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2964 """
2965
2966 if not self.db.set_one(
2967 "ro_tasks",
2968 update_dict=db_ro_task_update,
2969 q_filter=q_filter,
2970 fail_on_empty=False,
2971 ):
2972 del db_ro_task_update["to_check_at"]
2973 del q_filter["to_check_at"]
2974 """
2975 # Log RO tasks only when loglevel is DEBUG
2976 if self.logger.getEffectiveLevel() == logging.DEBUG:
2977 self._log_ro_task(
2978 None,
2979 db_ro_task_update_log,
2980 None,
2981 "TASK_WF",
2982 "SET_TASK " + str(q_filter),
2983 )
2984 """
2985 self.db.set_one(
2986 "ro_tasks",
2987 q_filter=q_filter,
2988 update_dict=db_ro_task_update,
2989 fail_on_empty=True,
2990 )
2991 except DbException as e:
2992 self.logger.error(
2993 "ro_task={} Error updating database {}".format(ro_task_id, e)
2994 )
2995 except Exception as e:
2996 self.logger.error(
2997 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2998 )
2999
3000 def _update_target(self, task, ro_vim_item_update):
3001 table, _, temp = task["target_record"].partition(":")
3002 _id, _, path_vim_status = temp.partition(":")
3003 path_item = path_vim_status[: path_vim_status.rfind(".")]
3004 path_item = path_item[: path_item.rfind(".")]
3005 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
3006 # path_item: dot separated list targeting record information, e.g. "vdur.10"
3007
3008 if ro_vim_item_update:
3009 update_dict = {
3010 path_vim_status + "." + k: v
3011 for k, v in ro_vim_item_update.items()
3012 if k
3013 in (
3014 "vim_id",
3015 "vim_details",
3016 "vim_message",
3017 "vim_name",
3018 "vim_status",
3019 "interfaces",
3020 "interfaces_backup",
3021 )
3022 }
3023
3024 if path_vim_status.startswith("vdur."):
3025 # for backward compatibility, add vdur.name apart from vdur.vim_name
3026 if ro_vim_item_update.get("vim_name"):
3027 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
3028
3029 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
3030 if ro_vim_item_update.get("vim_id"):
3031 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
3032
3033 # update general status
3034 if ro_vim_item_update.get("vim_status"):
3035 update_dict[path_item + ".status"] = ro_vim_item_update[
3036 "vim_status"
3037 ]
3038
3039 if ro_vim_item_update.get("interfaces"):
3040 path_interfaces = path_item + ".interfaces"
3041
3042 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
3043 if iface:
3044 update_dict.update(
3045 {
3046 path_interfaces + ".{}.".format(i) + k: v
3047 for k, v in iface.items()
3048 if k in ("vlan", "compute_node", "pci")
3049 }
3050 )
3051
3052 # put ip_address and mac_address with ip-address and mac-address
3053 if iface.get("ip_address"):
3054 update_dict[
3055 path_interfaces + ".{}.".format(i) + "ip-address"
3056 ] = iface["ip_address"]
3057
3058 if iface.get("mac_address"):
3059 update_dict[
3060 path_interfaces + ".{}.".format(i) + "mac-address"
3061 ] = iface["mac_address"]
3062
3063 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
3064 update_dict["ip-address"] = iface.get("ip_address").split(
3065 ";"
3066 )[0]
3067
3068 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
3069 update_dict[path_item + ".ip-address"] = iface.get(
3070 "ip_address"
3071 ).split(";")[0]
3072
3073 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
3074
3075 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
3076 if ro_vim_item_update.get("interfaces"):
3077 search_key = path_vim_status + ".interfaces"
3078 if update_dict.get(search_key):
3079 interfaces_backup_update = {
3080 path_vim_status + ".interfaces_backup": update_dict[search_key]
3081 }
3082
3083 self.db.set_one(
3084 table,
3085 q_filter={"_id": _id},
3086 update_dict=interfaces_backup_update,
3087 )
3088
3089 else:
3090 update_dict = {path_item + ".status": "DELETED"}
3091 self.db.set_one(
3092 table,
3093 q_filter={"_id": _id},
3094 update_dict=update_dict,
3095 unset={path_vim_status: None},
3096 )
3097
3098 def _process_delete_db_tasks(self):
3099 """
3100 Delete task from database because vnfrs or nsrs or both have been deleted
3101 :return: None. Uses and modify self.tasks_to_delete
3102 """
3103 while self.tasks_to_delete:
3104 task = self.tasks_to_delete[0]
3105 vnfrs_deleted = None
3106 nsr_id = task["nsr_id"]
3107
3108 if task["target_record"].startswith("vnfrs:"):
3109 # check if nsrs is present
3110 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
3111 vnfrs_deleted = task["target_record"].split(":")[1]
3112
3113 try:
3114 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
3115 except Exception as e:
3116 self.logger.error(
3117 "Error deleting task={}: {}".format(task["task_id"], e)
3118 )
3119 self.tasks_to_delete.pop(0)
3120
3121 @staticmethod
3122 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
3123 """
3124 Static method because it is called from osm_ng_ro.ns
3125 :param db: instance of database to use
3126 :param nsr_id: affected nsrs id
3127 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
3128 :return: None, exception is fails
3129 """
3130 retries = 5
3131 for retry in range(retries):
3132 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
3133 now = time.time()
3134 conflict = False
3135
3136 for ro_task in ro_tasks:
3137 db_update = {}
3138 to_delete_ro_task = True
3139
3140 for index, task in enumerate(ro_task["tasks"]):
3141 if not task:
3142 pass
3143 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
3144 vnfrs_deleted
3145 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
3146 ):
3147 db_update["tasks.{}".format(index)] = None
3148 else:
3149 # used by other nsr, ro_task cannot be deleted
3150 to_delete_ro_task = False
3151
3152 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
3153 if to_delete_ro_task:
3154 if not db.del_one(
3155 "ro_tasks",
3156 q_filter={
3157 "_id": ro_task["_id"],
3158 "modified_at": ro_task["modified_at"],
3159 },
3160 fail_on_empty=False,
3161 ):
3162 conflict = True
3163 elif db_update:
3164 db_update["modified_at"] = now
3165 if not db.set_one(
3166 "ro_tasks",
3167 q_filter={
3168 "_id": ro_task["_id"],
3169 "modified_at": ro_task["modified_at"],
3170 },
3171 update_dict=db_update,
3172 fail_on_empty=False,
3173 ):
3174 conflict = True
3175 if not conflict:
3176 return
3177 else:
3178 raise NsWorkerException("Exceeded {} retries".format(retries))
3179
3180 def run(self):
3181 # load database
3182 self.logger.info("Starting")
3183 while True:
3184 # step 1: get commands from queue
3185 try:
3186 if self.vim_targets:
3187 task = self.task_queue.get(block=False)
3188 else:
3189 if not self.idle:
3190 self.logger.debug("enters in idle state")
3191 self.idle = True
3192 task = self.task_queue.get(block=True)
3193 self.idle = False
3194
3195 if task[0] == "terminate":
3196 break
3197 elif task[0] == "load_vim":
3198 self.logger.info("order to load vim {}".format(task[1]))
3199 self._load_vim(task[1])
3200 elif task[0] == "unload_vim":
3201 self.logger.info("order to unload vim {}".format(task[1]))
3202 self._unload_vim(task[1])
3203 elif task[0] == "reload_vim":
3204 self._reload_vim(task[1])
3205 elif task[0] == "check_vim":
3206 self.logger.info("order to check vim {}".format(task[1]))
3207 self._check_vim(task[1])
3208 continue
3209 except Exception as e:
3210 if isinstance(e, queue.Empty):
3211 pass
3212 else:
3213 self.logger.critical(
3214 "Error processing task: {}".format(e), exc_info=True
3215 )
3216
3217 # step 2: process pending_tasks, delete not needed tasks
3218 try:
3219 if self.tasks_to_delete:
3220 self._process_delete_db_tasks()
3221 busy = False
3222 """
3223 # Log RO tasks only when loglevel is DEBUG
3224 if self.logger.getEffectiveLevel() == logging.DEBUG:
3225 _ = self._get_db_all_tasks()
3226 """
3227 ro_task = self._get_db_task()
3228 if ro_task:
3229 self.logger.debug("Task to process: {}".format(ro_task))
3230 time.sleep(1)
3231 self._process_pending_tasks(ro_task)
3232 busy = True
3233 if not busy:
3234 time.sleep(5)
3235 except Exception as e:
3236 self.logger.critical(
3237 "Unexpected exception at run: " + str(e), exc_info=True
3238 )
3239
3240 self.logger.info("Finishing")