Feature 10911-Vertical scaling of VM instances from OSM
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 from copy import deepcopy
28 from http import HTTPStatus
29 import logging
30 from os import mkdir
31 import queue
32 from shutil import rmtree
33 import threading
34 import time
35 import traceback
36 from unittest.mock import Mock
37
38 from importlib_metadata import entry_points
39 from osm_common.dbbase import DbException
40 from osm_ng_ro.vim_admin import LockRenew
41 from osm_ro_plugin import sdnconn, vimconn
42 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
43 from osm_ro_plugin.vim_dummy import VimDummyConnector
44 import yaml
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73
74 for method in dir(vimconn.VimConnector):
75 if method[0] != "_":
76 setattr(
77 self, method, Mock(side_effect=vimconn.VimConnException(error_msg))
78 )
79
80 for method in dir(sdnconn.SdnConnectorBase):
81 if method[0] != "_":
82 setattr(
83 self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg))
84 )
85
86
87 class NsWorkerExceptionNotFound(NsWorkerException):
88 pass
89
90
91 class VimInteractionBase:
92 """Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
93 It implements methods that does nothing and return ok"""
94
95 def __init__(self, db, my_vims, db_vims, logger):
96 self.db = db
97 self.logger = logger
98 self.my_vims = my_vims
99 self.db_vims = db_vims
100
101 def new(self, ro_task, task_index, task_depends):
102 return "BUILD", {}
103
104 def refresh(self, ro_task):
105 """skip calling VIM to get image, flavor status. Assumes ok"""
106 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
107 return "FAILED", {}
108
109 return "DONE", {}
110
111 def delete(self, ro_task, task_index):
112 """skip calling VIM to delete image. Assumes ok"""
113 return "DONE", {}
114
115 def exec(self, ro_task, task_index, task_depends):
116 return "DONE", None, None
117
118
119 class VimInteractionNet(VimInteractionBase):
120 def new(self, ro_task, task_index, task_depends):
121 vim_net_id = None
122 task = ro_task["tasks"][task_index]
123 task_id = task["task_id"]
124 created = False
125 created_items = {}
126 target_vim = self.my_vims[ro_task["target_id"]]
127 mgmtnet = False
128 mgmtnet_defined_in_vim = False
129
130 try:
131 # FIND
132 if task.get("find_params"):
133 # if management, get configuration of VIM
134 if task["find_params"].get("filter_dict"):
135 vim_filter = task["find_params"]["filter_dict"]
136 # management network
137 elif task["find_params"].get("mgmt"):
138 mgmtnet = True
139 if deep_get(
140 self.db_vims[ro_task["target_id"]],
141 "config",
142 "management_network_id",
143 ):
144 mgmtnet_defined_in_vim = True
145 vim_filter = {
146 "id": self.db_vims[ro_task["target_id"]]["config"][
147 "management_network_id"
148 ]
149 }
150 elif deep_get(
151 self.db_vims[ro_task["target_id"]],
152 "config",
153 "management_network_name",
154 ):
155 mgmtnet_defined_in_vim = True
156 vim_filter = {
157 "name": self.db_vims[ro_task["target_id"]]["config"][
158 "management_network_name"
159 ]
160 }
161 else:
162 vim_filter = {"name": task["find_params"]["name"]}
163 else:
164 raise NsWorkerExceptionNotFound(
165 "Invalid find_params for new_net {}".format(task["find_params"])
166 )
167
168 vim_nets = target_vim.get_network_list(vim_filter)
169 if not vim_nets and not task.get("params"):
170 # If there is mgmt-network in the descriptor,
171 # there is no mapping of that network to a VIM network in the descriptor,
172 # also there is no mapping in the "--config" parameter or at VIM creation;
173 # that mgmt-network will be created.
174 if mgmtnet and not mgmtnet_defined_in_vim:
175 net_name = (
176 vim_filter.get("name")
177 if vim_filter.get("name")
178 else vim_filter.get("id")[:16]
179 )
180 vim_net_id, created_items = target_vim.new_network(
181 net_name, None
182 )
183 self.logger.debug(
184 "Created mgmt network vim_net_id: {}".format(vim_net_id)
185 )
186 created = True
187 else:
188 raise NsWorkerExceptionNotFound(
189 "Network not found with this criteria: '{}'".format(
190 task.get("find_params")
191 )
192 )
193 elif len(vim_nets) > 1:
194 raise NsWorkerException(
195 "More than one network found with this criteria: '{}'".format(
196 task["find_params"]
197 )
198 )
199
200 if vim_nets:
201 vim_net_id = vim_nets[0]["id"]
202 else:
203 # CREATE
204 params = task["params"]
205 vim_net_id, created_items = target_vim.new_network(**params)
206 created = True
207
208 ro_vim_item_update = {
209 "vim_id": vim_net_id,
210 "vim_status": "BUILD",
211 "created": created,
212 "created_items": created_items,
213 "vim_details": None,
214 "vim_message": None,
215 }
216 self.logger.debug(
217 "task={} {} new-net={} created={}".format(
218 task_id, ro_task["target_id"], vim_net_id, created
219 )
220 )
221
222 return "BUILD", ro_vim_item_update
223 except (vimconn.VimConnException, NsWorkerException) as e:
224 self.logger.error(
225 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e)
226 )
227 ro_vim_item_update = {
228 "vim_status": "VIM_ERROR",
229 "created": created,
230 "vim_message": str(e),
231 }
232
233 return "FAILED", ro_vim_item_update
234
235 def refresh(self, ro_task):
236 """Call VIM to get network status"""
237 ro_task_id = ro_task["_id"]
238 target_vim = self.my_vims[ro_task["target_id"]]
239 vim_id = ro_task["vim_info"]["vim_id"]
240 net_to_refresh_list = [vim_id]
241
242 try:
243 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
244 vim_info = vim_dict[vim_id]
245
246 if vim_info["status"] == "ACTIVE":
247 task_status = "DONE"
248 elif vim_info["status"] == "BUILD":
249 task_status = "BUILD"
250 else:
251 task_status = "FAILED"
252 except vimconn.VimConnException as e:
253 # Mark all tasks at VIM_ERROR status
254 self.logger.error(
255 "ro_task={} vim={} get-net={}: {}".format(
256 ro_task_id, ro_task["target_id"], vim_id, e
257 )
258 )
259 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
260 task_status = "FAILED"
261
262 ro_vim_item_update = {}
263 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
264 ro_vim_item_update["vim_status"] = vim_info["status"]
265
266 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
267 ro_vim_item_update["vim_name"] = vim_info.get("name")
268
269 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
270 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
271 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
272 elif vim_info["status"] == "DELETED":
273 ro_vim_item_update["vim_id"] = None
274 ro_vim_item_update["vim_message"] = "Deleted externally"
275 else:
276 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
277 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
278
279 if ro_vim_item_update:
280 self.logger.debug(
281 "ro_task={} {} get-net={}: status={} {}".format(
282 ro_task_id,
283 ro_task["target_id"],
284 vim_id,
285 ro_vim_item_update.get("vim_status"),
286 ro_vim_item_update.get("vim_message")
287 if ro_vim_item_update.get("vim_status") != "ACTIVE"
288 else "",
289 )
290 )
291
292 return task_status, ro_vim_item_update
293
294 def delete(self, ro_task, task_index):
295 task = ro_task["tasks"][task_index]
296 task_id = task["task_id"]
297 net_vim_id = ro_task["vim_info"]["vim_id"]
298 ro_vim_item_update_ok = {
299 "vim_status": "DELETED",
300 "created": False,
301 "vim_message": "DELETED",
302 "vim_id": None,
303 }
304
305 try:
306 if net_vim_id or ro_task["vim_info"]["created_items"]:
307 target_vim = self.my_vims[ro_task["target_id"]]
308 target_vim.delete_network(
309 net_vim_id, ro_task["vim_info"]["created_items"]
310 )
311 except vimconn.VimConnNotFoundException:
312 ro_vim_item_update_ok["vim_message"] = "already deleted"
313 except vimconn.VimConnException as e:
314 self.logger.error(
315 "ro_task={} vim={} del-net={}: {}".format(
316 ro_task["_id"], ro_task["target_id"], net_vim_id, e
317 )
318 )
319 ro_vim_item_update = {
320 "vim_status": "VIM_ERROR",
321 "vim_message": "Error while deleting: {}".format(e),
322 }
323
324 return "FAILED", ro_vim_item_update
325
326 self.logger.debug(
327 "task={} {} del-net={} {}".format(
328 task_id,
329 ro_task["target_id"],
330 net_vim_id,
331 ro_vim_item_update_ok.get("vim_message", ""),
332 )
333 )
334
335 return "DONE", ro_vim_item_update_ok
336
337
338 class VimInteractionVdu(VimInteractionBase):
339 max_retries_inject_ssh_key = 20 # 20 times
340 time_retries_inject_ssh_key = 30 # wevery 30 seconds
341
342 def new(self, ro_task, task_index, task_depends):
343 task = ro_task["tasks"][task_index]
344 task_id = task["task_id"]
345 created = False
346 created_items = {}
347 target_vim = self.my_vims[ro_task["target_id"]]
348
349 try:
350 created = True
351 params = task["params"]
352 params_copy = deepcopy(params)
353 net_list = params_copy["net_list"]
354
355 for net in net_list:
356 # change task_id into network_id
357 if "net_id" in net and net["net_id"].startswith("TASK-"):
358 network_id = task_depends[net["net_id"]]
359
360 if not network_id:
361 raise NsWorkerException(
362 "Cannot create VM because depends on a network not created or found "
363 "for {}".format(net["net_id"])
364 )
365
366 net["net_id"] = network_id
367
368 if params_copy["image_id"].startswith("TASK-"):
369 params_copy["image_id"] = task_depends[params_copy["image_id"]]
370
371 if params_copy["flavor_id"].startswith("TASK-"):
372 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
373
374 affinity_group_list = params_copy["affinity_group_list"]
375 for affinity_group in affinity_group_list:
376 # change task_id into affinity_group_id
377 if "affinity_group_id" in affinity_group and affinity_group[
378 "affinity_group_id"
379 ].startswith("TASK-"):
380 affinity_group_id = task_depends[
381 affinity_group["affinity_group_id"]
382 ]
383
384 if not affinity_group_id:
385 raise NsWorkerException(
386 "found for {}".format(affinity_group["affinity_group_id"])
387 )
388
389 affinity_group["affinity_group_id"] = affinity_group_id
390
391 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
392 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
393
394 # add to created items previous_created_volumes (healing)
395 if task.get("previous_created_volumes"):
396 for k, v in task["previous_created_volumes"].items():
397 created_items[k] = v
398
399 ro_vim_item_update = {
400 "vim_id": vim_vm_id,
401 "vim_status": "BUILD",
402 "created": created,
403 "created_items": created_items,
404 "vim_details": None,
405 "vim_message": None,
406 "interfaces_vim_ids": interfaces,
407 "interfaces": [],
408 "interfaces_backup": [],
409 }
410 self.logger.debug(
411 "task={} {} new-vm={} created={}".format(
412 task_id, ro_task["target_id"], vim_vm_id, created
413 )
414 )
415
416 return "BUILD", ro_vim_item_update
417 except (vimconn.VimConnException, NsWorkerException) as e:
418 self.logger.error(
419 "task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e)
420 )
421 ro_vim_item_update = {
422 "vim_status": "VIM_ERROR",
423 "created": created,
424 "vim_message": str(e),
425 }
426
427 return "FAILED", ro_vim_item_update
428
429 def delete(self, ro_task, task_index):
430 task = ro_task["tasks"][task_index]
431 task_id = task["task_id"]
432 vm_vim_id = ro_task["vim_info"]["vim_id"]
433 ro_vim_item_update_ok = {
434 "vim_status": "DELETED",
435 "created": False,
436 "vim_message": "DELETED",
437 "vim_id": None,
438 }
439
440 try:
441 self.logger.debug(
442 "delete_vminstance: vm_vim_id={} created_items={}".format(
443 vm_vim_id, ro_task["vim_info"]["created_items"]
444 )
445 )
446 if vm_vim_id or ro_task["vim_info"]["created_items"]:
447 target_vim = self.my_vims[ro_task["target_id"]]
448 target_vim.delete_vminstance(
449 vm_vim_id,
450 ro_task["vim_info"]["created_items"],
451 ro_task["vim_info"].get("volumes_to_hold", []),
452 )
453 except vimconn.VimConnNotFoundException:
454 ro_vim_item_update_ok["vim_message"] = "already deleted"
455 except vimconn.VimConnException as e:
456 self.logger.error(
457 "ro_task={} vim={} del-vm={}: {}".format(
458 ro_task["_id"], ro_task["target_id"], vm_vim_id, e
459 )
460 )
461 ro_vim_item_update = {
462 "vim_status": "VIM_ERROR",
463 "vim_message": "Error while deleting: {}".format(e),
464 }
465
466 return "FAILED", ro_vim_item_update
467
468 self.logger.debug(
469 "task={} {} del-vm={} {}".format(
470 task_id,
471 ro_task["target_id"],
472 vm_vim_id,
473 ro_vim_item_update_ok.get("vim_message", ""),
474 )
475 )
476
477 return "DONE", ro_vim_item_update_ok
478
479 def refresh(self, ro_task):
480 """Call VIM to get vm status"""
481 ro_task_id = ro_task["_id"]
482 target_vim = self.my_vims[ro_task["target_id"]]
483 vim_id = ro_task["vim_info"]["vim_id"]
484
485 if not vim_id:
486 return None, None
487
488 vm_to_refresh_list = [vim_id]
489 try:
490 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
491 vim_info = vim_dict[vim_id]
492
493 if vim_info["status"] == "ACTIVE":
494 task_status = "DONE"
495 elif vim_info["status"] == "BUILD":
496 task_status = "BUILD"
497 else:
498 task_status = "FAILED"
499
500 # try to load and parse vim_information
501 try:
502 vim_info_info = yaml.safe_load(vim_info["vim_info"])
503 if vim_info_info.get("name"):
504 vim_info["name"] = vim_info_info["name"]
505 except Exception:
506 pass
507 except vimconn.VimConnException as e:
508 # Mark all tasks at VIM_ERROR status
509 self.logger.error(
510 "ro_task={} vim={} get-vm={}: {}".format(
511 ro_task_id, ro_task["target_id"], vim_id, e
512 )
513 )
514 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
515 task_status = "FAILED"
516
517 ro_vim_item_update = {}
518
519 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
520 vim_interfaces = []
521 if vim_info.get("interfaces"):
522 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
523 iface = next(
524 (
525 iface
526 for iface in vim_info["interfaces"]
527 if vim_iface_id == iface["vim_interface_id"]
528 ),
529 None,
530 )
531 # if iface:
532 # iface.pop("vim_info", None)
533 vim_interfaces.append(iface)
534
535 task_create = next(
536 t
537 for t in ro_task["tasks"]
538 if t and t["action"] == "CREATE" and t["status"] != "FINISHED"
539 )
540 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
541 vim_interfaces[task_create["mgmt_vnf_interface"]][
542 "mgmt_vnf_interface"
543 ] = True
544
545 mgmt_vdu_iface = task_create.get(
546 "mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0)
547 )
548 if vim_interfaces:
549 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
550
551 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
552 ro_vim_item_update["interfaces"] = vim_interfaces
553
554 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
555 ro_vim_item_update["vim_status"] = vim_info["status"]
556
557 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
558 ro_vim_item_update["vim_name"] = vim_info.get("name")
559
560 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
561 if ro_task["vim_info"]["vim_message"] != vim_info.get("error_msg"):
562 ro_vim_item_update["vim_message"] = vim_info.get("error_msg")
563 elif vim_info["status"] == "DELETED":
564 ro_vim_item_update["vim_id"] = None
565 ro_vim_item_update["vim_message"] = "Deleted externally"
566 else:
567 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
568 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
569
570 if ro_vim_item_update:
571 self.logger.debug(
572 "ro_task={} {} get-vm={}: status={} {}".format(
573 ro_task_id,
574 ro_task["target_id"],
575 vim_id,
576 ro_vim_item_update.get("vim_status"),
577 ro_vim_item_update.get("vim_message")
578 if ro_vim_item_update.get("vim_status") != "ACTIVE"
579 else "",
580 )
581 )
582
583 return task_status, ro_vim_item_update
584
585 def exec(self, ro_task, task_index, task_depends):
586 task = ro_task["tasks"][task_index]
587 task_id = task["task_id"]
588 target_vim = self.my_vims[ro_task["target_id"]]
589 db_task_update = {"retries": 0}
590 retries = task.get("retries", 0)
591
592 try:
593 params = task["params"]
594 params_copy = deepcopy(params)
595 params_copy["ro_key"] = self.db.decrypt(
596 params_copy.pop("private_key"),
597 params_copy.pop("schema_version"),
598 params_copy.pop("salt"),
599 )
600 params_copy["ip_addr"] = params_copy.pop("ip_address")
601 target_vim.inject_user_key(**params_copy)
602 self.logger.debug(
603 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"])
604 )
605
606 return (
607 "DONE",
608 None,
609 db_task_update,
610 ) # params_copy["key"]
611 except (vimconn.VimConnException, NsWorkerException) as e:
612 retries += 1
613
614 self.logger.debug(traceback.format_exc())
615 if retries < self.max_retries_inject_ssh_key:
616 return (
617 "BUILD",
618 None,
619 {
620 "retries": retries,
621 "next_retry": self.time_retries_inject_ssh_key,
622 },
623 )
624
625 self.logger.error(
626 "task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e)
627 )
628 ro_vim_item_update = {"vim_message": str(e)}
629
630 return "FAILED", ro_vim_item_update, db_task_update
631
632
633 class VimInteractionImage(VimInteractionBase):
634 def new(self, ro_task, task_index, task_depends):
635 task = ro_task["tasks"][task_index]
636 task_id = task["task_id"]
637 created = False
638 created_items = {}
639 target_vim = self.my_vims[ro_task["target_id"]]
640
641 try:
642 # FIND
643 if task.get("find_params"):
644 vim_images = target_vim.get_image_list(**task["find_params"])
645
646 if not vim_images:
647 raise NsWorkerExceptionNotFound(
648 "Image not found with this criteria: '{}'".format(
649 task["find_params"]
650 )
651 )
652 elif len(vim_images) > 1:
653 raise NsWorkerException(
654 "More than one image found with this criteria: '{}'".format(
655 task["find_params"]
656 )
657 )
658 else:
659 vim_image_id = vim_images[0]["id"]
660
661 ro_vim_item_update = {
662 "vim_id": vim_image_id,
663 "vim_status": "DONE",
664 "created": created,
665 "created_items": created_items,
666 "vim_details": None,
667 "vim_message": None,
668 }
669 self.logger.debug(
670 "task={} {} new-image={} created={}".format(
671 task_id, ro_task["target_id"], vim_image_id, created
672 )
673 )
674
675 return "DONE", ro_vim_item_update
676 except (NsWorkerException, vimconn.VimConnException) as e:
677 self.logger.error(
678 "task={} {} new-image: {}".format(task_id, ro_task["target_id"], e)
679 )
680 ro_vim_item_update = {
681 "vim_status": "VIM_ERROR",
682 "created": created,
683 "vim_message": str(e),
684 }
685
686 return "FAILED", ro_vim_item_update
687
688
689 class VimInteractionFlavor(VimInteractionBase):
690 def delete(self, ro_task, task_index):
691 task = ro_task["tasks"][task_index]
692 task_id = task["task_id"]
693 flavor_vim_id = ro_task["vim_info"]["vim_id"]
694 ro_vim_item_update_ok = {
695 "vim_status": "DELETED",
696 "created": False,
697 "vim_message": "DELETED",
698 "vim_id": None,
699 }
700
701 try:
702 if flavor_vim_id:
703 target_vim = self.my_vims[ro_task["target_id"]]
704 target_vim.delete_flavor(flavor_vim_id)
705 except vimconn.VimConnNotFoundException:
706 ro_vim_item_update_ok["vim_message"] = "already deleted"
707 except vimconn.VimConnException as e:
708 self.logger.error(
709 "ro_task={} vim={} del-flavor={}: {}".format(
710 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e
711 )
712 )
713 ro_vim_item_update = {
714 "vim_status": "VIM_ERROR",
715 "vim_message": "Error while deleting: {}".format(e),
716 }
717
718 return "FAILED", ro_vim_item_update
719
720 self.logger.debug(
721 "task={} {} del-flavor={} {}".format(
722 task_id,
723 ro_task["target_id"],
724 flavor_vim_id,
725 ro_vim_item_update_ok.get("vim_message", ""),
726 )
727 )
728
729 return "DONE", ro_vim_item_update_ok
730
731 def new(self, ro_task, task_index, task_depends):
732 task = ro_task["tasks"][task_index]
733 task_id = task["task_id"]
734 created = False
735 created_items = {}
736 target_vim = self.my_vims[ro_task["target_id"]]
737
738 try:
739 # FIND
740 vim_flavor_id = None
741
742 if task.get("find_params"):
743 try:
744 flavor_data = task["find_params"]["flavor_data"]
745 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
746 except vimconn.VimConnNotFoundException:
747 pass
748
749 if not vim_flavor_id and task.get("params"):
750 # CREATE
751 flavor_data = task["params"]["flavor_data"]
752 vim_flavor_id = target_vim.new_flavor(flavor_data)
753 created = True
754
755 ro_vim_item_update = {
756 "vim_id": vim_flavor_id,
757 "vim_status": "DONE",
758 "created": created,
759 "created_items": created_items,
760 "vim_details": None,
761 "vim_message": None,
762 }
763 self.logger.debug(
764 "task={} {} new-flavor={} created={}".format(
765 task_id, ro_task["target_id"], vim_flavor_id, created
766 )
767 )
768
769 return "DONE", ro_vim_item_update
770 except (vimconn.VimConnException, NsWorkerException) as e:
771 self.logger.error(
772 "task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e)
773 )
774 ro_vim_item_update = {
775 "vim_status": "VIM_ERROR",
776 "created": created,
777 "vim_message": str(e),
778 }
779
780 return "FAILED", ro_vim_item_update
781
782
783 class VimInteractionAffinityGroup(VimInteractionBase):
784 def delete(self, ro_task, task_index):
785 task = ro_task["tasks"][task_index]
786 task_id = task["task_id"]
787 affinity_group_vim_id = ro_task["vim_info"]["vim_id"]
788 ro_vim_item_update_ok = {
789 "vim_status": "DELETED",
790 "created": False,
791 "vim_message": "DELETED",
792 "vim_id": None,
793 }
794
795 try:
796 if affinity_group_vim_id:
797 target_vim = self.my_vims[ro_task["target_id"]]
798 target_vim.delete_affinity_group(affinity_group_vim_id)
799 except vimconn.VimConnNotFoundException:
800 ro_vim_item_update_ok["vim_message"] = "already deleted"
801 except vimconn.VimConnException as e:
802 self.logger.error(
803 "ro_task={} vim={} del-affinity-or-anti-affinity-group={}: {}".format(
804 ro_task["_id"], ro_task["target_id"], affinity_group_vim_id, e
805 )
806 )
807 ro_vim_item_update = {
808 "vim_status": "VIM_ERROR",
809 "vim_message": "Error while deleting: {}".format(e),
810 }
811
812 return "FAILED", ro_vim_item_update
813
814 self.logger.debug(
815 "task={} {} del-affinity-or-anti-affinity-group={} {}".format(
816 task_id,
817 ro_task["target_id"],
818 affinity_group_vim_id,
819 ro_vim_item_update_ok.get("vim_message", ""),
820 )
821 )
822
823 return "DONE", ro_vim_item_update_ok
824
825 def new(self, ro_task, task_index, task_depends):
826 task = ro_task["tasks"][task_index]
827 task_id = task["task_id"]
828 created = False
829 created_items = {}
830 target_vim = self.my_vims[ro_task["target_id"]]
831
832 try:
833 affinity_group_vim_id = None
834 affinity_group_data = None
835
836 if task.get("params"):
837 affinity_group_data = task["params"].get("affinity_group_data")
838
839 if affinity_group_data and affinity_group_data.get("vim-affinity-group-id"):
840 try:
841 param_affinity_group_id = task["params"]["affinity_group_data"].get(
842 "vim-affinity-group-id"
843 )
844 affinity_group_vim_id = target_vim.get_affinity_group(
845 param_affinity_group_id
846 ).get("id")
847 except vimconn.VimConnNotFoundException:
848 self.logger.error(
849 "task={} {} new-affinity-or-anti-affinity-group. Provided VIM Affinity Group ID {}"
850 "could not be found at VIM. Creating a new one.".format(
851 task_id, ro_task["target_id"], param_affinity_group_id
852 )
853 )
854
855 if not affinity_group_vim_id and affinity_group_data:
856 affinity_group_vim_id = target_vim.new_affinity_group(
857 affinity_group_data
858 )
859 created = True
860
861 ro_vim_item_update = {
862 "vim_id": affinity_group_vim_id,
863 "vim_status": "DONE",
864 "created": created,
865 "created_items": created_items,
866 "vim_details": None,
867 "vim_message": None,
868 }
869 self.logger.debug(
870 "task={} {} new-affinity-or-anti-affinity-group={} created={}".format(
871 task_id, ro_task["target_id"], affinity_group_vim_id, created
872 )
873 )
874
875 return "DONE", ro_vim_item_update
876 except (vimconn.VimConnException, NsWorkerException) as e:
877 self.logger.error(
878 "task={} vim={} new-affinity-or-anti-affinity-group:"
879 " {}".format(task_id, ro_task["target_id"], e)
880 )
881 ro_vim_item_update = {
882 "vim_status": "VIM_ERROR",
883 "created": created,
884 "vim_message": str(e),
885 }
886
887 return "FAILED", ro_vim_item_update
888
889
890 class VimInteractionUpdateVdu(VimInteractionBase):
891 def exec(self, ro_task, task_index, task_depends):
892 task = ro_task["tasks"][task_index]
893 task_id = task["task_id"]
894 db_task_update = {"retries": 0}
895 created = False
896 created_items = {}
897 target_vim = self.my_vims[ro_task["target_id"]]
898
899 try:
900 if task.get("params"):
901 vim_vm_id = task["params"].get("vim_vm_id")
902 action = task["params"].get("action")
903 context = {action: action}
904 target_vim.action_vminstance(vim_vm_id, context)
905 # created = True
906 ro_vim_item_update = {
907 "vim_id": vim_vm_id,
908 "vim_status": "DONE",
909 "created": created,
910 "created_items": created_items,
911 "vim_details": None,
912 "vim_message": None,
913 }
914 self.logger.debug(
915 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
916 )
917 return "DONE", ro_vim_item_update, db_task_update
918 except (vimconn.VimConnException, NsWorkerException) as e:
919 self.logger.error(
920 "task={} vim={} VM Migration:"
921 " {}".format(task_id, ro_task["target_id"], e)
922 )
923 ro_vim_item_update = {
924 "vim_status": "VIM_ERROR",
925 "created": created,
926 "vim_message": str(e),
927 }
928
929 return "FAILED", ro_vim_item_update, db_task_update
930
931
932 class VimInteractionSdnNet(VimInteractionBase):
933 @staticmethod
934 def _match_pci(port_pci, mapping):
935 """
936 Check if port_pci matches with mapping
937 mapping can have brackets to indicate that several chars are accepted. e.g
938 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
939 :param port_pci: text
940 :param mapping: text, can contain brackets to indicate several chars are available
941 :return: True if matches, False otherwise
942 """
943 if not port_pci or not mapping:
944 return False
945 if port_pci == mapping:
946 return True
947
948 mapping_index = 0
949 pci_index = 0
950 while True:
951 bracket_start = mapping.find("[", mapping_index)
952
953 if bracket_start == -1:
954 break
955
956 bracket_end = mapping.find("]", bracket_start)
957 if bracket_end == -1:
958 break
959
960 length = bracket_start - mapping_index
961 if (
962 length
963 and port_pci[pci_index : pci_index + length]
964 != mapping[mapping_index:bracket_start]
965 ):
966 return False
967
968 if (
969 port_pci[pci_index + length]
970 not in mapping[bracket_start + 1 : bracket_end]
971 ):
972 return False
973
974 pci_index += length + 1
975 mapping_index = bracket_end + 1
976
977 if port_pci[pci_index:] != mapping[mapping_index:]:
978 return False
979
980 return True
981
982 def _get_interfaces(self, vlds_to_connect, vim_account_id):
983 """
984 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
985 :param vim_account_id:
986 :return:
987 """
988 interfaces = []
989
990 for vld in vlds_to_connect:
991 table, _, db_id = vld.partition(":")
992 db_id, _, vld = db_id.partition(":")
993 _, _, vld_id = vld.partition(".")
994
995 if table == "vnfrs":
996 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
997 iface_key = "vnf-vld-id"
998 else: # table == "nsrs"
999 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
1000 iface_key = "ns-vld-id"
1001
1002 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
1003
1004 for db_vnfr in db_vnfrs:
1005 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
1006 for iface_index, interface in enumerate(vdur["interfaces"]):
1007 if interface.get(iface_key) == vld_id and interface.get(
1008 "type"
1009 ) in ("SR-IOV", "PCI-PASSTHROUGH"):
1010 # only SR-IOV o PT
1011 interface_ = interface.copy()
1012 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(
1013 db_vnfr["_id"], vdu_index, iface_index
1014 )
1015
1016 if vdur.get("status") == "ERROR":
1017 interface_["status"] = "ERROR"
1018
1019 interfaces.append(interface_)
1020
1021 return interfaces
1022
1023 def refresh(self, ro_task):
1024 # look for task create
1025 task_create_index, _ = next(
1026 i_t
1027 for i_t in enumerate(ro_task["tasks"])
1028 if i_t[1]
1029 and i_t[1]["action"] == "CREATE"
1030 and i_t[1]["status"] != "FINISHED"
1031 )
1032
1033 return self.new(ro_task, task_create_index, None)
1034
1035 def new(self, ro_task, task_index, task_depends):
1036
1037 task = ro_task["tasks"][task_index]
1038 task_id = task["task_id"]
1039 target_vim = self.my_vims[ro_task["target_id"]]
1040
1041 sdn_net_id = ro_task["vim_info"]["vim_id"]
1042
1043 created_items = ro_task["vim_info"].get("created_items")
1044 connected_ports = ro_task["vim_info"].get("connected_ports", [])
1045 new_connected_ports = []
1046 last_update = ro_task["vim_info"].get("last_update", 0)
1047 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
1048 error_list = []
1049 created = ro_task["vim_info"].get("created", False)
1050
1051 try:
1052 # CREATE
1053 params = task["params"]
1054 vlds_to_connect = params["vlds"]
1055 associated_vim = params["target_vim"]
1056 # external additional ports
1057 additional_ports = params.get("sdn-ports") or ()
1058 _, _, vim_account_id = associated_vim.partition(":")
1059
1060 if associated_vim:
1061 # get associated VIM
1062 if associated_vim not in self.db_vims:
1063 self.db_vims[associated_vim] = self.db.get_one(
1064 "vim_accounts", {"_id": vim_account_id}
1065 )
1066
1067 db_vim = self.db_vims[associated_vim]
1068
1069 # look for ports to connect
1070 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
1071 # print(ports)
1072
1073 sdn_ports = []
1074 pending_ports = error_ports = 0
1075 vlan_used = None
1076 sdn_need_update = False
1077
1078 for port in ports:
1079 vlan_used = port.get("vlan") or vlan_used
1080
1081 # TODO. Do not connect if already done
1082 if not port.get("compute_node") or not port.get("pci"):
1083 if port.get("status") == "ERROR":
1084 error_ports += 1
1085 else:
1086 pending_ports += 1
1087 continue
1088
1089 pmap = None
1090 compute_node_mappings = next(
1091 (
1092 c
1093 for c in db_vim["config"].get("sdn-port-mapping", ())
1094 if c and c["compute_node"] == port["compute_node"]
1095 ),
1096 None,
1097 )
1098
1099 if compute_node_mappings:
1100 # process port_mapping pci of type 0000:af:1[01].[1357]
1101 pmap = next(
1102 (
1103 p
1104 for p in compute_node_mappings["ports"]
1105 if self._match_pci(port["pci"], p.get("pci"))
1106 ),
1107 None,
1108 )
1109
1110 if not pmap:
1111 if not db_vim["config"].get("mapping_not_needed"):
1112 error_list.append(
1113 "Port mapping not found for compute_node={} pci={}".format(
1114 port["compute_node"], port["pci"]
1115 )
1116 )
1117 continue
1118
1119 pmap = {}
1120
1121 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
1122 new_port = {
1123 "service_endpoint_id": pmap.get("service_endpoint_id")
1124 or service_endpoint_id,
1125 "service_endpoint_encapsulation_type": "dot1q"
1126 if port["type"] == "SR-IOV"
1127 else None,
1128 "service_endpoint_encapsulation_info": {
1129 "vlan": port.get("vlan"),
1130 "mac": port.get("mac-address"),
1131 "device_id": pmap.get("device_id") or port["compute_node"],
1132 "device_interface_id": pmap.get("device_interface_id")
1133 or port["pci"],
1134 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
1135 "switch_port": pmap.get("switch_port"),
1136 "service_mapping_info": pmap.get("service_mapping_info"),
1137 },
1138 }
1139
1140 # TODO
1141 # if port["modified_at"] > last_update:
1142 # sdn_need_update = True
1143 new_connected_ports.append(port["id"]) # TODO
1144 sdn_ports.append(new_port)
1145
1146 if error_ports:
1147 error_list.append(
1148 "{} interfaces have not been created as VDU is on ERROR status".format(
1149 error_ports
1150 )
1151 )
1152
1153 # connect external ports
1154 for index, additional_port in enumerate(additional_ports):
1155 additional_port_id = additional_port.get(
1156 "service_endpoint_id"
1157 ) or "external-{}".format(index)
1158 sdn_ports.append(
1159 {
1160 "service_endpoint_id": additional_port_id,
1161 "service_endpoint_encapsulation_type": additional_port.get(
1162 "service_endpoint_encapsulation_type", "dot1q"
1163 ),
1164 "service_endpoint_encapsulation_info": {
1165 "vlan": additional_port.get("vlan") or vlan_used,
1166 "mac": additional_port.get("mac_address"),
1167 "device_id": additional_port.get("device_id"),
1168 "device_interface_id": additional_port.get(
1169 "device_interface_id"
1170 ),
1171 "switch_dpid": additional_port.get("switch_dpid")
1172 or additional_port.get("switch_id"),
1173 "switch_port": additional_port.get("switch_port"),
1174 "service_mapping_info": additional_port.get(
1175 "service_mapping_info"
1176 ),
1177 },
1178 }
1179 )
1180 new_connected_ports.append(additional_port_id)
1181 sdn_info = ""
1182
1183 # if there are more ports to connect or they have been modified, call create/update
1184 if error_list:
1185 sdn_status = "ERROR"
1186 sdn_info = "; ".join(error_list)
1187 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1188 last_update = time.time()
1189
1190 if not sdn_net_id:
1191 if len(sdn_ports) < 2:
1192 sdn_status = "ACTIVE"
1193
1194 if not pending_ports:
1195 self.logger.debug(
1196 "task={} {} new-sdn-net done, less than 2 ports".format(
1197 task_id, ro_task["target_id"]
1198 )
1199 )
1200 else:
1201 net_type = params.get("type") or "ELAN"
1202 (
1203 sdn_net_id,
1204 created_items,
1205 ) = target_vim.create_connectivity_service(net_type, sdn_ports)
1206 created = True
1207 self.logger.debug(
1208 "task={} {} new-sdn-net={} created={}".format(
1209 task_id, ro_task["target_id"], sdn_net_id, created
1210 )
1211 )
1212 else:
1213 created_items = target_vim.edit_connectivity_service(
1214 sdn_net_id, conn_info=created_items, connection_points=sdn_ports
1215 )
1216 created = True
1217 self.logger.debug(
1218 "task={} {} update-sdn-net={} created={}".format(
1219 task_id, ro_task["target_id"], sdn_net_id, created
1220 )
1221 )
1222
1223 connected_ports = new_connected_ports
1224 elif sdn_net_id:
1225 wim_status_dict = target_vim.get_connectivity_service_status(
1226 sdn_net_id, conn_info=created_items
1227 )
1228 sdn_status = wim_status_dict["sdn_status"]
1229
1230 if wim_status_dict.get("sdn_info"):
1231 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
1232
1233 if wim_status_dict.get("error_msg"):
1234 sdn_info = wim_status_dict.get("error_msg") or ""
1235
1236 if pending_ports:
1237 if sdn_status != "ERROR":
1238 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
1239 len(ports) - pending_ports, len(ports)
1240 )
1241
1242 if sdn_status == "ACTIVE":
1243 sdn_status = "BUILD"
1244
1245 ro_vim_item_update = {
1246 "vim_id": sdn_net_id,
1247 "vim_status": sdn_status,
1248 "created": created,
1249 "created_items": created_items,
1250 "connected_ports": connected_ports,
1251 "vim_details": sdn_info,
1252 "vim_message": None,
1253 "last_update": last_update,
1254 }
1255
1256 return sdn_status, ro_vim_item_update
1257 except Exception as e:
1258 self.logger.error(
1259 "task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
1260 exc_info=not isinstance(
1261 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1262 ),
1263 )
1264 ro_vim_item_update = {
1265 "vim_status": "VIM_ERROR",
1266 "created": created,
1267 "vim_message": str(e),
1268 }
1269
1270 return "FAILED", ro_vim_item_update
1271
1272 def delete(self, ro_task, task_index):
1273 task = ro_task["tasks"][task_index]
1274 task_id = task["task_id"]
1275 sdn_vim_id = ro_task["vim_info"].get("vim_id")
1276 ro_vim_item_update_ok = {
1277 "vim_status": "DELETED",
1278 "created": False,
1279 "vim_message": "DELETED",
1280 "vim_id": None,
1281 }
1282
1283 try:
1284 if sdn_vim_id:
1285 target_vim = self.my_vims[ro_task["target_id"]]
1286 target_vim.delete_connectivity_service(
1287 sdn_vim_id, ro_task["vim_info"].get("created_items")
1288 )
1289
1290 except Exception as e:
1291 if (
1292 isinstance(e, sdnconn.SdnConnectorError)
1293 and e.http_code == HTTPStatus.NOT_FOUND.value
1294 ):
1295 ro_vim_item_update_ok["vim_message"] = "already deleted"
1296 else:
1297 self.logger.error(
1298 "ro_task={} vim={} del-sdn-net={}: {}".format(
1299 ro_task["_id"], ro_task["target_id"], sdn_vim_id, e
1300 ),
1301 exc_info=not isinstance(
1302 e, (sdnconn.SdnConnectorError, vimconn.VimConnException)
1303 ),
1304 )
1305 ro_vim_item_update = {
1306 "vim_status": "VIM_ERROR",
1307 "vim_message": "Error while deleting: {}".format(e),
1308 }
1309
1310 return "FAILED", ro_vim_item_update
1311
1312 self.logger.debug(
1313 "task={} {} del-sdn-net={} {}".format(
1314 task_id,
1315 ro_task["target_id"],
1316 sdn_vim_id,
1317 ro_vim_item_update_ok.get("vim_message", ""),
1318 )
1319 )
1320
1321 return "DONE", ro_vim_item_update_ok
1322
1323
1324 class VimInteractionMigration(VimInteractionBase):
1325 def exec(self, ro_task, task_index, task_depends):
1326 task = ro_task["tasks"][task_index]
1327 task_id = task["task_id"]
1328 db_task_update = {"retries": 0}
1329 target_vim = self.my_vims[ro_task["target_id"]]
1330 vim_interfaces = []
1331 created = False
1332 created_items = {}
1333 refreshed_vim_info = {}
1334
1335 try:
1336 if task.get("params"):
1337 vim_vm_id = task["params"].get("vim_vm_id")
1338 migrate_host = task["params"].get("migrate_host")
1339 _, migrated_compute_node = target_vim.migrate_instance(
1340 vim_vm_id, migrate_host
1341 )
1342
1343 if migrated_compute_node:
1344 # When VM is migrated, vdu["vim_info"] needs to be updated
1345 vdu_old_vim_info = task["params"]["vdu_vim_info"].get(
1346 ro_task["target_id"]
1347 )
1348
1349 # Refresh VM to get new vim_info
1350 vm_to_refresh_list = [vim_vm_id]
1351 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1352 refreshed_vim_info = vim_dict[vim_vm_id]
1353
1354 if refreshed_vim_info.get("interfaces"):
1355 for old_iface in vdu_old_vim_info.get("interfaces"):
1356 iface = next(
1357 (
1358 iface
1359 for iface in refreshed_vim_info["interfaces"]
1360 if old_iface["vim_interface_id"]
1361 == iface["vim_interface_id"]
1362 ),
1363 None,
1364 )
1365 vim_interfaces.append(iface)
1366
1367 ro_vim_item_update = {
1368 "vim_id": vim_vm_id,
1369 "vim_status": "ACTIVE",
1370 "created": created,
1371 "created_items": created_items,
1372 "vim_details": None,
1373 "vim_message": None,
1374 }
1375
1376 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1377 "ERROR",
1378 "VIM_ERROR",
1379 ):
1380 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1381
1382 if vim_interfaces:
1383 ro_vim_item_update["interfaces"] = vim_interfaces
1384
1385 self.logger.debug(
1386 "task={} {} vm-migration done".format(task_id, ro_task["target_id"])
1387 )
1388
1389 return "DONE", ro_vim_item_update, db_task_update
1390
1391 except (vimconn.VimConnException, NsWorkerException) as e:
1392 self.logger.error(
1393 "task={} vim={} VM Migration:"
1394 " {}".format(task_id, ro_task["target_id"], e)
1395 )
1396 ro_vim_item_update = {
1397 "vim_status": "VIM_ERROR",
1398 "created": created,
1399 "vim_message": str(e),
1400 }
1401
1402 return "FAILED", ro_vim_item_update, db_task_update
1403
1404
1405 class VimInteractionResize(VimInteractionBase):
1406 def exec(self, ro_task, task_index, task_depends):
1407 task = ro_task["tasks"][task_index]
1408 task_id = task["task_id"]
1409 db_task_update = {"retries": 0}
1410 created = False
1411 target_flavor_uuid = None
1412 created_items = {}
1413 refreshed_vim_info = {}
1414 target_vim = self.my_vims[ro_task["target_id"]]
1415
1416 try:
1417 if task.get("params"):
1418 vim_vm_id = task["params"].get("vim_vm_id")
1419 flavor_dict = task["params"].get("flavor_dict")
1420 self.logger.info("flavor_dict %s", flavor_dict)
1421
1422 try:
1423 target_flavor_uuid = target_vim.get_flavor_id_from_data(flavor_dict)
1424 except Exception as e:
1425 self.logger.info("Cannot find any flavor matching %s.", str(e))
1426 try:
1427 target_flavor_uuid = target_vim.new_flavor(flavor_dict)
1428 except Exception as e:
1429 self.logger.error("Error creating flavor at VIM %s.", str(e))
1430
1431 if target_flavor_uuid is not None:
1432 resized_status = target_vim.resize_instance(
1433 vim_vm_id, target_flavor_uuid
1434 )
1435
1436 if resized_status:
1437 # Refresh VM to get new vim_info
1438 vm_to_refresh_list = [vim_vm_id]
1439 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
1440 refreshed_vim_info = vim_dict[vim_vm_id]
1441
1442 ro_vim_item_update = {
1443 "vim_id": vim_vm_id,
1444 "vim_status": "DONE",
1445 "created": created,
1446 "created_items": created_items,
1447 "vim_details": None,
1448 "vim_message": None,
1449 }
1450
1451 if refreshed_vim_info and refreshed_vim_info.get("status") not in (
1452 "ERROR",
1453 "VIM_ERROR",
1454 ):
1455 ro_vim_item_update["vim_details"] = refreshed_vim_info["vim_info"]
1456
1457 self.logger.debug(
1458 "task={} {} resize done".format(task_id, ro_task["target_id"])
1459 )
1460 return "DONE", ro_vim_item_update, db_task_update
1461 except (vimconn.VimConnException, NsWorkerException) as e:
1462 self.logger.error(
1463 "task={} vim={} Resize:" " {}".format(task_id, ro_task["target_id"], e)
1464 )
1465 ro_vim_item_update = {
1466 "vim_status": "VIM_ERROR",
1467 "created": created,
1468 "vim_message": str(e),
1469 }
1470
1471 return "FAILED", ro_vim_item_update, db_task_update
1472
1473
1474 class NsWorker(threading.Thread):
1475 REFRESH_BUILD = 5 # 5 seconds
1476 REFRESH_ACTIVE = 60 # 1 minute
1477 REFRESH_ERROR = 600
1478 REFRESH_IMAGE = 3600 * 10
1479 REFRESH_DELETE = 3600 * 10
1480 QUEUE_SIZE = 100
1481 terminate = False
1482
1483 def __init__(self, worker_index, config, plugins, db):
1484 """
1485
1486 :param worker_index: thread index
1487 :param config: general configuration of RO, among others the process_id with the docker id where it runs
1488 :param plugins: global shared dict with the loaded plugins
1489 :param db: database class instance to use
1490 """
1491 threading.Thread.__init__(self)
1492 self.config = config
1493 self.plugins = plugins
1494 self.plugin_name = "unknown"
1495 self.logger = logging.getLogger("ro.worker{}".format(worker_index))
1496 self.worker_index = worker_index
1497 self.task_queue = queue.Queue(self.QUEUE_SIZE)
1498 # targetvim: vimplugin class
1499 self.my_vims = {}
1500 # targetvim: vim information from database
1501 self.db_vims = {}
1502 # targetvim list
1503 self.vim_targets = []
1504 self.my_id = config["process_id"] + ":" + str(worker_index)
1505 self.db = db
1506 self.item2class = {
1507 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
1508 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
1509 "image": VimInteractionImage(
1510 self.db, self.my_vims, self.db_vims, self.logger
1511 ),
1512 "flavor": VimInteractionFlavor(
1513 self.db, self.my_vims, self.db_vims, self.logger
1514 ),
1515 "sdn_net": VimInteractionSdnNet(
1516 self.db, self.my_vims, self.db_vims, self.logger
1517 ),
1518 "update": VimInteractionUpdateVdu(
1519 self.db, self.my_vims, self.db_vims, self.logger
1520 ),
1521 "affinity-or-anti-affinity-group": VimInteractionAffinityGroup(
1522 self.db, self.my_vims, self.db_vims, self.logger
1523 ),
1524 "migrate": VimInteractionMigration(
1525 self.db, self.my_vims, self.db_vims, self.logger
1526 ),
1527 "verticalscale": VimInteractionResize(
1528 self.db, self.my_vims, self.db_vims, self.logger
1529 ),
1530 }
1531 self.time_last_task_processed = None
1532 # lists of tasks to delete because nsrs or vnfrs has been deleted from db
1533 self.tasks_to_delete = []
1534 # it is idle when there are not vim_targets associated
1535 self.idle = True
1536 self.task_locked_time = config["global"]["task_locked_time"]
1537
1538 def insert_task(self, task):
1539 try:
1540 self.task_queue.put(task, False)
1541 return None
1542 except queue.Full:
1543 raise NsWorkerException("timeout inserting a task")
1544
1545 def terminate(self):
1546 self.insert_task("exit")
1547
1548 def del_task(self, task):
1549 with self.task_lock:
1550 if task["status"] == "SCHEDULED":
1551 task["status"] = "SUPERSEDED"
1552 return True
1553 else: # task["status"] == "processing"
1554 self.task_lock.release()
1555 return False
1556
1557 def _process_vim_config(self, target_id, db_vim):
1558 """
1559 Process vim config, creating vim configuration files as ca_cert
1560 :param target_id: vim/sdn/wim + id
1561 :param db_vim: Vim dictionary obtained from database
1562 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
1563 """
1564 if not db_vim.get("config"):
1565 return
1566
1567 file_name = ""
1568
1569 try:
1570 if db_vim["config"].get("ca_cert_content"):
1571 file_name = "{}:{}".format(target_id, self.worker_index)
1572
1573 try:
1574 mkdir(file_name)
1575 except FileExistsError:
1576 pass
1577
1578 file_name = file_name + "/ca_cert"
1579
1580 with open(file_name, "w") as f:
1581 f.write(db_vim["config"]["ca_cert_content"])
1582 del db_vim["config"]["ca_cert_content"]
1583 db_vim["config"]["ca_cert"] = file_name
1584 except Exception as e:
1585 raise NsWorkerException(
1586 "Error writing to file '{}': {}".format(file_name, e)
1587 )
1588
1589 def _load_plugin(self, name, type="vim"):
1590 # type can be vim or sdn
1591 if "rovim_dummy" not in self.plugins:
1592 self.plugins["rovim_dummy"] = VimDummyConnector
1593
1594 if "rosdn_dummy" not in self.plugins:
1595 self.plugins["rosdn_dummy"] = SdnDummyConnector
1596
1597 if name in self.plugins:
1598 return self.plugins[name]
1599
1600 try:
1601 for ep in entry_points(group="osm_ro{}.plugins".format(type), name=name):
1602 self.plugins[name] = ep.load()
1603 except Exception as e:
1604 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
1605
1606 if name and name not in self.plugins:
1607 raise NsWorkerException(
1608 "Plugin 'osm_{n}' has not been installed".format(n=name)
1609 )
1610
1611 return self.plugins[name]
1612
1613 def _unload_vim(self, target_id):
1614 """
1615 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
1616 :param target_id: Contains type:_id; where type can be 'vim', ...
1617 :return: None.
1618 """
1619 try:
1620 self.db_vims.pop(target_id, None)
1621 self.my_vims.pop(target_id, None)
1622
1623 if target_id in self.vim_targets:
1624 self.vim_targets.remove(target_id)
1625
1626 self.logger.info("Unloaded {}".format(target_id))
1627 rmtree("{}:{}".format(target_id, self.worker_index))
1628 except FileNotFoundError:
1629 pass # this is raised by rmtree if folder does not exist
1630 except Exception as e:
1631 self.logger.error("Cannot unload {}: {}".format(target_id, e))
1632
1633 def _check_vim(self, target_id):
1634 """
1635 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
1636 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
1637 :return: None.
1638 """
1639 target, _, _id = target_id.partition(":")
1640 now = time.time()
1641 update_dict = {}
1642 unset_dict = {}
1643 op_text = ""
1644 step = ""
1645 loaded = target_id in self.vim_targets
1646 target_database = (
1647 "vim_accounts"
1648 if target == "vim"
1649 else "wim_accounts"
1650 if target == "wim"
1651 else "sdns"
1652 )
1653
1654 try:
1655 step = "Getting {} from db".format(target_id)
1656 db_vim = self.db.get_one(target_database, {"_id": _id})
1657
1658 for op_index, operation in enumerate(
1659 db_vim["_admin"].get("operations", ())
1660 ):
1661 if operation["operationState"] != "PROCESSING":
1662 continue
1663
1664 locked_at = operation.get("locked_at")
1665
1666 if locked_at is not None and locked_at >= now - self.task_locked_time:
1667 # some other thread is doing this operation
1668 return
1669
1670 # lock
1671 op_text = "_admin.operations.{}.".format(op_index)
1672
1673 if not self.db.set_one(
1674 target_database,
1675 q_filter={
1676 "_id": _id,
1677 op_text + "operationState": "PROCESSING",
1678 op_text + "locked_at": locked_at,
1679 },
1680 update_dict={
1681 op_text + "locked_at": now,
1682 "admin.current_operation": op_index,
1683 },
1684 fail_on_empty=False,
1685 ):
1686 return
1687
1688 unset_dict[op_text + "locked_at"] = None
1689 unset_dict["current_operation"] = None
1690 step = "Loading " + target_id
1691 error_text = self._load_vim(target_id)
1692
1693 if not error_text:
1694 step = "Checking connectivity"
1695
1696 if target == "vim":
1697 self.my_vims[target_id].check_vim_connectivity()
1698 else:
1699 self.my_vims[target_id].check_credentials()
1700
1701 update_dict["_admin.operationalState"] = "ENABLED"
1702 update_dict["_admin.detailed-status"] = ""
1703 unset_dict[op_text + "detailed-status"] = None
1704 update_dict[op_text + "operationState"] = "COMPLETED"
1705
1706 return
1707
1708 except Exception as e:
1709 error_text = "{}: {}".format(step, e)
1710 self.logger.error("{} for {}: {}".format(step, target_id, e))
1711
1712 finally:
1713 if update_dict or unset_dict:
1714 if error_text:
1715 update_dict[op_text + "operationState"] = "FAILED"
1716 update_dict[op_text + "detailed-status"] = error_text
1717 unset_dict.pop(op_text + "detailed-status", None)
1718 update_dict["_admin.operationalState"] = "ERROR"
1719 update_dict["_admin.detailed-status"] = error_text
1720
1721 if op_text:
1722 update_dict[op_text + "statusEnteredTime"] = now
1723
1724 self.db.set_one(
1725 target_database,
1726 q_filter={"_id": _id},
1727 update_dict=update_dict,
1728 unset=unset_dict,
1729 fail_on_empty=False,
1730 )
1731
1732 if not loaded:
1733 self._unload_vim(target_id)
1734
1735 def _reload_vim(self, target_id):
1736 if target_id in self.vim_targets:
1737 self._load_vim(target_id)
1738 else:
1739 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
1740 # just remove it to force load again next time it is needed
1741 self.db_vims.pop(target_id, None)
1742
1743 def _load_vim(self, target_id):
1744 """
1745 Load or reload a vim_account, sdn_controller or wim_account.
1746 Read content from database, load the plugin if not loaded.
1747 In case of error loading the plugin, it load a failing VIM_connector
1748 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
1749 :param target_id: Contains type:_id; where type can be 'vim', ...
1750 :return: None if ok, descriptive text if error
1751 """
1752 target, _, _id = target_id.partition(":")
1753 target_database = (
1754 "vim_accounts"
1755 if target == "vim"
1756 else "wim_accounts"
1757 if target == "wim"
1758 else "sdns"
1759 )
1760 plugin_name = ""
1761 vim = None
1762
1763 try:
1764 step = "Getting {}={} from db".format(target, _id)
1765 # TODO process for wim, sdnc, ...
1766 vim = self.db.get_one(target_database, {"_id": _id})
1767
1768 # if deep_get(vim, "config", "sdn-controller"):
1769 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
1770 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
1771
1772 step = "Decrypting password"
1773 schema_version = vim.get("schema_version")
1774 self.db.encrypt_decrypt_fields(
1775 vim,
1776 "decrypt",
1777 fields=("password", "secret"),
1778 schema_version=schema_version,
1779 salt=_id,
1780 )
1781 self._process_vim_config(target_id, vim)
1782
1783 if target == "vim":
1784 plugin_name = "rovim_" + vim["vim_type"]
1785 step = "Loading plugin '{}'".format(plugin_name)
1786 vim_module_conn = self._load_plugin(plugin_name)
1787 step = "Loading {}'".format(target_id)
1788 self.my_vims[target_id] = vim_module_conn(
1789 uuid=vim["_id"],
1790 name=vim["name"],
1791 tenant_id=vim.get("vim_tenant_id"),
1792 tenant_name=vim.get("vim_tenant_name"),
1793 url=vim["vim_url"],
1794 url_admin=None,
1795 user=vim["vim_user"],
1796 passwd=vim["vim_password"],
1797 config=vim.get("config") or {},
1798 persistent_info={},
1799 )
1800 else: # sdn
1801 plugin_name = "rosdn_" + vim["type"]
1802 step = "Loading plugin '{}'".format(plugin_name)
1803 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1804 step = "Loading {}'".format(target_id)
1805 wim = deepcopy(vim)
1806 wim_config = wim.pop("config", {}) or {}
1807 wim["uuid"] = wim["_id"]
1808 wim["wim_url"] = wim["url"]
1809
1810 if wim.get("dpid"):
1811 wim_config["dpid"] = wim.pop("dpid")
1812
1813 if wim.get("switch_id"):
1814 wim_config["switch_id"] = wim.pop("switch_id")
1815
1816 # wim, wim_account, config
1817 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config)
1818 self.db_vims[target_id] = vim
1819 self.error_status = None
1820
1821 self.logger.info(
1822 "Connector loaded for {}, plugin={}".format(target_id, plugin_name)
1823 )
1824 except Exception as e:
1825 self.logger.error(
1826 "Cannot load {} plugin={}: {} {}".format(
1827 target_id, plugin_name, step, e
1828 )
1829 )
1830
1831 self.db_vims[target_id] = vim or {}
1832 self.db_vims[target_id] = FailingConnector(str(e))
1833 error_status = "{} Error: {}".format(step, e)
1834
1835 return error_status
1836 finally:
1837 if target_id not in self.vim_targets:
1838 self.vim_targets.append(target_id)
1839
1840 def _get_db_task(self):
1841 """
1842 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1843 :return: None
1844 """
1845 now = time.time()
1846
1847 if not self.time_last_task_processed:
1848 self.time_last_task_processed = now
1849
1850 try:
1851 while True:
1852 """
1853 # Log RO tasks only when loglevel is DEBUG
1854 if self.logger.getEffectiveLevel() == logging.DEBUG:
1855 self._log_ro_task(
1856 None,
1857 None,
1858 None,
1859 "TASK_WF",
1860 "task_locked_time="
1861 + str(self.task_locked_time)
1862 + " "
1863 + "time_last_task_processed="
1864 + str(self.time_last_task_processed)
1865 + " "
1866 + "now="
1867 + str(now),
1868 )
1869 """
1870 locked = self.db.set_one(
1871 "ro_tasks",
1872 q_filter={
1873 "target_id": self.vim_targets,
1874 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1875 "locked_at.lt": now - self.task_locked_time,
1876 "to_check_at.lt": self.time_last_task_processed,
1877 },
1878 update_dict={"locked_by": self.my_id, "locked_at": now},
1879 fail_on_empty=False,
1880 )
1881
1882 if locked:
1883 # read and return
1884 ro_task = self.db.get_one(
1885 "ro_tasks",
1886 q_filter={
1887 "target_id": self.vim_targets,
1888 "tasks.status": ["SCHEDULED", "BUILD", "DONE", "FAILED"],
1889 "locked_at": now,
1890 },
1891 )
1892 return ro_task
1893
1894 if self.time_last_task_processed == now:
1895 self.time_last_task_processed = None
1896 return None
1897 else:
1898 self.time_last_task_processed = now
1899 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1900
1901 except DbException as e:
1902 self.logger.error("Database exception at _get_db_task: {}".format(e))
1903 except Exception as e:
1904 self.logger.critical(
1905 "Unexpected exception at _get_db_task: {}".format(e), exc_info=True
1906 )
1907
1908 return None
1909
1910 def _get_db_all_tasks(self):
1911 """
1912 Read all content of table ro_tasks to log it
1913 :return: None
1914 """
1915 try:
1916 # Checking the content of the BD:
1917
1918 # read and return
1919 ro_task = self.db.get_list("ro_tasks")
1920 for rt in ro_task:
1921 self._log_ro_task(rt, None, None, "TASK_WF", "GET_ALL_TASKS")
1922 return ro_task
1923
1924 except DbException as e:
1925 self.logger.error("Database exception at _get_db_all_tasks: {}".format(e))
1926 except Exception as e:
1927 self.logger.critical(
1928 "Unexpected exception at _get_db_all_tasks: {}".format(e), exc_info=True
1929 )
1930
1931 return None
1932
1933 def _log_ro_task(self, ro_task, db_ro_task_update, db_ro_task_delete, mark, event):
1934 """
1935 Generate a log with the following format:
1936
1937 Mark;Event;ro_task_id;locked_at;modified_at;created_at;to_check_at;locked_by;
1938 target_id;vim_info.refresh_at;vim_info;no_of_tasks;task_status;action_id;
1939 task_array_index;task_id;task_action;task_item;task_args
1940
1941 Example:
1942
1943 TASK_WF;GET_TASK;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;1642158724.8210013;
1944 1642158640.7986135;1642158640.7986135;1642158640.7986135;b134c9494e75:0a
1945 ;vim:b7ff9e24-8868-4d68-8a57-a59dc11d0327;None;{'created': False,
1946 'created_items': None, 'vim_id': None, 'vim_name': None, 'vim_status': None,
1947 'vim_details': None, 'vim_message': None, 'refresh_at': None};1;SCHEDULED;
1948 888f1864-749a-4fc2-bc1a-97c0fffd6a6f;0;888f1864-749a-4fc2-bc1a-97c0fffd6a6f:2;
1949 CREATE;image;{'filter_dict': {'name': 'ubuntu-os-cloud:image-family:ubuntu-1804-lts'}}
1950 """
1951 try:
1952 line = []
1953 i = 0
1954 if ro_task is not None and isinstance(ro_task, dict):
1955 for t in ro_task["tasks"]:
1956 line.clear()
1957 line.append(mark)
1958 line.append(event)
1959 line.append(ro_task.get("_id", ""))
1960 line.append(str(ro_task.get("locked_at", "")))
1961 line.append(str(ro_task.get("modified_at", "")))
1962 line.append(str(ro_task.get("created_at", "")))
1963 line.append(str(ro_task.get("to_check_at", "")))
1964 line.append(str(ro_task.get("locked_by", "")))
1965 line.append(str(ro_task.get("target_id", "")))
1966 line.append(str(ro_task.get("vim_info", {}).get("refresh_at", "")))
1967 line.append(str(ro_task.get("vim_info", "")))
1968 line.append(str(ro_task.get("tasks", "")))
1969 if isinstance(t, dict):
1970 line.append(str(t.get("status", "")))
1971 line.append(str(t.get("action_id", "")))
1972 line.append(str(i))
1973 line.append(str(t.get("task_id", "")))
1974 line.append(str(t.get("action", "")))
1975 line.append(str(t.get("item", "")))
1976 line.append(str(t.get("find_params", "")))
1977 line.append(str(t.get("params", "")))
1978 else:
1979 line.extend([""] * 2)
1980 line.append(str(i))
1981 line.extend([""] * 5)
1982
1983 i += 1
1984 self.logger.debug(";".join(line))
1985 elif db_ro_task_update is not None and isinstance(db_ro_task_update, dict):
1986 i = 0
1987 while True:
1988 st = "tasks.{}.status".format(i)
1989 if st not in db_ro_task_update:
1990 break
1991 line.clear()
1992 line.append(mark)
1993 line.append(event)
1994 line.append(db_ro_task_update.get("_id", ""))
1995 line.append(str(db_ro_task_update.get("locked_at", "")))
1996 line.append(str(db_ro_task_update.get("modified_at", "")))
1997 line.append("")
1998 line.append(str(db_ro_task_update.get("to_check_at", "")))
1999 line.append(str(db_ro_task_update.get("locked_by", "")))
2000 line.append("")
2001 line.append(str(db_ro_task_update.get("vim_info.refresh_at", "")))
2002 line.append("")
2003 line.append(str(db_ro_task_update.get("vim_info", "")))
2004 line.append(str(str(db_ro_task_update).count(".status")))
2005 line.append(db_ro_task_update.get(st, ""))
2006 line.append("")
2007 line.append(str(i))
2008 line.extend([""] * 3)
2009 i += 1
2010 self.logger.debug(";".join(line))
2011
2012 elif db_ro_task_delete is not None and isinstance(db_ro_task_delete, dict):
2013 line.clear()
2014 line.append(mark)
2015 line.append(event)
2016 line.append(db_ro_task_delete.get("_id", ""))
2017 line.append("")
2018 line.append(db_ro_task_delete.get("modified_at", ""))
2019 line.extend([""] * 13)
2020 self.logger.debug(";".join(line))
2021
2022 else:
2023 line.clear()
2024 line.append(mark)
2025 line.append(event)
2026 line.extend([""] * 16)
2027 self.logger.debug(";".join(line))
2028
2029 except Exception as e:
2030 self.logger.error("Error logging ro_task: {}".format(e))
2031
2032 def _delete_task(self, ro_task, task_index, task_depends, db_update):
2033 """
2034 Determine if this task need to be done or superseded
2035 :return: None
2036 """
2037 my_task = ro_task["tasks"][task_index]
2038 task_id = my_task["task_id"]
2039 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get(
2040 "created_items", False
2041 )
2042
2043 self.logger.warning("Needed delete: {}".format(needed_delete))
2044 if my_task["status"] == "FAILED":
2045 return None, None # TODO need to be retry??
2046
2047 try:
2048 for index, task in enumerate(ro_task["tasks"]):
2049 if index == task_index or not task:
2050 continue # own task
2051
2052 if (
2053 my_task["target_record"] == task["target_record"]
2054 and task["action"] == "CREATE"
2055 ):
2056 # set to finished
2057 db_update["tasks.{}.status".format(index)] = task[
2058 "status"
2059 ] = "FINISHED"
2060 elif task["action"] == "CREATE" and task["status"] not in (
2061 "FINISHED",
2062 "SUPERSEDED",
2063 ):
2064 needed_delete = False
2065
2066 if needed_delete:
2067 self.logger.warning(
2068 "Deleting ro_task={} task_index={}".format(ro_task, task_index)
2069 )
2070 return self.item2class[my_task["item"]].delete(ro_task, task_index)
2071 else:
2072 return "SUPERSEDED", None
2073 except Exception as e:
2074 if not isinstance(e, NsWorkerException):
2075 self.logger.critical(
2076 "Unexpected exception at _delete_task task={}: {}".format(
2077 task_id, e
2078 ),
2079 exc_info=True,
2080 )
2081
2082 return "FAILED", {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2083
2084 def _create_task(self, ro_task, task_index, task_depends, db_update):
2085 """
2086 Determine if this task need to create something at VIM
2087 :return: None
2088 """
2089 my_task = ro_task["tasks"][task_index]
2090 task_id = my_task["task_id"]
2091 task_status = None
2092
2093 if my_task["status"] == "FAILED":
2094 return None, None # TODO need to be retry??
2095 elif my_task["status"] == "SCHEDULED":
2096 # check if already created by another task
2097 for index, task in enumerate(ro_task["tasks"]):
2098 if index == task_index or not task:
2099 continue # own task
2100
2101 if task["action"] == "CREATE" and task["status"] not in (
2102 "SCHEDULED",
2103 "FINISHED",
2104 "SUPERSEDED",
2105 ):
2106 return task["status"], "COPY_VIM_INFO"
2107
2108 try:
2109 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
2110 ro_task, task_index, task_depends
2111 )
2112 # TODO update other CREATE tasks
2113 except Exception as e:
2114 if not isinstance(e, NsWorkerException):
2115 self.logger.error(
2116 "Error executing task={}: {}".format(task_id, e), exc_info=True
2117 )
2118
2119 task_status = "FAILED"
2120 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_message": str(e)}
2121 # TODO update ro_vim_item_update
2122
2123 return task_status, ro_vim_item_update
2124 else:
2125 return None, None
2126
2127 def _get_dependency(self, task_id, ro_task=None, target_id=None):
2128 """
2129 Look for dependency task
2130 :param task_id: Can be one of
2131 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2132 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
2133 3. task.task_id: "<action_id>:number"
2134 :param ro_task:
2135 :param target_id:
2136 :return: database ro_task plus index of task
2137 """
2138 if (
2139 task_id.startswith("vim:")
2140 or task_id.startswith("sdn:")
2141 or task_id.startswith("wim:")
2142 ):
2143 target_id, _, task_id = task_id.partition(" ")
2144
2145 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
2146 ro_task_dependency = self.db.get_one(
2147 "ro_tasks",
2148 q_filter={"target_id": target_id, "tasks.target_record_id": task_id},
2149 fail_on_empty=False,
2150 )
2151
2152 if ro_task_dependency:
2153 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2154 if task["target_record_id"] == task_id:
2155 return ro_task_dependency, task_index
2156
2157 else:
2158 if ro_task:
2159 for task_index, task in enumerate(ro_task["tasks"]):
2160 if task and task["task_id"] == task_id:
2161 return ro_task, task_index
2162
2163 ro_task_dependency = self.db.get_one(
2164 "ro_tasks",
2165 q_filter={
2166 "tasks.ANYINDEX.task_id": task_id,
2167 "tasks.ANYINDEX.target_record.ne": None,
2168 },
2169 fail_on_empty=False,
2170 )
2171
2172 self.logger.warning("ro_task_dependency={}".format(ro_task_dependency))
2173 if ro_task_dependency:
2174 for task_index, task in enumerate(ro_task_dependency["tasks"]):
2175 if task["task_id"] == task_id:
2176 return ro_task_dependency, task_index
2177 raise NsWorkerException("Cannot get depending task {}".format(task_id))
2178
2179 def _process_pending_tasks(self, ro_task):
2180 ro_task_id = ro_task["_id"]
2181 now = time.time()
2182 # one day
2183 next_check_at = now + (24 * 60 * 60)
2184 db_ro_task_update = {}
2185
2186 def _update_refresh(new_status):
2187 # compute next_refresh
2188 nonlocal task
2189 nonlocal next_check_at
2190 nonlocal db_ro_task_update
2191 nonlocal ro_task
2192
2193 next_refresh = time.time()
2194
2195 if task["item"] in ("image", "flavor"):
2196 next_refresh += self.REFRESH_IMAGE
2197 elif new_status == "BUILD":
2198 next_refresh += self.REFRESH_BUILD
2199 elif new_status == "DONE":
2200 next_refresh += self.REFRESH_ACTIVE
2201 else:
2202 next_refresh += self.REFRESH_ERROR
2203
2204 next_check_at = min(next_check_at, next_refresh)
2205 db_ro_task_update["vim_info.refresh_at"] = next_refresh
2206 ro_task["vim_info"]["refresh_at"] = next_refresh
2207
2208 try:
2209 """
2210 # Log RO tasks only when loglevel is DEBUG
2211 if self.logger.getEffectiveLevel() == logging.DEBUG:
2212 self._log_ro_task(ro_task, None, None, "TASK_WF", "GET_TASK")
2213 """
2214 # 0: get task_status_create
2215 lock_object = None
2216 task_status_create = None
2217 task_create = next(
2218 (
2219 t
2220 for t in ro_task["tasks"]
2221 if t
2222 and t["action"] == "CREATE"
2223 and t["status"] in ("BUILD", "DONE")
2224 ),
2225 None,
2226 )
2227
2228 if task_create:
2229 task_status_create = task_create["status"]
2230
2231 # 1: look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
2232 for task_action in ("DELETE", "CREATE", "EXEC"):
2233 db_vim_update = None
2234 new_status = None
2235
2236 for task_index, task in enumerate(ro_task["tasks"]):
2237 if not task:
2238 continue # task deleted
2239
2240 task_depends = {}
2241 target_update = None
2242
2243 if (
2244 (
2245 task_action in ("DELETE", "EXEC")
2246 and task["status"] not in ("SCHEDULED", "BUILD")
2247 )
2248 or task["action"] != task_action
2249 or (
2250 task_action == "CREATE"
2251 and task["status"] in ("FINISHED", "SUPERSEDED")
2252 )
2253 ):
2254 continue
2255
2256 task_path = "tasks.{}.status".format(task_index)
2257 try:
2258 db_vim_info_update = None
2259
2260 if task["status"] == "SCHEDULED":
2261 # check if tasks that this depends on have been completed
2262 dependency_not_completed = False
2263
2264 for dependency_task_id in task.get("depends_on") or ():
2265 (
2266 dependency_ro_task,
2267 dependency_task_index,
2268 ) = self._get_dependency(
2269 dependency_task_id, target_id=ro_task["target_id"]
2270 )
2271 dependency_task = dependency_ro_task["tasks"][
2272 dependency_task_index
2273 ]
2274 self.logger.warning(
2275 "dependency_ro_task={} dependency_task_index={}".format(
2276 dependency_ro_task, dependency_task_index
2277 )
2278 )
2279
2280 if dependency_task["status"] == "SCHEDULED":
2281 dependency_not_completed = True
2282 next_check_at = min(
2283 next_check_at, dependency_ro_task["to_check_at"]
2284 )
2285 # must allow dependent task to be processed first
2286 # to do this set time after last_task_processed
2287 next_check_at = max(
2288 self.time_last_task_processed, next_check_at
2289 )
2290 break
2291 elif dependency_task["status"] == "FAILED":
2292 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
2293 task["action"],
2294 task["item"],
2295 dependency_task["action"],
2296 dependency_task["item"],
2297 dependency_task_id,
2298 dependency_ro_task["vim_info"].get(
2299 "vim_message"
2300 ),
2301 )
2302 self.logger.error(
2303 "task={} {}".format(task["task_id"], error_text)
2304 )
2305 raise NsWorkerException(error_text)
2306
2307 task_depends[dependency_task_id] = dependency_ro_task[
2308 "vim_info"
2309 ]["vim_id"]
2310 task_depends[
2311 "TASK-{}".format(dependency_task_id)
2312 ] = dependency_ro_task["vim_info"]["vim_id"]
2313
2314 if dependency_not_completed:
2315 self.logger.warning(
2316 "DEPENDENCY NOT COMPLETED {}".format(
2317 dependency_ro_task["vim_info"]["vim_id"]
2318 )
2319 )
2320 # TODO set at vim_info.vim_details that it is waiting
2321 continue
2322
2323 # before calling VIM-plugin as it can take more than task_locked_time, insert to LockRenew
2324 # the task of renew this locking. It will update database locket_at periodically
2325 if not lock_object:
2326 lock_object = LockRenew.add_lock_object(
2327 "ro_tasks", ro_task, self
2328 )
2329
2330 if task["action"] == "DELETE":
2331 (new_status, db_vim_info_update,) = self._delete_task(
2332 ro_task, task_index, task_depends, db_ro_task_update
2333 )
2334 new_status = (
2335 "FINISHED" if new_status == "DONE" else new_status
2336 )
2337 # ^with FINISHED instead of DONE it will not be refreshing
2338
2339 if new_status in ("FINISHED", "SUPERSEDED"):
2340 target_update = "DELETE"
2341 elif task["action"] == "EXEC":
2342 (
2343 new_status,
2344 db_vim_info_update,
2345 db_task_update,
2346 ) = self.item2class[task["item"]].exec(
2347 ro_task, task_index, task_depends
2348 )
2349 new_status = (
2350 "FINISHED" if new_status == "DONE" else new_status
2351 )
2352 # ^with FINISHED instead of DONE it will not be refreshing
2353
2354 if db_task_update:
2355 # load into database the modified db_task_update "retries" and "next_retry"
2356 if db_task_update.get("retries"):
2357 db_ro_task_update[
2358 "tasks.{}.retries".format(task_index)
2359 ] = db_task_update["retries"]
2360
2361 next_check_at = time.time() + db_task_update.get(
2362 "next_retry", 60
2363 )
2364 target_update = None
2365 elif task["action"] == "CREATE":
2366 if task["status"] == "SCHEDULED":
2367 if task_status_create:
2368 new_status = task_status_create
2369 target_update = "COPY_VIM_INFO"
2370 else:
2371 new_status, db_vim_info_update = self.item2class[
2372 task["item"]
2373 ].new(ro_task, task_index, task_depends)
2374 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
2375 _update_refresh(new_status)
2376 else:
2377 if (
2378 ro_task["vim_info"]["refresh_at"]
2379 and now > ro_task["vim_info"]["refresh_at"]
2380 ):
2381 new_status, db_vim_info_update = self.item2class[
2382 task["item"]
2383 ].refresh(ro_task)
2384 _update_refresh(new_status)
2385 else:
2386 # The refresh is updated to avoid set the value of "refresh_at" to
2387 # default value (next_check_at = now + (24 * 60 * 60)) when status is BUILD,
2388 # because it can happen that in this case the task is never processed
2389 _update_refresh(task["status"])
2390
2391 except Exception as e:
2392 new_status = "FAILED"
2393 db_vim_info_update = {
2394 "vim_status": "VIM_ERROR",
2395 "vim_message": str(e),
2396 }
2397
2398 if not isinstance(
2399 e, (NsWorkerException, vimconn.VimConnException)
2400 ):
2401 self.logger.error(
2402 "Unexpected exception at _delete_task task={}: {}".format(
2403 task["task_id"], e
2404 ),
2405 exc_info=True,
2406 )
2407
2408 try:
2409 if db_vim_info_update:
2410 db_vim_update = db_vim_info_update.copy()
2411 db_ro_task_update.update(
2412 {
2413 "vim_info." + k: v
2414 for k, v in db_vim_info_update.items()
2415 }
2416 )
2417 ro_task["vim_info"].update(db_vim_info_update)
2418
2419 if new_status:
2420 if task_action == "CREATE":
2421 task_status_create = new_status
2422 db_ro_task_update[task_path] = new_status
2423
2424 if target_update or db_vim_update:
2425 if target_update == "DELETE":
2426 self._update_target(task, None)
2427 elif target_update == "COPY_VIM_INFO":
2428 self._update_target(task, ro_task["vim_info"])
2429 else:
2430 self._update_target(task, db_vim_update)
2431
2432 except Exception as e:
2433 if (
2434 isinstance(e, DbException)
2435 and e.http_code == HTTPStatus.NOT_FOUND
2436 ):
2437 # if the vnfrs or nsrs has been removed from database, this task must be removed
2438 self.logger.debug(
2439 "marking to delete task={}".format(task["task_id"])
2440 )
2441 self.tasks_to_delete.append(task)
2442 else:
2443 self.logger.error(
2444 "Unexpected exception at _update_target task={}: {}".format(
2445 task["task_id"], e
2446 ),
2447 exc_info=True,
2448 )
2449
2450 locked_at = ro_task["locked_at"]
2451
2452 if lock_object:
2453 locked_at = [
2454 lock_object["locked_at"],
2455 lock_object["locked_at"] + self.task_locked_time,
2456 ]
2457 # locked_at contains two times to avoid race condition. In case the lock has been renew, it will
2458 # contain exactly locked_at + self.task_locked_time
2459 LockRenew.remove_lock_object(lock_object)
2460
2461 q_filter = {
2462 "_id": ro_task["_id"],
2463 "to_check_at": ro_task["to_check_at"],
2464 "locked_at": locked_at,
2465 }
2466 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
2467 # outside this task (by ro_nbi) do not update it
2468 db_ro_task_update["locked_by"] = None
2469 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
2470 db_ro_task_update["locked_at"] = int(now - self.task_locked_time)
2471 db_ro_task_update["modified_at"] = now
2472 db_ro_task_update["to_check_at"] = next_check_at
2473
2474 """
2475 # Log RO tasks only when loglevel is DEBUG
2476 if self.logger.getEffectiveLevel() == logging.DEBUG:
2477 db_ro_task_update_log = db_ro_task_update.copy()
2478 db_ro_task_update_log["_id"] = q_filter["_id"]
2479 self._log_ro_task(None, db_ro_task_update_log, None, "TASK_WF", "SET_TASK")
2480 """
2481
2482 if not self.db.set_one(
2483 "ro_tasks",
2484 update_dict=db_ro_task_update,
2485 q_filter=q_filter,
2486 fail_on_empty=False,
2487 ):
2488 del db_ro_task_update["to_check_at"]
2489 del q_filter["to_check_at"]
2490 """
2491 # Log RO tasks only when loglevel is DEBUG
2492 if self.logger.getEffectiveLevel() == logging.DEBUG:
2493 self._log_ro_task(
2494 None,
2495 db_ro_task_update_log,
2496 None,
2497 "TASK_WF",
2498 "SET_TASK " + str(q_filter),
2499 )
2500 """
2501 self.db.set_one(
2502 "ro_tasks",
2503 q_filter=q_filter,
2504 update_dict=db_ro_task_update,
2505 fail_on_empty=True,
2506 )
2507 except DbException as e:
2508 self.logger.error(
2509 "ro_task={} Error updating database {}".format(ro_task_id, e)
2510 )
2511 except Exception as e:
2512 self.logger.error(
2513 "Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True
2514 )
2515
2516 def _update_target(self, task, ro_vim_item_update):
2517 table, _, temp = task["target_record"].partition(":")
2518 _id, _, path_vim_status = temp.partition(":")
2519 path_item = path_vim_status[: path_vim_status.rfind(".")]
2520 path_item = path_item[: path_item.rfind(".")]
2521 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
2522 # path_item: dot separated list targeting record information, e.g. "vdur.10"
2523
2524 if ro_vim_item_update:
2525 update_dict = {
2526 path_vim_status + "." + k: v
2527 for k, v in ro_vim_item_update.items()
2528 if k
2529 in (
2530 "vim_id",
2531 "vim_details",
2532 "vim_message",
2533 "vim_name",
2534 "vim_status",
2535 "interfaces",
2536 "interfaces_backup",
2537 )
2538 }
2539
2540 if path_vim_status.startswith("vdur."):
2541 # for backward compatibility, add vdur.name apart from vdur.vim_name
2542 if ro_vim_item_update.get("vim_name"):
2543 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
2544
2545 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
2546 if ro_vim_item_update.get("vim_id"):
2547 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
2548
2549 # update general status
2550 if ro_vim_item_update.get("vim_status"):
2551 update_dict[path_item + ".status"] = ro_vim_item_update[
2552 "vim_status"
2553 ]
2554
2555 if ro_vim_item_update.get("interfaces"):
2556 path_interfaces = path_item + ".interfaces"
2557
2558 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
2559 if iface:
2560 update_dict.update(
2561 {
2562 path_interfaces + ".{}.".format(i) + k: v
2563 for k, v in iface.items()
2564 if k in ("vlan", "compute_node", "pci")
2565 }
2566 )
2567
2568 # put ip_address and mac_address with ip-address and mac-address
2569 if iface.get("ip_address"):
2570 update_dict[
2571 path_interfaces + ".{}.".format(i) + "ip-address"
2572 ] = iface["ip_address"]
2573
2574 if iface.get("mac_address"):
2575 update_dict[
2576 path_interfaces + ".{}.".format(i) + "mac-address"
2577 ] = iface["mac_address"]
2578
2579 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
2580 update_dict["ip-address"] = iface.get("ip_address").split(
2581 ";"
2582 )[0]
2583
2584 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
2585 update_dict[path_item + ".ip-address"] = iface.get(
2586 "ip_address"
2587 ).split(";")[0]
2588
2589 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
2590
2591 # If interfaces exists, it backups VDU interfaces in the DB for healing operations
2592 if ro_vim_item_update.get("interfaces"):
2593 search_key = path_vim_status + ".interfaces"
2594 if update_dict.get(search_key):
2595 interfaces_backup_update = {
2596 path_vim_status + ".interfaces_backup": update_dict[search_key]
2597 }
2598
2599 self.db.set_one(
2600 table,
2601 q_filter={"_id": _id},
2602 update_dict=interfaces_backup_update,
2603 )
2604
2605 else:
2606 update_dict = {path_item + ".status": "DELETED"}
2607 self.db.set_one(
2608 table,
2609 q_filter={"_id": _id},
2610 update_dict=update_dict,
2611 unset={path_vim_status: None},
2612 )
2613
2614 def _process_delete_db_tasks(self):
2615 """
2616 Delete task from database because vnfrs or nsrs or both have been deleted
2617 :return: None. Uses and modify self.tasks_to_delete
2618 """
2619 while self.tasks_to_delete:
2620 task = self.tasks_to_delete[0]
2621 vnfrs_deleted = None
2622 nsr_id = task["nsr_id"]
2623
2624 if task["target_record"].startswith("vnfrs:"):
2625 # check if nsrs is present
2626 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
2627 vnfrs_deleted = task["target_record"].split(":")[1]
2628
2629 try:
2630 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
2631 except Exception as e:
2632 self.logger.error(
2633 "Error deleting task={}: {}".format(task["task_id"], e)
2634 )
2635 self.tasks_to_delete.pop(0)
2636
2637 @staticmethod
2638 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
2639 """
2640 Static method because it is called from osm_ng_ro.ns
2641 :param db: instance of database to use
2642 :param nsr_id: affected nsrs id
2643 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
2644 :return: None, exception is fails
2645 """
2646 retries = 5
2647 for retry in range(retries):
2648 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
2649 now = time.time()
2650 conflict = False
2651
2652 for ro_task in ro_tasks:
2653 db_update = {}
2654 to_delete_ro_task = True
2655
2656 for index, task in enumerate(ro_task["tasks"]):
2657 if not task:
2658 pass
2659 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or (
2660 vnfrs_deleted
2661 and task["target_record"].startswith("vnfrs:" + vnfrs_deleted)
2662 ):
2663 db_update["tasks.{}".format(index)] = None
2664 else:
2665 # used by other nsr, ro_task cannot be deleted
2666 to_delete_ro_task = False
2667
2668 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
2669 if to_delete_ro_task:
2670 if not db.del_one(
2671 "ro_tasks",
2672 q_filter={
2673 "_id": ro_task["_id"],
2674 "modified_at": ro_task["modified_at"],
2675 },
2676 fail_on_empty=False,
2677 ):
2678 conflict = True
2679 elif db_update:
2680 db_update["modified_at"] = now
2681 if not db.set_one(
2682 "ro_tasks",
2683 q_filter={
2684 "_id": ro_task["_id"],
2685 "modified_at": ro_task["modified_at"],
2686 },
2687 update_dict=db_update,
2688 fail_on_empty=False,
2689 ):
2690 conflict = True
2691 if not conflict:
2692 return
2693 else:
2694 raise NsWorkerException("Exceeded {} retries".format(retries))
2695
2696 def run(self):
2697 # load database
2698 self.logger.info("Starting")
2699 while True:
2700 # step 1: get commands from queue
2701 try:
2702 if self.vim_targets:
2703 task = self.task_queue.get(block=False)
2704 else:
2705 if not self.idle:
2706 self.logger.debug("enters in idle state")
2707 self.idle = True
2708 task = self.task_queue.get(block=True)
2709 self.idle = False
2710
2711 if task[0] == "terminate":
2712 break
2713 elif task[0] == "load_vim":
2714 self.logger.info("order to load vim {}".format(task[1]))
2715 self._load_vim(task[1])
2716 elif task[0] == "unload_vim":
2717 self.logger.info("order to unload vim {}".format(task[1]))
2718 self._unload_vim(task[1])
2719 elif task[0] == "reload_vim":
2720 self._reload_vim(task[1])
2721 elif task[0] == "check_vim":
2722 self.logger.info("order to check vim {}".format(task[1]))
2723 self._check_vim(task[1])
2724 continue
2725 except Exception as e:
2726 if isinstance(e, queue.Empty):
2727 pass
2728 else:
2729 self.logger.critical(
2730 "Error processing task: {}".format(e), exc_info=True
2731 )
2732
2733 # step 2: process pending_tasks, delete not needed tasks
2734 try:
2735 if self.tasks_to_delete:
2736 self._process_delete_db_tasks()
2737 busy = False
2738 """
2739 # Log RO tasks only when loglevel is DEBUG
2740 if self.logger.getEffectiveLevel() == logging.DEBUG:
2741 _ = self._get_db_all_tasks()
2742 """
2743 ro_task = self._get_db_task()
2744 if ro_task:
2745 self.logger.warning("Task to process: {}".format(ro_task))
2746 time.sleep(1)
2747 self._process_pending_tasks(ro_task)
2748 busy = True
2749 if not busy:
2750 time.sleep(5)
2751 except Exception as e:
2752 self.logger.critical(
2753 "Unexpected exception at run: " + str(e), exc_info=True
2754 )
2755
2756 self.logger.info("Finishing")