Fixes bug 2013 : Passing Anti-affinity group as additionalParamsForVnf
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45 __author__ = "Alfonso Tierno"
46 __date__ = "$28-Sep-2017 12:07:15$"
47
48
49 def deep_get(target_dict, *args, **kwargs):
50 """
51 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
52 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
53 :param target_dict: dictionary to be read
54 :param args: list of keys to read from target_dict
55 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
56 :return: The wanted value if exist, None or default otherwise
57 """
58 for key in args:
59 if not isinstance(target_dict, dict) or key not in target_dict:
60 return kwargs.get("default")
61 target_dict = target_dict[key]
62 return target_dict
63
64
65 class NsWorkerException(Exception):
66 pass
67
68
69 class FailingConnector:
70 def __init__(self, error_msg):
71 self.error_msg = error_msg
72
73 for method in dir(vimconn.VimConnector):
74 if method[0] != "_":
75 setattr(
76 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
77 )
78
79 for method in dir(sdnconn.SdnConnectorBase):
80 if method[0] != "_":
81 setattr(
82 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
83 )
84
85
86 class NsWorkerExceptionNotFound(NsWorkerException):
87 pass
88
89
90 class VimInteractionBase:
91 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
92 It implements methods that does nothing and return ok"""
93
94 def __init__(self, db, my_vims, db_vims, logger):
95 self.db = db
96 self.logger = logger
97 self.my_vims = my_vims
98 self.db_vims = db_vims
99
100 def new(self, ro_task, task_index, task_depends):
101 return "BUILD", {}
102
103 def refresh(self, ro_task):
104 """skip calling VIM to get image, flavor status. Assumes ok"""
105 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
106 return "FAILED", {}
107
108 return "DONE", {}
109
110 def delete(self, ro_task, task_index):
111 """skip calling VIM to delete image. Assumes ok"""
112 return "DONE", {}
113
114 def exec(self, ro_task, task_index, task_depends):
115 return "DONE", None, None
116
117
118 class VimInteractionNet(VimInteractionBase):
119 def new(self, ro_task, task_index, task_depends):
120 vim_net_id = None
121 task = ro_task["tasks"][task_index]
122 task_id = task["task_id"]
123 created = False
124 created_items = {}
125 target_vim = self.my_vims[ro_task["target_id"]]
126 mgmtnet = False
127 mgmtnet_defined_in_vim = False
128
129 try:
130 # FIND
131 if task.get("find_params"):
132 # if management, get configuration of VIM
133 if task["find_params"].get("filter_dict"):
134 vim_filter = task["find_params"]["filter_dict"]
135 # management network
136 elif task["find_params"].get("mgmt"):
137 mgmtnet = True
138 if deep_get(
139 self.db_vims[ro_task["target_id"]],
140 "config",
141 "management_network_id",
142 ):
143 mgmtnet_defined_in_vim = True
144 vim_filter = {
145 "id": self.db_vims[ro_task["target_id"]]["config"][
146 "management_network_id"
147 ]
148 }
149 elif deep_get(
150 self.db_vims[ro_task["target_id"]],
151 "config",
152 "management_network_name",
153 ):
154 mgmtnet_defined_in_vim = True
155 vim_filter = {
156 "name": self.db_vims[ro_task["target_id"]]["config"][
157 "management_network_name"
158 ]
159 }
160 else:
161 vim_filter = {"name": task["find_params"]["name"]}
162 else:
163 raise NsWorkerExceptionNotFound(
164 "Invalid find_params for new_net {}".format(task["find_params"])
165 )
166
167 vim_nets = target_vim.get_network_list(vim_filter)
168 if not vim_nets and not task.get("params"):
169 # If there is mgmt-network in the descriptor,
170 # there is no mapping of that network to a VIM network in the descriptor,
171 # also there is no mapping in the "--config" parameter or at VIM creation;
172 # that mgmt-network will be created.
173 if mgmtnet and not mgmtnet_defined_in_vim:
174 net_name = (
175 vim_filter.get("name")
176 if vim_filter.get("name")
177 else vim_filter.get("id")[:16]
178 )
179 vim_net_id, created_items = target_vim.new_network(
180 net_name, None
181 )
182 self.logger.debug(
183 "Created mgmt network vim_net_id: {}".format(vim_net_id)
184 )
185 created = True
186 else:
187 raise NsWorkerExceptionNotFound(
188 "Network not found with this criteria: '{}'".format(
189 task.get("find_params")
190 )
191 )
192 elif len(vim_nets) > 1:
193 raise NsWorkerException(
194 "More than one network found with this criteria: '{}'".format(
195 task["find_params"]
196 )
197 )
198
199 if vim_nets:
200 vim_net_id = vim_nets[0]["id"]
201 else:
202 # CREATE
203 params = task["params"]
204 vim_net_id, created_items = target_vim.new_network(**params)
205 created = True
206
207 ro_vim_item_update = {
208 "vim_id": vim_net_id,
209 "vim_status": "BUILD",
210 "created": created,
211 "created_items": created_items,
212 "vim_details": None,
213 }
214 self.logger.debug(
215 "task={} {} new-net={} created={}".format(
216 task_id, ro_task["target_id"], vim_net_id, created
217 )
218 )
219
220 return "BUILD", ro_vim_item_update
221 except (vimconn.VimConnException, NsWorkerException) as e:
222 self.logger.error(
223 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
224 )
225 ro_vim_item_update = {
226 "vim_status": "VIM_ERROR",
227 "created": created,
228 "vim_details": str(e),
229 }
230
231 return "FAILED", ro_vim_item_update
232
233 def refresh(self, ro_task):
234 """Call VIM to get network status"""
235 ro_task_id = ro_task["_id"]
236 target_vim = self.my_vims[ro_task["target_id"]]
237 vim_id = ro_task["vim_info"]["vim_id"]
238 net_to_refresh_list = [vim_id]
239
240 try:
241 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
242 vim_info = vim_dict[vim_id]
243
244 if vim_info["status"] == "ACTIVE":
245 task_status = "DONE"
246 elif vim_info["status"] == "BUILD":
247 task_status = "BUILD"
248 else:
249 task_status = "FAILED"
250 except vimconn.VimConnException as e:
251 # Mark all tasks at VIM_ERROR status
252 self.logger.error(
253 "ro_task={} vim={} get-net={}: {}".format(
254 ro_task_id, ro_task["target_id"], vim_id, e
255 )
256 )
257 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
258 task_status = "FAILED"
259
260 ro_vim_item_update = {}
261 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
262 ro_vim_item_update["vim_status"] = vim_info["status"]
263
264 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
265 ro_vim_item_update["vim_name"] = vim_info.get("name")
266
267 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
268 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
269 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
270 elif vim_info["status"] == "DELETED":
271 ro_vim_item_update["vim_id"] = None
272 ro_vim_item_update["vim_details"] = "Deleted externally"
273 else:
274 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
275 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
276
277 if ro_vim_item_update:
278 self.logger.debug(
279 "ro_task={} {} get-net={}: status={} {}".format(
280 ro_task_id,
281 ro_task["target_id"],
282 vim_id,
283 ro_vim_item_update.get("vim_status"),
284 ro_vim_item_update.get("vim_details")
285 if ro_vim_item_update.get("vim_status") != "ACTIVE"
286 else "",
287 )
288 )
289
290 return task_status, ro_vim_item_update
291
292 def delete(self, ro_task, task_index):
293 task = ro_task["tasks"][task_index]
294 task_id = task["task_id"]
295 net_vim_id = ro_task["vim_info"]["vim_id"]
296 ro_vim_item_update_ok = {
297 "vim_status": "DELETED",
298 "created": False,
299 "vim_details": "DELETED",
300 "vim_id": None,
301 }
302
303 try:
304 if net_vim_id or ro_task["vim_info"]["created_items"]:
305 target_vim = self.my_vims[ro_task["target_id"]]
306 target_vim.delete_network(
307 net_vim_id, ro_task["vim_info"]["created_items"]
308 )
309 except vimconn.VimConnNotFoundException:
310 ro_vim_item_update_ok["vim_details"] = "already deleted"
311 except vimconn.VimConnException as e:
312 self.logger.error(
313 "ro_task={} vim={} del-net={}: {}".format(
314 ro_task["_id"], ro_task["target_id"], net_vim_id, e
315 )
316 )
317 ro_vim_item_update = {
318 "vim_status": "VIM_ERROR",
319 "vim_details": "Error while deleting: {}".format(e),
320 }
321
322 return "FAILED", ro_vim_item_update
323
324 self.logger.debug(
325 "task={} {} del-net={} {}".format(
326 task_id,
327 ro_task["target_id"],
328 net_vim_id,
329 ro_vim_item_update_ok.get("vim_details", ""),
330 )
331 )
332
333 return "DONE", ro_vim_item_update_ok
334
335
336 class VimInteractionVdu(VimInteractionBase):
337 max_retries_inject_ssh_key = 20 # 20 times
338 time_retries_inject_ssh_key = 30 # wevery 30 seconds
339
340 def new(self, ro_task, task_index, task_depends):
341 task = ro_task["tasks"][task_index]
342 task_id = task["task_id"]
343 created = False
344 created_items = {}
345 target_vim = self.my_vims[ro_task["target_id"]]
346
347 try:
348 created = True
349 params = task["params"]
350 params_copy = deepcopy(params)
351 net_list = params_copy["net_list"]
352
353 for net in net_list:
354 # change task_id into network_id
355 if "net_id" in net and net["net_id"].startswith("TASK-"):
356 network_id = task_depends[net["net_id"]]
357
358 if not network_id:
359 raise NsWorkerException(
360 "Cannot create VM because depends on a network not created or found "
361 "for {}".format(net["net_id"])
362 )
363
364 net["net_id"] = network_id
365
366 if params_copy["image_id"].startswith("TASK-"):
367 params_copy["image_id"] = task_depends[params_copy["image_id"]]
368
369 if params_copy["flavor_id"].startswith("TASK-"):
370 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
371
372 affinity_group_list = params_copy["affinity_group_list"]
373 for affinity_group in affinity_group_list:
374 # change task_id into affinity_group_id
375 if "affinity_group_id" in affinity_group and affinity_group[
376 "affinity_group_id"
377 ].startswith("TASK-"):
378 affinity_group_id = task_depends[
379 affinity_group["affinity_group_id"]
380 ]
381
382 if not affinity_group_id:
383 raise NsWorkerException(
384 "found for {}".format(affinity_group["affinity_group_id"])
385 )
386
387 affinity_group["affinity_group_id"] = affinity_group_id
388
389 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
390 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
391
392 ro_vim_item_update = {
393 "vim_id": vim_vm_id,
394 "vim_status": "BUILD",
395 "created": created,
396 "created_items": created_items,
397 "vim_details": None,
398 "interfaces_vim_ids": interfaces,
399 "interfaces": [],
400 }
401 self.logger.debug(
402 "task={} {} new-vm={} created={}".format(
403 task_id, ro_task["target_id"], vim_vm_id, created
404 )
405 )
406
407 return "BUILD", ro_vim_item_update
408 except (vimconn.VimConnException, NsWorkerException) as e:
409 self.logger.error(
410 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
411 )
412 ro_vim_item_update = {
413 "vim_status": "VIM_ERROR",
414 "created": created,
415 "vim_details": str(e),
416 }
417
418 return "FAILED", ro_vim_item_update
419
420 def delete(self, ro_task, task_index):
421 task = ro_task["tasks"][task_index]
422 task_id = task["task_id"]
423 vm_vim_id = ro_task["vim_info"]["vim_id"]
424 ro_vim_item_update_ok = {
425 "vim_status": "DELETED",
426 "created": False,
427 "vim_details": "DELETED",
428 "vim_id": None,
429 }
430
431 try:
432 if vm_vim_id or ro_task["vim_info"]["created_items"]:
433 target_vim = self.my_vims[ro_task["target_id"]]
434 target_vim.delete_vminstance(
435 vm_vim_id, ro_task["vim_info"]["created_items"]
436 )
437 except vimconn.VimConnNotFoundException:
438 ro_vim_item_update_ok["vim_details"] = "already deleted"
439 except vimconn.VimConnException as e:
440 self.logger.error(
441 "ro_task={} vim={} del-vm={}: {}".format(
442 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
443 )
444 )
445 ro_vim_item_update = {
446 "vim_status": "VIM_ERROR",
447 "vim_details": "Error while deleting: {}".format(e),
448 }
449
450 return "FAILED", ro_vim_item_update
451
452 self.logger.debug(
453 "task={} {} del-vm={} {}".format(
454 task_id,
455 ro_task["target_id"],
456 vm_vim_id,
457 ro_vim_item_update_ok.get("vim_details", ""),
458 )
459 )
460
461 return "DONE", ro_vim_item_update_ok
462
463 def refresh(self, ro_task):
464 """Call VIM to get vm status"""
465 ro_task_id = ro_task["_id"]
466 target_vim = self.my_vims[ro_task["target_id"]]
467 vim_id = ro_task["vim_info"]["vim_id"]
468
469 if not vim_id:
470 return None, None
471
472 vm_to_refresh_list = [vim_id]
473 try:
474 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
475 vim_info = vim_dict[vim_id]
476
477 if vim_info["status"] == "ACTIVE":
478 task_status = "DONE"
479 elif vim_info["status"] == "BUILD":
480 task_status = "BUILD"
481 else:
482 task_status = "FAILED"
483
484 # try to load and parse vim_information
485 try:
486 vim_info_info = yaml.safe_load(vim_info["vim_info"])
487 if vim_info_info.get("name"):
488 vim_info["name"] = vim_info_info["name"]
489 except Exception:
490 pass
491 except vimconn.VimConnException as e:
492 # Mark all tasks at VIM_ERROR status
493 self.logger.error(
494 "ro_task={} vim={} get-vm={}: {}".format(
495 ro_task_id, ro_task["target_id"], vim_id, e
496 )
497 )
498 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
499 task_status = "FAILED"
500
501 ro_vim_item_update = {}
502
503 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
504 vim_interfaces = []
505 if vim_info.get("interfaces"):
506 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
507 iface = next(
508 (
509 iface
510 for iface in vim_info["interfaces"]
511 if vim_iface_id == iface["vim_interface_id"]
512 ),
513 None,
514 )
515 # if iface:
516 # iface.pop("vim_info", None)
517 vim_interfaces.append(iface)
518
519 task_create = next(
520 t
521 for t in ro_task["tasks"]
522 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
523 )
524 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
525 vim_interfaces[task_create["mgmt_vnf_interface"]][
526 "mgmt_vnf_interface"
527 ] = True
528
529 mgmt_vdu_iface = task_create.get(
530 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
531 )
532 if vim_interfaces:
533 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
534
535 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
536 ro_vim_item_update["interfaces"] = vim_interfaces
537
538 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
539 ro_vim_item_update["vim_status"] = vim_info["status"]
540
541 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
542 ro_vim_item_update["vim_name"] = vim_info.get("name")
543
544 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
545 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
546 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
547 elif vim_info["status"] == "DELETED":
548 ro_vim_item_update["vim_id"] = None
549 ro_vim_item_update["vim_details"] = "Deleted externally"
550 else:
551 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
552 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
553
554 if ro_vim_item_update:
555 self.logger.debug(
556 "ro_task={} {} get-vm={}: status={} {}".format(
557 ro_task_id,
558 ro_task["target_id"],
559 vim_id,
560 ro_vim_item_update.get("vim_status"),
561 ro_vim_item_update.get("vim_details")
562 if ro_vim_item_update.get("vim_status") != "ACTIVE"
563 else "",
564 )
565 )
566
567 return task_status, ro_vim_item_update
568
569 def exec(self, ro_task, task_index, task_depends):
570 task = ro_task["tasks"][task_index]
571 task_id = task["task_id"]
572 target_vim = self.my_vims[ro_task["target_id"]]
573 db_task_update = {"retries": 0}
574 retries = task.get("retries", 0)
575
576 try:
577 params = task["params"]
578 params_copy = deepcopy(params)
579 params_copy["ro_key"] = self.db.decrypt(
580 params_copy.pop("private_key"),
581 params_copy.pop("schema_version"),
582 params_copy.pop("salt"),
583 )
584 params_copy["ip_addr"] = params_copy.pop("ip_address")
585 target_vim.inject_user_key(**params_copy)
586 self.logger.debug(
587 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
588 )
589
590 return (
591 "DONE",
592 None,
593 db_task_update,
594 ) # params_copy["key"]
595 except (vimconn.VimConnException, NsWorkerException) as e:
596 retries += 1
597
598 if retries < self.max_retries_inject_ssh_key:
599 return (
600 "BUILD",
601 None,
602 {
603 "retries": retries,
604 "next_retry": self.time_retries_inject_ssh_key,
605 },
606 )
607
608 self.logger.error(
609 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
610 )
611 ro_vim_item_update = {"vim_details": str(e)}
612
613 return "FAILED", ro_vim_item_update, db_task_update
614
615
616 class VimInteractionImage(VimInteractionBase):
617 def new(self, ro_task, task_index, task_depends):
618 task = ro_task["tasks"][task_index]
619 task_id = task["task_id"]
620 created = False
621 created_items = {}
622 target_vim = self.my_vims[ro_task["target_id"]]
623
624 try:
625 # FIND
626 if task.get("find_params"):
627 vim_images = target_vim.get_image_list(**task["find_params"])
628
629 if not vim_images:
630 raise NsWorkerExceptionNotFound(
631 "Image not found with this criteria: '{}'".format(
632 task["find_params"]
633 )
634 )
635 elif len(vim_images) > 1:
636 raise NsWorkerException(
637 "More than one image found with this criteria: '{}'".format(
638 task["find_params"]
639 )
640 )
641 else:
642 vim_image_id = vim_images[0]["id"]
643
644 ro_vim_item_update = {
645 "vim_id": vim_image_id,
646 "vim_status": "DONE",
647 "created": created,
648 "created_items": created_items,
649 "vim_details": None,
650 }
651 self.logger.debug(
652 "task={} {} new-image={} created={}".format(
653 task_id, ro_task["target_id"], vim_image_id, created
654 )
655 )
656
657 return "DONE", ro_vim_item_update
658 except (NsWorkerException, vimconn.VimConnException) as e:
659 self.logger.error(
660 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
661 )
662 ro_vim_item_update = {
663 "vim_status": "VIM_ERROR",
664 "created": created,
665 "vim_details": str(e),
666 }
667
668 return "FAILED", ro_vim_item_update
669
670
671 class VimInteractionFlavor(VimInteractionBase):
672 def delete(self, ro_task, task_index):
673 task = ro_task["tasks"][task_index]
674 task_id = task["task_id"]
675 flavor_vim_id = ro_task["vim_info"]["vim_id"]
676 ro_vim_item_update_ok = {
677 "vim_status": "DELETED",
678 "created": False,
679 "vim_details": "DELETED",
680 "vim_id": None,
681 }
682
683 try:
684 if flavor_vim_id:
685 target_vim = self.my_vims[ro_task["target_id"]]
686 target_vim.delete_flavor(flavor_vim_id)
687 except vimconn.VimConnNotFoundException:
688 ro_vim_item_update_ok["vim_details"] = "already deleted"
689 except vimconn.VimConnException as e:
690 self.logger.error(
691 "ro_task={} vim={} del-flavor={}: {}".format(
692 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
693 )
694 )
695 ro_vim_item_update = {
696 "vim_status": "VIM_ERROR",
697 "vim_details": "Error while deleting: {}".format(e),
698 }
699
700 return "FAILED", ro_vim_item_update
701
702 self.logger.debug(
703 "task={} {} del-flavor={} {}".format(
704 task_id,
705 ro_task["target_id"],
706 flavor_vim_id,
707 ro_vim_item_update_ok.get("vim_details", ""),
708 )
709 )
710
711 return "DONE", ro_vim_item_update_ok
712
713 def new(self, ro_task, task_index, task_depends):
714 task = ro_task["tasks"][task_index]
715 task_id = task["task_id"]
716 created = False
717 created_items = {}
718 target_vim = self.my_vims[ro_task["target_id"]]
719
720 try:
721 # FIND
722 vim_flavor_id = None
723
724 if task.get("find_params"):
725 try:
726 flavor_data = task["find_params"]["flavor_data"]
727 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
728 except vimconn.VimConnNotFoundException:
729 pass
730
731 if not vim_flavor_id and task.get("params"):
732 # CREATE
733 flavor_data = task["params"]["flavor_data"]
734 vim_flavor_id = target_vim.new_flavor(flavor_data)
735 created = True
736
737 ro_vim_item_update = {
738 "vim_id": vim_flavor_id,
739 "vim_status": "DONE",
740 "created": created,
741 "created_items": created_items,
742 "vim_details": None,
743 }
744 self.logger.debug(
745 "task={} {} new-flavor={} created={}".format(
746 task_id, ro_task["target_id"], vim_flavor_id, created
747 )
748 )
749
750 return "DONE", ro_vim_item_update
751 except (vimconn.VimConnException, NsWorkerException) as e:
752 self.logger.error(
753 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
754 )
755 ro_vim_item_update = {
756 "vim_status": "VIM_ERROR",
757 "created": created,
758 "vim_details": str(e),
759 }
760
761 return "FAILED", ro_vim_item_update
762
763
764 class VimInteractionAffinityGroup(VimInteractionBase):
765 def delete(self, ro_task, task_index):
766 task = ro_task["tasks"][task_index]
767 task_id = task["task_id"]
768 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
769 ro_vim_item_update_ok = {
770 "vim_status": "DELETED",
771 "created": False,
772 "vim_details": "DELETED",
773 "vim_id": None,
774 }
775
776 try:
777 if affinity_group_vim_id:
778 target_vim = self.my_vims[ro_task["target_id"]]
779 target_vim.delete_affinity_group(affinity_group_vim_id)
780 except vimconn.VimConnNotFoundException:
781 ro_vim_item_update_ok["vim_details"] = "already deleted"
782 except vimconn.VimConnException as e:
783 self.logger.error(
784 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
785 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
786 )
787 )
788 ro_vim_item_update = {
789 "vim_status": "VIM_ERROR",
790 "vim_details": "Error while deleting: {}".format(e),
791 }
792
793 return "FAILED", ro_vim_item_update
794
795 self.logger.debug(
796 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
797 task_id,
798 ro_task["target_id"],
799 affinity_group_vim_id,
800 ro_vim_item_update_ok.get("vim_details", ""),
801 )
802 )
803
804 return "DONE", ro_vim_item_update_ok
805
806 def new(self, ro_task, task_index, task_depends):
807 task = ro_task["tasks"][task_index]
808 task_id = task["task_id"]
809 created = False
810 created_items = {}
811 target_vim = self.my_vims[ro_task["target_id"]]
812
813 try:
814 affinity_group_vim_id = None
815 affinity_group_data = None
816
817 if task.get("params"):
818 affinity_group_data = task["params"].get("affinity_group_data")
819
820 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
821 try:
822 param_affinity_group_id = task["params"]["affinity_group_data"].get(
823 "vim-affinity-group-id"
824 )
825 affinity_group_vim_id = target_vim.get_affinity_group(
826 param_affinity_group_id
827 ).get("id")
828 except vimconn.VimConnNotFoundException:
829 self.logger.error(
830 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
831 "could not be found at VIM. Creating a new one.".format(
832 task_id, ro_task["target_id"], param_affinity_group_id
833 )
834 )
835
836 if not affinity_group_vim_id and affinity_group_data:
837 affinity_group_vim_id = target_vim.new_affinity_group(
838 affinity_group_data
839 )
840 created = True
841
842 ro_vim_item_update = {
843 "vim_id": affinity_group_vim_id,
844 "vim_status": "DONE",
845 "created": created,
846 "created_items": created_items,
847 "vim_details": None,
848 }
849 self.logger.debug(
850 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
851 task_id, ro_task["target_id"], affinity_group_vim_id, created
852 )
853 )
854
855 return "DONE", ro_vim_item_update
856 except (vimconn.VimConnException, NsWorkerException) as e:
857 self.logger.error(
858 "task={} vim={} new-affinity-or-anti-affinity-group:"
859 " {}".format(task_id, ro_task["target_id"], e)
860 )
861 ro_vim_item_update = {
862 "vim_status": "VIM_ERROR",
863 "created": created,
864 "vim_details": str(e),
865 }
866
867 return "FAILED", ro_vim_item_update
868
869
870 class VimInteractionSdnNet(VimInteractionBase):
871 @staticmethod
872 def _match_pci(port_pci, mapping):
873 """
874 Check if port_pci matches with mapping
875 mapping can have brackets to indicate that several chars are accepted. e.g
876 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
877 :param port_pci: text
878 :param mapping: text, can contain brackets to indicate several chars are available
879 :return: True if matches, False otherwise
880 """
881 if not port_pci or not mapping:
882 return False
883 if port_pci == mapping:
884 return True
885
886 mapping_index = 0
887 pci_index = 0
888 while True:
889 bracket_start = mapping.find("[", mapping_index)
890
891 if bracket_start == -1:
892 break
893
894 bracket_end = mapping.find("]", bracket_start)
895 if bracket_end == -1:
896 break
897
898 length = bracket_start - mapping_index
899 if (
900 length
901 and port_pci[pci_index : pci_index + length]
902 != mapping[mapping_index:bracket_start]
903 ):
904 return False
905
906 if (
907 port_pci[pci_index + length]
908 not in mapping[bracket_start + 1 : bracket_end]
909 ):
910 return False
911
912 pci_index += length + 1
913 mapping_index = bracket_end + 1
914
915 if port_pci[pci_index:] != mapping[mapping_index:]:
916 return False
917
918 return True
919
920 def _get_interfaces(self, vlds_to_connect, vim_account_id):
921 """
922 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
923 :param vim_account_id:
924 :return:
925 """
926 interfaces = []
927
928 for vld in vlds_to_connect:
929 table, _, db_id = vld.partition(":")
930 db_id, _, vld = db_id.partition(":")
931 _, _, vld_id = vld.partition(".")
932
933 if table == "vnfrs":
934 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
935 iface_key = "vnf-vld-id"
936 else: # table == "nsrs"
937 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
938 iface_key = "ns-vld-id"
939
940 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
941
942 for db_vnfr in db_vnfrs:
943 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
944 for iface_index, interface in enumerate(vdur["interfaces"]):
945 if interface.get(iface_key) == vld_id and interface.get(
946 "type"
947 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
948 # only SR-IOV o PT
949 interface_ = interface.copy()
950 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
951 db_vnfr["_id"], vdu_index, iface_index
952 )
953
954 if vdur.get("status") == "ERROR":
955 interface_["status"] = "ERROR"
956
957 interfaces.append(interface_)
958
959 return interfaces
960
961 def refresh(self, ro_task):
962 # look for task create
963 task_create_index, _ = next(
964 i_t
965 for i_t in enumerate(ro_task["tasks"])
966 if i_t[1]
967 and i_t[1]["action"] == "CREATE"
968 and i_t[1]["status"] != "FINISHED"
969 )
970
971 return self.new(ro_task, task_create_index, None)
972
973 def new(self, ro_task, task_index, task_depends):
974
975 task = ro_task["tasks"][task_index]
976 task_id = task["task_id"]
977 target_vim = self.my_vims[ro_task["target_id"]]
978
979 sdn_net_id = ro_task["vim_info"]["vim_id"]
980
981 created_items = ro_task["vim_info"].get("created_items")
982 connected_ports = ro_task["vim_info"].get("connected_ports", [])
983 new_connected_ports = []
984 last_update = ro_task["vim_info"].get("last_update", 0)
985 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
986 error_list = []
987 created = ro_task["vim_info"].get("created", False)
988
989 try:
990 # CREATE
991 params = task["params"]
992 vlds_to_connect = params["vlds"]
993 associated_vim = params["target_vim"]
994 # external additional ports
995 additional_ports = params.get("sdn-ports") or ()
996 _, _, vim_account_id = associated_vim.partition(":")
997
998 if associated_vim:
999 # get associated VIM
1000 if associated_vim not in self.db_vims:
1001 self.db_vims[associated_vim] = self.db.get_one(
1002 "vim_accounts", {"_id": vim_account_id}
1003 )
1004
1005 db_vim = self.db_vims[associated_vim]
1006
1007 # look for ports to connect
1008 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1009 # print(ports)
1010
1011 sdn_ports = []
1012 pending_ports = error_ports = 0
1013 vlan_used = None
1014 sdn_need_update = False
1015
1016 for port in ports:
1017 vlan_used = port.get("vlan") or vlan_used
1018
1019 # TODO. Do not connect if already done
1020 if not port.get("compute_node") or not port.get("pci"):
1021 if port.get("status") == "ERROR":
1022 error_ports += 1
1023 else:
1024 pending_ports += 1
1025 continue
1026
1027 pmap = None
1028 compute_node_mappings = next(
1029 (
1030 c
1031 for c in db_vim["config"].get("sdn-port-mapping", ())
1032 if c and c["compute_node"] == port["compute_node"]
1033 ),
1034 None,
1035 )
1036
1037 if compute_node_mappings:
1038 # process port_mapping pci of type 0000:af:1[01].[1357]
1039 pmap = next(
1040 (
1041 p
1042 for p in compute_node_mappings["ports"]
1043 if self._match_pci(port["pci"], p.get("pci"))
1044 ),
1045 None,
1046 )
1047
1048 if not pmap:
1049 if not db_vim["config"].get("mapping_not_needed"):
1050 error_list.append(
1051 "Port mapping not found for compute_node={} pci={}".format(
1052 port["compute_node"], port["pci"]
1053 )
1054 )
1055 continue
1056
1057 pmap = {}
1058
1059 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1060 new_port = {
1061 "service_endpoint_id": pmap.get("service_endpoint_id")
1062 or service_endpoint_id,
1063 "service_endpoint_encapsulation_type": "dot1q"
1064 if port["type"] == "SR-IOV"
1065 else None,
1066 "service_endpoint_encapsulation_info": {
1067 "vlan": port.get("vlan"),
1068 "mac": port.get("mac-address"),
1069 "device_id": pmap.get("device_id") or port["compute_node"],
1070 "device_interface_id": pmap.get("device_interface_id")
1071 or port["pci"],
1072 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1073 "switch_port": pmap.get("switch_port"),
1074 "service_mapping_info": pmap.get("service_mapping_info"),
1075 },
1076 }
1077
1078 # TODO
1079 # if port["modified_at"] > last_update:
1080 # sdn_need_update = True
1081 new_connected_ports.append(port["id"]) # TODO
1082 sdn_ports.append(new_port)
1083
1084 if error_ports:
1085 error_list.append(
1086 "{} interfaces have not been created as VDU is on ERROR status".format(
1087 error_ports
1088 )
1089 )
1090
1091 # connect external ports
1092 for index, additional_port in enumerate(additional_ports):
1093 additional_port_id = additional_port.get(
1094 "service_endpoint_id"
1095 ) or "external-{}".format(index)
1096 sdn_ports.append(
1097 {
1098 "service_endpoint_id": additional_port_id,
1099 "service_endpoint_encapsulation_type": additional_port.get(
1100 "service_endpoint_encapsulation_type", "dot1q"
1101 ),
1102 "service_endpoint_encapsulation_info": {
1103 "vlan": additional_port.get("vlan") or vlan_used,
1104 "mac": additional_port.get("mac_address"),
1105 "device_id": additional_port.get("device_id"),
1106 "device_interface_id": additional_port.get(
1107 "device_interface_id"
1108 ),
1109 "switch_dpid": additional_port.get("switch_dpid")
1110 or additional_port.get("switch_id"),
1111 "switch_port": additional_port.get("switch_port"),
1112 "service_mapping_info": additional_port.get(
1113 "service_mapping_info"
1114 ),
1115 },
1116 }
1117 )
1118 new_connected_ports.append(additional_port_id)
1119 sdn_info = ""
1120
1121 # if there are more ports to connect or they have been modified, call create/update
1122 if error_list:
1123 sdn_status = "ERROR"
1124 sdn_info = "; ".join(error_list)
1125 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1126 last_update = time.time()
1127
1128 if not sdn_net_id:
1129 if len(sdn_ports) < 2:
1130 sdn_status = "ACTIVE"
1131
1132 if not pending_ports:
1133 self.logger.debug(
1134 "task={} {} new-sdn-net done, less than 2 ports".format(
1135 task_id, ro_task["target_id"]
1136 )
1137 )
1138 else:
1139 net_type = params.get("type") or "ELAN"
1140 (
1141 sdn_net_id,
1142 created_items,
1143 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1144 created = True
1145 self.logger.debug(
1146 "task={} {} new-sdn-net={} created={}".format(
1147 task_id, ro_task["target_id"], sdn_net_id, created
1148 )
1149 )
1150 else:
1151 created_items = target_vim.edit_connectivity_service(
1152 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1153 )
1154 created = True
1155 self.logger.debug(
1156 "task={} {} update-sdn-net={} created={}".format(
1157 task_id, ro_task["target_id"], sdn_net_id, created
1158 )
1159 )
1160
1161 connected_ports = new_connected_ports
1162 elif sdn_net_id:
1163 wim_status_dict = target_vim.get_connectivity_service_status(
1164 sdn_net_id, conn_info=created_items
1165 )
1166 sdn_status = wim_status_dict["sdn_status"]
1167
1168 if wim_status_dict.get("sdn_info"):
1169 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1170
1171 if wim_status_dict.get("error_msg"):
1172 sdn_info = wim_status_dict.get("error_msg") or ""
1173
1174 if pending_ports:
1175 if sdn_status != "ERROR":
1176 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1177 len(ports) - pending_ports, len(ports)
1178 )
1179
1180 if sdn_status == "ACTIVE":
1181 sdn_status = "BUILD"
1182
1183 ro_vim_item_update = {
1184 "vim_id": sdn_net_id,
1185 "vim_status": sdn_status,
1186 "created": created,
1187 "created_items": created_items,
1188 "connected_ports": connected_ports,
1189 "vim_details": sdn_info,
1190 "last_update": last_update,
1191 }
1192
1193 return sdn_status, ro_vim_item_update
1194 except Exception as e:
1195 self.logger.error(
1196 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1197 exc_info=not isinstance(
1198 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1199 ),
1200 )
1201 ro_vim_item_update = {
1202 "vim_status": "VIM_ERROR",
1203 "created": created,
1204 "vim_details": str(e),
1205 }
1206
1207 return "FAILED", ro_vim_item_update
1208
1209 def delete(self, ro_task, task_index):
1210 task = ro_task["tasks"][task_index]
1211 task_id = task["task_id"]
1212 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1213 ro_vim_item_update_ok = {
1214 "vim_status": "DELETED",
1215 "created": False,
1216 "vim_details": "DELETED",
1217 "vim_id": None,
1218 }
1219
1220 try:
1221 if sdn_vim_id:
1222 target_vim = self.my_vims[ro_task["target_id"]]
1223 target_vim.delete_connectivity_service(
1224 sdn_vim_id, ro_task["vim_info"].get("created_items")
1225 )
1226
1227 except Exception as e:
1228 if (
1229 isinstance(e, sdnconn.SdnConnectorError)
1230 and e.http_code == HTTPStatus.NOT_FOUND.value
1231 ):
1232 ro_vim_item_update_ok["vim_details"] = "already deleted"
1233 else:
1234 self.logger.error(
1235 "ro_task={} vim={} del-sdn-net={}: {}".format(
1236 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1237 ),
1238 exc_info=not isinstance(
1239 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1240 ),
1241 )
1242 ro_vim_item_update = {
1243 "vim_status": "VIM_ERROR",
1244 "vim_details": "Error while deleting: {}".format(e),
1245 }
1246
1247 return "FAILED", ro_vim_item_update
1248
1249 self.logger.debug(
1250 "task={} {} del-sdn-net={} {}".format(
1251 task_id,
1252 ro_task["target_id"],
1253 sdn_vim_id,
1254 ro_vim_item_update_ok.get("vim_details", ""),
1255 )
1256 )
1257
1258 return "DONE", ro_vim_item_update_ok
1259
1260
1261 class NsWorker(threading.Thread):
1262 REFRESH_BUILD = 5 # 5 seconds
1263 REFRESH_ACTIVE = 60 # 1 minute
1264 REFRESH_ERROR = 600
1265 REFRESH_IMAGE = 3600 * 10
1266 REFRESH_DELETE = 3600 * 10
1267 QUEUE_SIZE = 100
1268 terminate = False
1269
1270 def __init__(self, worker_index, config, plugins, db):
1271 """
1272
1273 :param worker_index: thread index
1274 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1275 :param plugins: global shared dict with the loaded plugins
1276 :param db: database class instance to use
1277 """
1278 threading.Thread.__init__(self)
1279 self.config = config
1280 self.plugins = plugins
1281 self.plugin_name = "unknown"
1282 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1283 self.worker_index = worker_index
1284 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1285 # targetvim: vimplugin class
1286 self.my_vims = {}
1287 # targetvim: vim information from database
1288 self.db_vims = {}
1289 # targetvim list
1290 self.vim_targets = []
1291 self.my_id = config["process_id"] + ":" + str(worker_index)
1292 self.db = db
1293 self.item2class = {
1294 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1295 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1296 "image": VimInteractionImage(
1297 self.db, self.my_vims, self.db_vims, self.logger
1298 ),
1299 "flavor": VimInteractionFlavor(
1300 self.db, self.my_vims, self.db_vims, self.logger
1301 ),
1302 "sdn_net": VimInteractionSdnNet(
1303 self.db, self.my_vims, self.db_vims, self.logger
1304 ),
1305 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1306 self.db, self.my_vims, self.db_vims, self.logger
1307 ),
1308 }
1309 self.time_last_task_processed = None
1310 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1311 self.tasks_to_delete = []
1312 # it is idle when there are not vim_targets associated
1313 self.idle = True
1314 self.task_locked_time = config["global"]["task_locked_time"]
1315
1316 def insert_task(self, task):
1317 try:
1318 self.task_queue.put(task, False)
1319 return None
1320 except queue.Full:
1321 raise NsWorkerException("timeout inserting a task")
1322
1323 def terminate(self):
1324 self.insert_task("exit")
1325
1326 def del_task(self, task):
1327 with self.task_lock:
1328 if task["status"] == "SCHEDULED":
1329 task["status"] = "SUPERSEDED"
1330 return True
1331 else: # task["status"] == "processing"
1332 self.task_lock.release()
1333 return False
1334
1335 def _process_vim_config(self, target_id, db_vim):
1336 """
1337 Process vim config, creating vim configuration files as ca_cert
1338 :param target_id: vim/sdn/wim + id
1339 :param db_vim: Vim dictionary obtained from database
1340 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1341 """
1342 if not db_vim.get("config"):
1343 return
1344
1345 file_name = ""
1346
1347 try:
1348 if db_vim["config"].get("ca_cert_content"):
1349 file_name = "{}:{}".format(target_id, self.worker_index)
1350
1351 try:
1352 mkdir(file_name)
1353 except FileExistsError:
1354 pass
1355
1356 file_name = file_name + "/ca_cert"
1357
1358 with open(file_name, "w") as f:
1359 f.write(db_vim["config"]["ca_cert_content"])
1360 del db_vim["config"]["ca_cert_content"]
1361 db_vim["config"]["ca_cert"] = file_name
1362 except Exception as e:
1363 raise NsWorkerException(
1364 "Error writing to file '{}': {}".format(file_name, e)
1365 )
1366
1367 def _load_plugin(self, name, type="vim"):
1368 # type can be vim or sdn
1369 if "rovim_dummy" not in self.plugins:
1370 self.plugins["rovim_dummy"] = VimDummyConnector
1371
1372 if "rosdn_dummy" not in self.plugins:
1373 self.plugins["rosdn_dummy"] = SdnDummyConnector
1374
1375 if name in self.plugins:
1376 return self.plugins[name]
1377
1378 try:
1379 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1380 self.plugins[name] = ep.load()
1381 except Exception as e:
1382 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1383
1384 if name and name not in self.plugins:
1385 raise NsWorkerException(
1386 "Plugin 'osm_{n}' has not been installed".format(n=name)
1387 )
1388
1389 return self.plugins[name]
1390
1391 def _unload_vim(self, target_id):
1392 """
1393 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1394 :param target_id: Contains type:_id; where type can be 'vim', ...
1395 :return: None.
1396 """
1397 try:
1398 self.db_vims.pop(target_id, None)
1399 self.my_vims.pop(target_id, None)
1400
1401 if target_id in self.vim_targets:
1402 self.vim_targets.remove(target_id)
1403
1404 self.logger.info("Unloaded {}".format(target_id))
1405 rmtree("{}:{}".format(target_id, self.worker_index))
1406 except FileNotFoundError:
1407 pass # this is raised by rmtree if folder does not exist
1408 except Exception as e:
1409 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1410
1411 def _check_vim(self, target_id):
1412 """
1413 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1414 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1415 :return: None.
1416 """
1417 target, _, _id = target_id.partition(":")
1418 now = time.time()
1419 update_dict = {}
1420 unset_dict = {}
1421 op_text = ""
1422 step = ""
1423 loaded = target_id in self.vim_targets
1424 target_database = (
1425 "vim_accounts"
1426 if target == "vim"
1427 else "wim_accounts"
1428 if target == "wim"
1429 else "sdns"
1430 )
1431
1432 try:
1433 step = "Getting {} from db".format(target_id)
1434 db_vim = self.db.get_one(target_database, {"_id": _id})
1435
1436 for op_index, operation in enumerate(
1437 db_vim["_admin"].get("operations", ())
1438 ):
1439 if operation["operationState"] != "PROCESSING":
1440 continue
1441
1442 locked_at = operation.get("locked_at")
1443
1444 if locked_at is not None and locked_at >= now - self.task_locked_time:
1445 # some other thread is doing this operation
1446 return
1447
1448 # lock
1449 op_text = "_admin.operations.{}.".format(op_index)
1450
1451 if not self.db.set_one(
1452 target_database,
1453 q_filter={
1454 "_id": _id,
1455 op_text + "operationState": "PROCESSING",
1456 op_text + "locked_at": locked_at,
1457 },
1458 update_dict={
1459 op_text + "locked_at": now,
1460 "admin.current_operation": op_index,
1461 },
1462 fail_on_empty=False,
1463 ):
1464 return
1465
1466 unset_dict[op_text + "locked_at"] = None
1467 unset_dict["current_operation"] = None
1468 step = "Loading " + target_id
1469 error_text = self._load_vim(target_id)
1470
1471 if not error_text:
1472 step = "Checking connectivity"
1473
1474 if target == "vim":
1475 self.my_vims[target_id].check_vim_connectivity()
1476 else:
1477 self.my_vims[target_id].check_credentials()
1478
1479 update_dict["_admin.operationalState"] = "ENABLED"
1480 update_dict["_admin.detailed-status"] = ""
1481 unset_dict[op_text + "detailed-status"] = None
1482 update_dict[op_text + "operationState"] = "COMPLETED"
1483
1484 return
1485
1486 except Exception as e:
1487 error_text = "{}: {}".format(step, e)
1488 self.logger.error("{} for {}: {}".format(step, target_id, e))
1489
1490 finally:
1491 if update_dict or unset_dict:
1492 if error_text:
1493 update_dict[op_text + "operationState"] = "FAILED"
1494 update_dict[op_text + "detailed-status"] = error_text
1495 unset_dict.pop(op_text + "detailed-status", None)
1496 update_dict["_admin.operationalState"] = "ERROR"
1497 update_dict["_admin.detailed-status"] = error_text
1498
1499 if op_text:
1500 update_dict[op_text + "statusEnteredTime"] = now
1501
1502 self.db.set_one(
1503 target_database,
1504 q_filter={"_id": _id},
1505 update_dict=update_dict,
1506 unset=unset_dict,
1507 fail_on_empty=False,
1508 )
1509
1510 if not loaded:
1511 self._unload_vim(target_id)
1512
1513 def _reload_vim(self, target_id):
1514 if target_id in self.vim_targets:
1515 self._load_vim(target_id)
1516 else:
1517 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1518 # just remove it to force load again next time it is needed
1519 self.db_vims.pop(target_id, None)
1520
1521 def _load_vim(self, target_id):
1522 """
1523 Load or reload a vim_account, sdn_controller or wim_account.
1524 Read content from database, load the plugin if not loaded.
1525 In case of error loading the plugin, it load a failing VIM_connector
1526 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1527 :param target_id: Contains type:_id; where type can be 'vim', ...
1528 :return: None if ok, descriptive text if error
1529 """
1530 target, _, _id = target_id.partition(":")
1531 target_database = (
1532 "vim_accounts"
1533 if target == "vim"
1534 else "wim_accounts"
1535 if target == "wim"
1536 else "sdns"
1537 )
1538 plugin_name = ""
1539 vim = None
1540
1541 try:
1542 step = "Getting {}={} from db".format(target, _id)
1543 # TODO process for wim, sdnc, ...
1544 vim = self.db.get_one(target_database, {"_id": _id})
1545
1546 # if deep_get(vim, "config", "sdn-controller"):
1547 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1548 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1549
1550 step = "Decrypting password"
1551 schema_version = vim.get("schema_version")
1552 self.db.encrypt_decrypt_fields(
1553 vim,
1554 "decrypt",
1555 fields=("password", "secret"),
1556 schema_version=schema_version,
1557 salt=_id,
1558 )
1559 self._process_vim_config(target_id, vim)
1560
1561 if target == "vim":
1562 plugin_name = "rovim_" + vim["vim_type"]
1563 step = "Loading plugin '{}'".format(plugin_name)
1564 vim_module_conn = self._load_plugin(plugin_name)
1565 step = "Loading {}'".format(target_id)
1566 self.my_vims[target_id] = vim_module_conn(
1567 uuid=vim["_id"],
1568 name=vim["name"],
1569 tenant_id=vim.get("vim_tenant_id"),
1570 tenant_name=vim.get("vim_tenant_name"),
1571 url=vim["vim_url"],
1572 url_admin=None,
1573 user=vim["vim_user"],
1574 passwd=vim["vim_password"],
1575 config=vim.get("config") or {},
1576 persistent_info={},
1577 )
1578 else: # sdn
1579 plugin_name = "rosdn_" + vim["type"]
1580 step = "Loading plugin '{}'".format(plugin_name)
1581 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1582 step = "Loading {}'".format(target_id)
1583 wim = deepcopy(vim)
1584 wim_config = wim.pop("config", {}) or {}
1585 wim["uuid"] = wim["_id"]
1586 wim["wim_url"] = wim["url"]
1587
1588 if wim.get("dpid"):
1589 wim_config["dpid"] = wim.pop("dpid")
1590
1591 if wim.get("switch_id"):
1592 wim_config["switch_id"] = wim.pop("switch_id")
1593
1594 # wim, wim_account, config
1595 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1596 self.db_vims[target_id] = vim
1597 self.error_status = None
1598
1599 self.logger.info(
1600 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1601 )
1602 except Exception as e:
1603 self.logger.error(
1604 "Cannot load {} plugin={}: {} {}".format(
1605 target_id, plugin_name, step, e
1606 )
1607 )
1608
1609 self.db_vims[target_id] = vim or {}
1610 self.db_vims[target_id] = FailingConnector(str(e))
1611 error_status = "{} Error: {}".format(step, e)
1612
1613 return error_status
1614 finally:
1615 if target_id not in self.vim_targets:
1616 self.vim_targets.append(target_id)
1617
1618 def _get_db_task(self):
1619 """
1620 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1621 :return: None
1622 """
1623 now = time.time()
1624
1625 if not self.time_last_task_processed:
1626 self.time_last_task_processed = now
1627
1628 try:
1629 while True:
1630 """
1631 # Log RO tasks only when loglevel is DEBUG
1632 if self.logger.getEffectiveLevel() == logging.DEBUG:
1633 self._log_ro_task(
1634 None,
1635 None,
1636 None,
1637 "TASK_WF",
1638 "task_locked_time="
1639 + str(self.task_locked_time)
1640 + " "
1641 + "time_last_task_processed="
1642 + str(self.time_last_task_processed)
1643 + " "
1644 + "now="
1645 + str(now),
1646 )
1647 """
1648 locked = self.db.set_one(
1649 "ro_tasks",
1650 q_filter={
1651 "target_id": self.vim_targets,
1652 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1653 "locked_at.lt": now - self.task_locked_time,
1654 "to_check_at.lt": self.time_last_task_processed,
1655 },
1656 update_dict={"locked_by": self.my_id, "locked_at": now},
1657 fail_on_empty=False,
1658 )
1659
1660 if locked:
1661 # read and return
1662 ro_task = self.db.get_one(
1663 "ro_tasks",
1664 q_filter={
1665 "target_id": self.vim_targets,
1666 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1667 "locked_at": now,
1668 },
1669 )
1670 return ro_task
1671
1672 if self.time_last_task_processed == now:
1673 self.time_last_task_processed = None
1674 return None
1675 else:
1676 self.time_last_task_processed = now
1677 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1678
1679 except DbException as e:
1680 self.logger.error("Database exception at _get_db_task: {}".format(e))
1681 except Exception as e:
1682 self.logger.critical(
1683 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1684 )
1685
1686 return None
1687
1688 def _get_db_all_tasks(self):
1689 """
1690 Read all content of table ro_tasks to log it
1691 :return: None
1692 """
1693 try:
1694 # Checking the content of the BD:
1695
1696 # read and return
1697 ro_task = self.db.get_list("ro_tasks")
1698 for rt in ro_task:
1699 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1700 return ro_task
1701
1702 except DbException as e:
1703 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1704 except Exception as e:
1705 self.logger.critical(
1706 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1707 )
1708
1709 return None
1710
1711 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1712 """
1713 Generate a log with the following format:
1714
1715 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1716 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1717 task_array_index;task_id;task_action;task_item;task_args
1718
1719 Example:
1720
1721 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1722 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1723 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1724 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1725 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1726 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1727 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1728 """
1729 try:
1730 line = []
1731 i = 0
1732 if ro_task is not None and isinstance(ro_task, dict):
1733 for t in ro_task["tasks"]:
1734 line.clear()
1735 line.append(mark)
1736 line.append(event)
1737 line.append(ro_task.get("_id", ""))
1738 line.append(str(ro_task.get("locked_at", "")))
1739 line.append(str(ro_task.get("modified_at", "")))
1740 line.append(str(ro_task.get("created_at", "")))
1741 line.append(str(ro_task.get("to_check_at", "")))
1742 line.append(str(ro_task.get("locked_by", "")))
1743 line.append(str(ro_task.get("target_id", "")))
1744 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1745 line.append(str(ro_task.get("vim_info", "")))
1746 line.append(str(ro_task.get("tasks", "")))
1747 if isinstance(t, dict):
1748 line.append(str(t.get("status", "")))
1749 line.append(str(t.get("action_id", "")))
1750 line.append(str(i))
1751 line.append(str(t.get("task_id", "")))
1752 line.append(str(t.get("action", "")))
1753 line.append(str(t.get("item", "")))
1754 line.append(str(t.get("find_params", "")))
1755 line.append(str(t.get("params", "")))
1756 else:
1757 line.extend([""] * 2)
1758 line.append(str(i))
1759 line.extend([""] * 5)
1760
1761 i += 1
1762 self.logger.debug(";".join(line))
1763 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1764 i = 0
1765 while True:
1766 st = "tasks.{}.status".format(i)
1767 if st not in db_ro_task_update:
1768 break
1769 line.clear()
1770 line.append(mark)
1771 line.append(event)
1772 line.append(db_ro_task_update.get("_id", ""))
1773 line.append(str(db_ro_task_update.get("locked_at", "")))
1774 line.append(str(db_ro_task_update.get("modified_at", "")))
1775 line.append("")
1776 line.append(str(db_ro_task_update.get("to_check_at", "")))
1777 line.append(str(db_ro_task_update.get("locked_by", "")))
1778 line.append("")
1779 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1780 line.append("")
1781 line.append(str(db_ro_task_update.get("vim_info", "")))
1782 line.append(str(str(db_ro_task_update).count(".status")))
1783 line.append(db_ro_task_update.get(st, ""))
1784 line.append("")
1785 line.append(str(i))
1786 line.extend([""] * 3)
1787 i += 1
1788 self.logger.debug(";".join(line))
1789
1790 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1791 line.clear()
1792 line.append(mark)
1793 line.append(event)
1794 line.append(db_ro_task_delete.get("_id", ""))
1795 line.append("")
1796 line.append(db_ro_task_delete.get("modified_at", ""))
1797 line.extend([""] * 13)
1798 self.logger.debug(";".join(line))
1799
1800 else:
1801 line.clear()
1802 line.append(mark)
1803 line.append(event)
1804 line.extend([""] * 16)
1805 self.logger.debug(";".join(line))
1806
1807 except Exception as e:
1808 self.logger.error("Error logging ro_task: {}".format(e))
1809
1810 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1811 """
1812 Determine if this task need to be done or superseded
1813 :return: None
1814 """
1815 my_task = ro_task["tasks"][task_index]
1816 task_id = my_task["task_id"]
1817 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1818 "created_items", False
1819 )
1820
1821 if my_task["status"] == "FAILED":
1822 return None, None # TODO need to be retry??
1823
1824 try:
1825 for index, task in enumerate(ro_task["tasks"]):
1826 if index == task_index or not task:
1827 continue # own task
1828
1829 if (
1830 my_task["target_record"] == task["target_record"]
1831 and task["action"] == "CREATE"
1832 ):
1833 # set to finished
1834 db_update["tasks.{}.status".format(index)] = task[
1835 "status"
1836 ] = "FINISHED"
1837 elif task["action"] == "CREATE" and task["status"] not in (
1838 "FINISHED",
1839 "SUPERSEDED",
1840 ):
1841 needed_delete = False
1842
1843 if needed_delete:
1844 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1845 else:
1846 return "SUPERSEDED", None
1847 except Exception as e:
1848 if not isinstance(e, NsWorkerException):
1849 self.logger.critical(
1850 "Unexpected exception at _delete_task task={}: {}".format(
1851 task_id, e
1852 ),
1853 exc_info=True,
1854 )
1855
1856 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1857
1858 def _create_task(self, ro_task, task_index, task_depends, db_update):
1859 """
1860 Determine if this task need to create something at VIM
1861 :return: None
1862 """
1863 my_task = ro_task["tasks"][task_index]
1864 task_id = my_task["task_id"]
1865 task_status = None
1866
1867 if my_task["status"] == "FAILED":
1868 return None, None # TODO need to be retry??
1869 elif my_task["status"] == "SCHEDULED":
1870 # check if already created by another task
1871 for index, task in enumerate(ro_task["tasks"]):
1872 if index == task_index or not task:
1873 continue # own task
1874
1875 if task["action"] == "CREATE" and task["status"] not in (
1876 "SCHEDULED",
1877 "FINISHED",
1878 "SUPERSEDED",
1879 ):
1880 return task["status"], "COPY_VIM_INFO"
1881
1882 try:
1883 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1884 ro_task, task_index, task_depends
1885 )
1886 # TODO update other CREATE tasks
1887 except Exception as e:
1888 if not isinstance(e, NsWorkerException):
1889 self.logger.error(
1890 "Error executing task={}: {}".format(task_id, e), exc_info=True
1891 )
1892
1893 task_status = "FAILED"
1894 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1895 # TODO update ro_vim_item_update
1896
1897 return task_status, ro_vim_item_update
1898 else:
1899 return None, None
1900
1901 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1902 """
1903 Look for dependency task
1904 :param task_id: Can be one of
1905 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1906 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1907 3. task.task_id: "<action_id>:number"
1908 :param ro_task:
1909 :param target_id:
1910 :return: database ro_task plus index of task
1911 """
1912 if (
1913 task_id.startswith("vim:")
1914 or task_id.startswith("sdn:")
1915 or task_id.startswith("wim:")
1916 ):
1917 target_id, _, task_id = task_id.partition(" ")
1918
1919 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1920 ro_task_dependency = self.db.get_one(
1921 "ro_tasks",
1922 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1923 fail_on_empty=False,
1924 )
1925
1926 if ro_task_dependency:
1927 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1928 if task["target_record_id"] == task_id:
1929 return ro_task_dependency, task_index
1930
1931 else:
1932 if ro_task:
1933 for task_index, task in enumerate(ro_task["tasks"]):
1934 if task and task["task_id"] == task_id:
1935 return ro_task, task_index
1936
1937 ro_task_dependency = self.db.get_one(
1938 "ro_tasks",
1939 q_filter={
1940 "tasks.ANYINDEX.task_id": task_id,
1941 "tasks.ANYINDEX.target_record.ne": None,
1942 },
1943 fail_on_empty=False,
1944 )
1945
1946 if ro_task_dependency:
1947 for task_index, task in ro_task_dependency["tasks"]:
1948 if task["task_id"] == task_id:
1949 return ro_task_dependency, task_index
1950 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1951
1952 def _process_pending_tasks(self, ro_task):
1953 ro_task_id = ro_task["_id"]
1954 now = time.time()
1955 # one day
1956 next_check_at = now + (24 * 60 * 60)
1957 db_ro_task_update = {}
1958
1959 def _update_refresh(new_status):
1960 # compute next_refresh
1961 nonlocal task
1962 nonlocal next_check_at
1963 nonlocal db_ro_task_update
1964 nonlocal ro_task
1965
1966 next_refresh = time.time()
1967
1968 if task["item"] in ("image", "flavor"):
1969 next_refresh += self.REFRESH_IMAGE
1970 elif new_status == "BUILD":
1971 next_refresh += self.REFRESH_BUILD
1972 elif new_status == "DONE":
1973 next_refresh += self.REFRESH_ACTIVE
1974 else:
1975 next_refresh += self.REFRESH_ERROR
1976
1977 next_check_at = min(next_check_at, next_refresh)
1978 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1979 ro_task["vim_info"]["refresh_at"] = next_refresh
1980
1981 try:
1982 """
1983 # Log RO tasks only when loglevel is DEBUG
1984 if self.logger.getEffectiveLevel() == logging.DEBUG:
1985 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1986 """
1987 # 0: get task_status_create
1988 lock_object = None
1989 task_status_create = None
1990 task_create = next(
1991 (
1992 t
1993 for t in ro_task["tasks"]
1994 if t
1995 and t["action"] == "CREATE"
1996 and t["status"] in ("BUILD", "DONE")
1997 ),
1998 None,
1999 )
2000
2001 if task_create:
2002 task_status_create = task_create["status"]
2003
2004 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2005 for task_action in ("DELETE", "CREATE", "EXEC"):
2006 db_vim_update = None
2007 new_status = None
2008
2009 for task_index, task in enumerate(ro_task["tasks"]):
2010 if not task:
2011 continue # task deleted
2012
2013 task_depends = {}
2014 target_update = None
2015
2016 if (
2017 (
2018 task_action in ("DELETE", "EXEC")
2019 and task["status"] not in ("SCHEDULED", "BUILD")
2020 )
2021 or task["action"] != task_action
2022 or (
2023 task_action == "CREATE"
2024 and task["status"] in ("FINISHED", "SUPERSEDED")
2025 )
2026 ):
2027 continue
2028
2029 task_path = "tasks.{}.status".format(task_index)
2030 try:
2031 db_vim_info_update = None
2032
2033 if task["status"] == "SCHEDULED":
2034 # check if tasks that this depends on have been completed
2035 dependency_not_completed = False
2036
2037 for dependency_task_id in task.get("depends_on") or ():
2038 (
2039 dependency_ro_task,
2040 dependency_task_index,
2041 ) = self._get_dependency(
2042 dependency_task_id, target_id=ro_task["target_id"]
2043 )
2044 dependency_task = dependency_ro_task["tasks"][
2045 dependency_task_index
2046 ]
2047
2048 if dependency_task["status"] == "SCHEDULED":
2049 dependency_not_completed = True
2050 next_check_at = min(
2051 next_check_at, dependency_ro_task["to_check_at"]
2052 )
2053 # must allow dependent task to be processed first
2054 # to do this set time after last_task_processed
2055 next_check_at = max(
2056 self.time_last_task_processed, next_check_at
2057 )
2058 break
2059 elif dependency_task["status"] == "FAILED":
2060 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2061 task["action"],
2062 task["item"],
2063 dependency_task["action"],
2064 dependency_task["item"],
2065 dependency_task_id,
2066 dependency_ro_task["vim_info"].get(
2067 "vim_details"
2068 ),
2069 )
2070 self.logger.error(
2071 "task={} {}".format(task["task_id"], error_text)
2072 )
2073 raise NsWorkerException(error_text)
2074
2075 task_depends[dependency_task_id] = dependency_ro_task[
2076 "vim_info"
2077 ]["vim_id"]
2078 task_depends[
2079 "TASK-{}".format(dependency_task_id)
2080 ] = dependency_ro_task["vim_info"]["vim_id"]
2081
2082 if dependency_not_completed:
2083 # TODO set at vim_info.vim_details that it is waiting
2084 continue
2085
2086 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2087 # the task of renew this locking. It will update database locket_at periodically
2088 if not lock_object:
2089 lock_object = LockRenew.add_lock_object(
2090 "ro_tasks", ro_task, self
2091 )
2092
2093 if task["action"] == "DELETE":
2094 (new_status, db_vim_info_update,) = self._delete_task(
2095 ro_task, task_index, task_depends, db_ro_task_update
2096 )
2097 new_status = (
2098 "FINISHED" if new_status == "DONE" else new_status
2099 )
2100 # ^with FINISHED instead of DONE it will not be refreshing
2101
2102 if new_status in ("FINISHED", "SUPERSEDED"):
2103 target_update = "DELETE"
2104 elif task["action"] == "EXEC":
2105 (
2106 new_status,
2107 db_vim_info_update,
2108 db_task_update,
2109 ) = self.item2class[task["item"]].exec(
2110 ro_task, task_index, task_depends
2111 )
2112 new_status = (
2113 "FINISHED" if new_status == "DONE" else new_status
2114 )
2115 # ^with FINISHED instead of DONE it will not be refreshing
2116
2117 if db_task_update:
2118 # load into database the modified db_task_update "retries" and "next_retry"
2119 if db_task_update.get("retries"):
2120 db_ro_task_update[
2121 "tasks.{}.retries".format(task_index)
2122 ] = db_task_update["retries"]
2123
2124 next_check_at = time.time() + db_task_update.get(
2125 "next_retry", 60
2126 )
2127 target_update = None
2128 elif task["action"] == "CREATE":
2129 if task["status"] == "SCHEDULED":
2130 if task_status_create:
2131 new_status = task_status_create
2132 target_update = "COPY_VIM_INFO"
2133 else:
2134 new_status, db_vim_info_update = self.item2class[
2135 task["item"]
2136 ].new(ro_task, task_index, task_depends)
2137 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2138 _update_refresh(new_status)
2139 else:
2140 if (
2141 ro_task["vim_info"]["refresh_at"]
2142 and now > ro_task["vim_info"]["refresh_at"]
2143 ):
2144 new_status, db_vim_info_update = self.item2class[
2145 task["item"]
2146 ].refresh(ro_task)
2147 _update_refresh(new_status)
2148 else:
2149 # The refresh is updated to avoid set the value of "refresh_at" to
2150 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2151 # because it can happen that in this case the task is never processed
2152 _update_refresh(task["status"])
2153
2154 except Exception as e:
2155 new_status = "FAILED"
2156 db_vim_info_update = {
2157 "vim_status": "VIM_ERROR",
2158 "vim_details": str(e),
2159 }
2160
2161 if not isinstance(
2162 e, (NsWorkerException, vimconn.VimConnException)
2163 ):
2164 self.logger.error(
2165 "Unexpected exception at _delete_task task={}: {}".format(
2166 task["task_id"], e
2167 ),
2168 exc_info=True,
2169 )
2170
2171 try:
2172 if db_vim_info_update:
2173 db_vim_update = db_vim_info_update.copy()
2174 db_ro_task_update.update(
2175 {
2176 "vim_info." + k: v
2177 for k, v in db_vim_info_update.items()
2178 }
2179 )
2180 ro_task["vim_info"].update(db_vim_info_update)
2181
2182 if new_status:
2183 if task_action == "CREATE":
2184 task_status_create = new_status
2185 db_ro_task_update[task_path] = new_status
2186
2187 if target_update or db_vim_update:
2188 if target_update == "DELETE":
2189 self._update_target(task, None)
2190 elif target_update == "COPY_VIM_INFO":
2191 self._update_target(task, ro_task["vim_info"])
2192 else:
2193 self._update_target(task, db_vim_update)
2194
2195 except Exception as e:
2196 if (
2197 isinstance(e, DbException)
2198 and e.http_code == HTTPStatus.NOT_FOUND
2199 ):
2200 # if the vnfrs or nsrs has been removed from database, this task must be removed
2201 self.logger.debug(
2202 "marking to delete task={}".format(task["task_id"])
2203 )
2204 self.tasks_to_delete.append(task)
2205 else:
2206 self.logger.error(
2207 "Unexpected exception at _update_target task={}: {}".format(
2208 task["task_id"], e
2209 ),
2210 exc_info=True,
2211 )
2212
2213 locked_at = ro_task["locked_at"]
2214
2215 if lock_object:
2216 locked_at = [
2217 lock_object["locked_at"],
2218 lock_object["locked_at"] + self.task_locked_time,
2219 ]
2220 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2221 # contain exactly locked_at + self.task_locked_time
2222 LockRenew.remove_lock_object(lock_object)
2223
2224 q_filter = {
2225 "_id": ro_task["_id"],
2226 "to_check_at": ro_task["to_check_at"],
2227 "locked_at": locked_at,
2228 }
2229 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2230 # outside this task (by ro_nbi) do not update it
2231 db_ro_task_update["locked_by"] = None
2232 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2233 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2234 db_ro_task_update["modified_at"] = now
2235 db_ro_task_update["to_check_at"] = next_check_at
2236
2237 """
2238 # Log RO tasks only when loglevel is DEBUG
2239 if self.logger.getEffectiveLevel() == logging.DEBUG:
2240 db_ro_task_update_log = db_ro_task_update.copy()
2241 db_ro_task_update_log["_id"] = q_filter["_id"]
2242 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2243 """
2244
2245 if not self.db.set_one(
2246 "ro_tasks",
2247 update_dict=db_ro_task_update,
2248 q_filter=q_filter,
2249 fail_on_empty=False,
2250 ):
2251 del db_ro_task_update["to_check_at"]
2252 del q_filter["to_check_at"]
2253 """
2254 # Log RO tasks only when loglevel is DEBUG
2255 if self.logger.getEffectiveLevel() == logging.DEBUG:
2256 self._log_ro_task(
2257 None,
2258 db_ro_task_update_log,
2259 None,
2260 "TASK_WF",
2261 "SET_TASK " + str(q_filter),
2262 )
2263 """
2264 self.db.set_one(
2265 "ro_tasks",
2266 q_filter=q_filter,
2267 update_dict=db_ro_task_update,
2268 fail_on_empty=True,
2269 )
2270 except DbException as e:
2271 self.logger.error(
2272 "ro_task={} Error updating database {}".format(ro_task_id, e)
2273 )
2274 except Exception as e:
2275 self.logger.error(
2276 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2277 )
2278
2279 def _update_target(self, task, ro_vim_item_update):
2280 table, _, temp = task["target_record"].partition(":")
2281 _id, _, path_vim_status = temp.partition(":")
2282 path_item = path_vim_status[: path_vim_status.rfind(".")]
2283 path_item = path_item[: path_item.rfind(".")]
2284 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2285 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2286
2287 if ro_vim_item_update:
2288 update_dict = {
2289 path_vim_status + "." + k: v
2290 for k, v in ro_vim_item_update.items()
2291 if k
2292 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2293 }
2294
2295 if path_vim_status.startswith("vdur."):
2296 # for backward compatibility, add vdur.name apart from vdur.vim_name
2297 if ro_vim_item_update.get("vim_name"):
2298 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2299
2300 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2301 if ro_vim_item_update.get("vim_id"):
2302 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2303
2304 # update general status
2305 if ro_vim_item_update.get("vim_status"):
2306 update_dict[path_item + ".status"] = ro_vim_item_update[
2307 "vim_status"
2308 ]
2309
2310 if ro_vim_item_update.get("interfaces"):
2311 path_interfaces = path_item + ".interfaces"
2312
2313 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2314 if iface:
2315 update_dict.update(
2316 {
2317 path_interfaces + ".{}.".format(i) + k: v
2318 for k, v in iface.items()
2319 if k in ("vlan", "compute_node", "pci")
2320 }
2321 )
2322
2323 # put ip_address and mac_address with ip-address and mac-address
2324 if iface.get("ip_address"):
2325 update_dict[
2326 path_interfaces + ".{}.".format(i) + "ip-address"
2327 ] = iface["ip_address"]
2328
2329 if iface.get("mac_address"):
2330 update_dict[
2331 path_interfaces + ".{}.".format(i) + "mac-address"
2332 ] = iface["mac_address"]
2333
2334 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2335 update_dict["ip-address"] = iface.get("ip_address").split(
2336 ";"
2337 )[0]
2338
2339 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2340 update_dict[path_item + ".ip-address"] = iface.get(
2341 "ip_address"
2342 ).split(";")[0]
2343
2344 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2345 else:
2346 update_dict = {path_item + ".status": "DELETED"}
2347 self.db.set_one(
2348 table,
2349 q_filter={"_id": _id},
2350 update_dict=update_dict,
2351 unset={path_vim_status: None},
2352 )
2353
2354 def _process_delete_db_tasks(self):
2355 """
2356 Delete task from database because vnfrs or nsrs or both have been deleted
2357 :return: None. Uses and modify self.tasks_to_delete
2358 """
2359 while self.tasks_to_delete:
2360 task = self.tasks_to_delete[0]
2361 vnfrs_deleted = None
2362 nsr_id = task["nsr_id"]
2363
2364 if task["target_record"].startswith("vnfrs:"):
2365 # check if nsrs is present
2366 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2367 vnfrs_deleted = task["target_record"].split(":")[1]
2368
2369 try:
2370 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2371 except Exception as e:
2372 self.logger.error(
2373 "Error deleting task={}: {}".format(task["task_id"], e)
2374 )
2375 self.tasks_to_delete.pop(0)
2376
2377 @staticmethod
2378 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2379 """
2380 Static method because it is called from osm_ng_ro.ns
2381 :param db: instance of database to use
2382 :param nsr_id: affected nsrs id
2383 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2384 :return: None, exception is fails
2385 """
2386 retries = 5
2387 for retry in range(retries):
2388 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2389 now = time.time()
2390 conflict = False
2391
2392 for ro_task in ro_tasks:
2393 db_update = {}
2394 to_delete_ro_task = True
2395
2396 for index, task in enumerate(ro_task["tasks"]):
2397 if not task:
2398 pass
2399 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2400 vnfrs_deleted
2401 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2402 ):
2403 db_update["tasks.{}".format(index)] = None
2404 else:
2405 # used by other nsr, ro_task cannot be deleted
2406 to_delete_ro_task = False
2407
2408 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2409 if to_delete_ro_task:
2410 if not db.del_one(
2411 "ro_tasks",
2412 q_filter={
2413 "_id": ro_task["_id"],
2414 "modified_at": ro_task["modified_at"],
2415 },
2416 fail_on_empty=False,
2417 ):
2418 conflict = True
2419 elif db_update:
2420 db_update["modified_at"] = now
2421 if not db.set_one(
2422 "ro_tasks",
2423 q_filter={
2424 "_id": ro_task["_id"],
2425 "modified_at": ro_task["modified_at"],
2426 },
2427 update_dict=db_update,
2428 fail_on_empty=False,
2429 ):
2430 conflict = True
2431 if not conflict:
2432 return
2433 else:
2434 raise NsWorkerException("Exceeded {} retries".format(retries))
2435
2436 def run(self):
2437 # load database
2438 self.logger.info("Starting")
2439 while True:
2440 # step 1: get commands from queue
2441 try:
2442 if self.vim_targets:
2443 task = self.task_queue.get(block=False)
2444 else:
2445 if not self.idle:
2446 self.logger.debug("enters in idle state")
2447 self.idle = True
2448 task = self.task_queue.get(block=True)
2449 self.idle = False
2450
2451 if task[0] == "terminate":
2452 break
2453 elif task[0] == "load_vim":
2454 self.logger.info("order to load vim {}".format(task[1]))
2455 self._load_vim(task[1])
2456 elif task[0] == "unload_vim":
2457 self.logger.info("order to unload vim {}".format(task[1]))
2458 self._unload_vim(task[1])
2459 elif task[0] == "reload_vim":
2460 self._reload_vim(task[1])
2461 elif task[0] == "check_vim":
2462 self.logger.info("order to check vim {}".format(task[1]))
2463 self._check_vim(task[1])
2464 continue
2465 except Exception as e:
2466 if isinstance(e, queue.Empty):
2467 pass
2468 else:
2469 self.logger.critical(
2470 "Error processing task: {}".format(e), exc_info=True
2471 )
2472
2473 # step 2: process pending_tasks, delete not needed tasks
2474 try:
2475 if self.tasks_to_delete:
2476 self._process_delete_db_tasks()
2477 busy = False
2478 """
2479 # Log RO tasks only when loglevel is DEBUG
2480 if self.logger.getEffectiveLevel() == logging.DEBUG:
2481 _ = self._get_db_all_tasks()
2482 """
2483 ro_task = self._get_db_task()
2484 if ro_task:
2485 self._process_pending_tasks(ro_task)
2486 busy = True
2487 if not busy:
2488 time.sleep(5)
2489 except Exception as e:
2490 self.logger.critical(
2491 "Unexpected exception at run: " + str(e), exc_info=True
2492 )
2493
2494 self.logger.info("Finishing")