e65ae3fc4091949dd387d81605c47c22e344dbca
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from unittest.mock import Mock
37
38 from importlib_metadata import entry_points
39 from osm_common.dbbase import DbException
40 from osm_ng_ro.vim_admin import LockRenew
41 from osm_ro_plugin import sdnconn, vimconn
42 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
43 from osm_ro_plugin.vim_dummy import VimDummyConnector
44 import yaml
45
46
47 __author__ = "Alfonso Tierno"
48 __date__ = "$28-Sep-2017 12:07:15$"
49
50
51 def deep_get(target_dict, *args, **kwargs):
52 """
53 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
54 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
55 :param target_dict: dictionary to be read
56 :param args: list of keys to read from target_dict
57 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
58 :return: The wanted value if exist, None or default otherwise
59 """
60 for key in args:
61 if not isinstance(target_dict, dict) or key not in target_dict:
62 return kwargs.get("default")
63 target_dict = target_dict[key]
64 return target_dict
65
66
67 class NsWorkerException(Exception):
68 pass
69
70
71 class FailingConnector:
72 def __init__(self, error_msg):
73 self.error_msg = error_msg
74
75 for method in dir(vimconn.VimConnector):
76 if method[0] != "_":
77 setattr(
78 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
79 )
80
81 for method in dir(sdnconn.SdnConnectorBase):
82 if method[0] != "_":
83 setattr(
84 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
85 )
86
87
88 class NsWorkerExceptionNotFound(NsWorkerException):
89 pass
90
91
92 class VimInteractionBase:
93 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
94 It implements methods that does nothing and return ok"""
95
96 def __init__(self, db, my_vims, db_vims, logger):
97 self.db = db
98 self.logger = logger
99 self.my_vims = my_vims
100 self.db_vims = db_vims
101
102 def new(self, ro_task, task_index, task_depends):
103 return "BUILD", {}
104
105 def refresh(self, ro_task):
106 """skip calling VIM to get image, flavor status. Assumes ok"""
107 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
108 return "FAILED", {}
109
110 return "DONE", {}
111
112 def delete(self, ro_task, task_index):
113 """skip calling VIM to delete image. Assumes ok"""
114 return "DONE", {}
115
116 def exec(self, ro_task, task_index, task_depends):
117 return "DONE", None, None
118
119
120 class VimInteractionNet(VimInteractionBase):
121 def new(self, ro_task, task_index, task_depends):
122 vim_net_id = None
123 task = ro_task["tasks"][task_index]
124 task_id = task["task_id"]
125 created = False
126 created_items = {}
127 target_vim = self.my_vims[ro_task["target_id"]]
128 mgmtnet = False
129 mgmtnet_defined_in_vim = False
130
131 try:
132 # FIND
133 if task.get("find_params"):
134 # if management, get configuration of VIM
135 if task["find_params"].get("filter_dict"):
136 vim_filter = task["find_params"]["filter_dict"]
137 # management network
138 elif task["find_params"].get("mgmt"):
139 mgmtnet = True
140 if deep_get(
141 self.db_vims[ro_task["target_id"]],
142 "config",
143 "management_network_id",
144 ):
145 mgmtnet_defined_in_vim = True
146 vim_filter = {
147 "id": self.db_vims[ro_task["target_id"]]["config"][
148 "management_network_id"
149 ]
150 }
151 elif deep_get(
152 self.db_vims[ro_task["target_id"]],
153 "config",
154 "management_network_name",
155 ):
156 mgmtnet_defined_in_vim = True
157 vim_filter = {
158 "name": self.db_vims[ro_task["target_id"]]["config"][
159 "management_network_name"
160 ]
161 }
162 else:
163 vim_filter = {"name": task["find_params"]["name"]}
164 else:
165 raise NsWorkerExceptionNotFound(
166 "Invalid find_params for new_net {}".format(task["find_params"])
167 )
168
169 vim_nets = target_vim.get_network_list(vim_filter)
170 if not vim_nets and not task.get("params"):
171 # If there is mgmt-network in the descriptor,
172 # there is no mapping of that network to a VIM network in the descriptor,
173 # also there is no mapping in the "--config" parameter or at VIM creation;
174 # that mgmt-network will be created.
175 if mgmtnet and not mgmtnet_defined_in_vim:
176 net_name = (
177 vim_filter.get("name")
178 if vim_filter.get("name")
179 else vim_filter.get("id")[:16]
180 )
181 vim_net_id, created_items = target_vim.new_network(
182 net_name, None
183 )
184 self.logger.debug(
185 "Created mgmt network vim_net_id: {}".format(vim_net_id)
186 )
187 created = True
188 else:
189 raise NsWorkerExceptionNotFound(
190 "Network not found with this criteria: '{}'".format(
191 task.get("find_params")
192 )
193 )
194 elif len(vim_nets) > 1:
195 raise NsWorkerException(
196 "More than one network found with this criteria: '{}'".format(
197 task["find_params"]
198 )
199 )
200
201 if vim_nets:
202 vim_net_id = vim_nets[0]["id"]
203 else:
204 # CREATE
205 params = task["params"]
206 vim_net_id, created_items = target_vim.new_network(**params)
207 created = True
208
209 ro_vim_item_update = {
210 "vim_id": vim_net_id,
211 "vim_status": "BUILD",
212 "created": created,
213 "created_items": created_items,
214 "vim_details": None,
215 }
216 self.logger.debug(
217 "task={} {} new-net={} created={}".format(
218 task_id, ro_task["target_id"], vim_net_id, created
219 )
220 )
221
222 return "BUILD", ro_vim_item_update
223 except (vimconn.VimConnException, NsWorkerException) as e:
224 self.logger.error(
225 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
226 )
227 ro_vim_item_update = {
228 "vim_status": "VIM_ERROR",
229 "created": created,
230 "vim_details": str(e),
231 }
232
233 return "FAILED", ro_vim_item_update
234
235 def refresh(self, ro_task):
236 """Call VIM to get network status"""
237 ro_task_id = ro_task["_id"]
238 target_vim = self.my_vims[ro_task["target_id"]]
239 vim_id = ro_task["vim_info"]["vim_id"]
240 net_to_refresh_list = [vim_id]
241
242 try:
243 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
244 vim_info = vim_dict[vim_id]
245
246 if vim_info["status"] == "ACTIVE":
247 task_status = "DONE"
248 elif vim_info["status"] == "BUILD":
249 task_status = "BUILD"
250 else:
251 task_status = "FAILED"
252 except vimconn.VimConnException as e:
253 # Mark all tasks at VIM_ERROR status
254 self.logger.error(
255 "ro_task={} vim={} get-net={}: {}".format(
256 ro_task_id, ro_task["target_id"], vim_id, e
257 )
258 )
259 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
260 task_status = "FAILED"
261
262 ro_vim_item_update = {}
263 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
264 ro_vim_item_update["vim_status"] = vim_info["status"]
265
266 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
267 ro_vim_item_update["vim_name"] = vim_info.get("name")
268
269 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
270 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
271 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
272 elif vim_info["status"] == "DELETED":
273 ro_vim_item_update["vim_id"] = None
274 ro_vim_item_update["vim_details"] = "Deleted externally"
275 else:
276 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
277 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
278
279 if ro_vim_item_update:
280 self.logger.debug(
281 "ro_task={} {} get-net={}: status={} {}".format(
282 ro_task_id,
283 ro_task["target_id"],
284 vim_id,
285 ro_vim_item_update.get("vim_status"),
286 ro_vim_item_update.get("vim_details")
287 if ro_vim_item_update.get("vim_status") != "ACTIVE"
288 else "",
289 )
290 )
291
292 return task_status, ro_vim_item_update
293
294 def delete(self, ro_task, task_index):
295 task = ro_task["tasks"][task_index]
296 task_id = task["task_id"]
297 net_vim_id = ro_task["vim_info"]["vim_id"]
298 ro_vim_item_update_ok = {
299 "vim_status": "DELETED",
300 "created": False,
301 "vim_details": "DELETED",
302 "vim_id": None,
303 }
304
305 try:
306 if net_vim_id or ro_task["vim_info"]["created_items"]:
307 target_vim = self.my_vims[ro_task["target_id"]]
308 target_vim.delete_network(
309 net_vim_id, ro_task["vim_info"]["created_items"]
310 )
311 except vimconn.VimConnNotFoundException:
312 ro_vim_item_update_ok["vim_details"] = "already deleted"
313 except vimconn.VimConnException as e:
314 self.logger.error(
315 "ro_task={} vim={} del-net={}: {}".format(
316 ro_task["_id"], ro_task["target_id"], net_vim_id, e
317 )
318 )
319 ro_vim_item_update = {
320 "vim_status": "VIM_ERROR",
321 "vim_details": "Error while deleting: {}".format(e),
322 }
323
324 return "FAILED", ro_vim_item_update
325
326 self.logger.debug(
327 "task={} {} del-net={} {}".format(
328 task_id,
329 ro_task["target_id"],
330 net_vim_id,
331 ro_vim_item_update_ok.get("vim_details", ""),
332 )
333 )
334
335 return "DONE", ro_vim_item_update_ok
336
337
338 class VimInteractionVdu(VimInteractionBase):
339 max_retries_inject_ssh_key = 20 # 20 times
340 time_retries_inject_ssh_key = 30 # wevery 30 seconds
341
342 def new(self, ro_task, task_index, task_depends):
343 task = ro_task["tasks"][task_index]
344 task_id = task["task_id"]
345 created = False
346 created_items = {}
347 target_vim = self.my_vims[ro_task["target_id"]]
348
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
383
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
388
389 affinity_group["affinity_group_id"] = affinity_group_id
390
391 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
392 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
393
394 ro_vim_item_update = {
395 "vim_id": vim_vm_id,
396 "vim_status": "BUILD",
397 "created": created,
398 "created_items": created_items,
399 "vim_details": None,
400 "interfaces_vim_ids": interfaces,
401 "interfaces": [],
402 }
403 self.logger.debug(
404 "task={} {} new-vm={} created={}".format(
405 task_id, ro_task["target_id"], vim_vm_id, created
406 )
407 )
408
409 return "BUILD", ro_vim_item_update
410 except (vimconn.VimConnException, NsWorkerException) as e:
411 self.logger.error(
412 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
413 )
414 ro_vim_item_update = {
415 "vim_status": "VIM_ERROR",
416 "created": created,
417 "vim_details": str(e),
418 }
419
420 return "FAILED", ro_vim_item_update
421
422 def delete(self, ro_task, task_index):
423 task = ro_task["tasks"][task_index]
424 task_id = task["task_id"]
425 vm_vim_id = ro_task["vim_info"]["vim_id"]
426 ro_vim_item_update_ok = {
427 "vim_status": "DELETED",
428 "created": False,
429 "vim_details": "DELETED",
430 "vim_id": None,
431 }
432
433 try:
434 if vm_vim_id or ro_task["vim_info"]["created_items"]:
435 target_vim = self.my_vims[ro_task["target_id"]]
436 target_vim.delete_vminstance(
437 vm_vim_id, ro_task["vim_info"]["created_items"]
438 )
439 except vimconn.VimConnNotFoundException:
440 ro_vim_item_update_ok["vim_details"] = "already deleted"
441 except vimconn.VimConnException as e:
442 self.logger.error(
443 "ro_task={} vim={} del-vm={}: {}".format(
444 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
445 )
446 )
447 ro_vim_item_update = {
448 "vim_status": "VIM_ERROR",
449 "vim_details": "Error while deleting: {}".format(e),
450 }
451
452 return "FAILED", ro_vim_item_update
453
454 self.logger.debug(
455 "task={} {} del-vm={} {}".format(
456 task_id,
457 ro_task["target_id"],
458 vm_vim_id,
459 ro_vim_item_update_ok.get("vim_details", ""),
460 )
461 )
462
463 return "DONE", ro_vim_item_update_ok
464
465 def refresh(self, ro_task):
466 """Call VIM to get vm status"""
467 ro_task_id = ro_task["_id"]
468 target_vim = self.my_vims[ro_task["target_id"]]
469 vim_id = ro_task["vim_info"]["vim_id"]
470
471 if not vim_id:
472 return None, None
473
474 vm_to_refresh_list = [vim_id]
475 try:
476 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
477 vim_info = vim_dict[vim_id]
478
479 if vim_info["status"] == "ACTIVE":
480 task_status = "DONE"
481 elif vim_info["status"] == "BUILD":
482 task_status = "BUILD"
483 else:
484 task_status = "FAILED"
485
486 # try to load and parse vim_information
487 try:
488 vim_info_info = yaml.safe_load(vim_info["vim_info"])
489 if vim_info_info.get("name"):
490 vim_info["name"] = vim_info_info["name"]
491 except Exception:
492 pass
493 except vimconn.VimConnException as e:
494 # Mark all tasks at VIM_ERROR status
495 self.logger.error(
496 "ro_task={} vim={} get-vm={}: {}".format(
497 ro_task_id, ro_task["target_id"], vim_id, e
498 )
499 )
500 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
501 task_status = "FAILED"
502
503 ro_vim_item_update = {}
504
505 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
506 vim_interfaces = []
507 if vim_info.get("interfaces"):
508 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
509 iface = next(
510 (
511 iface
512 for iface in vim_info["interfaces"]
513 if vim_iface_id == iface["vim_interface_id"]
514 ),
515 None,
516 )
517 # if iface:
518 # iface.pop("vim_info", None)
519 vim_interfaces.append(iface)
520
521 task_create = next(
522 t
523 for t in ro_task["tasks"]
524 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
525 )
526 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
527 vim_interfaces[task_create["mgmt_vnf_interface"]][
528 "mgmt_vnf_interface"
529 ] = True
530
531 mgmt_vdu_iface = task_create.get(
532 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
533 )
534 if vim_interfaces:
535 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
536
537 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
538 ro_vim_item_update["interfaces"] = vim_interfaces
539
540 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
541 ro_vim_item_update["vim_status"] = vim_info["status"]
542
543 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
544 ro_vim_item_update["vim_name"] = vim_info.get("name")
545
546 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
547 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
548 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
549 elif vim_info["status"] == "DELETED":
550 ro_vim_item_update["vim_id"] = None
551 ro_vim_item_update["vim_details"] = "Deleted externally"
552 else:
553 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
554 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
555
556 if ro_vim_item_update:
557 self.logger.debug(
558 "ro_task={} {} get-vm={}: status={} {}".format(
559 ro_task_id,
560 ro_task["target_id"],
561 vim_id,
562 ro_vim_item_update.get("vim_status"),
563 ro_vim_item_update.get("vim_details")
564 if ro_vim_item_update.get("vim_status") != "ACTIVE"
565 else "",
566 )
567 )
568
569 return task_status, ro_vim_item_update
570
571 def exec(self, ro_task, task_index, task_depends):
572 task = ro_task["tasks"][task_index]
573 task_id = task["task_id"]
574 target_vim = self.my_vims[ro_task["target_id"]]
575 db_task_update = {"retries": 0}
576 retries = task.get("retries", 0)
577
578 try:
579 params = task["params"]
580 params_copy = deepcopy(params)
581 params_copy["ro_key"] = self.db.decrypt(
582 params_copy.pop("private_key"),
583 params_copy.pop("schema_version"),
584 params_copy.pop("salt"),
585 )
586 params_copy["ip_addr"] = params_copy.pop("ip_address")
587 target_vim.inject_user_key(**params_copy)
588 self.logger.debug(
589 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
590 )
591
592 return (
593 "DONE",
594 None,
595 db_task_update,
596 ) # params_copy["key"]
597 except (vimconn.VimConnException, NsWorkerException) as e:
598 retries += 1
599
600 self.logger.debug(traceback.format_exc())
601 if retries < self.max_retries_inject_ssh_key:
602 return (
603 "BUILD",
604 None,
605 {
606 "retries": retries,
607 "next_retry": self.time_retries_inject_ssh_key,
608 },
609 )
610
611 self.logger.error(
612 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
613 )
614 ro_vim_item_update = {"vim_details": str(e)}
615
616 return "FAILED", ro_vim_item_update, db_task_update
617
618
619 class VimInteractionImage(VimInteractionBase):
620 def new(self, ro_task, task_index, task_depends):
621 task = ro_task["tasks"][task_index]
622 task_id = task["task_id"]
623 created = False
624 created_items = {}
625 target_vim = self.my_vims[ro_task["target_id"]]
626
627 try:
628 # FIND
629 if task.get("find_params"):
630 vim_images = target_vim.get_image_list(**task["find_params"])
631
632 if not vim_images:
633 raise NsWorkerExceptionNotFound(
634 "Image not found with this criteria: '{}'".format(
635 task["find_params"]
636 )
637 )
638 elif len(vim_images) > 1:
639 raise NsWorkerException(
640 "More than one image found with this criteria: '{}'".format(
641 task["find_params"]
642 )
643 )
644 else:
645 vim_image_id = vim_images[0]["id"]
646
647 ro_vim_item_update = {
648 "vim_id": vim_image_id,
649 "vim_status": "DONE",
650 "created": created,
651 "created_items": created_items,
652 "vim_details": None,
653 }
654 self.logger.debug(
655 "task={} {} new-image={} created={}".format(
656 task_id, ro_task["target_id"], vim_image_id, created
657 )
658 )
659
660 return "DONE", ro_vim_item_update
661 except (NsWorkerException, vimconn.VimConnException) as e:
662 self.logger.error(
663 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
664 )
665 ro_vim_item_update = {
666 "vim_status": "VIM_ERROR",
667 "created": created,
668 "vim_details": str(e),
669 }
670
671 return "FAILED", ro_vim_item_update
672
673
674 class VimInteractionFlavor(VimInteractionBase):
675 def delete(self, ro_task, task_index):
676 task = ro_task["tasks"][task_index]
677 task_id = task["task_id"]
678 flavor_vim_id = ro_task["vim_info"]["vim_id"]
679 ro_vim_item_update_ok = {
680 "vim_status": "DELETED",
681 "created": False,
682 "vim_details": "DELETED",
683 "vim_id": None,
684 }
685
686 try:
687 if flavor_vim_id:
688 target_vim = self.my_vims[ro_task["target_id"]]
689 target_vim.delete_flavor(flavor_vim_id)
690 except vimconn.VimConnNotFoundException:
691 ro_vim_item_update_ok["vim_details"] = "already deleted"
692 except vimconn.VimConnException as e:
693 self.logger.error(
694 "ro_task={} vim={} del-flavor={}: {}".format(
695 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
696 )
697 )
698 ro_vim_item_update = {
699 "vim_status": "VIM_ERROR",
700 "vim_details": "Error while deleting: {}".format(e),
701 }
702
703 return "FAILED", ro_vim_item_update
704
705 self.logger.debug(
706 "task={} {} del-flavor={} {}".format(
707 task_id,
708 ro_task["target_id"],
709 flavor_vim_id,
710 ro_vim_item_update_ok.get("vim_details", ""),
711 )
712 )
713
714 return "DONE", ro_vim_item_update_ok
715
716 def new(self, ro_task, task_index, task_depends):
717 task = ro_task["tasks"][task_index]
718 task_id = task["task_id"]
719 created = False
720 created_items = {}
721 target_vim = self.my_vims[ro_task["target_id"]]
722
723 try:
724 # FIND
725 vim_flavor_id = None
726
727 if task.get("find_params"):
728 try:
729 flavor_data = task["find_params"]["flavor_data"]
730 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
731 except vimconn.VimConnNotFoundException:
732 pass
733
734 if not vim_flavor_id and task.get("params"):
735 # CREATE
736 flavor_data = task["params"]["flavor_data"]
737 vim_flavor_id = target_vim.new_flavor(flavor_data)
738 created = True
739
740 ro_vim_item_update = {
741 "vim_id": vim_flavor_id,
742 "vim_status": "DONE",
743 "created": created,
744 "created_items": created_items,
745 "vim_details": None,
746 }
747 self.logger.debug(
748 "task={} {} new-flavor={} created={}".format(
749 task_id, ro_task["target_id"], vim_flavor_id, created
750 )
751 )
752
753 return "DONE", ro_vim_item_update
754 except (vimconn.VimConnException, NsWorkerException) as e:
755 self.logger.error(
756 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
757 )
758 ro_vim_item_update = {
759 "vim_status": "VIM_ERROR",
760 "created": created,
761 "vim_details": str(e),
762 }
763
764 return "FAILED", ro_vim_item_update
765
766
767 class VimInteractionAffinityGroup(VimInteractionBase):
768 def delete(self, ro_task, task_index):
769 task = ro_task["tasks"][task_index]
770 task_id = task["task_id"]
771 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
772 ro_vim_item_update_ok = {
773 "vim_status": "DELETED",
774 "created": False,
775 "vim_details": "DELETED",
776 "vim_id": None,
777 }
778
779 try:
780 if affinity_group_vim_id:
781 target_vim = self.my_vims[ro_task["target_id"]]
782 target_vim.delete_affinity_group(affinity_group_vim_id)
783 except vimconn.VimConnNotFoundException:
784 ro_vim_item_update_ok["vim_details"] = "already deleted"
785 except vimconn.VimConnException as e:
786 self.logger.error(
787 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
788 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
789 )
790 )
791 ro_vim_item_update = {
792 "vim_status": "VIM_ERROR",
793 "vim_details": "Error while deleting: {}".format(e),
794 }
795
796 return "FAILED", ro_vim_item_update
797
798 self.logger.debug(
799 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
800 task_id,
801 ro_task["target_id"],
802 affinity_group_vim_id,
803 ro_vim_item_update_ok.get("vim_details", ""),
804 )
805 )
806
807 return "DONE", ro_vim_item_update_ok
808
809 def new(self, ro_task, task_index, task_depends):
810 task = ro_task["tasks"][task_index]
811 task_id = task["task_id"]
812 created = False
813 created_items = {}
814 target_vim = self.my_vims[ro_task["target_id"]]
815
816 try:
817 affinity_group_vim_id = None
818 affinity_group_data = None
819
820 if task.get("params"):
821 affinity_group_data = task["params"].get("affinity_group_data")
822
823 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
824 try:
825 param_affinity_group_id = task["params"]["affinity_group_data"].get(
826 "vim-affinity-group-id"
827 )
828 affinity_group_vim_id = target_vim.get_affinity_group(
829 param_affinity_group_id
830 ).get("id")
831 except vimconn.VimConnNotFoundException:
832 self.logger.error(
833 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
834 "could not be found at VIM. Creating a new one.".format(
835 task_id, ro_task["target_id"], param_affinity_group_id
836 )
837 )
838
839 if not affinity_group_vim_id and affinity_group_data:
840 affinity_group_vim_id = target_vim.new_affinity_group(
841 affinity_group_data
842 )
843 created = True
844
845 ro_vim_item_update = {
846 "vim_id": affinity_group_vim_id,
847 "vim_status": "DONE",
848 "created": created,
849 "created_items": created_items,
850 "vim_details": None,
851 }
852 self.logger.debug(
853 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
854 task_id, ro_task["target_id"], affinity_group_vim_id, created
855 )
856 )
857
858 return "DONE", ro_vim_item_update
859 except (vimconn.VimConnException, NsWorkerException) as e:
860 self.logger.error(
861 "task={} vim={} new-affinity-or-anti-affinity-group:"
862 " {}".format(task_id, ro_task["target_id"], e)
863 )
864 ro_vim_item_update = {
865 "vim_status": "VIM_ERROR",
866 "created": created,
867 "vim_details": str(e),
868 }
869
870 return "FAILED", ro_vim_item_update
871
872
873 class VimInteractionSdnNet(VimInteractionBase):
874 @staticmethod
875 def _match_pci(port_pci, mapping):
876 """
877 Check if port_pci matches with mapping
878 mapping can have brackets to indicate that several chars are accepted. e.g
879 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
880 :param port_pci: text
881 :param mapping: text, can contain brackets to indicate several chars are available
882 :return: True if matches, False otherwise
883 """
884 if not port_pci or not mapping:
885 return False
886 if port_pci == mapping:
887 return True
888
889 mapping_index = 0
890 pci_index = 0
891 while True:
892 bracket_start = mapping.find("[", mapping_index)
893
894 if bracket_start == -1:
895 break
896
897 bracket_end = mapping.find("]", bracket_start)
898 if bracket_end == -1:
899 break
900
901 length = bracket_start - mapping_index
902 if (
903 length
904 and port_pci[pci_index : pci_index + length]
905 != mapping[mapping_index:bracket_start]
906 ):
907 return False
908
909 if (
910 port_pci[pci_index + length]
911 not in mapping[bracket_start + 1 : bracket_end]
912 ):
913 return False
914
915 pci_index += length + 1
916 mapping_index = bracket_end + 1
917
918 if port_pci[pci_index:] != mapping[mapping_index:]:
919 return False
920
921 return True
922
923 def _get_interfaces(self, vlds_to_connect, vim_account_id):
924 """
925 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
926 :param vim_account_id:
927 :return:
928 """
929 interfaces = []
930
931 for vld in vlds_to_connect:
932 table, _, db_id = vld.partition(":")
933 db_id, _, vld = db_id.partition(":")
934 _, _, vld_id = vld.partition(".")
935
936 if table == "vnfrs":
937 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
938 iface_key = "vnf-vld-id"
939 else: # table == "nsrs"
940 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
941 iface_key = "ns-vld-id"
942
943 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
944
945 for db_vnfr in db_vnfrs:
946 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
947 for iface_index, interface in enumerate(vdur["interfaces"]):
948 if interface.get(iface_key) == vld_id and interface.get(
949 "type"
950 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
951 # only SR-IOV o PT
952 interface_ = interface.copy()
953 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
954 db_vnfr["_id"], vdu_index, iface_index
955 )
956
957 if vdur.get("status") == "ERROR":
958 interface_["status"] = "ERROR"
959
960 interfaces.append(interface_)
961
962 return interfaces
963
964 def refresh(self, ro_task):
965 # look for task create
966 task_create_index, _ = next(
967 i_t
968 for i_t in enumerate(ro_task["tasks"])
969 if i_t[1]
970 and i_t[1]["action"] == "CREATE"
971 and i_t[1]["status"] != "FINISHED"
972 )
973
974 return self.new(ro_task, task_create_index, None)
975
976 def new(self, ro_task, task_index, task_depends):
977
978 task = ro_task["tasks"][task_index]
979 task_id = task["task_id"]
980 target_vim = self.my_vims[ro_task["target_id"]]
981
982 sdn_net_id = ro_task["vim_info"]["vim_id"]
983
984 created_items = ro_task["vim_info"].get("created_items")
985 connected_ports = ro_task["vim_info"].get("connected_ports", [])
986 new_connected_ports = []
987 last_update = ro_task["vim_info"].get("last_update", 0)
988 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
989 error_list = []
990 created = ro_task["vim_info"].get("created", False)
991
992 try:
993 # CREATE
994 params = task["params"]
995 vlds_to_connect = params["vlds"]
996 associated_vim = params["target_vim"]
997 # external additional ports
998 additional_ports = params.get("sdn-ports") or ()
999 _, _, vim_account_id = associated_vim.partition(":")
1000
1001 if associated_vim:
1002 # get associated VIM
1003 if associated_vim not in self.db_vims:
1004 self.db_vims[associated_vim] = self.db.get_one(
1005 "vim_accounts", {"_id": vim_account_id}
1006 )
1007
1008 db_vim = self.db_vims[associated_vim]
1009
1010 # look for ports to connect
1011 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1012 # print(ports)
1013
1014 sdn_ports = []
1015 pending_ports = error_ports = 0
1016 vlan_used = None
1017 sdn_need_update = False
1018
1019 for port in ports:
1020 vlan_used = port.get("vlan") or vlan_used
1021
1022 # TODO. Do not connect if already done
1023 if not port.get("compute_node") or not port.get("pci"):
1024 if port.get("status") == "ERROR":
1025 error_ports += 1
1026 else:
1027 pending_ports += 1
1028 continue
1029
1030 pmap = None
1031 compute_node_mappings = next(
1032 (
1033 c
1034 for c in db_vim["config"].get("sdn-port-mapping", ())
1035 if c and c["compute_node"] == port["compute_node"]
1036 ),
1037 None,
1038 )
1039
1040 if compute_node_mappings:
1041 # process port_mapping pci of type 0000:af:1[01].[1357]
1042 pmap = next(
1043 (
1044 p
1045 for p in compute_node_mappings["ports"]
1046 if self._match_pci(port["pci"], p.get("pci"))
1047 ),
1048 None,
1049 )
1050
1051 if not pmap:
1052 if not db_vim["config"].get("mapping_not_needed"):
1053 error_list.append(
1054 "Port mapping not found for compute_node={} pci={}".format(
1055 port["compute_node"], port["pci"]
1056 )
1057 )
1058 continue
1059
1060 pmap = {}
1061
1062 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1063 new_port = {
1064 "service_endpoint_id": pmap.get("service_endpoint_id")
1065 or service_endpoint_id,
1066 "service_endpoint_encapsulation_type": "dot1q"
1067 if port["type"] == "SR-IOV"
1068 else None,
1069 "service_endpoint_encapsulation_info": {
1070 "vlan": port.get("vlan"),
1071 "mac": port.get("mac-address"),
1072 "device_id": pmap.get("device_id") or port["compute_node"],
1073 "device_interface_id": pmap.get("device_interface_id")
1074 or port["pci"],
1075 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1076 "switch_port": pmap.get("switch_port"),
1077 "service_mapping_info": pmap.get("service_mapping_info"),
1078 },
1079 }
1080
1081 # TODO
1082 # if port["modified_at"] > last_update:
1083 # sdn_need_update = True
1084 new_connected_ports.append(port["id"]) # TODO
1085 sdn_ports.append(new_port)
1086
1087 if error_ports:
1088 error_list.append(
1089 "{} interfaces have not been created as VDU is on ERROR status".format(
1090 error_ports
1091 )
1092 )
1093
1094 # connect external ports
1095 for index, additional_port in enumerate(additional_ports):
1096 additional_port_id = additional_port.get(
1097 "service_endpoint_id"
1098 ) or "external-{}".format(index)
1099 sdn_ports.append(
1100 {
1101 "service_endpoint_id": additional_port_id,
1102 "service_endpoint_encapsulation_type": additional_port.get(
1103 "service_endpoint_encapsulation_type", "dot1q"
1104 ),
1105 "service_endpoint_encapsulation_info": {
1106 "vlan": additional_port.get("vlan") or vlan_used,
1107 "mac": additional_port.get("mac_address"),
1108 "device_id": additional_port.get("device_id"),
1109 "device_interface_id": additional_port.get(
1110 "device_interface_id"
1111 ),
1112 "switch_dpid": additional_port.get("switch_dpid")
1113 or additional_port.get("switch_id"),
1114 "switch_port": additional_port.get("switch_port"),
1115 "service_mapping_info": additional_port.get(
1116 "service_mapping_info"
1117 ),
1118 },
1119 }
1120 )
1121 new_connected_ports.append(additional_port_id)
1122 sdn_info = ""
1123
1124 # if there are more ports to connect or they have been modified, call create/update
1125 if error_list:
1126 sdn_status = "ERROR"
1127 sdn_info = "; ".join(error_list)
1128 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1129 last_update = time.time()
1130
1131 if not sdn_net_id:
1132 if len(sdn_ports) < 2:
1133 sdn_status = "ACTIVE"
1134
1135 if not pending_ports:
1136 self.logger.debug(
1137 "task={} {} new-sdn-net done, less than 2 ports".format(
1138 task_id, ro_task["target_id"]
1139 )
1140 )
1141 else:
1142 net_type = params.get("type") or "ELAN"
1143 (
1144 sdn_net_id,
1145 created_items,
1146 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1147 created = True
1148 self.logger.debug(
1149 "task={} {} new-sdn-net={} created={}".format(
1150 task_id, ro_task["target_id"], sdn_net_id, created
1151 )
1152 )
1153 else:
1154 created_items = target_vim.edit_connectivity_service(
1155 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1156 )
1157 created = True
1158 self.logger.debug(
1159 "task={} {} update-sdn-net={} created={}".format(
1160 task_id, ro_task["target_id"], sdn_net_id, created
1161 )
1162 )
1163
1164 connected_ports = new_connected_ports
1165 elif sdn_net_id:
1166 wim_status_dict = target_vim.get_connectivity_service_status(
1167 sdn_net_id, conn_info=created_items
1168 )
1169 sdn_status = wim_status_dict["sdn_status"]
1170
1171 if wim_status_dict.get("sdn_info"):
1172 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1173
1174 if wim_status_dict.get("error_msg"):
1175 sdn_info = wim_status_dict.get("error_msg") or ""
1176
1177 if pending_ports:
1178 if sdn_status != "ERROR":
1179 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1180 len(ports) - pending_ports, len(ports)
1181 )
1182
1183 if sdn_status == "ACTIVE":
1184 sdn_status = "BUILD"
1185
1186 ro_vim_item_update = {
1187 "vim_id": sdn_net_id,
1188 "vim_status": sdn_status,
1189 "created": created,
1190 "created_items": created_items,
1191 "connected_ports": connected_ports,
1192 "vim_details": sdn_info,
1193 "last_update": last_update,
1194 }
1195
1196 return sdn_status, ro_vim_item_update
1197 except Exception as e:
1198 self.logger.error(
1199 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1200 exc_info=not isinstance(
1201 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1202 ),
1203 )
1204 ro_vim_item_update = {
1205 "vim_status": "VIM_ERROR",
1206 "created": created,
1207 "vim_details": str(e),
1208 }
1209
1210 return "FAILED", ro_vim_item_update
1211
1212 def delete(self, ro_task, task_index):
1213 task = ro_task["tasks"][task_index]
1214 task_id = task["task_id"]
1215 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1216 ro_vim_item_update_ok = {
1217 "vim_status": "DELETED",
1218 "created": False,
1219 "vim_details": "DELETED",
1220 "vim_id": None,
1221 }
1222
1223 try:
1224 if sdn_vim_id:
1225 target_vim = self.my_vims[ro_task["target_id"]]
1226 target_vim.delete_connectivity_service(
1227 sdn_vim_id, ro_task["vim_info"].get("created_items")
1228 )
1229
1230 except Exception as e:
1231 if (
1232 isinstance(e, sdnconn.SdnConnectorError)
1233 and e.http_code == HTTPStatus.NOT_FOUND.value
1234 ):
1235 ro_vim_item_update_ok["vim_details"] = "already deleted"
1236 else:
1237 self.logger.error(
1238 "ro_task={} vim={} del-sdn-net={}: {}".format(
1239 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1240 ),
1241 exc_info=not isinstance(
1242 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1243 ),
1244 )
1245 ro_vim_item_update = {
1246 "vim_status": "VIM_ERROR",
1247 "vim_details": "Error while deleting: {}".format(e),
1248 }
1249
1250 return "FAILED", ro_vim_item_update
1251
1252 self.logger.debug(
1253 "task={} {} del-sdn-net={} {}".format(
1254 task_id,
1255 ro_task["target_id"],
1256 sdn_vim_id,
1257 ro_vim_item_update_ok.get("vim_details", ""),
1258 )
1259 )
1260
1261 return "DONE", ro_vim_item_update_ok
1262
1263
1264 class NsWorker(threading.Thread):
1265 REFRESH_BUILD = 5 # 5 seconds
1266 REFRESH_ACTIVE = 60 # 1 minute
1267 REFRESH_ERROR = 600
1268 REFRESH_IMAGE = 3600 * 10
1269 REFRESH_DELETE = 3600 * 10
1270 QUEUE_SIZE = 100
1271 terminate = False
1272
1273 def __init__(self, worker_index, config, plugins, db):
1274 """
1275
1276 :param worker_index: thread index
1277 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1278 :param plugins: global shared dict with the loaded plugins
1279 :param db: database class instance to use
1280 """
1281 threading.Thread.__init__(self)
1282 self.config = config
1283 self.plugins = plugins
1284 self.plugin_name = "unknown"
1285 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1286 self.worker_index = worker_index
1287 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1288 # targetvim: vimplugin class
1289 self.my_vims = {}
1290 # targetvim: vim information from database
1291 self.db_vims = {}
1292 # targetvim list
1293 self.vim_targets = []
1294 self.my_id = config["process_id"] + ":" + str(worker_index)
1295 self.db = db
1296 self.item2class = {
1297 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1298 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1299 "image": VimInteractionImage(
1300 self.db, self.my_vims, self.db_vims, self.logger
1301 ),
1302 "flavor": VimInteractionFlavor(
1303 self.db, self.my_vims, self.db_vims, self.logger
1304 ),
1305 "sdn_net": VimInteractionSdnNet(
1306 self.db, self.my_vims, self.db_vims, self.logger
1307 ),
1308 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1309 self.db, self.my_vims, self.db_vims, self.logger
1310 ),
1311 }
1312 self.time_last_task_processed = None
1313 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1314 self.tasks_to_delete = []
1315 # it is idle when there are not vim_targets associated
1316 self.idle = True
1317 self.task_locked_time = config["global"]["task_locked_time"]
1318
1319 def insert_task(self, task):
1320 try:
1321 self.task_queue.put(task, False)
1322 return None
1323 except queue.Full:
1324 raise NsWorkerException("timeout inserting a task")
1325
1326 def terminate(self):
1327 self.insert_task("exit")
1328
1329 def del_task(self, task):
1330 with self.task_lock:
1331 if task["status"] == "SCHEDULED":
1332 task["status"] = "SUPERSEDED"
1333 return True
1334 else: # task["status"] == "processing"
1335 self.task_lock.release()
1336 return False
1337
1338 def _process_vim_config(self, target_id, db_vim):
1339 """
1340 Process vim config, creating vim configuration files as ca_cert
1341 :param target_id: vim/sdn/wim + id
1342 :param db_vim: Vim dictionary obtained from database
1343 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1344 """
1345 if not db_vim.get("config"):
1346 return
1347
1348 file_name = ""
1349
1350 try:
1351 if db_vim["config"].get("ca_cert_content"):
1352 file_name = "{}:{}".format(target_id, self.worker_index)
1353
1354 try:
1355 mkdir(file_name)
1356 except FileExistsError:
1357 pass
1358
1359 file_name = file_name + "/ca_cert"
1360
1361 with open(file_name, "w") as f:
1362 f.write(db_vim["config"]["ca_cert_content"])
1363 del db_vim["config"]["ca_cert_content"]
1364 db_vim["config"]["ca_cert"] = file_name
1365 except Exception as e:
1366 raise NsWorkerException(
1367 "Error writing to file '{}': {}".format(file_name, e)
1368 )
1369
1370 def _load_plugin(self, name, type="vim"):
1371 # type can be vim or sdn
1372 if "rovim_dummy" not in self.plugins:
1373 self.plugins["rovim_dummy"] = VimDummyConnector
1374
1375 if "rosdn_dummy" not in self.plugins:
1376 self.plugins["rosdn_dummy"] = SdnDummyConnector
1377
1378 if name in self.plugins:
1379 return self.plugins[name]
1380
1381 try:
1382 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1383 self.plugins[name] = ep.load()
1384 except Exception as e:
1385 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1386
1387 if name and name not in self.plugins:
1388 raise NsWorkerException(
1389 "Plugin 'osm_{n}' has not been installed".format(n=name)
1390 )
1391
1392 return self.plugins[name]
1393
1394 def _unload_vim(self, target_id):
1395 """
1396 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1397 :param target_id: Contains type:_id; where type can be 'vim', ...
1398 :return: None.
1399 """
1400 try:
1401 self.db_vims.pop(target_id, None)
1402 self.my_vims.pop(target_id, None)
1403
1404 if target_id in self.vim_targets:
1405 self.vim_targets.remove(target_id)
1406
1407 self.logger.info("Unloaded {}".format(target_id))
1408 rmtree("{}:{}".format(target_id, self.worker_index))
1409 except FileNotFoundError:
1410 pass # this is raised by rmtree if folder does not exist
1411 except Exception as e:
1412 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1413
1414 def _check_vim(self, target_id):
1415 """
1416 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1417 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1418 :return: None.
1419 """
1420 target, _, _id = target_id.partition(":")
1421 now = time.time()
1422 update_dict = {}
1423 unset_dict = {}
1424 op_text = ""
1425 step = ""
1426 loaded = target_id in self.vim_targets
1427 target_database = (
1428 "vim_accounts"
1429 if target == "vim"
1430 else "wim_accounts"
1431 if target == "wim"
1432 else "sdns"
1433 )
1434
1435 try:
1436 step = "Getting {} from db".format(target_id)
1437 db_vim = self.db.get_one(target_database, {"_id": _id})
1438
1439 for op_index, operation in enumerate(
1440 db_vim["_admin"].get("operations", ())
1441 ):
1442 if operation["operationState"] != "PROCESSING":
1443 continue
1444
1445 locked_at = operation.get("locked_at")
1446
1447 if locked_at is not None and locked_at >= now - self.task_locked_time:
1448 # some other thread is doing this operation
1449 return
1450
1451 # lock
1452 op_text = "_admin.operations.{}.".format(op_index)
1453
1454 if not self.db.set_one(
1455 target_database,
1456 q_filter={
1457 "_id": _id,
1458 op_text + "operationState": "PROCESSING",
1459 op_text + "locked_at": locked_at,
1460 },
1461 update_dict={
1462 op_text + "locked_at": now,
1463 "admin.current_operation": op_index,
1464 },
1465 fail_on_empty=False,
1466 ):
1467 return
1468
1469 unset_dict[op_text + "locked_at"] = None
1470 unset_dict["current_operation"] = None
1471 step = "Loading " + target_id
1472 error_text = self._load_vim(target_id)
1473
1474 if not error_text:
1475 step = "Checking connectivity"
1476
1477 if target == "vim":
1478 self.my_vims[target_id].check_vim_connectivity()
1479 else:
1480 self.my_vims[target_id].check_credentials()
1481
1482 update_dict["_admin.operationalState"] = "ENABLED"
1483 update_dict["_admin.detailed-status"] = ""
1484 unset_dict[op_text + "detailed-status"] = None
1485 update_dict[op_text + "operationState"] = "COMPLETED"
1486
1487 return
1488
1489 except Exception as e:
1490 error_text = "{}: {}".format(step, e)
1491 self.logger.error("{} for {}: {}".format(step, target_id, e))
1492
1493 finally:
1494 if update_dict or unset_dict:
1495 if error_text:
1496 update_dict[op_text + "operationState"] = "FAILED"
1497 update_dict[op_text + "detailed-status"] = error_text
1498 unset_dict.pop(op_text + "detailed-status", None)
1499 update_dict["_admin.operationalState"] = "ERROR"
1500 update_dict["_admin.detailed-status"] = error_text
1501
1502 if op_text:
1503 update_dict[op_text + "statusEnteredTime"] = now
1504
1505 self.db.set_one(
1506 target_database,
1507 q_filter={"_id": _id},
1508 update_dict=update_dict,
1509 unset=unset_dict,
1510 fail_on_empty=False,
1511 )
1512
1513 if not loaded:
1514 self._unload_vim(target_id)
1515
1516 def _reload_vim(self, target_id):
1517 if target_id in self.vim_targets:
1518 self._load_vim(target_id)
1519 else:
1520 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1521 # just remove it to force load again next time it is needed
1522 self.db_vims.pop(target_id, None)
1523
1524 def _load_vim(self, target_id):
1525 """
1526 Load or reload a vim_account, sdn_controller or wim_account.
1527 Read content from database, load the plugin if not loaded.
1528 In case of error loading the plugin, it load a failing VIM_connector
1529 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1530 :param target_id: Contains type:_id; where type can be 'vim', ...
1531 :return: None if ok, descriptive text if error
1532 """
1533 target, _, _id = target_id.partition(":")
1534 target_database = (
1535 "vim_accounts"
1536 if target == "vim"
1537 else "wim_accounts"
1538 if target == "wim"
1539 else "sdns"
1540 )
1541 plugin_name = ""
1542 vim = None
1543
1544 try:
1545 step = "Getting {}={} from db".format(target, _id)
1546 # TODO process for wim, sdnc, ...
1547 vim = self.db.get_one(target_database, {"_id": _id})
1548
1549 # if deep_get(vim, "config", "sdn-controller"):
1550 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1551 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1552
1553 step = "Decrypting password"
1554 schema_version = vim.get("schema_version")
1555 self.db.encrypt_decrypt_fields(
1556 vim,
1557 "decrypt",
1558 fields=("password", "secret"),
1559 schema_version=schema_version,
1560 salt=_id,
1561 )
1562 self._process_vim_config(target_id, vim)
1563
1564 if target == "vim":
1565 plugin_name = "rovim_" + vim["vim_type"]
1566 step = "Loading plugin '{}'".format(plugin_name)
1567 vim_module_conn = self._load_plugin(plugin_name)
1568 step = "Loading {}'".format(target_id)
1569 self.my_vims[target_id] = vim_module_conn(
1570 uuid=vim["_id"],
1571 name=vim["name"],
1572 tenant_id=vim.get("vim_tenant_id"),
1573 tenant_name=vim.get("vim_tenant_name"),
1574 url=vim["vim_url"],
1575 url_admin=None,
1576 user=vim["vim_user"],
1577 passwd=vim["vim_password"],
1578 config=vim.get("config") or {},
1579 persistent_info={},
1580 )
1581 else: # sdn
1582 plugin_name = "rosdn_" + vim["type"]
1583 step = "Loading plugin '{}'".format(plugin_name)
1584 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1585 step = "Loading {}'".format(target_id)
1586 wim = deepcopy(vim)
1587 wim_config = wim.pop("config", {}) or {}
1588 wim["uuid"] = wim["_id"]
1589 wim["wim_url"] = wim["url"]
1590
1591 if wim.get("dpid"):
1592 wim_config["dpid"] = wim.pop("dpid")
1593
1594 if wim.get("switch_id"):
1595 wim_config["switch_id"] = wim.pop("switch_id")
1596
1597 # wim, wim_account, config
1598 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1599 self.db_vims[target_id] = vim
1600 self.error_status = None
1601
1602 self.logger.info(
1603 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1604 )
1605 except Exception as e:
1606 self.logger.error(
1607 "Cannot load {} plugin={}: {} {}".format(
1608 target_id, plugin_name, step, e
1609 )
1610 )
1611
1612 self.db_vims[target_id] = vim or {}
1613 self.db_vims[target_id] = FailingConnector(str(e))
1614 error_status = "{} Error: {}".format(step, e)
1615
1616 return error_status
1617 finally:
1618 if target_id not in self.vim_targets:
1619 self.vim_targets.append(target_id)
1620
1621 def _get_db_task(self):
1622 """
1623 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1624 :return: None
1625 """
1626 now = time.time()
1627
1628 if not self.time_last_task_processed:
1629 self.time_last_task_processed = now
1630
1631 try:
1632 while True:
1633 """
1634 # Log RO tasks only when loglevel is DEBUG
1635 if self.logger.getEffectiveLevel() == logging.DEBUG:
1636 self._log_ro_task(
1637 None,
1638 None,
1639 None,
1640 "TASK_WF",
1641 "task_locked_time="
1642 + str(self.task_locked_time)
1643 + " "
1644 + "time_last_task_processed="
1645 + str(self.time_last_task_processed)
1646 + " "
1647 + "now="
1648 + str(now),
1649 )
1650 """
1651 locked = self.db.set_one(
1652 "ro_tasks",
1653 q_filter={
1654 "target_id": self.vim_targets,
1655 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1656 "locked_at.lt": now - self.task_locked_time,
1657 "to_check_at.lt": self.time_last_task_processed,
1658 },
1659 update_dict={"locked_by": self.my_id, "locked_at": now},
1660 fail_on_empty=False,
1661 )
1662
1663 if locked:
1664 # read and return
1665 ro_task = self.db.get_one(
1666 "ro_tasks",
1667 q_filter={
1668 "target_id": self.vim_targets,
1669 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1670 "locked_at": now,
1671 },
1672 )
1673 return ro_task
1674
1675 if self.time_last_task_processed == now:
1676 self.time_last_task_processed = None
1677 return None
1678 else:
1679 self.time_last_task_processed = now
1680 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1681
1682 except DbException as e:
1683 self.logger.error("Database exception at _get_db_task: {}".format(e))
1684 except Exception as e:
1685 self.logger.critical(
1686 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1687 )
1688
1689 return None
1690
1691 def _get_db_all_tasks(self):
1692 """
1693 Read all content of table ro_tasks to log it
1694 :return: None
1695 """
1696 try:
1697 # Checking the content of the BD:
1698
1699 # read and return
1700 ro_task = self.db.get_list("ro_tasks")
1701 for rt in ro_task:
1702 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1703 return ro_task
1704
1705 except DbException as e:
1706 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1707 except Exception as e:
1708 self.logger.critical(
1709 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1710 )
1711
1712 return None
1713
1714 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1715 """
1716 Generate a log with the following format:
1717
1718 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1719 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1720 task_array_index;task_id;task_action;task_item;task_args
1721
1722 Example:
1723
1724 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1725 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1726 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1727 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1728 'vim_details': None, 'refresh_at': None};1;SCHEDULED;
1729 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1730 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1731 """
1732 try:
1733 line = []
1734 i = 0
1735 if ro_task is not None and isinstance(ro_task, dict):
1736 for t in ro_task["tasks"]:
1737 line.clear()
1738 line.append(mark)
1739 line.append(event)
1740 line.append(ro_task.get("_id", ""))
1741 line.append(str(ro_task.get("locked_at", "")))
1742 line.append(str(ro_task.get("modified_at", "")))
1743 line.append(str(ro_task.get("created_at", "")))
1744 line.append(str(ro_task.get("to_check_at", "")))
1745 line.append(str(ro_task.get("locked_by", "")))
1746 line.append(str(ro_task.get("target_id", "")))
1747 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1748 line.append(str(ro_task.get("vim_info", "")))
1749 line.append(str(ro_task.get("tasks", "")))
1750 if isinstance(t, dict):
1751 line.append(str(t.get("status", "")))
1752 line.append(str(t.get("action_id", "")))
1753 line.append(str(i))
1754 line.append(str(t.get("task_id", "")))
1755 line.append(str(t.get("action", "")))
1756 line.append(str(t.get("item", "")))
1757 line.append(str(t.get("find_params", "")))
1758 line.append(str(t.get("params", "")))
1759 else:
1760 line.extend([""] * 2)
1761 line.append(str(i))
1762 line.extend([""] * 5)
1763
1764 i += 1
1765 self.logger.debug(";".join(line))
1766 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1767 i = 0
1768 while True:
1769 st = "tasks.{}.status".format(i)
1770 if st not in db_ro_task_update:
1771 break
1772 line.clear()
1773 line.append(mark)
1774 line.append(event)
1775 line.append(db_ro_task_update.get("_id", ""))
1776 line.append(str(db_ro_task_update.get("locked_at", "")))
1777 line.append(str(db_ro_task_update.get("modified_at", "")))
1778 line.append("")
1779 line.append(str(db_ro_task_update.get("to_check_at", "")))
1780 line.append(str(db_ro_task_update.get("locked_by", "")))
1781 line.append("")
1782 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
1783 line.append("")
1784 line.append(str(db_ro_task_update.get("vim_info", "")))
1785 line.append(str(str(db_ro_task_update).count(".status")))
1786 line.append(db_ro_task_update.get(st, ""))
1787 line.append("")
1788 line.append(str(i))
1789 line.extend([""] * 3)
1790 i += 1
1791 self.logger.debug(";".join(line))
1792
1793 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
1794 line.clear()
1795 line.append(mark)
1796 line.append(event)
1797 line.append(db_ro_task_delete.get("_id", ""))
1798 line.append("")
1799 line.append(db_ro_task_delete.get("modified_at", ""))
1800 line.extend([""] * 13)
1801 self.logger.debug(";".join(line))
1802
1803 else:
1804 line.clear()
1805 line.append(mark)
1806 line.append(event)
1807 line.extend([""] * 16)
1808 self.logger.debug(";".join(line))
1809
1810 except Exception as e:
1811 self.logger.error("Error logging ro_task: {}".format(e))
1812
1813 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1814 """
1815 Determine if this task need to be done or superseded
1816 :return: None
1817 """
1818 my_task = ro_task["tasks"][task_index]
1819 task_id = my_task["task_id"]
1820 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
1821 "created_items", False
1822 )
1823
1824 if my_task["status"] == "FAILED":
1825 return None, None # TODO need to be retry??
1826
1827 try:
1828 for index, task in enumerate(ro_task["tasks"]):
1829 if index == task_index or not task:
1830 continue # own task
1831
1832 if (
1833 my_task["target_record"] == task["target_record"]
1834 and task["action"] == "CREATE"
1835 ):
1836 # set to finished
1837 db_update["tasks.{}.status".format(index)] = task[
1838 "status"
1839 ] = "FINISHED"
1840 elif task["action"] == "CREATE" and task["status"] not in (
1841 "FINISHED",
1842 "SUPERSEDED",
1843 ):
1844 needed_delete = False
1845
1846 if needed_delete:
1847 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1848 else:
1849 return "SUPERSEDED", None
1850 except Exception as e:
1851 if not isinstance(e, NsWorkerException):
1852 self.logger.critical(
1853 "Unexpected exception at _delete_task task={}: {}".format(
1854 task_id, e
1855 ),
1856 exc_info=True,
1857 )
1858
1859 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1860
1861 def _create_task(self, ro_task, task_index, task_depends, db_update):
1862 """
1863 Determine if this task need to create something at VIM
1864 :return: None
1865 """
1866 my_task = ro_task["tasks"][task_index]
1867 task_id = my_task["task_id"]
1868 task_status = None
1869
1870 if my_task["status"] == "FAILED":
1871 return None, None # TODO need to be retry??
1872 elif my_task["status"] == "SCHEDULED":
1873 # check if already created by another task
1874 for index, task in enumerate(ro_task["tasks"]):
1875 if index == task_index or not task:
1876 continue # own task
1877
1878 if task["action"] == "CREATE" and task["status"] not in (
1879 "SCHEDULED",
1880 "FINISHED",
1881 "SUPERSEDED",
1882 ):
1883 return task["status"], "COPY_VIM_INFO"
1884
1885 try:
1886 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1887 ro_task, task_index, task_depends
1888 )
1889 # TODO update other CREATE tasks
1890 except Exception as e:
1891 if not isinstance(e, NsWorkerException):
1892 self.logger.error(
1893 "Error executing task={}: {}".format(task_id, e), exc_info=True
1894 )
1895
1896 task_status = "FAILED"
1897 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1898 # TODO update ro_vim_item_update
1899
1900 return task_status, ro_vim_item_update
1901 else:
1902 return None, None
1903
1904 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1905 """
1906 Look for dependency task
1907 :param task_id: Can be one of
1908 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1909 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1910 3. task.task_id: "<action_id>:number"
1911 :param ro_task:
1912 :param target_id:
1913 :return: database ro_task plus index of task
1914 """
1915 if (
1916 task_id.startswith("vim:")
1917 or task_id.startswith("sdn:")
1918 or task_id.startswith("wim:")
1919 ):
1920 target_id, _, task_id = task_id.partition(" ")
1921
1922 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1923 ro_task_dependency = self.db.get_one(
1924 "ro_tasks",
1925 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
1926 fail_on_empty=False,
1927 )
1928
1929 if ro_task_dependency:
1930 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1931 if task["target_record_id"] == task_id:
1932 return ro_task_dependency, task_index
1933
1934 else:
1935 if ro_task:
1936 for task_index, task in enumerate(ro_task["tasks"]):
1937 if task and task["task_id"] == task_id:
1938 return ro_task, task_index
1939
1940 ro_task_dependency = self.db.get_one(
1941 "ro_tasks",
1942 q_filter={
1943 "tasks.ANYINDEX.task_id": task_id,
1944 "tasks.ANYINDEX.target_record.ne": None,
1945 },
1946 fail_on_empty=False,
1947 )
1948
1949 if ro_task_dependency:
1950 for task_index, task in ro_task_dependency["tasks"]:
1951 if task["task_id"] == task_id:
1952 return ro_task_dependency, task_index
1953 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1954
1955 def _process_pending_tasks(self, ro_task):
1956 ro_task_id = ro_task["_id"]
1957 now = time.time()
1958 # one day
1959 next_check_at = now + (24 * 60 * 60)
1960 db_ro_task_update = {}
1961
1962 def _update_refresh(new_status):
1963 # compute next_refresh
1964 nonlocal task
1965 nonlocal next_check_at
1966 nonlocal db_ro_task_update
1967 nonlocal ro_task
1968
1969 next_refresh = time.time()
1970
1971 if task["item"] in ("image", "flavor"):
1972 next_refresh += self.REFRESH_IMAGE
1973 elif new_status == "BUILD":
1974 next_refresh += self.REFRESH_BUILD
1975 elif new_status == "DONE":
1976 next_refresh += self.REFRESH_ACTIVE
1977 else:
1978 next_refresh += self.REFRESH_ERROR
1979
1980 next_check_at = min(next_check_at, next_refresh)
1981 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1982 ro_task["vim_info"]["refresh_at"] = next_refresh
1983
1984 try:
1985 """
1986 # Log RO tasks only when loglevel is DEBUG
1987 if self.logger.getEffectiveLevel() == logging.DEBUG:
1988 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
1989 """
1990 # 0: get task_status_create
1991 lock_object = None
1992 task_status_create = None
1993 task_create = next(
1994 (
1995 t
1996 for t in ro_task["tasks"]
1997 if t
1998 and t["action"] == "CREATE"
1999 and t["status"] in ("BUILD", "DONE")
2000 ),
2001 None,
2002 )
2003
2004 if task_create:
2005 task_status_create = task_create["status"]
2006
2007 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2008 for task_action in ("DELETE", "CREATE", "EXEC"):
2009 db_vim_update = None
2010 new_status = None
2011
2012 for task_index, task in enumerate(ro_task["tasks"]):
2013 if not task:
2014 continue # task deleted
2015
2016 task_depends = {}
2017 target_update = None
2018
2019 if (
2020 (
2021 task_action in ("DELETE", "EXEC")
2022 and task["status"] not in ("SCHEDULED", "BUILD")
2023 )
2024 or task["action"] != task_action
2025 or (
2026 task_action == "CREATE"
2027 and task["status"] in ("FINISHED", "SUPERSEDED")
2028 )
2029 ):
2030 continue
2031
2032 task_path = "tasks.{}.status".format(task_index)
2033 try:
2034 db_vim_info_update = None
2035
2036 if task["status"] == "SCHEDULED":
2037 # check if tasks that this depends on have been completed
2038 dependency_not_completed = False
2039
2040 for dependency_task_id in task.get("depends_on") or ():
2041 (
2042 dependency_ro_task,
2043 dependency_task_index,
2044 ) = self._get_dependency(
2045 dependency_task_id, target_id=ro_task["target_id"]
2046 )
2047 dependency_task = dependency_ro_task["tasks"][
2048 dependency_task_index
2049 ]
2050
2051 if dependency_task["status"] == "SCHEDULED":
2052 dependency_not_completed = True
2053 next_check_at = min(
2054 next_check_at, dependency_ro_task["to_check_at"]
2055 )
2056 # must allow dependent task to be processed first
2057 # to do this set time after last_task_processed
2058 next_check_at = max(
2059 self.time_last_task_processed, next_check_at
2060 )
2061 break
2062 elif dependency_task["status"] == "FAILED":
2063 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2064 task["action"],
2065 task["item"],
2066 dependency_task["action"],
2067 dependency_task["item"],
2068 dependency_task_id,
2069 dependency_ro_task["vim_info"].get(
2070 "vim_details"
2071 ),
2072 )
2073 self.logger.error(
2074 "task={} {}".format(task["task_id"], error_text)
2075 )
2076 raise NsWorkerException(error_text)
2077
2078 task_depends[dependency_task_id] = dependency_ro_task[
2079 "vim_info"
2080 ]["vim_id"]
2081 task_depends[
2082 "TASK-{}".format(dependency_task_id)
2083 ] = dependency_ro_task["vim_info"]["vim_id"]
2084
2085 if dependency_not_completed:
2086 # TODO set at vim_info.vim_details that it is waiting
2087 continue
2088
2089 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2090 # the task of renew this locking. It will update database locket_at periodically
2091 if not lock_object:
2092 lock_object = LockRenew.add_lock_object(
2093 "ro_tasks", ro_task, self
2094 )
2095
2096 if task["action"] == "DELETE":
2097 (new_status, db_vim_info_update,) = self._delete_task(
2098 ro_task, task_index, task_depends, db_ro_task_update
2099 )
2100 new_status = (
2101 "FINISHED" if new_status == "DONE" else new_status
2102 )
2103 # ^with FINISHED instead of DONE it will not be refreshing
2104
2105 if new_status in ("FINISHED", "SUPERSEDED"):
2106 target_update = "DELETE"
2107 elif task["action"] == "EXEC":
2108 (
2109 new_status,
2110 db_vim_info_update,
2111 db_task_update,
2112 ) = self.item2class[task["item"]].exec(
2113 ro_task, task_index, task_depends
2114 )
2115 new_status = (
2116 "FINISHED" if new_status == "DONE" else new_status
2117 )
2118 # ^with FINISHED instead of DONE it will not be refreshing
2119
2120 if db_task_update:
2121 # load into database the modified db_task_update "retries" and "next_retry"
2122 if db_task_update.get("retries"):
2123 db_ro_task_update[
2124 "tasks.{}.retries".format(task_index)
2125 ] = db_task_update["retries"]
2126
2127 next_check_at = time.time() + db_task_update.get(
2128 "next_retry", 60
2129 )
2130 target_update = None
2131 elif task["action"] == "CREATE":
2132 if task["status"] == "SCHEDULED":
2133 if task_status_create:
2134 new_status = task_status_create
2135 target_update = "COPY_VIM_INFO"
2136 else:
2137 new_status, db_vim_info_update = self.item2class[
2138 task["item"]
2139 ].new(ro_task, task_index, task_depends)
2140 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2141 _update_refresh(new_status)
2142 else:
2143 if (
2144 ro_task["vim_info"]["refresh_at"]
2145 and now > ro_task["vim_info"]["refresh_at"]
2146 ):
2147 new_status, db_vim_info_update = self.item2class[
2148 task["item"]
2149 ].refresh(ro_task)
2150 _update_refresh(new_status)
2151 else:
2152 # The refresh is updated to avoid set the value of "refresh_at" to
2153 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2154 # because it can happen that in this case the task is never processed
2155 _update_refresh(task["status"])
2156
2157 except Exception as e:
2158 new_status = "FAILED"
2159 db_vim_info_update = {
2160 "vim_status": "VIM_ERROR",
2161 "vim_details": str(e),
2162 }
2163
2164 if not isinstance(
2165 e, (NsWorkerException, vimconn.VimConnException)
2166 ):
2167 self.logger.error(
2168 "Unexpected exception at _delete_task task={}: {}".format(
2169 task["task_id"], e
2170 ),
2171 exc_info=True,
2172 )
2173
2174 try:
2175 if db_vim_info_update:
2176 db_vim_update = db_vim_info_update.copy()
2177 db_ro_task_update.update(
2178 {
2179 "vim_info." + k: v
2180 for k, v in db_vim_info_update.items()
2181 }
2182 )
2183 ro_task["vim_info"].update(db_vim_info_update)
2184
2185 if new_status:
2186 if task_action == "CREATE":
2187 task_status_create = new_status
2188 db_ro_task_update[task_path] = new_status
2189
2190 if target_update or db_vim_update:
2191 if target_update == "DELETE":
2192 self._update_target(task, None)
2193 elif target_update == "COPY_VIM_INFO":
2194 self._update_target(task, ro_task["vim_info"])
2195 else:
2196 self._update_target(task, db_vim_update)
2197
2198 except Exception as e:
2199 if (
2200 isinstance(e, DbException)
2201 and e.http_code == HTTPStatus.NOT_FOUND
2202 ):
2203 # if the vnfrs or nsrs has been removed from database, this task must be removed
2204 self.logger.debug(
2205 "marking to delete task={}".format(task["task_id"])
2206 )
2207 self.tasks_to_delete.append(task)
2208 else:
2209 self.logger.error(
2210 "Unexpected exception at _update_target task={}: {}".format(
2211 task["task_id"], e
2212 ),
2213 exc_info=True,
2214 )
2215
2216 locked_at = ro_task["locked_at"]
2217
2218 if lock_object:
2219 locked_at = [
2220 lock_object["locked_at"],
2221 lock_object["locked_at"] + self.task_locked_time,
2222 ]
2223 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2224 # contain exactly locked_at + self.task_locked_time
2225 LockRenew.remove_lock_object(lock_object)
2226
2227 q_filter = {
2228 "_id": ro_task["_id"],
2229 "to_check_at": ro_task["to_check_at"],
2230 "locked_at": locked_at,
2231 }
2232 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2233 # outside this task (by ro_nbi) do not update it
2234 db_ro_task_update["locked_by"] = None
2235 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2236 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2237 db_ro_task_update["modified_at"] = now
2238 db_ro_task_update["to_check_at"] = next_check_at
2239
2240 """
2241 # Log RO tasks only when loglevel is DEBUG
2242 if self.logger.getEffectiveLevel() == logging.DEBUG:
2243 db_ro_task_update_log = db_ro_task_update.copy()
2244 db_ro_task_update_log["_id"] = q_filter["_id"]
2245 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2246 """
2247
2248 if not self.db.set_one(
2249 "ro_tasks",
2250 update_dict=db_ro_task_update,
2251 q_filter=q_filter,
2252 fail_on_empty=False,
2253 ):
2254 del db_ro_task_update["to_check_at"]
2255 del q_filter["to_check_at"]
2256 """
2257 # Log RO tasks only when loglevel is DEBUG
2258 if self.logger.getEffectiveLevel() == logging.DEBUG:
2259 self._log_ro_task(
2260 None,
2261 db_ro_task_update_log,
2262 None,
2263 "TASK_WF",
2264 "SET_TASK " + str(q_filter),
2265 )
2266 """
2267 self.db.set_one(
2268 "ro_tasks",
2269 q_filter=q_filter,
2270 update_dict=db_ro_task_update,
2271 fail_on_empty=True,
2272 )
2273 except DbException as e:
2274 self.logger.error(
2275 "ro_task={} Error updating database {}".format(ro_task_id, e)
2276 )
2277 except Exception as e:
2278 self.logger.error(
2279 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2280 )
2281
2282 def _update_target(self, task, ro_vim_item_update):
2283 table, _, temp = task["target_record"].partition(":")
2284 _id, _, path_vim_status = temp.partition(":")
2285 path_item = path_vim_status[: path_vim_status.rfind(".")]
2286 path_item = path_item[: path_item.rfind(".")]
2287 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2288 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2289
2290 if ro_vim_item_update:
2291 update_dict = {
2292 path_vim_status + "." + k: v
2293 for k, v in ro_vim_item_update.items()
2294 if k
2295 in ("vim_id", "vim_details", "vim_name", "vim_status", "interfaces")
2296 }
2297
2298 if path_vim_status.startswith("vdur."):
2299 # for backward compatibility, add vdur.name apart from vdur.vim_name
2300 if ro_vim_item_update.get("vim_name"):
2301 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2302
2303 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2304 if ro_vim_item_update.get("vim_id"):
2305 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2306
2307 # update general status
2308 if ro_vim_item_update.get("vim_status"):
2309 update_dict[path_item + ".status"] = ro_vim_item_update[
2310 "vim_status"
2311 ]
2312
2313 if ro_vim_item_update.get("interfaces"):
2314 path_interfaces = path_item + ".interfaces"
2315
2316 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2317 if iface:
2318 update_dict.update(
2319 {
2320 path_interfaces + ".{}.".format(i) + k: v
2321 for k, v in iface.items()
2322 if k in ("vlan", "compute_node", "pci")
2323 }
2324 )
2325
2326 # put ip_address and mac_address with ip-address and mac-address
2327 if iface.get("ip_address"):
2328 update_dict[
2329 path_interfaces + ".{}.".format(i) + "ip-address"
2330 ] = iface["ip_address"]
2331
2332 if iface.get("mac_address"):
2333 update_dict[
2334 path_interfaces + ".{}.".format(i) + "mac-address"
2335 ] = iface["mac_address"]
2336
2337 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2338 update_dict["ip-address"] = iface.get("ip_address").split(
2339 ";"
2340 )[0]
2341
2342 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2343 update_dict[path_item + ".ip-address"] = iface.get(
2344 "ip_address"
2345 ).split(";")[0]
2346
2347 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2348 else:
2349 update_dict = {path_item + ".status": "DELETED"}
2350 self.db.set_one(
2351 table,
2352 q_filter={"_id": _id},
2353 update_dict=update_dict,
2354 unset={path_vim_status: None},
2355 )
2356
2357 def _process_delete_db_tasks(self):
2358 """
2359 Delete task from database because vnfrs or nsrs or both have been deleted
2360 :return: None. Uses and modify self.tasks_to_delete
2361 """
2362 while self.tasks_to_delete:
2363 task = self.tasks_to_delete[0]
2364 vnfrs_deleted = None
2365 nsr_id = task["nsr_id"]
2366
2367 if task["target_record"].startswith("vnfrs:"):
2368 # check if nsrs is present
2369 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2370 vnfrs_deleted = task["target_record"].split(":")[1]
2371
2372 try:
2373 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2374 except Exception as e:
2375 self.logger.error(
2376 "Error deleting task={}: {}".format(task["task_id"], e)
2377 )
2378 self.tasks_to_delete.pop(0)
2379
2380 @staticmethod
2381 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2382 """
2383 Static method because it is called from osm_ng_ro.ns
2384 :param db: instance of database to use
2385 :param nsr_id: affected nsrs id
2386 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2387 :return: None, exception is fails
2388 """
2389 retries = 5
2390 for retry in range(retries):
2391 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2392 now = time.time()
2393 conflict = False
2394
2395 for ro_task in ro_tasks:
2396 db_update = {}
2397 to_delete_ro_task = True
2398
2399 for index, task in enumerate(ro_task["tasks"]):
2400 if not task:
2401 pass
2402 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2403 vnfrs_deleted
2404 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2405 ):
2406 db_update["tasks.{}".format(index)] = None
2407 else:
2408 # used by other nsr, ro_task cannot be deleted
2409 to_delete_ro_task = False
2410
2411 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2412 if to_delete_ro_task:
2413 if not db.del_one(
2414 "ro_tasks",
2415 q_filter={
2416 "_id": ro_task["_id"],
2417 "modified_at": ro_task["modified_at"],
2418 },
2419 fail_on_empty=False,
2420 ):
2421 conflict = True
2422 elif db_update:
2423 db_update["modified_at"] = now
2424 if not db.set_one(
2425 "ro_tasks",
2426 q_filter={
2427 "_id": ro_task["_id"],
2428 "modified_at": ro_task["modified_at"],
2429 },
2430 update_dict=db_update,
2431 fail_on_empty=False,
2432 ):
2433 conflict = True
2434 if not conflict:
2435 return
2436 else:
2437 raise NsWorkerException("Exceeded {} retries".format(retries))
2438
2439 def run(self):
2440 # load database
2441 self.logger.info("Starting")
2442 while True:
2443 # step 1: get commands from queue
2444 try:
2445 if self.vim_targets:
2446 task = self.task_queue.get(block=False)
2447 else:
2448 if not self.idle:
2449 self.logger.debug("enters in idle state")
2450 self.idle = True
2451 task = self.task_queue.get(block=True)
2452 self.idle = False
2453
2454 if task[0] == "terminate":
2455 break
2456 elif task[0] == "load_vim":
2457 self.logger.info("order to load vim {}".format(task[1]))
2458 self._load_vim(task[1])
2459 elif task[0] == "unload_vim":
2460 self.logger.info("order to unload vim {}".format(task[1]))
2461 self._unload_vim(task[1])
2462 elif task[0] == "reload_vim":
2463 self._reload_vim(task[1])
2464 elif task[0] == "check_vim":
2465 self.logger.info("order to check vim {}".format(task[1]))
2466 self._check_vim(task[1])
2467 continue
2468 except Exception as e:
2469 if isinstance(e, queue.Empty):
2470 pass
2471 else:
2472 self.logger.critical(
2473 "Error processing task: {}".format(e), exc_info=True
2474 )
2475
2476 # step 2: process pending_tasks, delete not needed tasks
2477 try:
2478 if self.tasks_to_delete:
2479 self._process_delete_db_tasks()
2480 busy = False
2481 """
2482 # Log RO tasks only when loglevel is DEBUG
2483 if self.logger.getEffectiveLevel() == logging.DEBUG:
2484 _ = self._get_db_all_tasks()
2485 """
2486 ro_task = self._get_db_task()
2487 if ro_task:
2488 self._process_pending_tasks(ro_task)
2489 busy = True
2490 if not busy:
2491 time.sleep(5)
2492 except Exception as e:
2493 self.logger.critical(
2494 "Unexpected exception at run: " + str(e), exc_info=True
2495 )
2496
2497 self.logger.info("Finishing")