dc1760084f7218fe73af06edc1241d189de0509b
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 from unittest.mock import Mock
36
37 from importlib_metadata import entry_points
38 from osm_common.dbbase import DbException
39 from osm_ng_ro.vim_admin import LockRenew
40 from osm_ro_plugin import sdnconn, vimconn
41 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
42 from osm_ro_plugin.vim_dummy import VimDummyConnector
43 import yaml
44
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 }
215 self.logger.debug(
216 "task={} {} new-net={} created={}".format(
217 task_id, ro_task["target_id"], vim_net_id, created
218 )
219 )
220
221 return "BUILD", ro_vim_item_update
222 except (vimconn.VimConnException, NsWorkerException) as e:
223 self.logger.error(
224 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
225 )
226 ro_vim_item_update = {
227 "vim_status": "VIM_ERROR",
228 "created": created,
229 "vim_details": str(e),
230 }
231
232 return "FAILED", ro_vim_item_update
233
234 def refresh(self, ro_task):
235 """Call VIM to get network status"""
236 ro_task_id = ro_task["_id"]
237 target_vim = self.my_vims[ro_task["target_id"]]
238 vim_id = ro_task["vim_info"]["vim_id"]
239 net_to_refresh_list = [vim_id]
240
241 try:
242 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
243 vim_info = vim_dict[vim_id]
244
245 if vim_info["status"] == "ACTIVE":
246 task_status = "DONE"
247 elif vim_info["status"] == "BUILD":
248 task_status = "BUILD"
249 else:
250 task_status = "FAILED"
251 except vimconn.VimConnException as e:
252 # Mark all tasks at VIM_ERROR status
253 self.logger.error(
254 "ro_task={} vim={} get-net={}: {}".format(
255 ro_task_id, ro_task["target_id"], vim_id, e
256 )
257 )
258 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
259 task_status = "FAILED"
260
261 ro_vim_item_update = {}
262 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
263 ro_vim_item_update["vim_status"] = vim_info["status"]
264
265 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
266 ro_vim_item_update["vim_name"] = vim_info.get("name")
267
268 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
269 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
270 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
271 elif vim_info["status"] == "DELETED":
272 ro_vim_item_update["vim_id"] = None
273 ro_vim_item_update["vim_details"] = "Deleted externally"
274 else:
275 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
276 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
277
278 if ro_vim_item_update:
279 self.logger.debug(
280 "ro_task={} {} get-net={}: status={} {}".format(
281 ro_task_id,
282 ro_task["target_id"],
283 vim_id,
284 ro_vim_item_update.get("vim_status"),
285 ro_vim_item_update.get("vim_details")
286 if ro_vim_item_update.get("vim_status") != "ACTIVE"
287 else "",
288 )
289 )
290
291 return task_status, ro_vim_item_update
292
293 def delete(self, ro_task, task_index):
294 task = ro_task["tasks"][task_index]
295 task_id = task["task_id"]
296 net_vim_id = ro_task["vim_info"]["vim_id"]
297 ro_vim_item_update_ok = {
298 "vim_status": "DELETED",
299 "created": False,
300 "vim_details": "DELETED",
301 "vim_id": None,
302 }
303
304 try:
305 if net_vim_id or ro_task["vim_info"]["created_items"]:
306 target_vim = self.my_vims[ro_task["target_id"]]
307 target_vim.delete_network(
308 net_vim_id, ro_task["vim_info"]["created_items"]
309 )
310 except vimconn.VimConnNotFoundException:
311 ro_vim_item_update_ok["vim_details"] = "already deleted"
312 except vimconn.VimConnException as e:
313 self.logger.error(
314 "ro_task={} vim={} del-net={}: {}".format(
315 ro_task["_id"], ro_task["target_id"], net_vim_id, e
316 )
317 )
318 ro_vim_item_update = {
319 "vim_status": "VIM_ERROR",
320 "vim_details": "Error while deleting: {}".format(e),
321 }
322
323 return "FAILED", ro_vim_item_update
324
325 self.logger.debug(
326 "task={} {} del-net={} {}".format(
327 task_id,
328 ro_task["target_id"],
329 net_vim_id,
330 ro_vim_item_update_ok.get("vim_details", ""),
331 )
332 )
333
334 return "DONE", ro_vim_item_update_ok
335
336
337 class VimInteractionVdu(VimInteractionBase):
338 max_retries_inject_ssh_key = 20 # 20 times
339 time_retries_inject_ssh_key = 30 # wevery 30 seconds
340
341 def new(self, ro_task, task_index, task_depends):
342 task = ro_task["tasks"][task_index]
343 task_id = task["task_id"]
344 created = False
345 created_items = {}
346 target_vim = self.my_vims[ro_task["target_id"]]
347
348 try:
349 created = True
350 params = task["params"]
351 params_copy = deepcopy(params)
352 net_list = params_copy["net_list"]
353
354 for net in net_list:
355 # change task_id into network_id
356 if "net_id" in net and net["net_id"].startswith("TASK-"):
357 network_id = task_depends[net["net_id"]]
358
359 if not network_id:
360 raise NsWorkerException(
361 "Cannot create VM because depends on a network not created or found "
362 "for {}".format(net["net_id"])
363 )
364
365 net["net_id"] = network_id
366
367 if params_copy["image_id"].startswith("TASK-"):
368 params_copy["image_id"] = task_depends[params_copy["image_id"]]
369
370 if params_copy["flavor_id"].startswith("TASK-"):
371 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
372
373 affinity_group_list = params_copy["affinity_group_list"]
374 for affinity_group in affinity_group_list:
375 # change task_id into affinity_group_id
376 if "affinity_group_id" in affinity_group and affinity_group[
377 "affinity_group_id"
378 ].startswith("TASK-"):
379 affinity_group_id = task_depends[
380 affinity_group["affinity_group_id"]
381 ]
382
383 if not affinity_group_id:
384 raise NsWorkerException(
385 "found for {}".format(affinity_group["affinity_group_id"])
386 )
387
388 affinity_group["affinity_group_id"] = affinity_group_id
389
390 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
391 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
392
393 ro_vim_item_update = {
394 "vim_id": vim_vm_id,
395 "vim_status": "BUILD",
396 "created": created,
397 "created_items": created_items,
398 "vim_details": None,
399 "interfaces_vim_ids": interfaces,
400 "interfaces": [],
401 }
402 self.logger.debug(
403 "task={} {} new-vm={} created={}".format(
404 task_id, ro_task["target_id"], vim_vm_id, created
405 )
406 )
407
408 return "BUILD", ro_vim_item_update
409 except (vimconn.VimConnException, NsWorkerException) as e:
410 self.logger.error(
411 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
412 )
413 ro_vim_item_update = {
414 "vim_status": "VIM_ERROR",
415 "created": created,
416 "vim_details": str(e),
417 }
418
419 return "FAILED", ro_vim_item_update
420
421 def delete(self, ro_task, task_index):
422 task = ro_task["tasks"][task_index]
423 task_id = task["task_id"]
424 vm_vim_id = ro_task["vim_info"]["vim_id"]
425 ro_vim_item_update_ok = {
426 "vim_status": "DELETED",
427 "created": False,
428 "vim_details": "DELETED",
429 "vim_id": None,
430 }
431
432 try:
433 if vm_vim_id or ro_task["vim_info"]["created_items"]:
434 target_vim = self.my_vims[ro_task["target_id"]]
435 target_vim.delete_vminstance(
436 vm_vim_id, ro_task["vim_info"]["created_items"]
437 )
438 except vimconn.VimConnNotFoundException:
439 ro_vim_item_update_ok["vim_details"] = "already deleted"
440 except vimconn.VimConnException as e:
441 self.logger.error(
442 "ro_task={} vim={} del-vm={}: {}".format(
443 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
444 )
445 )
446 ro_vim_item_update = {
447 "vim_status": "VIM_ERROR",
448 "vim_details": "Error while deleting: {}".format(e),
449 }
450
451 return "FAILED", ro_vim_item_update
452
453 self.logger.debug(
454 "task={} {} del-vm={} {}".format(
455 task_id,
456 ro_task["target_id"],
457 vm_vim_id,
458 ro_vim_item_update_ok.get("vim_details", ""),
459 )
460 )
461
462 return "DONE", ro_vim_item_update_ok
463
464 def refresh(self, ro_task):
465 """Call VIM to get vm status"""
466 ro_task_id = ro_task["_id"]
467 target_vim = self.my_vims[ro_task["target_id"]]
468 vim_id = ro_task["vim_info"]["vim_id"]
469
470 if not vim_id:
471 return None, None
472
473 vm_to_refresh_list = [vim_id]
474 try:
475 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
476 vim_info = vim_dict[vim_id]
477
478 if vim_info["status"] == "ACTIVE":
479 task_status = "DONE"
480 elif vim_info["status"] == "BUILD":
481 task_status = "BUILD"
482 else:
483 task_status = "FAILED"
484
485 # try to load and parse vim_information
486 try:
487 vim_info_info = yaml.safe_load(vim_info["vim_info"])
488 if vim_info_info.get("name"):
489 vim_info["name"] = vim_info_info["name"]
490 except Exception:
491 pass
492 except vimconn.VimConnException as e:
493 # Mark all tasks at VIM_ERROR status
494 self.logger.error(
495 "ro_task={} vim={} get-vm={}: {}".format(
496 ro_task_id, ro_task["target_id"], vim_id, e
497 )
498 )
499 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
500 task_status = "FAILED"
501
502 ro_vim_item_update = {}
503
504 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
505 vim_interfaces = []
506 if vim_info.get("interfaces"):
507 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
508 iface = next(
509 (
510 iface
511 for iface in vim_info["interfaces"]
512 if vim_iface_id == iface["vim_interface_id"]
513 ),
514 None,
515 )
516 # if iface:
517 # iface.pop("vim_info", None)
518 vim_interfaces.append(iface)
519
520 task_create = next(
521 t
522 for t in ro_task["tasks"]
523 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
524 )
525 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
526 vim_interfaces[task_create["mgmt_vnf_interface"]][
527 "mgmt_vnf_interface"
528 ] = True
529
530 mgmt_vdu_iface = task_create.get(
531 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
532 )
533 if vim_interfaces:
534 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
535
536 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
537 ro_vim_item_update["interfaces"] = vim_interfaces
538
539 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
540 ro_vim_item_update["vim_status"] = vim_info["status"]
541
542 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
543 ro_vim_item_update["vim_name"] = vim_info.get("name")
544
545 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
546 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
547 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
548 elif vim_info["status"] == "DELETED":
549 ro_vim_item_update["vim_id"] = None
550 ro_vim_item_update["vim_details"] = "Deleted externally"
551 else:
552 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
553 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
554
555 if ro_vim_item_update:
556 self.logger.debug(
557 "ro_task={} {} get-vm={}: status={} {}".format(
558 ro_task_id,
559 ro_task["target_id"],
560 vim_id,
561 ro_vim_item_update.get("vim_status"),
562 ro_vim_item_update.get("vim_details")
563 if ro_vim_item_update.get("vim_status") != "ACTIVE"
564 else "",
565 )
566 )
567
568 return task_status, ro_vim_item_update
569
570 def exec(self, ro_task, task_index, task_depends):
571 task = ro_task["tasks"][task_index]
572 task_id = task["task_id"]
573 target_vim = self.my_vims[ro_task["target_id"]]
574 db_task_update = {"retries": 0}
575 retries = task.get("retries", 0)
576
577 try:
578 params = task["params"]
579 params_copy = deepcopy(params)
580 params_copy["ro_key"] = self.db.decrypt(
581 params_copy.pop("private_key"),
582 params_copy.pop("schema_version"),
583 params_copy.pop("salt"),
584 )
585 params_copy["ip_addr"] = params_copy.pop("ip_address")
586 target_vim.inject_user_key(**params_copy)
587 self.logger.debug(
588 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
589 )
590
591 return (
592 "DONE",
593 None,
594 db_task_update,
595 ) # params_copy["key"]
596 except (vimconn.VimConnException, NsWorkerException) as e:
597 retries += 1
598
599 if retries < self.max_retries_inject_ssh_key:
600 return (
601 "BUILD",
602 None,
603 {
604 "retries": retries,
605 "next_retry": self.time_retries_inject_ssh_key,
606 },
607 )
608
609 self.logger.error(
610 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
611 )
612 ro_vim_item_update = {"vim_details": str(e)}
613
614 return "FAILED", ro_vim_item_update, db_task_update
615
616
617 class VimInteractionImage(VimInteractionBase):
618 def new(self, ro_task, task_index, task_depends):
619 task = ro_task["tasks"][task_index]
620 task_id = task["task_id"]
621 created = False
622 created_items = {}
623 target_vim = self.my_vims[ro_task["target_id"]]
624
625 try:
626 # FIND
627 if task.get("find_params"):
628 vim_images = target_vim.get_image_list(**task["find_params"])
629
630 if not vim_images:
631 raise NsWorkerExceptionNotFound(
632 "Image not found with this criteria: '{}'".format(
633 task["find_params"]
634 )
635 )
636 elif len(vim_images) > 1:
637 raise NsWorkerException(
638 "More than one image found with this criteria: '{}'".format(
639 task["find_params"]
640 )
641 )
642 else:
643 vim_image_id = vim_images[0]["id"]
644
645 ro_vim_item_update = {
646 "vim_id": vim_image_id,
647 "vim_status": "DONE",
648 "created": created,
649 "created_items": created_items,
650 "vim_details": None,
651 }
652 self.logger.debug(
653 "task={} {} new-image={} created={}".format(
654 task_id, ro_task["target_id"], vim_image_id, created
655 )
656 )
657
658 return "DONE", ro_vim_item_update
659 except (NsWorkerException, vimconn.VimConnException) as e:
660 self.logger.error(
661 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
662 )
663 ro_vim_item_update = {
664 "vim_status": "VIM_ERROR",
665 "created": created,
666 "vim_details": str(e),
667 }
668
669 return "FAILED", ro_vim_item_update
670
671
672 class VimInteractionFlavor(VimInteractionBase):
673 def delete(self, ro_task, task_index):
674 task = ro_task["tasks"][task_index]
675 task_id = task["task_id"]
676 flavor_vim_id = ro_task["vim_info"]["vim_id"]
677 ro_vim_item_update_ok = {
678 "vim_status": "DELETED",
679 "created": False,
680 "vim_details": "DELETED",
681 "vim_id": None,
682 }
683
684 try:
685 if flavor_vim_id:
686 target_vim = self.my_vims[ro_task["target_id"]]
687 target_vim.delete_flavor(flavor_vim_id)
688 except vimconn.VimConnNotFoundException:
689 ro_vim_item_update_ok["vim_details"] = "already deleted"
690 except vimconn.VimConnException as e:
691 self.logger.error(
692 "ro_task={} vim={} del-flavor={}: {}".format(
693 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
694 )
695 )
696 ro_vim_item_update = {
697 "vim_status": "VIM_ERROR",
698 "vim_details": "Error while deleting: {}".format(e),
699 }
700
701 return "FAILED", ro_vim_item_update
702
703 self.logger.debug(
704 "task={} {} del-flavor={} {}".format(
705 task_id,
706 ro_task["target_id"],
707 flavor_vim_id,
708 ro_vim_item_update_ok.get("vim_details", ""),
709 )
710 )
711
712 return "DONE", ro_vim_item_update_ok
713
714 def new(self, ro_task, task_index, task_depends):
715 task = ro_task["tasks"][task_index]
716 task_id = task["task_id"]
717 created = False
718 created_items = {}
719 target_vim = self.my_vims[ro_task["target_id"]]
720
721 try:
722 # FIND
723 vim_flavor_id = None
724
725 if task.get("find_params"):
726 try:
727 flavor_data = task["find_params"]["flavor_data"]
728 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
729 except vimconn.VimConnNotFoundException:
730 pass
731
732 if not vim_flavor_id and task.get("params"):
733 # CREATE
734 flavor_data = task["params"]["flavor_data"]
735 vim_flavor_id = target_vim.new_flavor(flavor_data)
736 created = True
737
738 ro_vim_item_update = {
739 "vim_id": vim_flavor_id,
740 "vim_status": "DONE",
741 "created": created,
742 "created_items": created_items,
743 "vim_details": None,
744 }
745 self.logger.debug(
746 "task={} {} new-flavor={} created={}".format(
747 task_id, ro_task["target_id"], vim_flavor_id, created
748 )
749 )
750
751 return "DONE", ro_vim_item_update
752 except (vimconn.VimConnException, NsWorkerException) as e:
753 self.logger.error(
754 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
755 )
756 ro_vim_item_update = {
757 "vim_status": "VIM_ERROR",
758 "created": created,
759 "vim_details": str(e),
760 }
761
762 return "FAILED", ro_vim_item_update
763
764
765 class VimInteractionAffinityGroup(VimInteractionBase):
766 def delete(self, ro_task, task_index):
767 task = ro_task["tasks"][task_index]
768 task_id = task["task_id"]
769 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
770 ro_vim_item_update_ok = {
771 "vim_status": "DELETED",
772 "created": False,
773 "vim_details": "DELETED",
774 "vim_id": None,
775 }
776
777 try:
778 if affinity_group_vim_id:
779 target_vim = self.my_vims[ro_task["target_id"]]
780 target_vim.delete_affinity_group(affinity_group_vim_id)
781 except vimconn.VimConnNotFoundException:
782 ro_vim_item_update_ok["vim_details"] = "already deleted"
783 except vimconn.VimConnException as e:
784 self.logger.error(
785 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
786 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
787 )
788 )
789 ro_vim_item_update = {
790 "vim_status": "VIM_ERROR",
791 "vim_details": "Error while deleting: {}".format(e),
792 }
793
794 return "FAILED", ro_vim_item_update
795
796 self.logger.debug(
797 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
798 task_id,
799 ro_task["target_id"],
800 affinity_group_vim_id,
801 ro_vim_item_update_ok.get("vim_details", ""),
802 )
803 )
804
805 return "DONE", ro_vim_item_update_ok
806
807 def new(self, ro_task, task_index, task_depends):
808 task = ro_task["tasks"][task_index]
809 task_id = task["task_id"]
810 created = False
811 created_items = {}
812 target_vim = self.my_vims[ro_task["target_id"]]
813
814 try:
815 affinity_group_vim_id = None
816 affinity_group_data = None
817
818 if task.get("params"):
819 affinity_group_data = task["params"].get("affinity_group_data")
820
821 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
822 try:
823 param_affinity_group_id = task["params"]["affinity_group_data"].get(
824 "vim-affinity-group-id"
825 )
826 affinity_group_vim_id = target_vim.get_affinity_group(
827 param_affinity_group_id
828 ).get("id")
829 except vimconn.VimConnNotFoundException:
830 self.logger.error(
831 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
832 "could not be found at VIM. Creating a new one.".format(
833 task_id, ro_task["target_id"], param_affinity_group_id
834 )
835 )
836
837 if not affinity_group_vim_id and affinity_group_data:
838 affinity_group_vim_id = target_vim.new_affinity_group(
839 affinity_group_data
840 )
841 created = True
842
843 ro_vim_item_update = {
844 "vim_id": affinity_group_vim_id,
845 "vim_status": "DONE",
846 "created": created,
847 "created_items": created_items,
848 "vim_details": None,
849 }
850 self.logger.debug(
851 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
852 task_id, ro_task["target_id"], affinity_group_vim_id, created
853 )
854 )
855
856 return "DONE", ro_vim_item_update
857 except (vimconn.VimConnException, NsWorkerException) as e:
858 self.logger.error(
859 "task={} vim={} new-affinity-or-anti-affinity-group:"
860 " {}".format(task_id, ro_task["target_id"], e)
861 )
862 ro_vim_item_update = {
863 "vim_status": "VIM_ERROR",
864 "created": created,
865 "vim_details": str(e),
866 }
867
868 return "FAILED", ro_vim_item_update
869
870
871 class VimInteractionSdnNet(VimInteractionBase):
872 @staticmethod
873 def _match_pci(port_pci, mapping):
874 """
875 Check if port_pci matches with mapping
876 mapping can have brackets to indicate that several chars are accepted. e.g
877 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
878 :param port_pci: text
879 :param mapping: text, can contain brackets to indicate several chars are available
880 :return: True if matches, False otherwise
881 """
882 if not port_pci or not mapping:
883 return False
884 if port_pci == mapping:
885 return True
886
887 mapping_index = 0
888 pci_index = 0
889 while True:
890 bracket_start = mapping.find("[", mapping_index)
891
892 if bracket_start == -1:
893 break
894
895 bracket_end = mapping.find("]", bracket_start)
896 if bracket_end == -1:
897 break
898
899 length = bracket_start - mapping_index
900 if (
901 length
902 and port_pci[pci_index : pci_index + length]
903 != mapping[mapping_index:bracket_start]
904 ):
905 return False
906
907 if (
908 port_pci[pci_index + length]
909 not in mapping[bracket_start + 1 : bracket_end]
910 ):
911 return False
912
913 pci_index += length + 1
914 mapping_index = bracket_end + 1
915
916 if port_pci[pci_index:] != mapping[mapping_index:]:
917 return False
918
919 return True
920
921 def _get_interfaces(self, vlds_to_connect, vim_account_id):
922 """
923 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
924 :param vim_account_id:
925 :return:
926 """
927 interfaces = []
928
929 for vld in vlds_to_connect:
930 table, _, db_id = vld.partition(":")
931 db_id, _, vld = db_id.partition(":")
932 _, _, vld_id = vld.partition(".")
933
934 if table == "vnfrs":
935 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
936 iface_key = "vnf-vld-id"
937 else: # table == "nsrs"
938 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
939 iface_key = "ns-vld-id"
940
941 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
942
943 for db_vnfr in db_vnfrs:
944 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
945 for iface_index, interface in enumerate(vdur["interfaces"]):
946 if interface.get(iface_key) == vld_id and interface.get(
947 "type"
948 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
949 # only SR-IOV o PT
950 interface_ = interface.copy()
951 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
952 db_vnfr["_id"], vdu_index, iface_index
953 )
954
955 if vdur.get("status") == "ERROR":
956 interface_["status"] = "ERROR"
957
958 interfaces.append(interface_)
959
960 return interfaces
961
962 def refresh(self, ro_task):
963 # look for task create
964 task_create_index, _ = next(
965 i_t
966 for i_t in enumerate(ro_task["tasks"])
967 if i_t[1]
968 and i_t[1]["action"] == "CREATE"
969 and i_t[1]["status"] != "FINISHED"
970 )
971
972 return self.new(ro_task, task_create_index, None)
973
974 def new(self, ro_task, task_index, task_depends):
975
976 task = ro_task["tasks"][task_index]
977 task_id = task["task_id"]
978 target_vim = self.my_vims[ro_task["target_id"]]
979
980 sdn_net_id = ro_task["vim_info"]["vim_id"]
981
982 created_items = ro_task["vim_info"].get("created_items")
983 connected_ports = ro_task["vim_info"].get("connected_ports", [])
984 new_connected_ports = []
985 last_update = ro_task["vim_info"].get("last_update", 0)
986 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
987 error_list = []
988 created = ro_task["vim_info"].get("created", False)
989
990 try:
991 # CREATE
992 params = task["params"]
993 vlds_to_connect = params["vlds"]
994 associated_vim = params["target_vim"]
995 # external additional ports
996 additional_ports = params.get("sdn-ports") or ()
997 _, _, vim_account_id = associated_vim.partition(":")
998
999 if associated_vim:
1000 # get associated VIM
1001 if associated_vim not in self.db_vims:
1002 self.db_vims[associated_vim] = self.db.get_one(
1003 "vim_accounts", {"_id": vim_account_id}
1004 )
1005
1006 db_vim = self.db_vims[associated_vim]
1007
1008 # look for ports to connect
1009 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1010 # print(ports)
1011
1012 sdn_ports = []
1013 pending_ports = error_ports = 0
1014 vlan_used = None
1015 sdn_need_update = False
1016
1017 for port in ports:
1018 vlan_used = port.get("vlan") or vlan_used
1019
1020 # TODO. Do not connect if already done
1021 if not port.get("compute_node") or not port.get("pci"):
1022 if port.get("status") == "ERROR":
1023 error_ports += 1
1024 else:
1025 pending_ports += 1
1026 continue
1027
1028 pmap = None
1029 compute_node_mappings = next(
1030 (
1031 c
1032 for c in db_vim["config"].get("sdn-port-mapping", ())
1033 if c and c["compute_node"] == port["compute_node"]
1034 ),
1035 None,
1036 )
1037
1038 if compute_node_mappings:
1039 # process port_mapping pci of type 0000:af:1[01].[1357]
1040 pmap = next(
1041 (
1042 p
1043 for p in compute_node_mappings["ports"]
1044 if self._match_pci(port["pci"], p.get("pci"))
1045 ),
1046 None,
1047 )
1048
1049 if not pmap:
1050 if not db_vim["config"].get("mapping_not_needed"):
1051 error_list.append(
1052 "Port mapping not found for compute_node={} pci={}".format(
1053 port["compute_node"], port["pci"]
1054 )
1055 )
1056 continue
1057
1058 pmap = {}
1059
1060 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1061 new_port = {
1062 "service_endpoint_id": pmap.get("service_endpoint_id")
1063 or service_endpoint_id,
1064 "service_endpoint_encapsulation_type": "dot1q"
1065 if port["type"] == "SR-IOV"
1066 else None,
1067 "service_endpoint_encapsulation_info": {
1068 "vlan": port.get("vlan"),
1069 "mac": port.get("mac-address"),
1070 "device_id": pmap.get("device_id") or port["compute_node"],
1071 "device_interface_id": pmap.get("device_interface_id")
1072 or port["pci"],
1073 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1074 "switch_port": pmap.get("switch_port"),
1075 "service_mapping_info": pmap.get("service_mapping_info"),
1076 },
1077 }
1078
1079 # TODO
1080 # if port["modified_at"] > last_update:
1081 # sdn_need_update = True
1082 new_connected_ports.append(port["id"]) # TODO
1083 sdn_ports.append(new_port)
1084
1085 if error_ports:
1086 error_list.append(
1087 "{} interfaces have not been created as VDU is on ERROR status".format(
1088 error_ports
1089 )
1090 )
1091
1092 # connect external ports
1093 for index, additional_port in enumerate(additional_ports):
1094 additional_port_id = additional_port.get(
1095 "service_endpoint_id"
1096 ) or "external-{}".format(index)
1097 sdn_ports.append(
1098 {
1099 "service_endpoint_id": additional_port_id,
1100 "service_endpoint_encapsulation_type": additional_port.get(
1101 "service_endpoint_encapsulation_type", "dot1q"
1102 ),
1103 "service_endpoint_encapsulation_info": {
1104 "vlan": additional_port.get("vlan") or vlan_used,
1105 "mac": additional_port.get("mac_address"),
1106 "device_id": additional_port.get("device_id"),
1107 "device_interface_id": additional_port.get(
1108 "device_interface_id"
1109 ),
1110 "switch_dpid": additional_port.get("switch_dpid")
1111 or additional_port.get("switch_id"),
1112 "switch_port": additional_port.get("switch_port"),
1113 "service_mapping_info": additional_port.get(
1114 "service_mapping_info"
1115 ),
1116 },
1117 }
1118 )
1119 new_connected_ports.append(additional_port_id)
1120 sdn_info = ""
1121
1122 # if there are more ports to connect or they have been modified, call create/update
1123 if error_list:
1124 sdn_status = "ERROR"
1125 sdn_info = "; ".join(error_list)
1126 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1127 last_update = time.time()
1128
1129 if not sdn_net_id:
1130 if len(sdn_ports) < 2:
1131 sdn_status = "ACTIVE"
1132
1133 if not pending_ports:
1134 self.logger.debug(
1135 "task={} {} new-sdn-net done, less than 2 ports".format(
1136 task_id, ro_task["target_id"]
1137 )
1138 )
1139 else:
1140 net_type = params.get("type") or "ELAN"
1141 (
1142 sdn_net_id,
1143 created_items,
1144 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1145 created = True
1146 self.logger.debug(
1147 "task={} {} new-sdn-net={} created={}".format(
1148 task_id, ro_task["target_id"], sdn_net_id, created
1149 )
1150 )
1151 else:
1152 created_items = target_vim.edit_connectivity_service(
1153 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1154 )
1155 created = True
1156 self.logger.debug(
1157 "task={} {} update-sdn-net={} created={}".format(
1158 task_id, ro_task["target_id"], sdn_net_id, created
1159 )
1160 )
1161
1162 connected_ports = new_connected_ports
1163 elif sdn_net_id:
1164 wim_status_dict = target_vim.get_connectivity_service_status(
1165 sdn_net_id, conn_info=created_items
1166 )
1167 sdn_status = wim_status_dict["sdn_status"]
1168
1169 if wim_status_dict.get("sdn_info"):
1170 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1171
1172 if wim_status_dict.get("error_msg"):
1173 sdn_info = wim_status_dict.get("error_msg") or ""
1174
1175 if pending_ports:
1176 if sdn_status != "ERROR":
1177 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1178 len(ports) - pending_ports, len(ports)
1179 )
1180
1181 if sdn_status == "ACTIVE":
1182 sdn_status = "BUILD"
1183
1184 ro_vim_item_update = {
1185 "vim_id": sdn_net_id,
1186 "vim_status": sdn_status,
1187 "created": created,
1188 "created_items": created_items,
1189 "connected_ports": connected_ports,
1190 "vim_details": sdn_info,
1191 "last_update": last_update,
1192 }
1193
1194 return sdn_status, ro_vim_item_update
1195 except Exception as e:
1196 self.logger.error(
1197 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1198 exc_info=not isinstance(
1199 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1200 ),
1201 )
1202 ro_vim_item_update = {
1203 "vim_status": "VIM_ERROR",
1204 "created": created,
1205 "vim_details": str(e),
1206 }
1207
1208 return "FAILED", ro_vim_item_update
1209
1210 def delete(self, ro_task, task_index):
1211 task = ro_task["tasks"][task_index]
1212 task_id = task["task_id"]
1213 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1214 ro_vim_item_update_ok = {
1215 "vim_status": "DELETED",
1216 "created": False,
1217 "vim_details": "DELETED",
1218 "vim_id": None,
1219 }
1220
1221 try:
1222 if sdn_vim_id:
1223 target_vim = self.my_vims[ro_task["target_id"]]
1224 target_vim.delete_connectivity_service(
1225 sdn_vim_id, ro_task["vim_info"].get("created_items")
1226 )
1227
1228 except Exception as e:
1229 if (
1230 isinstance(e, sdnconn.SdnConnectorError)
1231 and e.http_code == HTTPStatus.NOT_FOUND.value
1232 ):
1233 ro_vim_item_update_ok["vim_details"] = "already deleted"
1234 else:
1235 self.logger.error(
1236 "ro_task={} vim={} del-sdn-net={}: {}".format(
1237 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1238 ),
1239 exc_info=not isinstance(
1240 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1241 ),
1242 )
1243 ro_vim_item_update = {
1244 "vim_status": "VIM_ERROR",
1245 "vim_details": "Error while deleting: {}".format(e),
1246 }
1247
1248 return "FAILED", ro_vim_item_update
1249
1250 self.logger.debug(
1251 "task={} {} del-sdn-net={} {}".format(
1252 task_id,
1253 ro_task["target_id"],
1254 sdn_vim_id,
1255 ro_vim_item_update_ok.get("vim_details", ""),
1256 )
1257 )
1258
1259 return "DONE", ro_vim_item_update_ok
1260
1261
1262 class NsWorker(threading.Thread):
1263 REFRESH_BUILD = 5 # 5 seconds
1264 REFRESH_ACTIVE = 60 # 1 minute
1265 REFRESH_ERROR = 600
1266 REFRESH_IMAGE = 3600 * 10
1267 REFRESH_DELETE = 3600 * 10
1268 QUEUE_SIZE = 100
1269 terminate = False
1270
1271 def __init__(self, worker_index, config, plugins, db):
1272 """
1273
1274 :param worker_index: thread index
1275 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1276 :param plugins: global shared dict with the loaded plugins
1277 :param db: database class instance to use
1278 """
1279 threading.Thread.__init__(self)
1280 self.config = config
1281 self.plugins = plugins
1282 self.plugin_name = "unknown"
1283 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1284 self.worker_index = worker_index
1285 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1286 # targetvim: vimplugin class
1287 self.my_vims = {}
1288 # targetvim: vim information from database
1289 self.db_vims = {}
1290 # targetvim list
1291 self.vim_targets = []
1292 self.my_id = config["process_id"] + ":" + str(worker_index)
1293 self.db = db
1294 self.item2class = {
1295 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1296 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1297 "image": VimInteractionImage(
1298 self.db, self.my_vims, self.db_vims, self.logger
1299 ),
1300 "flavor": VimInteractionFlavor(
1301 self.db, self.my_vims, self.db_vims, self.logger
1302 ),
1303 "sdn_net": VimInteractionSdnNet(
1304 self.db, self.my_vims, self.db_vims, self.logger
1305 ),
1306 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1307 self.db, self.my_vims, self.db_vims, self.logger
1308 ),
1309 }
1310 self.time_last_task_processed = None
1311 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1312 self.tasks_to_delete = []
1313 # it is idle when there are not vim_targets associated
1314 self.idle = True
1315 self.task_locked_time = config["global"]["task_locked_time"]
1316
1317 def insert_task(self, task):
1318 try:
1319 self.task_queue.put(task, False)
1320 return None
1321 except queue.Full:
1322 raise NsWorkerException("timeout inserting a task")
1323
1324 def terminate(self):
1325 self.insert_task("exit")
1326
1327 def del_task(self, task):
1328 with self.task_lock:
1329 if task["status"] == "SCHEDULED":
1330 task["status"] = "SUPERSEDED"
1331 return True
1332 else: # task["status"] == "processing"
1333 self.task_lock.release()
1334 return False
1335
1336 def _process_vim_config(self, target_id, db_vim):
1337 """
1338 Process vim config, creating vim configuration files as ca_cert
1339 :param target_id: vim/sdn/wim + id
1340 :param db_vim: Vim dictionary obtained from database
1341 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1342 """
1343 if not db_vim.get("config"):
1344 return
1345
1346 file_name = ""
1347
1348 try:
1349 if db_vim["config"].get("ca_cert_content"):
1350 file_name = "{}:{}".format(target_id, self.worker_index)
1351
1352 try:
1353 mkdir(file_name)
1354 except FileExistsError:
1355 pass
1356
1357 file_name = file_name + "/ca_cert"
1358
1359 with open(file_name, "w") as f:
1360 f.write(db_vim["config"]["ca_cert_content"])
1361 del db_vim["config"]["ca_cert_content"]
1362 db_vim["config"]["ca_cert"] = file_name
1363 except Exception as e:
1364 raise NsWorkerException(
1365 "Error writing to file '{}': {}".format(file_name, e)
1366 )
1367
1368 def _load_plugin(self, name, type="vim"):
1369 # type can be vim or sdn
1370 if "rovim_dummy" not in self.plugins:
1371 self.plugins["rovim_dummy"] = VimDummyConnector
1372
1373 if "rosdn_dummy" not in self.plugins:
1374 self.plugins["rosdn_dummy"] = SdnDummyConnector
1375
1376 if name in self.plugins:
1377 return self.plugins[name]
1378
1379 try:
1380 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1381 self.plugins[name] = ep.load()
1382 except Exception as e:
1383 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1384
1385 if name and name not in self.plugins:
1386 raise NsWorkerException(
1387 "Plugin 'osm_{n}' has not been installed".format(n=name)
1388 )
1389
1390 return self.plugins[name]
1391
1392 def _unload_vim(self, target_id):
1393 """
1394 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1395 :param target_id: Contains type:_id; where type can be 'vim', ...
1396 :return: None.
1397 """
1398 try:
1399 self.db_vims.pop(target_id, None)
1400 self.my_vims.pop(target_id, None)
1401
1402 if target_id in self.vim_targets:
1403 self.vim_targets.remove(target_id)
1404
1405 self.logger.info("Unloaded {}".format(target_id))
1406 rmtree("{}:{}".format(target_id, self.worker_index))
1407 except FileNotFoundError:
1408 pass # this is raised by rmtree if folder does not exist
1409 except Exception as e:
1410 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1411
1412 def _check_vim(self, target_id):
1413 """
1414 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1415 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1416 :return: None.
1417 """
1418 target, _, _id = target_id.partition(":")
1419 now = time.time()
1420 update_dict = {}
1421 unset_dict = {}
1422 op_text = ""
1423 step = ""
1424 loaded = target_id in self.vim_targets
1425 target_database = (
1426 "vim_accounts"
1427 if target == "vim"
1428 else "wim_accounts"
1429 if target == "wim"
1430 else "sdns"
1431 )
1432
1433 try:
1434 step = "Getting {} from db".format(target_id)
1435 db_vim = self.db.get_one(target_database, {"_id": _id})
1436
1437 for op_index, operation in enumerate(
1438 db_vim["_admin"].get("operations", ())
1439 ):
1440 if operation["operationState"] != "PROCESSING":
1441 continue
1442
1443 locked_at = operation.get("locked_at")
1444
1445 if locked_at is not None and locked_at >= now - self.task_locked_time:
1446 # some other thread is doing this operation
1447 return
1448
1449 # lock
1450 op_text = "_admin.operations.{}.".format(op_index)
1451
1452 if not self.db.set_one(
1453 target_database,
1454 q_filter={
1455 "_id": _id,
1456 op_text + "operationState": "PROCESSING",
1457 op_text + "locked_at": locked_at,
1458 },
1459 update_dict={
1460 op_text + "locked_at": now,
1461 "admin.current_operation": op_index,
1462 },
1463 fail_on_empty=False,
1464 ):
1465 return
1466
1467 unset_dict[op_text + "locked_at"] = None
1468 unset_dict["current_operation"] = None
1469 step = "Loading " + target_id
1470 error_text = self._load_vim(target_id)
1471
1472 if not error_text:
1473 step = "Checking connectivity"
1474
1475 if target == "vim":
1476 self.my_vims[target_id].check_vim_connectivity()
1477 else:
1478 self.my_vims[target_id].check_credentials()
1479
1480 update_dict["_admin.operationalState"] = "ENABLED"
1481 update_dict["_admin.detailed-status"] = ""
1482 unset_dict[op_text + "detailed-status"] = None
1483 update_dict[op_text + "operationState"] = "COMPLETED"
1484
1485 return
1486
1487 except Exception as e:
1488 error_text = "{}: {}".format(step, e)
1489 self.logger.error("{} for {}: {}".format(step, target_id, e))
1490
1491 finally:
1492 if update_dict or unset_dict:
1493 if error_text:
1494 update_dict[op_text + "operationState"] = "FAILED"
1495 update_dict[op_text + "detailed-status"] = error_text
1496 unset_dict.pop(op_text + "detailed-status", None)
1497 update_dict["_admin.operationalState"] = "ERROR"
1498 update_dict["_admin.detailed-status"] = error_text
1499
1500 if op_text:
1501 update_dict[op_text + "statusEnteredTime"] = now
1502
1503 self.db.set_one(
1504 target_database,
1505 q_filter={"_id": _id},
1506 update_dict=update_dict,
1507 unset=unset_dict,
1508 fail_on_empty=False,
1509 )
1510
1511 if not loaded:
1512 self._unload_vim(target_id)
1513
1514 def _reload_vim(self, target_id):
1515 if target_id in self.vim_targets:
1516 self._load_vim(target_id)
1517 else:
1518 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1519 # just remove it to force load again next time it is needed
1520 self.db_vims.pop(target_id, None)
1521
1522 def _load_vim(self, target_id):
1523 """
1524 Load or reload a vim_account, sdn_controller or wim_account.
1525 Read content from database, load the plugin if not loaded.
1526 In case of error loading the plugin, it load a failing VIM_connector
1527 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1528 :param target_id: Contains type:_id; where type can be 'vim', ...
1529 :return: None if ok, descriptive text if error
1530 """
1531 target, _, _id = target_id.partition(":")
1532 target_database = (
1533 "vim_accounts"
1534 if target == "vim"
1535 else "wim_accounts"
1536 if target == "wim"
1537 else "sdns"
1538 )
1539 plugin_name = ""
1540 vim = None
1541
1542 try:
1543 step = "Getting {}={} from db".format(target, _id)
1544 # TODO process for wim, sdnc, ...
1545 vim = self.db.get_one(target_database, {"_id": _id})
1546
1547 # if deep_get(vim, "config", "sdn-controller"):
1548 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1549 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1550
1551 step = "Decrypting password"
1552 schema_version = vim.get("schema_version")
1553 self.db.encrypt_decrypt_fields(
1554 vim,
1555 "decrypt",
1556 fields=("password", "secret"),
1557 schema_version=schema_version,
1558 salt=_id,
1559 )
1560 self._process_vim_config(target_id, vim)
1561
1562 if target == "vim":
1563 plugin_name = "rovim_" + vim["vim_type"]
1564 step = "Loading plugin '{}'".format(plugin_name)
1565 vim_module_conn = self._load_plugin(plugin_name)
1566 step = "Loading {}'".format(target_id)
1567 self.my_vims[target_id] = vim_module_conn(
1568 uuid=vim["_id"],
1569 name=vim["name"],
1570 tenant_id=vim.get("vim_tenant_id"),
1571 tenant_name=vim.get("vim_tenant_name"),
1572 url=vim["vim_url"],
1573 url_admin=None,
1574 user=vim["vim_user"],
1575 passwd=vim["vim_password"],
1576 config=vim.get("config") or {},
1577 persistent_info={},
1578 )
1579 else: # sdn
1580 plugin_name = "rosdn_" + vim["type"]
1581 step = "Loading plugin '{}'".format(plugin_name)
1582 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1583 step = "Loading {}'".format(target_id)
1584 wim = deepcopy(vim)
1585 wim_config = wim.pop("config", {}) or {}
1586 wim["uuid"] = wim["_id"]
1587 wim["wim_url"] = wim["url"]
1588
1589 if wim.get("dpid"):
1590 wim_config["dpid"] = wim.pop("dpid")
1591
1592 if wim.get("switch_id"):
1593 wim_config["switch_id"] = wim.pop("switch_id")
1594
1595 # wim, wim_account, config
1596 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1597 self.db_vims[target_id] = vim
1598 self.error_status = None
1599
1600 self.logger.info(
1601 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1602 )
1603 except Exception as e:
1604 self.logger.error(
1605 "Cannot load {} plugin={}: {} {}".format(
1606 target_id, plugin_name, step, e
1607 )
1608 )
1609
1610 self.db_vims[target_id] = vim or {}
1611 self.db_vims[target_id] = FailingConnector(str(e))
1612 error_status = "{} Error: {}".format(step, e)
1613
1614 return error_status
1615 finally:
1616 if target_id not in self.vim_targets:
1617 self.vim_targets.append(target_id)
1618
1619 def _get_db_task(self):
1620 """
1621 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1622 :return: None
1623 """
1624 now = time.time()
1625
1626 if not self.time_last_task_processed:
1627 self.time_last_task_processed = now
1628
1629 try:
1630 while True:
1631 """
1632 # Log RO tasks only when loglevel is DEBUG
1633 if self.logger.getEffectiveLevel() == logging.DEBUG:
1634 self._log_ro_task(
1635 None,
1636 None,
1637 None,
1638 "TASK_WF",
1639 "task_locked_time="
1640 + str(self.task_locked_time)
1641 + " "
1642 + "time_last_task_processed="
1643 + str(self.time_last_task_processed)
1644 + " "
1645 + "now="
1646 + str(now),
1647 )
1648 """
1649 locked = self.db.set_one(
1650 "ro_tasks",
1651 q_filter={
1652 "target_id": self.vim_targets,
1653 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1654 "locked_at.lt": now - self.task_locked_time,
1655 "to_check_at.lt": self.time_last_task_processed,
1656 },
1657 update_dict={"locked_by": self.my_id, "locked_at": now},
1658 fail_on_empty=False,
1659 )
1660
1661 if locked:
1662 # read and return
1663 ro_task = self.db.get_one(
1664 "ro_tasks",
1665 q_filter={
1666 "target_id": self.vim_targets,
1667 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1668 "locked_at": now,
1669 },
1670 )
1671 return ro_task
1672
1673 if self.time_last_task_processed == now:
1674 self.time_last_task_processed = None
1675 return None
1676 else:
1677 self.time_last_task_processed = now
1678 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1679
1680 except DbException as e:
1681 self.logger.error("Database exception at _get_db_task: {}".format(e))
1682 except Exception as e:
1683 self.logger.critical(
1684 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1685 )
1686
1687 return None
1688
1689 def _get_db_all_tasks(self):
1690 """
1691 Read all content of table ro_tasks to log it
1692 :return: None
1693 """
1694 try:
1695 # Checking the content of the BD:
1696
1697 # read and return
1698 ro_task = self.db.get_list("ro_tasks")
1699 for rt in ro_task:
1700 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1701 return ro_task
1702
1703 except DbException as e:
1704 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1705 except Exception as e:
1706 self.logger.critical(
1707 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1708 )
1709
1710 return None
1711
1712 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1713 """
1714 Generate a log with the following format:
1715
1716 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1717 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1718 task_array_index;task_id;task_action;task_item;task_args
1719
1720 Example:
1721
1722 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1723 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1724 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1725 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1726 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1727 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1728 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1729 """
1730 try:
1731 line = []
1732 i = 0
1733 if ro_task is not None and isinstance(ro_task, dict):
1734 for t in ro_task["tasks"]:
1735 line.clear()
1736 line.append(mark)
1737 line.append(event)
1738 line.append(ro_task.get("_id", ""))
1739 line.append(str(ro_task.get("locked_at", "")))
1740 line.append(str(ro_task.get("modified_at", "")))
1741 line.append(str(ro_task.get("created_at", "")))
1742 line.append(str(ro_task.get("to_check_at", "")))
1743 line.append(str(ro_task.get("locked_by", "")))
1744 line.append(str(ro_task.get("target_id", "")))
1745 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1746 line.append(str(ro_task.get("vim_info", "")))
1747 line.append(str(ro_task.get("tasks", "")))
1748 if isinstance(t, dict):
1749 line.append(str(t.get("status", "")))
1750 line.append(str(t.get("action_id", "")))
1751 line.append(str(i))
1752 line.append(str(t.get("task_id", "")))
1753 line.append(str(t.get("action", "")))
1754 line.append(str(t.get("item", "")))
1755 line.append(str(t.get("find_params", "")))
1756 line.append(str(t.get("params", "")))
1757 else:
1758 line.extend([""] * 2)
1759 line.append(str(i))
1760 line.extend([""] * 5)
1761
1762 i += 1
1763 self.logger.debug(";".join(line))
1764 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1765 i = 0
1766 while True:
1767 st = "tasks.{}.status".format(i)
1768 if st not in db_ro_task_update:
1769 break
1770 line.clear()
1771 line.append(mark)
1772 line.append(event)
1773 line.append(db_ro_task_update.get("_id", ""))
1774 line.append(str(db_ro_task_update.get("locked_at", "")))
1775 line.append(str(db_ro_task_update.get("modified_at", "")))
1776 line.append("")
1777 line.append(str(db_ro_task_update.get("to_check_at", "")))
1778 line.append(str(db_ro_task_update.get("locked_by", "")))
1779 line.append("")
1780 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1781 line.append("")
1782 line.append(str(db_ro_task_update.get("vim_info", "")))
1783 line.append(str(str(db_ro_task_update).count(".status")))
1784 line.append(db_ro_task_update.get(st, ""))
1785 line.append("")
1786 line.append(str(i))
1787 line.extend([""] * 3)
1788 i += 1
1789 self.logger.debug(";".join(line))
1790
1791 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1792 line.clear()
1793 line.append(mark)
1794 line.append(event)
1795 line.append(db_ro_task_delete.get("_id", ""))
1796 line.append("")
1797 line.append(db_ro_task_delete.get("modified_at", ""))
1798 line.extend([""] * 13)
1799 self.logger.debug(";".join(line))
1800
1801 else:
1802 line.clear()
1803 line.append(mark)
1804 line.append(event)
1805 line.extend([""] * 16)
1806 self.logger.debug(";".join(line))
1807
1808 except Exception as e:
1809 self.logger.error("Error logging ro_task: {}".format(e))
1810
1811 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1812 """
1813 Determine if this task need to be done or superseded
1814 :return: None
1815 """
1816 my_task = ro_task["tasks"][task_index]
1817 task_id = my_task["task_id"]
1818 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1819 "created_items", False
1820 )
1821
1822 if my_task["status"] == "FAILED":
1823 return None, None # TODO need to be retry??
1824
1825 try:
1826 for index, task in enumerate(ro_task["tasks"]):
1827 if index == task_index or not task:
1828 continue # own task
1829
1830 if (
1831 my_task["target_record"] == task["target_record"]
1832 and task["action"] == "CREATE"
1833 ):
1834 # set to finished
1835 db_update["tasks.{}.status".format(index)] = task[
1836 "status"
1837 ] = "FINISHED"
1838 elif task["action"] == "CREATE" and task["status"] not in (
1839 "FINISHED",
1840 "SUPERSEDED",
1841 ):
1842 needed_delete = False
1843
1844 if needed_delete:
1845 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1846 else:
1847 return "SUPERSEDED", None
1848 except Exception as e:
1849 if not isinstance(e, NsWorkerException):
1850 self.logger.critical(
1851 "Unexpected exception at _delete_task task={}: {}".format(
1852 task_id, e
1853 ),
1854 exc_info=True,
1855 )
1856
1857 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1858
1859 def _create_task(self, ro_task, task_index, task_depends, db_update):
1860 """
1861 Determine if this task need to create something at VIM
1862 :return: None
1863 """
1864 my_task = ro_task["tasks"][task_index]
1865 task_id = my_task["task_id"]
1866 task_status = None
1867
1868 if my_task["status"] == "FAILED":
1869 return None, None # TODO need to be retry??
1870 elif my_task["status"] == "SCHEDULED":
1871 # check if already created by another task
1872 for index, task in enumerate(ro_task["tasks"]):
1873 if index == task_index or not task:
1874 continue # own task
1875
1876 if task["action"] == "CREATE" and task["status"] not in (
1877 "SCHEDULED",
1878 "FINISHED",
1879 "SUPERSEDED",
1880 ):
1881 return task["status"], "COPY_VIM_INFO"
1882
1883 try:
1884 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1885 ro_task, task_index, task_depends
1886 )
1887 # TODO update other CREATE tasks
1888 except Exception as e:
1889 if not isinstance(e, NsWorkerException):
1890 self.logger.error(
1891 "Error executing task={}: {}".format(task_id, e), exc_info=True
1892 )
1893
1894 task_status = "FAILED"
1895 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1896 # TODO update ro_vim_item_update
1897
1898 return task_status, ro_vim_item_update
1899 else:
1900 return None, None
1901
1902 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1903 """
1904 Look for dependency task
1905 :param task_id: Can be one of
1906 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1907 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1908 3. task.task_id: "<action_id>:number"
1909 :param ro_task:
1910 :param target_id:
1911 :return: database ro_task plus index of task
1912 """
1913 if (
1914 task_id.startswith("vim:")
1915 or task_id.startswith("sdn:")
1916 or task_id.startswith("wim:")
1917 ):
1918 target_id, _, task_id = task_id.partition(" ")
1919
1920 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1921 ro_task_dependency = self.db.get_one(
1922 "ro_tasks",
1923 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1924 fail_on_empty=False,
1925 )
1926
1927 if ro_task_dependency:
1928 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1929 if task["target_record_id"] == task_id:
1930 return ro_task_dependency, task_index
1931
1932 else:
1933 if ro_task:
1934 for task_index, task in enumerate(ro_task["tasks"]):
1935 if task and task["task_id"] == task_id:
1936 return ro_task, task_index
1937
1938 ro_task_dependency = self.db.get_one(
1939 "ro_tasks",
1940 q_filter={
1941 "tasks.ANYINDEX.task_id": task_id,
1942 "tasks.ANYINDEX.target_record.ne": None,
1943 },
1944 fail_on_empty=False,
1945 )
1946
1947 if ro_task_dependency:
1948 for task_index, task in ro_task_dependency["tasks"]:
1949 if task["task_id"] == task_id:
1950 return ro_task_dependency, task_index
1951 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1952
1953 def _process_pending_tasks(self, ro_task):
1954 ro_task_id = ro_task["_id"]
1955 now = time.time()
1956 # one day
1957 next_check_at = now + (24 * 60 * 60)
1958 db_ro_task_update = {}
1959
1960 def _update_refresh(new_status):
1961 # compute next_refresh
1962 nonlocal task
1963 nonlocal next_check_at
1964 nonlocal db_ro_task_update
1965 nonlocal ro_task
1966
1967 next_refresh = time.time()
1968
1969 if task["item"] in ("image", "flavor"):
1970 next_refresh += self.REFRESH_IMAGE
1971 elif new_status == "BUILD":
1972 next_refresh += self.REFRESH_BUILD
1973 elif new_status == "DONE":
1974 next_refresh += self.REFRESH_ACTIVE
1975 else:
1976 next_refresh += self.REFRESH_ERROR
1977
1978 next_check_at = min(next_check_at, next_refresh)
1979 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1980 ro_task["vim_info"]["refresh_at"] = next_refresh
1981
1982 try:
1983 """
1984 # Log RO tasks only when loglevel is DEBUG
1985 if self.logger.getEffectiveLevel() == logging.DEBUG:
1986 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1987 """
1988 # 0: get task_status_create
1989 lock_object = None
1990 task_status_create = None
1991 task_create = next(
1992 (
1993 t
1994 for t in ro_task["tasks"]
1995 if t
1996 and t["action"] == "CREATE"
1997 and t["status"] in ("BUILD", "DONE")
1998 ),
1999 None,
2000 )
2001
2002 if task_create:
2003 task_status_create = task_create["status"]
2004
2005 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2006 for task_action in ("DELETE", "CREATE", "EXEC"):
2007 db_vim_update = None
2008 new_status = None
2009
2010 for task_index, task in enumerate(ro_task["tasks"]):
2011 if not task:
2012 continue # task deleted
2013
2014 task_depends = {}
2015 target_update = None
2016
2017 if (
2018 (
2019 task_action in ("DELETE", "EXEC")
2020 and task["status"] not in ("SCHEDULED", "BUILD")
2021 )
2022 or task["action"] != task_action
2023 or (
2024 task_action == "CREATE"
2025 and task["status"] in ("FINISHED", "SUPERSEDED")
2026 )
2027 ):
2028 continue
2029
2030 task_path = "tasks.{}.status".format(task_index)
2031 try:
2032 db_vim_info_update = None
2033
2034 if task["status"] == "SCHEDULED":
2035 # check if tasks that this depends on have been completed
2036 dependency_not_completed = False
2037
2038 for dependency_task_id in task.get("depends_on") or ():
2039 (
2040 dependency_ro_task,
2041 dependency_task_index,
2042 ) = self._get_dependency(
2043 dependency_task_id, target_id=ro_task["target_id"]
2044 )
2045 dependency_task = dependency_ro_task["tasks"][
2046 dependency_task_index
2047 ]
2048
2049 if dependency_task["status"] == "SCHEDULED":
2050 dependency_not_completed = True
2051 next_check_at = min(
2052 next_check_at, dependency_ro_task["to_check_at"]
2053 )
2054 # must allow dependent task to be processed first
2055 # to do this set time after last_task_processed
2056 next_check_at = max(
2057 self.time_last_task_processed, next_check_at
2058 )
2059 break
2060 elif dependency_task["status"] == "FAILED":
2061 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2062 task["action"],
2063 task["item"],
2064 dependency_task["action"],
2065 dependency_task["item"],
2066 dependency_task_id,
2067 dependency_ro_task["vim_info"].get(
2068 "vim_details"
2069 ),
2070 )
2071 self.logger.error(
2072 "task={} {}".format(task["task_id"], error_text)
2073 )
2074 raise NsWorkerException(error_text)
2075
2076 task_depends[dependency_task_id] = dependency_ro_task[
2077 "vim_info"
2078 ]["vim_id"]
2079 task_depends[
2080 "TASK-{}".format(dependency_task_id)
2081 ] = dependency_ro_task["vim_info"]["vim_id"]
2082
2083 if dependency_not_completed:
2084 # TODO set at vim_info.vim_details that it is waiting
2085 continue
2086
2087 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2088 # the task of renew this locking. It will update database locket_at periodically
2089 if not lock_object:
2090 lock_object = LockRenew.add_lock_object(
2091 "ro_tasks", ro_task, self
2092 )
2093
2094 if task["action"] == "DELETE":
2095 (new_status, db_vim_info_update,) = self._delete_task(
2096 ro_task, task_index, task_depends, db_ro_task_update
2097 )
2098 new_status = (
2099 "FINISHED" if new_status == "DONE" else new_status
2100 )
2101 # ^with FINISHED instead of DONE it will not be refreshing
2102
2103 if new_status in ("FINISHED", "SUPERSEDED"):
2104 target_update = "DELETE"
2105 elif task["action"] == "EXEC":
2106 (
2107 new_status,
2108 db_vim_info_update,
2109 db_task_update,
2110 ) = self.item2class[task["item"]].exec(
2111 ro_task, task_index, task_depends
2112 )
2113 new_status = (
2114 "FINISHED" if new_status == "DONE" else new_status
2115 )
2116 # ^with FINISHED instead of DONE it will not be refreshing
2117
2118 if db_task_update:
2119 # load into database the modified db_task_update "retries" and "next_retry"
2120 if db_task_update.get("retries"):
2121 db_ro_task_update[
2122 "tasks.{}.retries".format(task_index)
2123 ] = db_task_update["retries"]
2124
2125 next_check_at = time.time() + db_task_update.get(
2126 "next_retry", 60
2127 )
2128 target_update = None
2129 elif task["action"] == "CREATE":
2130 if task["status"] == "SCHEDULED":
2131 if task_status_create:
2132 new_status = task_status_create
2133 target_update = "COPY_VIM_INFO"
2134 else:
2135 new_status, db_vim_info_update = self.item2class[
2136 task["item"]
2137 ].new(ro_task, task_index, task_depends)
2138 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2139 _update_refresh(new_status)
2140 else:
2141 if (
2142 ro_task["vim_info"]["refresh_at"]
2143 and now > ro_task["vim_info"]["refresh_at"]
2144 ):
2145 new_status, db_vim_info_update = self.item2class[
2146 task["item"]
2147 ].refresh(ro_task)
2148 _update_refresh(new_status)
2149 else:
2150 # The refresh is updated to avoid set the value of "refresh_at" to
2151 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2152 # because it can happen that in this case the task is never processed
2153 _update_refresh(task["status"])
2154
2155 except Exception as e:
2156 new_status = "FAILED"
2157 db_vim_info_update = {
2158 "vim_status": "VIM_ERROR",
2159 "vim_details": str(e),
2160 }
2161
2162 if not isinstance(
2163 e, (NsWorkerException, vimconn.VimConnException)
2164 ):
2165 self.logger.error(
2166 "Unexpected exception at _delete_task task={}: {}".format(
2167 task["task_id"], e
2168 ),
2169 exc_info=True,
2170 )
2171
2172 try:
2173 if db_vim_info_update:
2174 db_vim_update = db_vim_info_update.copy()
2175 db_ro_task_update.update(
2176 {
2177 "vim_info." + k: v
2178 for k, v in db_vim_info_update.items()
2179 }
2180 )
2181 ro_task["vim_info"].update(db_vim_info_update)
2182
2183 if new_status:
2184 if task_action == "CREATE":
2185 task_status_create = new_status
2186 db_ro_task_update[task_path] = new_status
2187
2188 if target_update or db_vim_update:
2189 if target_update == "DELETE":
2190 self._update_target(task, None)
2191 elif target_update == "COPY_VIM_INFO":
2192 self._update_target(task, ro_task["vim_info"])
2193 else:
2194 self._update_target(task, db_vim_update)
2195
2196 except Exception as e:
2197 if (
2198 isinstance(e, DbException)
2199 and e.http_code == HTTPStatus.NOT_FOUND
2200 ):
2201 # if the vnfrs or nsrs has been removed from database, this task must be removed
2202 self.logger.debug(
2203 "marking to delete task={}".format(task["task_id"])
2204 )
2205 self.tasks_to_delete.append(task)
2206 else:
2207 self.logger.error(
2208 "Unexpected exception at _update_target task={}: {}".format(
2209 task["task_id"], e
2210 ),
2211 exc_info=True,
2212 )
2213
2214 locked_at = ro_task["locked_at"]
2215
2216 if lock_object:
2217 locked_at = [
2218 lock_object["locked_at"],
2219 lock_object["locked_at"] + self.task_locked_time,
2220 ]
2221 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2222 # contain exactly locked_at + self.task_locked_time
2223 LockRenew.remove_lock_object(lock_object)
2224
2225 q_filter = {
2226 "_id": ro_task["_id"],
2227 "to_check_at": ro_task["to_check_at"],
2228 "locked_at": locked_at,
2229 }
2230 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2231 # outside this task (by ro_nbi) do not update it
2232 db_ro_task_update["locked_by"] = None
2233 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2234 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2235 db_ro_task_update["modified_at"] = now
2236 db_ro_task_update["to_check_at"] = next_check_at
2237
2238 """
2239 # Log RO tasks only when loglevel is DEBUG
2240 if self.logger.getEffectiveLevel() == logging.DEBUG:
2241 db_ro_task_update_log = db_ro_task_update.copy()
2242 db_ro_task_update_log["_id"] = q_filter["_id"]
2243 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2244 """
2245
2246 if not self.db.set_one(
2247 "ro_tasks",
2248 update_dict=db_ro_task_update,
2249 q_filter=q_filter,
2250 fail_on_empty=False,
2251 ):
2252 del db_ro_task_update["to_check_at"]
2253 del q_filter["to_check_at"]
2254 """
2255 # Log RO tasks only when loglevel is DEBUG
2256 if self.logger.getEffectiveLevel() == logging.DEBUG:
2257 self._log_ro_task(
2258 None,
2259 db_ro_task_update_log,
2260 None,
2261 "TASK_WF",
2262 "SET_TASK " + str(q_filter),
2263 )
2264 """
2265 self.db.set_one(
2266 "ro_tasks",
2267 q_filter=q_filter,
2268 update_dict=db_ro_task_update,
2269 fail_on_empty=True,
2270 )
2271 except DbException as e:
2272 self.logger.error(
2273 "ro_task={} Error updating database {}".format(ro_task_id, e)
2274 )
2275 except Exception as e:
2276 self.logger.error(
2277 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2278 )
2279
2280 def _update_target(self, task, ro_vim_item_update):
2281 table, _, temp = task["target_record"].partition(":")
2282 _id, _, path_vim_status = temp.partition(":")
2283 path_item = path_vim_status[: path_vim_status.rfind(".")]
2284 path_item = path_item[: path_item.rfind(".")]
2285 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2286 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2287
2288 if ro_vim_item_update:
2289 update_dict = {
2290 path_vim_status + "." + k: v
2291 for k, v in ro_vim_item_update.items()
2292 if k
2293 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2294 }
2295
2296 if path_vim_status.startswith("vdur."):
2297 # for backward compatibility, add vdur.name apart from vdur.vim_name
2298 if ro_vim_item_update.get("vim_name"):
2299 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2300
2301 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2302 if ro_vim_item_update.get("vim_id"):
2303 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2304
2305 # update general status
2306 if ro_vim_item_update.get("vim_status"):
2307 update_dict[path_item + ".status"] = ro_vim_item_update[
2308 "vim_status"
2309 ]
2310
2311 if ro_vim_item_update.get("interfaces"):
2312 path_interfaces = path_item + ".interfaces"
2313
2314 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2315 if iface:
2316 update_dict.update(
2317 {
2318 path_interfaces + ".{}.".format(i) + k: v
2319 for k, v in iface.items()
2320 if k in ("vlan", "compute_node", "pci")
2321 }
2322 )
2323
2324 # put ip_address and mac_address with ip-address and mac-address
2325 if iface.get("ip_address"):
2326 update_dict[
2327 path_interfaces + ".{}.".format(i) + "ip-address"
2328 ] = iface["ip_address"]
2329
2330 if iface.get("mac_address"):
2331 update_dict[
2332 path_interfaces + ".{}.".format(i) + "mac-address"
2333 ] = iface["mac_address"]
2334
2335 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2336 update_dict["ip-address"] = iface.get("ip_address").split(
2337 ";"
2338 )[0]
2339
2340 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2341 update_dict[path_item + ".ip-address"] = iface.get(
2342 "ip_address"
2343 ).split(";")[0]
2344
2345 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2346 else:
2347 update_dict = {path_item + ".status": "DELETED"}
2348 self.db.set_one(
2349 table,
2350 q_filter={"_id": _id},
2351 update_dict=update_dict,
2352 unset={path_vim_status: None},
2353 )
2354
2355 def _process_delete_db_tasks(self):
2356 """
2357 Delete task from database because vnfrs or nsrs or both have been deleted
2358 :return: None. Uses and modify self.tasks_to_delete
2359 """
2360 while self.tasks_to_delete:
2361 task = self.tasks_to_delete[0]
2362 vnfrs_deleted = None
2363 nsr_id = task["nsr_id"]
2364
2365 if task["target_record"].startswith("vnfrs:"):
2366 # check if nsrs is present
2367 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2368 vnfrs_deleted = task["target_record"].split(":")[1]
2369
2370 try:
2371 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2372 except Exception as e:
2373 self.logger.error(
2374 "Error deleting task={}: {}".format(task["task_id"], e)
2375 )
2376 self.tasks_to_delete.pop(0)
2377
2378 @staticmethod
2379 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2380 """
2381 Static method because it is called from osm_ng_ro.ns
2382 :param db: instance of database to use
2383 :param nsr_id: affected nsrs id
2384 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2385 :return: None, exception is fails
2386 """
2387 retries = 5
2388 for retry in range(retries):
2389 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2390 now = time.time()
2391 conflict = False
2392
2393 for ro_task in ro_tasks:
2394 db_update = {}
2395 to_delete_ro_task = True
2396
2397 for index, task in enumerate(ro_task["tasks"]):
2398 if not task:
2399 pass
2400 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2401 vnfrs_deleted
2402 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2403 ):
2404 db_update["tasks.{}".format(index)] = None
2405 else:
2406 # used by other nsr, ro_task cannot be deleted
2407 to_delete_ro_task = False
2408
2409 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2410 if to_delete_ro_task:
2411 if not db.del_one(
2412 "ro_tasks",
2413 q_filter={
2414 "_id": ro_task["_id"],
2415 "modified_at": ro_task["modified_at"],
2416 },
2417 fail_on_empty=False,
2418 ):
2419 conflict = True
2420 elif db_update:
2421 db_update["modified_at"] = now
2422 if not db.set_one(
2423 "ro_tasks",
2424 q_filter={
2425 "_id": ro_task["_id"],
2426 "modified_at": ro_task["modified_at"],
2427 },
2428 update_dict=db_update,
2429 fail_on_empty=False,
2430 ):
2431 conflict = True
2432 if not conflict:
2433 return
2434 else:
2435 raise NsWorkerException("Exceeded {} retries".format(retries))
2436
2437 def run(self):
2438 # load database
2439 self.logger.info("Starting")
2440 while True:
2441 # step 1: get commands from queue
2442 try:
2443 if self.vim_targets:
2444 task = self.task_queue.get(block=False)
2445 else:
2446 if not self.idle:
2447 self.logger.debug("enters in idle state")
2448 self.idle = True
2449 task = self.task_queue.get(block=True)
2450 self.idle = False
2451
2452 if task[0] == "terminate":
2453 break
2454 elif task[0] == "load_vim":
2455 self.logger.info("order to load vim {}".format(task[1]))
2456 self._load_vim(task[1])
2457 elif task[0] == "unload_vim":
2458 self.logger.info("order to unload vim {}".format(task[1]))
2459 self._unload_vim(task[1])
2460 elif task[0] == "reload_vim":
2461 self._reload_vim(task[1])
2462 elif task[0] == "check_vim":
2463 self.logger.info("order to check vim {}".format(task[1]))
2464 self._check_vim(task[1])
2465 continue
2466 except Exception as e:
2467 if isinstance(e, queue.Empty):
2468 pass
2469 else:
2470 self.logger.critical(
2471 "Error processing task: {}".format(e), exc_info=True
2472 )
2473
2474 # step 2: process pending_tasks, delete not needed tasks
2475 try:
2476 if self.tasks_to_delete:
2477 self._process_delete_db_tasks()
2478 busy = False
2479 """
2480 # Log RO tasks only when loglevel is DEBUG
2481 if self.logger.getEffectiveLevel() == logging.DEBUG:
2482 _ = self._get_db_all_tasks()
2483 """
2484 ro_task = self._get_db_task()
2485 if ro_task:
2486 self._process_pending_tasks(ro_task)
2487 busy = True
2488 if not busy:
2489 time.sleep(5)
2490 except Exception as e:
2491 self.logger.critical(
2492 "Unexpected exception at run: " + str(e), exc_info=True
2493 )
2494
2495 self.logger.info("Finishing")