fix error on task_depends not defined
[osm/RO.git] / NG-RO / osm_ng_ro / ns_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 ##
19
20 """"
21 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
22 The tasks are stored at database in table ro_tasks
23 A single ro_task refers to a VIM element (flavor, image, network, ...).
24 A ro_task can contain several 'tasks', each one with a target, where to store the results
25 """
26
27 import threading
28 import time
29 import queue
30 import logging
31 import yaml
32 from pkg_resources import iter_entry_points
33 # from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
34 from osm_common.dbbase import DbException
35 # from osm_common.fsbase import FsException
36 # from osm_common.msgbase import MsgException
37 from osm_ro_plugin.vim_dummy import VimDummyConnector
38 from osm_ro_plugin.sdn_dummy import SdnDummyConnector
39 from osm_ro_plugin import vimconn, sdnconn
40 from copy import deepcopy
41 from unittest.mock import Mock
42 from http import HTTPStatus
43 from os import mkdir
44 from shutil import rmtree
45
46 __author__ = "Alfonso Tierno"
47 __date__ = "$28-Sep-2017 12:07:15$"
48
49
50 def deep_get(target_dict, *args, **kwargs):
51 """
52 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
53 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
54 :param target_dict: dictionary to be read
55 :param args: list of keys to read from target_dict
56 :param kwargs: only can contain default=value to return if key is not present in the nested dictionary
57 :return: The wanted value if exist, None or default otherwise
58 """
59 for key in args:
60 if not isinstance(target_dict, dict) or key not in target_dict:
61 return kwargs.get("default")
62 target_dict = target_dict[key]
63 return target_dict
64
65
66 class NsWorkerException(Exception):
67 pass
68
69
70 class FailingConnector:
71 def __init__(self, error_msg):
72 self.error_msg = error_msg
73 for method in dir(vimconn.VimConnector):
74 if method[0] != "_":
75 setattr(self, method, Mock(side_effect=vimconn.VimConnException(error_msg)))
76 for method in dir(sdnconn.SdnConnectorBase):
77 if method[0] != "_":
78 setattr(self, method, Mock(side_effect=sdnconn.SdnConnectorError(error_msg)))
79
80
81 class NsWorkerExceptionNotFound(NsWorkerException):
82 pass
83
84
85 class VimInteractionBase:
86 """ Base class to call VIM/SDN for creating, deleting and refresh networks, VMs, flavors, ...
87 It implements methods that does nothing and return ok"""
88 def __init__(self, db, my_vims, db_vims, logger):
89 self.db = db
90 self.logger = logger
91 self.my_vims = my_vims
92 self.db_vims = db_vims
93
94 def new(self, ro_task, task_index, task_depends):
95 return "BUILD", {}
96
97 def refresh(self, ro_task):
98 """skip calling VIM to get image, flavor status. Assumes ok"""
99 if ro_task["vim_info"]["vim_status"] == "VIM_ERROR":
100 return "FAILED", {}
101 return "DONE", {}
102
103 def delete(self, ro_task, task_index):
104 """skip calling VIM to delete image. Assumes ok"""
105 return "DONE", {}
106
107 def exec(self, ro_task, task_index, task_depends):
108 return "DONE", None, None
109
110
111 class VimInteractionNet(VimInteractionBase):
112
113 def new(self, ro_task, task_index, task_depends):
114 vim_net_id = None
115 task = ro_task["tasks"][task_index]
116 task_id = task["task_id"]
117 created = False
118 created_items = {}
119 target_vim = self.my_vims[ro_task["target_id"]]
120 try:
121 # FIND
122 if task.get("find_params"):
123 # if management, get configuration of VIM
124 if task["find_params"].get("filter_dict"):
125 vim_filter = task["find_params"]["filter_dict"]
126 elif task["find_params"].get("mgmt"): # mamagement network
127 if deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_id"):
128 vim_filter = {"id": self.db_vims[ro_task["target_id"]]["config"]["management_network_id"]}
129 elif deep_get(self.db_vims[ro_task["target_id"]], "config", "management_network_name"):
130 vim_filter = {"name": self.db_vims[ro_task["target_id"]]["config"]["management_network_name"]}
131 else:
132 vim_filter = {"name": task["find_params"]["name"]}
133 else:
134 raise NsWorkerExceptionNotFound("Invalid find_params for new_net {}".format(task["find_params"]))
135
136 vim_nets = target_vim.get_network_list(vim_filter)
137 if not vim_nets and not task.get("params"):
138 raise NsWorkerExceptionNotFound("Network not found with this criteria: '{}'".format(
139 task.get("find_params")))
140 elif len(vim_nets) > 1:
141 raise NsWorkerException(
142 "More than one network found with this criteria: '{}'".format(task["find_params"]))
143 if vim_nets:
144 vim_net_id = vim_nets[0]["id"]
145 else:
146 # CREATE
147 params = task["params"]
148 vim_net_id, created_items = target_vim.new_network(**params)
149 created = True
150
151 ro_vim_item_update = {"vim_id": vim_net_id,
152 "vim_status": "BUILD",
153 "created": created,
154 "created_items": created_items,
155 "vim_details": None}
156 self.logger.debug(
157 "task={} {} new-net={} created={}".format(task_id, ro_task["target_id"], vim_net_id, created))
158 return "BUILD", ro_vim_item_update
159 except (vimconn.VimConnException, NsWorkerException) as e:
160 self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e))
161 ro_vim_item_update = {"vim_status": "VIM_ERROR",
162 "created": created,
163 "vim_details": str(e)}
164 return "FAILED", ro_vim_item_update
165
166 def refresh(self, ro_task):
167 """Call VIM to get network status"""
168 ro_task_id = ro_task["_id"]
169 target_vim = self.my_vims[ro_task["target_id"]]
170
171 vim_id = ro_task["vim_info"]["vim_id"]
172 net_to_refresh_list = [vim_id]
173 try:
174 vim_dict = target_vim.refresh_nets_status(net_to_refresh_list)
175 vim_info = vim_dict[vim_id]
176 if vim_info["status"] == "ACTIVE":
177 task_status = "DONE"
178 elif vim_info["status"] == "BUILD":
179 task_status = "BUILD"
180 else:
181 task_status = "FAILED"
182 except vimconn.VimConnException as e:
183 # Mark all tasks at VIM_ERROR status
184 self.logger.error("ro_task={} vim={} get-net={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
185 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
186 task_status = "FAILED"
187
188 ro_vim_item_update = {}
189 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
190 ro_vim_item_update["vim_status"] = vim_info["status"]
191 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
192 ro_vim_item_update["vim_name"] = vim_info.get("name")
193 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
194 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
195 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
196 elif vim_info["status"] == "DELETED":
197 ro_vim_item_update["vim_id"] = None
198 ro_vim_item_update["vim_details"] = "Deleted externally"
199 else:
200 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
201 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
202 if ro_vim_item_update:
203 self.logger.debug("ro_task={} {} get-net={}: status={} {}".format(
204 ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
205 ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
206 return task_status, ro_vim_item_update
207
208 def delete(self, ro_task, task_index):
209 task = ro_task["tasks"][task_index]
210 task_id = task["task_id"]
211 net_vim_id = ro_task["vim_info"]["vim_id"]
212 ro_vim_item_update_ok = {"vim_status": "DELETED",
213 "created": False,
214 "vim_details": "DELETED",
215 "vim_id": None}
216 try:
217 if net_vim_id or ro_task["vim_info"]["created_items"]:
218 target_vim = self.my_vims[ro_task["target_id"]]
219 target_vim.delete_network(net_vim_id, ro_task["vim_info"]["created_items"])
220
221 except vimconn.VimConnNotFoundException:
222 ro_vim_item_update_ok["vim_details"] = "already deleted"
223
224 except vimconn.VimConnException as e:
225 self.logger.error("ro_task={} vim={} del-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
226 net_vim_id, e))
227 ro_vim_item_update = {"vim_status": "VIM_ERROR",
228 "vim_details": "Error while deleting: {}".format(e)}
229 return "FAILED", ro_vim_item_update
230
231 self.logger.debug("task={} {} del-net={} {}".format(task_id, ro_task["target_id"], net_vim_id,
232 ro_vim_item_update_ok.get("vim_details", "")))
233 return "DONE", ro_vim_item_update_ok
234
235
236 class VimInteractionVdu(VimInteractionBase):
237 max_retries_inject_ssh_key = 20 # 20 times
238 time_retries_inject_ssh_key = 30 # wevery 30 seconds
239
240 def new(self, ro_task, task_index, task_depends):
241 task = ro_task["tasks"][task_index]
242 task_id = task["task_id"]
243 created = False
244 created_items = {}
245 target_vim = self.my_vims[ro_task["target_id"]]
246 try:
247 created = True
248 params = task["params"]
249 params_copy = deepcopy(params)
250 net_list = params_copy["net_list"]
251 for net in net_list:
252 if "net_id" in net and net["net_id"].startswith("TASK-"): # change task_id into network_id
253 network_id = task_depends[net["net_id"]]
254 if not network_id:
255 raise NsWorkerException("Cannot create VM because depends on a network not created or found "
256 "for {}".format(net["net_id"]))
257 net["net_id"] = network_id
258 if params_copy["image_id"].startswith("TASK-"):
259 params_copy["image_id"] = task_depends[params_copy["image_id"]]
260 if params_copy["flavor_id"].startswith("TASK-"):
261 params_copy["flavor_id"] = task_depends[params_copy["flavor_id"]]
262
263 vim_vm_id, created_items = target_vim.new_vminstance(**params_copy)
264 interfaces = [iface["vim_id"] for iface in params_copy["net_list"]]
265
266 ro_vim_item_update = {"vim_id": vim_vm_id,
267 "vim_status": "BUILD",
268 "created": created,
269 "created_items": created_items,
270 "vim_details": None,
271 "interfaces_vim_ids": interfaces,
272 "interfaces": [],
273 }
274 self.logger.debug(
275 "task={} {} new-vm={} created={}".format(task_id, ro_task["target_id"], vim_vm_id, created))
276 return "BUILD", ro_vim_item_update
277 except (vimconn.VimConnException, NsWorkerException) as e:
278 self.logger.error("task={} {} new-vm: {}".format(task_id, ro_task["target_id"], e))
279 ro_vim_item_update = {"vim_status": "VIM_ERROR",
280 "created": created,
281 "vim_details": str(e)}
282 return "FAILED", ro_vim_item_update
283
284 def delete(self, ro_task, task_index):
285 task = ro_task["tasks"][task_index]
286 task_id = task["task_id"]
287 vm_vim_id = ro_task["vim_info"]["vim_id"]
288 ro_vim_item_update_ok = {"vim_status": "DELETED",
289 "created": False,
290 "vim_details": "DELETED",
291 "vim_id": None}
292 try:
293 if vm_vim_id or ro_task["vim_info"]["created_items"]:
294 target_vim = self.my_vims[ro_task["target_id"]]
295 target_vim.delete_vminstance(vm_vim_id, ro_task["vim_info"]["created_items"])
296
297 except vimconn.VimConnNotFoundException:
298 ro_vim_item_update_ok["vim_details"] = "already deleted"
299
300 except vimconn.VimConnException as e:
301 self.logger.error("ro_task={} vim={} del-vm={}: {}".format(ro_task["_id"], ro_task["target_id"],
302 vm_vim_id, e))
303 ro_vim_item_update = {"vim_status": "VIM_ERROR",
304 "vim_details": "Error while deleting: {}".format(e)}
305 return "FAILED", ro_vim_item_update
306
307 self.logger.debug("task={} {} del-vm={} {}".format(task_id, ro_task["target_id"], vm_vim_id,
308 ro_vim_item_update_ok.get("vim_details", "")))
309 return "DONE", ro_vim_item_update_ok
310
311 def refresh(self, ro_task):
312 """Call VIM to get vm status"""
313 ro_task_id = ro_task["_id"]
314 target_vim = self.my_vims[ro_task["target_id"]]
315
316 vim_id = ro_task["vim_info"]["vim_id"]
317 if not vim_id:
318 return None, None
319 vm_to_refresh_list = [vim_id]
320 try:
321 vim_dict = target_vim.refresh_vms_status(vm_to_refresh_list)
322 vim_info = vim_dict[vim_id]
323 if vim_info["status"] == "ACTIVE":
324 task_status = "DONE"
325 elif vim_info["status"] == "BUILD":
326 task_status = "BUILD"
327 else:
328 task_status = "FAILED"
329 # try to load and parse vim_information
330 try:
331 vim_info_info = yaml.safe_load(vim_info["vim_info"])
332 if vim_info_info.get("name"):
333 vim_info["name"] = vim_info_info["name"]
334 except Exception:
335 pass
336 except vimconn.VimConnException as e:
337 # Mark all tasks at VIM_ERROR status
338 self.logger.error("ro_task={} vim={} get-vm={}: {}".format(ro_task_id, ro_task["target_id"], vim_id, e))
339 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
340 task_status = "FAILED"
341
342 ro_vim_item_update = {}
343 # Interfaces cannot be present if e.g. VM is not present, that is status=DELETED
344 vim_interfaces = []
345 if vim_info.get("interfaces"):
346 for vim_iface_id in ro_task["vim_info"]["interfaces_vim_ids"]:
347 iface = next((iface for iface in vim_info["interfaces"] if vim_iface_id == iface["vim_interface_id"]),
348 None)
349 # if iface:
350 # iface.pop("vim_info", None)
351 vim_interfaces.append(iface)
352
353 task_create = next(t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and t["status"] != "FINISHED")
354 if vim_interfaces and task_create.get("mgmt_vnf_interface") is not None:
355 vim_interfaces[task_create["mgmt_vnf_interface"]]["mgmt_vnf_interface"] = True
356 mgmt_vdu_iface = task_create.get("mgmt_vdu_interface", task_create.get("mgmt_vnf_interface", 0))
357 if vim_interfaces:
358 vim_interfaces[mgmt_vdu_iface]["mgmt_vdu_interface"] = True
359
360 if ro_task["vim_info"]["interfaces"] != vim_interfaces:
361 ro_vim_item_update["interfaces"] = vim_interfaces
362 if ro_task["vim_info"]["vim_status"] != vim_info["status"]:
363 ro_vim_item_update["vim_status"] = vim_info["status"]
364 if ro_task["vim_info"]["vim_name"] != vim_info.get("name"):
365 ro_vim_item_update["vim_name"] = vim_info.get("name")
366 if vim_info["status"] in ("ERROR", "VIM_ERROR"):
367 if ro_task["vim_info"]["vim_details"] != vim_info.get("error_msg"):
368 ro_vim_item_update["vim_details"] = vim_info.get("error_msg")
369 elif vim_info["status"] == "DELETED":
370 ro_vim_item_update["vim_id"] = None
371 ro_vim_item_update["vim_details"] = "Deleted externally"
372 else:
373 if ro_task["vim_info"]["vim_details"] != vim_info["vim_info"]:
374 ro_vim_item_update["vim_details"] = vim_info["vim_info"]
375 if ro_vim_item_update:
376 self.logger.debug("ro_task={} {} get-vm={}: status={} {}".format(
377 ro_task_id, ro_task["target_id"], vim_id, ro_vim_item_update.get("vim_status"),
378 ro_vim_item_update.get("vim_details") if ro_vim_item_update.get("vim_status") != "ACTIVE" else ''))
379 return task_status, ro_vim_item_update
380
381 def exec(self, ro_task, task_index, task_depends):
382 task = ro_task["tasks"][task_index]
383 task_id = task["task_id"]
384 target_vim = self.my_vims[ro_task["target_id"]]
385 db_task_update = {"retries": 0}
386 retries = task.get("retries", 0)
387 try:
388 params = task["params"]
389 params_copy = deepcopy(params)
390 params_copy["ro_key"] = self.db.decrypt(params_copy.pop("private_key"),
391 params_copy.pop("schema_version"), params_copy.pop("salt"))
392 params_copy["ip_addr"] = params_copy.pop("ip_address")
393 target_vim.inject_user_key(**params_copy)
394 self.logger.debug(
395 "task={} {} action-vm=inject_key".format(task_id, ro_task["target_id"]))
396 return "DONE", None, db_task_update, # params_copy["key"]
397 except (vimconn.VimConnException, NsWorkerException) as e:
398 retries += 1
399 if retries < self.max_retries_inject_ssh_key:
400 return "BUILD", None, {"retries": retries, "next_retry": self.time_retries_inject_ssh_key}
401 self.logger.error("task={} {} inject-ssh-key: {}".format(task_id, ro_task["target_id"], e))
402 ro_vim_item_update = {"vim_details": str(e)}
403 return "FAILED", ro_vim_item_update, db_task_update
404
405
406 class VimInteractionImage(VimInteractionBase):
407
408 def new(self, ro_task, task_index, task_depends):
409 task = ro_task["tasks"][task_index]
410 task_id = task["task_id"]
411 created = False
412 created_items = {}
413 target_vim = self.my_vims[ro_task["target_id"]]
414 try:
415 # FIND
416 if task.get("find_params"):
417 vim_images = target_vim.get_image_list(**task["find_params"])
418 if not vim_images:
419 raise NsWorkerExceptionNotFound("Image not found with this criteria: '{}'".format(
420 task["find_params"]))
421 elif len(vim_images) > 1:
422 raise NsWorkerException(
423 "More than one network found with this criteria: '{}'".format(task["find_params"]))
424 else:
425 vim_image_id = vim_images[0]["id"]
426
427 ro_vim_item_update = {"vim_id": vim_image_id,
428 "vim_status": "DONE",
429 "created": created,
430 "created_items": created_items,
431 "vim_details": None}
432 self.logger.debug(
433 "task={} {} new-image={} created={}".format(task_id, ro_task["target_id"], vim_image_id, created))
434 return "DONE", ro_vim_item_update
435 except (NsWorkerException, vimconn.VimConnException) as e:
436 self.logger.error("task={} {} new-image: {}".format(task_id, ro_task["target_id"], e))
437 ro_vim_item_update = {"vim_status": "VIM_ERROR",
438 "created": created,
439 "vim_details": str(e)}
440 return "FAILED", ro_vim_item_update
441
442
443 class VimInteractionFlavor(VimInteractionBase):
444
445 def delete(self, ro_task, task_index):
446 task = ro_task["tasks"][task_index]
447 task_id = task["task_id"]
448 flavor_vim_id = ro_task["vim_info"]["vim_id"]
449 ro_vim_item_update_ok = {"vim_status": "DELETED",
450 "created": False,
451 "vim_details": "DELETED",
452 "vim_id": None}
453 try:
454 if flavor_vim_id:
455 target_vim = self.my_vims[ro_task["target_id"]]
456 target_vim.delete_flavor(flavor_vim_id)
457
458 except vimconn.VimConnNotFoundException:
459 ro_vim_item_update_ok["vim_details"] = "already deleted"
460
461 except vimconn.VimConnException as e:
462 self.logger.error("ro_task={} vim={} del-flavor={}: {}".format(
463 ro_task["_id"], ro_task["target_id"], flavor_vim_id, e))
464 ro_vim_item_update = {"vim_status": "VIM_ERROR",
465 "vim_details": "Error while deleting: {}".format(e)}
466 return "FAILED", ro_vim_item_update
467
468 self.logger.debug("task={} {} del-flavor={} {}".format(
469 task_id, ro_task["target_id"], flavor_vim_id, ro_vim_item_update_ok.get("vim_details", "")))
470 return "DONE", ro_vim_item_update_ok
471
472 def new(self, ro_task, task_index, task_depends):
473 task = ro_task["tasks"][task_index]
474 task_id = task["task_id"]
475 created = False
476 created_items = {}
477 target_vim = self.my_vims[ro_task["target_id"]]
478 try:
479 # FIND
480 vim_flavor_id = None
481 if task.get("find_params"):
482 try:
483 flavor_data = task["find_params"]["flavor_data"]
484 vim_flavor_id = target_vim.get_flavor_id_from_data(flavor_data)
485 except vimconn.VimConnNotFoundException:
486 pass
487
488 if not vim_flavor_id and task.get("params"):
489 # CREATE
490 flavor_data = task["params"]["flavor_data"]
491 vim_flavor_id = target_vim.new_flavor(flavor_data)
492 created = True
493
494 ro_vim_item_update = {"vim_id": vim_flavor_id,
495 "vim_status": "DONE",
496 "created": created,
497 "created_items": created_items,
498 "vim_details": None}
499 self.logger.debug(
500 "task={} {} new-flavor={} created={}".format(task_id, ro_task["target_id"], vim_flavor_id, created))
501 return "DONE", ro_vim_item_update
502 except (vimconn.VimConnException, NsWorkerException) as e:
503 self.logger.error("task={} vim={} new-flavor: {}".format(task_id, ro_task["target_id"], e))
504 ro_vim_item_update = {"vim_status": "VIM_ERROR",
505 "created": created,
506 "vim_details": str(e)}
507 return "FAILED", ro_vim_item_update
508
509
510 class VimInteractionSdnNet(VimInteractionBase):
511
512 @staticmethod
513 def _match_pci(port_pci, mapping):
514 """
515 Check if port_pci matches with mapping
516 mapping can have brackets to indicate that several chars are accepted. e.g
517 pci '0000:af:10.1' matches with '0000:af:1[01].[1357]'
518 :param port_pci: text
519 :param mapping: text, can contain brackets to indicate several chars are available
520 :return: True if matches, False otherwise
521 """
522 if not port_pci or not mapping:
523 return False
524 if port_pci == mapping:
525 return True
526
527 mapping_index = 0
528 pci_index = 0
529 while True:
530 bracket_start = mapping.find("[", mapping_index)
531 if bracket_start == -1:
532 break
533 bracket_end = mapping.find("]", bracket_start)
534 if bracket_end == -1:
535 break
536 length = bracket_start - mapping_index
537 if length and port_pci[pci_index:pci_index + length] != mapping[mapping_index:bracket_start]:
538 return False
539 if port_pci[pci_index + length] not in mapping[bracket_start+1:bracket_end]:
540 return False
541 pci_index += length + 1
542 mapping_index = bracket_end + 1
543
544 if port_pci[pci_index:] != mapping[mapping_index:]:
545 return False
546 return True
547
548 def _get_interfaces(self, vlds_to_connect, vim_account_id):
549 """
550 :param vlds_to_connect: list with format vnfrs:<id>:vld.<vld_id> or nsrs:<id>:vld.<vld_id>
551 :param vim_account_id:
552 :return:
553 """
554 interfaces = []
555 for vld in vlds_to_connect:
556 table, _, db_id = vld.partition(":")
557 db_id, _, vld = db_id.partition(":")
558 _, _, vld_id = vld.partition(".")
559 if table == "vnfrs":
560 q_filter = {"vim-account-id": vim_account_id, "_id": db_id}
561 iface_key = "vnf-vld-id"
562 else: # table == "nsrs"
563 q_filter = {"vim-account-id": vim_account_id, "nsr-id-ref": db_id}
564 iface_key = "ns-vld-id"
565 db_vnfrs = self.db.get_list("vnfrs", q_filter=q_filter)
566 for db_vnfr in db_vnfrs:
567 for vdu_index, vdur in enumerate(db_vnfr.get("vdur", ())):
568 for iface_index, interface in enumerate(vdur["interfaces"]):
569 if interface.get(iface_key) == vld_id and \
570 interface.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
571 # only SR-IOV o PT
572 interface_ = interface.copy()
573 interface_["id"] = "vnfrs:{}:vdu.{}.interfaces.{}".format(db_vnfr["_id"], vdu_index,
574 iface_index)
575 if vdur.get("status") == "ERROR":
576 interface_["status"] = "ERROR"
577 interfaces.append(interface_)
578 return interfaces
579
580 def refresh(self, ro_task):
581 # look for task create
582 task_create_index, _ = next(i_t for i_t in enumerate(ro_task["tasks"])
583 if i_t[1] and i_t[1]["action"] == "CREATE" and i_t[1]["status"] != "FINISHED")
584
585 return self.new(ro_task, task_create_index, None)
586
587 def new(self, ro_task, task_index, task_depends):
588
589 task = ro_task["tasks"][task_index]
590 task_id = task["task_id"]
591 target_vim = self.my_vims[ro_task["target_id"]]
592
593 sdn_net_id = ro_task["vim_info"]["vim_id"]
594
595 created_items = ro_task["vim_info"].get("created_items")
596 connected_ports = ro_task["vim_info"].get("connected_ports", [])
597 new_connected_ports = []
598 last_update = ro_task["vim_info"].get("last_update", 0)
599 sdn_status = ro_task["vim_info"].get("vim_status", "BUILD") or "BUILD"
600 error_list = []
601 created = ro_task["vim_info"].get("created", False)
602
603 try:
604
605 # CREATE
606 params = task["params"]
607 vlds_to_connect = params["vlds"]
608 associated_vim = params["target_vim"]
609 additional_ports = params.get("sdn-ports") or () # external additional ports
610 _, _, vim_account_id = associated_vim.partition(":")
611 if associated_vim:
612 # get associated VIM
613 if associated_vim not in self.db_vims:
614 self.db_vims[associated_vim] = self.db.get_one("vim_accounts", {"_id": vim_account_id})
615 db_vim = self.db_vims[associated_vim]
616
617 # look for ports to connect
618 ports = self._get_interfaces(vlds_to_connect, vim_account_id)
619 # print(ports)
620
621 sdn_ports = []
622 pending_ports = error_ports = 0
623 vlan_used = None
624 sdn_need_update = False
625 for port in ports:
626 vlan_used = port.get("vlan") or vlan_used
627 # TODO. Do not connect if already done
628 if not port.get("compute_node") or not port.get("pci"):
629 if port.get("status") == "ERROR":
630 error_ports += 1
631 else:
632 pending_ports += 1
633 continue
634 pmap = None
635 compute_node_mappings = next((c for c in db_vim["config"].get("sdn-port-mapping", ())
636 if c and c["compute_node"] == port["compute_node"]), None)
637 if compute_node_mappings:
638 # process port_mapping pci of type 0000:af:1[01].[1357]
639 pmap = next((p for p in compute_node_mappings["ports"]
640 if self._match_pci(port["pci"], p.get("pci"))), None)
641 if not pmap:
642 if not db_vim["config"].get("mapping_not_needed"):
643 error_list.append("Port mapping not found for compute_node={} pci={}".format(
644 port["compute_node"], port["pci"]))
645 continue
646 pmap = {}
647
648 service_endpoint_id = "{}:{}".format(port["compute_node"], port["pci"])
649 new_port = {
650 "service_endpoint_id": pmap.get("service_endpoint_id") or service_endpoint_id,
651 "service_endpoint_encapsulation_type": "dot1q" if port["type"] == "SR-IOV" else None,
652 "service_endpoint_encapsulation_info": {
653 "vlan": port.get("vlan"),
654 "mac": port.get("mac_address"),
655 "device_id": pmap.get("device_id") or port["compute_node"], # device_id
656 "device_interface_id": pmap.get("device_interface_id") or port["pci"],
657 "switch_dpid": pmap.get("switch_id") or pmap.get("switch_dpid"),
658 "switch_port": pmap.get("switch_port"),
659 "service_mapping_info": pmap.get("service_mapping_info"),
660 }
661 }
662
663 # TODO
664 # if port["modified_at"] > last_update:
665 # sdn_need_update = True
666 new_connected_ports.append(port["id"]) # TODO
667 sdn_ports.append(new_port)
668
669 if error_ports:
670 error_list.append("{} interfaces have not been created as VDU is on ERROR status".format(error_ports))
671
672 # connect external ports
673 for index, additional_port in enumerate(additional_ports):
674 additional_port_id = additional_port.get("service_endpoint_id") or "external-{}".format(index)
675 sdn_ports.append({
676 "service_endpoint_id": additional_port_id,
677 "service_endpoint_encapsulation_type": additional_port.get("service_endpoint_encapsulation_type",
678 "dot1q"),
679 "service_endpoint_encapsulation_info": {
680 "vlan": additional_port.get("vlan") or vlan_used,
681 "mac": additional_port.get("mac_address"),
682 "device_id": additional_port.get("device_id"),
683 "device_interface_id": additional_port.get("device_interface_id"),
684 "switch_dpid": additional_port.get("switch_dpid") or additional_port.get("switch_id"),
685 "switch_port": additional_port.get("switch_port"),
686 "service_mapping_info": additional_port.get("service_mapping_info"),
687 }})
688 new_connected_ports.append(additional_port_id)
689 sdn_info = ""
690 # if there are more ports to connect or they have been modified, call create/update
691 if error_list:
692 sdn_status = "ERROR"
693 sdn_info = "; ".join(error_list)
694 elif set(connected_ports) != set(new_connected_ports) or sdn_need_update:
695 last_update = time.time()
696 if not sdn_net_id:
697 if len(sdn_ports) < 2:
698 sdn_status = "ACTIVE"
699 if not pending_ports:
700 self.logger.debug("task={} {} new-sdn-net done, less than 2 ports".
701 format(task_id, ro_task["target_id"]))
702 else:
703 net_type = params.get("type") or "ELAN"
704 sdn_net_id, created_items = target_vim.create_connectivity_service(
705 net_type, sdn_ports)
706 created = True
707 self.logger.debug("task={} {} new-sdn-net={} created={}".
708 format(task_id, ro_task["target_id"], sdn_net_id, created))
709 else:
710 created_items = target_vim.edit_connectivity_service(
711 sdn_net_id, conn_info=created_items, connection_points=sdn_ports)
712 created = True
713 self.logger.debug("task={} {} update-sdn-net={} created={}".
714 format(task_id, ro_task["target_id"], sdn_net_id, created))
715 connected_ports = new_connected_ports
716 elif sdn_net_id:
717 wim_status_dict = target_vim.get_connectivity_service_status(sdn_net_id, conn_info=created_items)
718 sdn_status = wim_status_dict["sdn_status"]
719 if wim_status_dict.get("sdn_info"):
720 sdn_info = str(wim_status_dict.get("sdn_info")) or ""
721 if wim_status_dict.get("error_msg"):
722 sdn_info = wim_status_dict.get("error_msg") or ""
723
724 if pending_ports:
725 if sdn_status != "ERROR":
726 sdn_info = "Waiting for getting interfaces location from VIM. Obtained '{}' of {}".format(
727 len(ports)-pending_ports, len(ports))
728 if sdn_status == "ACTIVE":
729 sdn_status = "BUILD"
730
731 ro_vim_item_update = {"vim_id": sdn_net_id,
732 "vim_status": sdn_status,
733 "created": created,
734 "created_items": created_items,
735 "connected_ports": connected_ports,
736 "vim_details": sdn_info,
737 "last_update": last_update}
738 return sdn_status, ro_vim_item_update
739 except Exception as e:
740 self.logger.error("task={} vim={} new-net: {}".format(task_id, ro_task["target_id"], e),
741 exc_info=not isinstance(e, (sdnconn.SdnConnectorError, vimconn.VimConnException)))
742 ro_vim_item_update = {"vim_status": "VIM_ERROR",
743 "created": created,
744 "vim_details": str(e)}
745 return "FAILED", ro_vim_item_update
746
747 def delete(self, ro_task, task_index):
748 task = ro_task["tasks"][task_index]
749 task_id = task["task_id"]
750 sdn_vim_id = ro_task["vim_info"].get("vim_id")
751 ro_vim_item_update_ok = {"vim_status": "DELETED",
752 "created": False,
753 "vim_details": "DELETED",
754 "vim_id": None}
755 try:
756 if sdn_vim_id:
757 target_vim = self.my_vims[ro_task["target_id"]]
758 target_vim.delete_connectivity_service(sdn_vim_id, ro_task["vim_info"].get("created_items"))
759
760 except Exception as e:
761 if isinstance(e, sdnconn.SdnConnectorError) and e.http_code == HTTPStatus.NOT_FOUND.value:
762 ro_vim_item_update_ok["vim_details"] = "already deleted"
763 else:
764 self.logger.error("ro_task={} vim={} del-sdn-net={}: {}".format(ro_task["_id"], ro_task["target_id"],
765 sdn_vim_id, e),
766 exc_info=not isinstance(e, (sdnconn.SdnConnectorError, vimconn.VimConnException)))
767 ro_vim_item_update = {"vim_status": "VIM_ERROR",
768 "vim_details": "Error while deleting: {}".format(e)}
769 return "FAILED", ro_vim_item_update
770
771 self.logger.debug("task={} {} del-sdn-net={} {}".format(task_id, ro_task["target_id"], sdn_vim_id,
772 ro_vim_item_update_ok.get("vim_details", "")))
773 return "DONE", ro_vim_item_update_ok
774
775
776 class NsWorker(threading.Thread):
777 REFRESH_BUILD = 5 # 5 seconds
778 REFRESH_ACTIVE = 60 # 1 minute
779 REFRESH_ERROR = 600
780 REFRESH_IMAGE = 3600 * 10
781 REFRESH_DELETE = 3600 * 10
782 QUEUE_SIZE = 2000
783 # TODO delete assigment_lock = Lock()
784 terminate = False
785 # TODO delete assignment = {}
786 MAX_TIME_LOCKED = 3600
787 MAX_TIME_VIM_LOCKED = 120
788
789 def __init__(self, worker_index, config, plugins, db):
790 """
791
792 :param worker_index: thread index
793 :param config: general configuration of RO, among others the process_id with the docker id where it runs
794 :param plugins: global shared dict with the loaded plugins
795 :param db: database class instance to use
796 """
797 threading.Thread.__init__(self)
798 self.config = config
799 self.plugins = plugins
800 self.plugin_name = "unknown"
801 self.logger = logging.getLogger('ro.worker{}'.format(worker_index))
802 self.worker_index = worker_index
803 self.task_queue = queue.Queue(self.QUEUE_SIZE)
804 self.my_vims = {} # targetvim: vimplugin class
805 self.db_vims = {} # targetvim: vim information from database
806 self.vim_targets = [] # targetvim list
807 self.my_id = config["process_id"] + ":" + str(worker_index)
808 self.db = db
809 self.item2class = {
810 "net": VimInteractionNet(self.db, self.my_vims, self.db_vims, self.logger),
811 "vdu": VimInteractionVdu(self.db, self.my_vims, self.db_vims, self.logger),
812 "image": VimInteractionImage(self.db, self.my_vims, self.db_vims, self.logger),
813 "flavor": VimInteractionFlavor(self.db, self.my_vims, self.db_vims, self.logger),
814 "sdn_net": VimInteractionSdnNet(self.db, self.my_vims, self.db_vims, self.logger),
815 }
816 self.time_last_task_processed = None
817 self.tasks_to_delete = [] # lists of tasks to delete because nsrs or vnfrs has been deleted from db
818
819 def insert_task(self, task):
820 try:
821 self.task_queue.put(task, False)
822 return None
823 except queue.Full:
824 raise NsWorkerException("timeout inserting a task")
825
826 def terminate(self):
827 self.insert_task("exit")
828
829 def del_task(self, task):
830 with self.task_lock:
831 if task["status"] == "SCHEDULED":
832 task["status"] = "SUPERSEDED"
833 return True
834 else: # task["status"] == "processing"
835 self.task_lock.release()
836 return False
837
838 def _process_vim_config(self, target_id, db_vim):
839 """
840 Process vim config, creating vim configuration files as ca_cert
841 :param target_id: vim/sdn/wim + id
842 :param db_vim: Vim dictionary obtained from database
843 :return: None. Modifies vim. Creates a folder target_id:worker_index and several files
844 """
845 if not db_vim.get("config"):
846 return
847 file_name = ""
848 try:
849 if db_vim["config"].get("ca_cert_content"):
850 file_name = "{}:{}".format(target_id, self.worker_index)
851 try:
852 mkdir(file_name)
853 except FileExistsError:
854 pass
855 file_name = file_name + "/ca_cert"
856 with open(file_name, "w") as f:
857 f.write(db_vim["config"]["ca_cert_content"])
858 del db_vim["config"]["ca_cert_content"]
859 db_vim["config"]["ca_cert"] = file_name
860 except Exception as e:
861 raise NsWorkerException("Error writing to file '{}': {}".format(file_name, e))
862
863 def _load_plugin(self, name, type="vim"):
864 # type can be vim or sdn
865 if "rovim_dummy" not in self.plugins:
866 self.plugins["rovim_dummy"] = VimDummyConnector
867 if "rosdn_dummy" not in self.plugins:
868 self.plugins["rosdn_dummy"] = SdnDummyConnector
869 if name in self.plugins:
870 return self.plugins[name]
871 try:
872 for v in iter_entry_points('osm_ro{}.plugins'.format(type), name):
873 self.plugins[name] = v.load()
874 except Exception as e:
875 raise NsWorkerException("Cannot load plugin osm_{}: {}".format(name, e))
876 if name and name not in self.plugins:
877 raise NsWorkerException("Plugin 'osm_{n}' has not been installed".format(n=name))
878 return self.plugins[name]
879
880 def _unload_vim(self, target_id):
881 """
882 Unload a vim_account. Removes it from self db_vims dictionary, my_vims dictionary and vim_targets list
883 :param target_id: Contains type:_id; where type can be 'vim', ...
884 :return: None.
885 """
886 try:
887 target, _, _id = target_id.partition(":")
888 self.db_vims.pop(target_id, None)
889 self.my_vims.pop(target_id, None)
890 self.vim_targets.remove(target_id)
891 rmtree("{}:{}".format(target_id, self.worker_index))
892 except FileNotFoundError:
893 pass # this is raised by rmtree if folder does not exist
894 except Exception as e:
895 self.logger.error("Cannot unload {}: {}".format(target_id, e))
896
897 def _check_vim(self, target_id):
898 """
899 Load a VIM/SDN/WIM (if not loaded) and check connectivity, updating database with ENABLE or ERROR
900 :param target_id: Contains type:_id; type can be 'vim', 'sdn' or 'wim'
901 :return: None.
902 """
903 target, _, _id = target_id.partition(":")
904 now = time.time()
905 update_dict = {}
906 unset_dict = {}
907 op_text = ""
908 step = ""
909 loaded = target_id in self.my_vims
910 target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
911 try:
912 step = "Getting {} from db".format(target_id)
913 db_vim = self.db.get_one(target_database, {"_id": _id})
914 for op_index, operation in enumerate(db_vim["_admin"].get("operations", ())):
915 if operation["operationState"] != "PROCESSING":
916 continue
917 locked_at = operation.get("locked_at")
918 if locked_at is not None and locked_at >= now - self.MAX_TIME_VIM_LOCKED:
919 # some other thread is doing this operation
920 return
921 # lock
922 op_text = "_admin.operations.{}.".format(op_index)
923 if not self.db.set_one(target_database,
924 q_filter={"_id": _id,
925 op_text + "operationState": "PROCESSING",
926 op_text + "locked_at": locked_at
927 },
928 update_dict={op_text + "locked_at": now,
929 "admin.current_operation": op_index},
930 fail_on_empty=False):
931 return
932 unset_dict[op_text + "locked_at"] = None
933 unset_dict["current_operation"] = None
934 step = "Loading " + target_id
935 error_text = self._load_vim(target_id)
936 if not error_text:
937 step = "Checking connectivity"
938 if target == 'vim':
939 self.my_vims[target_id].check_vim_connectivity()
940 else:
941 self.my_vims[target_id].check_credentials()
942 update_dict["_admin.operationalState"] = "ENABLED"
943 update_dict["_admin.detailed-status"] = ""
944 unset_dict[op_text + "detailed-status"] = None
945 update_dict[op_text + "operationState"] = "COMPLETED"
946 return
947
948 except Exception as e:
949 error_text = "{}: {}".format(step, e)
950 self.logger.error("{} for {}: {}".format(step, target_id, e))
951
952 finally:
953 if update_dict or unset_dict:
954 if error_text:
955 update_dict[op_text + "operationState"] = "FAILED"
956 update_dict[op_text + "detailed-status"] = error_text
957 unset_dict.pop(op_text + "detailed-status", None)
958 update_dict["_admin.operationalState"] = "ERROR"
959 update_dict["_admin.detailed-status"] = error_text
960 if op_text:
961 update_dict[op_text + "statusEnteredTime"] = now
962 self.db.set_one(target_database, q_filter={"_id": _id}, update_dict=update_dict, unset=unset_dict,
963 fail_on_empty=False)
964 if not loaded:
965 self._unload_vim(target_id)
966
967 def _reload_vim(self, target_id):
968 if target_id in self.vim_targets:
969 self._load_vim(target_id)
970 else:
971 # if the vim is not loaded, but database information of VIM is cached at self.db_vims,
972 # just remove it to force load again next time it is needed
973 self.db_vims.pop(target_id, None)
974
975 def _load_vim(self, target_id):
976 """
977 Load or reload a vim_account, sdn_controller or wim_account.
978 Read content from database, load the plugin if not loaded.
979 In case of error loading the plugin, it load a failing VIM_connector
980 It fills self db_vims dictionary, my_vims dictionary and vim_targets list
981 :param target_id: Contains type:_id; where type can be 'vim', ...
982 :return: None if ok, descriptive text if error
983 """
984 target, _, _id = target_id.partition(":")
985 target_database = "vim_accounts" if target == "vim" else "wim_accounts" if target == "wim" else "sdns"
986 plugin_name = ""
987 vim = None
988 try:
989 step = "Getting {}={} from db".format(target, _id)
990 # TODO process for wim, sdnc, ...
991 vim = self.db.get_one(target_database, {"_id": _id})
992
993 # if deep_get(vim, "config", "sdn-controller"):
994 # step = "Getting sdn-controller-id='{}' from db".format(vim["config"]["sdn-controller"])
995 # db_sdn = self.db.get_one("sdns", {"_id": vim["config"]["sdn-controller"]})
996
997 step = "Decrypting password"
998 schema_version = vim.get("schema_version")
999 self.db.encrypt_decrypt_fields(vim, "decrypt", fields=('password', 'secret'),
1000 schema_version=schema_version, salt=_id)
1001 self._process_vim_config(target_id, vim)
1002 if target == "vim":
1003 plugin_name = "rovim_" + vim["vim_type"]
1004 step = "Loading plugin '{}'".format(plugin_name)
1005 vim_module_conn = self._load_plugin(plugin_name)
1006 step = "Loading {}'".format(target_id)
1007 self.my_vims[target_id] = vim_module_conn(
1008 uuid=vim['_id'], name=vim['name'],
1009 tenant_id=vim.get('vim_tenant_id'), tenant_name=vim.get('vim_tenant_name'),
1010 url=vim['vim_url'], url_admin=None,
1011 user=vim['vim_user'], passwd=vim['vim_password'],
1012 config=vim.get('config') or {}, persistent_info={}
1013 )
1014 else: # sdn
1015 plugin_name = "rosdn_" + vim["type"]
1016 step = "Loading plugin '{}'".format(plugin_name)
1017 vim_module_conn = self._load_plugin(plugin_name, "sdn")
1018 step = "Loading {}'".format(target_id)
1019 wim = deepcopy(vim)
1020 wim_config = wim.pop("config", {}) or {}
1021 wim["uuid"] = wim["_id"]
1022 wim["wim_url"] = wim["url"]
1023 if wim.get("dpid"):
1024 wim_config["dpid"] = wim.pop("dpid")
1025 if wim.get("switch_id"):
1026 wim_config["switch_id"] = wim.pop("switch_id")
1027 self.my_vims[target_id] = vim_module_conn(wim, wim, wim_config) # wim, wim_account, config
1028 self.db_vims[target_id] = vim
1029 self.error_status = None
1030 self.logger.info("Connector loaded for {}, plugin={}".format(target_id, plugin_name))
1031 except Exception as e:
1032 self.logger.error("Cannot load {} plugin={}: {} {}".format(
1033 target_id, plugin_name, step, e))
1034 self.db_vims[target_id] = vim or {}
1035 self.db_vims[target_id] = FailingConnector(str(e))
1036 error_status = "{} Error: {}".format(step, e)
1037 return error_status
1038 finally:
1039 if target_id not in self.vim_targets:
1040 self.vim_targets.append(target_id)
1041
1042 def _get_db_task(self):
1043 """
1044 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
1045 :return: None
1046 """
1047 now = time.time()
1048 if not self.time_last_task_processed:
1049 self.time_last_task_processed = now
1050 try:
1051 while True:
1052 locked = self.db.set_one(
1053 "ro_tasks",
1054 q_filter={"target_id": self.vim_targets,
1055 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
1056 "locked_at.lt": now - self.MAX_TIME_LOCKED,
1057 "to_check_at.lt": self.time_last_task_processed},
1058 update_dict={"locked_by": self.my_id, "locked_at": now},
1059 fail_on_empty=False)
1060 if locked:
1061 # read and return
1062 ro_task = self.db.get_one(
1063 "ro_tasks",
1064 q_filter={"target_id": self.vim_targets,
1065 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
1066 "locked_at": now})
1067 return ro_task
1068 if self.time_last_task_processed == now:
1069 self.time_last_task_processed = None
1070 return None
1071 else:
1072 self.time_last_task_processed = now
1073 # self.time_last_task_processed = min(self.time_last_task_processed + 1000, now)
1074
1075 except DbException as e:
1076 self.logger.error("Database exception at _get_db_task: {}".format(e))
1077 except Exception as e:
1078 self.logger.critical("Unexpected exception at _get_db_task: {}".format(e), exc_info=True)
1079 return None
1080
1081 def _delete_task(self, ro_task, task_index, task_depends, db_update):
1082 """
1083 Determine if this task need to be done or superseded
1084 :return: None
1085 """
1086 my_task = ro_task["tasks"][task_index]
1087 task_id = my_task["task_id"]
1088 needed_delete = ro_task["vim_info"]["created"] or ro_task["vim_info"].get("created_items", False)
1089 if my_task["status"] == "FAILED":
1090 return None, None # TODO need to be retry??
1091 try:
1092 for index, task in enumerate(ro_task["tasks"]):
1093 if index == task_index or not task:
1094 continue # own task
1095 if my_task["target_record"] == task["target_record"] and task["action"] == "CREATE":
1096 # set to finished
1097 db_update["tasks.{}.status".format(index)] = task["status"] = "FINISHED"
1098 elif task["action"] == "CREATE" and task["status"] not in ("FINISHED", "SUPERSEDED"):
1099 needed_delete = False
1100 if needed_delete:
1101 return self.item2class[my_task["item"]].delete(ro_task, task_index)
1102 else:
1103 return "SUPERSEDED", None
1104 except Exception as e:
1105 if not isinstance(e, NsWorkerException):
1106 self.logger.critical("Unexpected exception at _delete_task task={}: {}".format(task_id, e),
1107 exc_info=True)
1108 return "FAILED", {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1109
1110 def _create_task(self, ro_task, task_index, task_depends, db_update):
1111 """
1112 Determine if this task need to create something at VIM
1113 :return: None
1114 """
1115 my_task = ro_task["tasks"][task_index]
1116 task_id = my_task["task_id"]
1117 task_status = None
1118 if my_task["status"] == "FAILED":
1119 return None, None # TODO need to be retry??
1120 elif my_task["status"] == "SCHEDULED":
1121 # check if already created by another task
1122 for index, task in enumerate(ro_task["tasks"]):
1123 if index == task_index or not task:
1124 continue # own task
1125 if task["action"] == "CREATE" and task["status"] not in ("SCHEDULED", "FINISHED", "SUPERSEDED"):
1126 return task["status"], "COPY_VIM_INFO"
1127
1128 try:
1129 task_status, ro_vim_item_update = self.item2class[my_task["item"]].new(
1130 ro_task, task_index, task_depends)
1131 # TODO update other CREATE tasks
1132 except Exception as e:
1133 if not isinstance(e, NsWorkerException):
1134 self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
1135 task_status = "FAILED"
1136 ro_vim_item_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1137 # TODO update ro_vim_item_update
1138 return task_status, ro_vim_item_update
1139 else:
1140 return None, None
1141
1142 def _get_dependency(self, task_id, ro_task=None, target_id=None):
1143 """
1144 Look for dependency task
1145 :param task_id: Can be one of
1146 1. target_vim+blank+task.target_record_id: "(vim|sdn|wim):<id> (vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1147 2. task.target_record_id: "(vnfrs|nsrs):(vld|vdu|flavor|image).<id>"
1148 3. task.task_id: "<action_id>:number"
1149 :param ro_task:
1150 :param target_id:
1151 :return: database ro_task plus index of task
1152 """
1153 if task_id.startswith("vim:") or task_id.startswith("sdn:") or task_id.startswith("wim:"):
1154 target_id, _, task_id = task_id.partition(" ")
1155
1156 if task_id.startswith("nsrs:") or task_id.startswith("vnfrs:"):
1157 ro_task_dependency = self.db.get_one(
1158 "ro_tasks",
1159 q_filter={"target_id": target_id,
1160 "tasks.target_record_id": task_id
1161 },
1162 fail_on_empty=False)
1163 if ro_task_dependency:
1164 for task_index, task in enumerate(ro_task_dependency["tasks"]):
1165 if task["target_record_id"] == task_id:
1166 return ro_task_dependency, task_index
1167
1168 else:
1169 if ro_task:
1170 for task_index, task in enumerate(ro_task["tasks"]):
1171 if task and task["task_id"] == task_id:
1172 return ro_task, task_index
1173 ro_task_dependency = self.db.get_one(
1174 "ro_tasks",
1175 q_filter={"tasks.ANYINDEX.task_id": task_id,
1176 "tasks.ANYINDEX.target_record.ne": None
1177 },
1178 fail_on_empty=False)
1179 if ro_task_dependency:
1180 for task_index, task in ro_task_dependency["tasks"]:
1181 if task["task_id"] == task_id:
1182 return ro_task_dependency, task_index
1183 raise NsWorkerException("Cannot get depending task {}".format(task_id))
1184
1185 def _process_pending_tasks(self, ro_task):
1186 ro_task_id = ro_task["_id"]
1187 now = time.time()
1188 next_check_at = now + (24*60*60) # one day
1189 db_ro_task_update = {}
1190
1191 def _update_refresh(new_status):
1192 # compute next_refresh
1193 nonlocal task
1194 nonlocal next_check_at
1195 nonlocal db_ro_task_update
1196 nonlocal ro_task
1197
1198 next_refresh = time.time()
1199 if task["item"] in ("image", "flavor"):
1200 next_refresh += self.REFRESH_IMAGE
1201 elif new_status == "BUILD":
1202 next_refresh += self.REFRESH_BUILD
1203 elif new_status == "DONE":
1204 next_refresh += self.REFRESH_ACTIVE
1205 else:
1206 next_refresh += self.REFRESH_ERROR
1207 next_check_at = min(next_check_at, next_refresh)
1208 db_ro_task_update["vim_info.refresh_at"] = next_refresh
1209 ro_task["vim_info"]["refresh_at"] = next_refresh
1210
1211 try:
1212 # 0 get task_status_create
1213 task_status_create = None
1214 task_create = next((t for t in ro_task["tasks"] if t and t["action"] == "CREATE" and
1215 t["status"] in ("BUILD", "DONE")), None)
1216 if task_create:
1217 task_status_create = task_create["status"]
1218 # 1. look for tasks in status SCHEDULED, or in status CREATE if action is DONE or BUILD
1219 for task_action in ("DELETE", "CREATE", "EXEC"):
1220 db_vim_update = None
1221 new_status = None
1222 for task_index, task in enumerate(ro_task["tasks"]):
1223 if not task:
1224 continue # task deleted
1225 task_depends = {}
1226 target_update = None
1227 if (task_action in ("DELETE", "EXEC") and task["status"] not in ("SCHEDULED", "BUILD")) or \
1228 task["action"] != task_action or \
1229 (task_action == "CREATE" and task["status"] in ("FINISHED", "SUPERSEDED")):
1230 continue
1231 task_path = "tasks.{}.status".format(task_index)
1232 try:
1233 db_vim_info_update = None
1234 if task["status"] == "SCHEDULED":
1235 # check if tasks that this depends on have been completed
1236 dependency_not_completed = False
1237 for dependency_task_id in (task.get("depends_on") or ()):
1238 dependency_ro_task, dependency_task_index = \
1239 self._get_dependency(dependency_task_id, target_id=ro_task["target_id"])
1240 dependency_task = dependency_ro_task["tasks"][dependency_task_index]
1241 if dependency_task["status"] == "SCHEDULED":
1242 dependency_not_completed = True
1243 next_check_at = min(next_check_at, dependency_ro_task["to_check_at"])
1244 break
1245 elif dependency_task["status"] == "FAILED":
1246 error_text = "Cannot {} {} because depends on failed {} {} id={}): {}".format(
1247 task["action"], task["item"], dependency_task["action"],
1248 dependency_task["item"], dependency_task_id,
1249 dependency_ro_task["vim_info"].get("vim_details"))
1250 self.logger.error("task={} {}".format(task["task_id"], error_text))
1251 raise NsWorkerException(error_text)
1252
1253 task_depends[dependency_task_id] = dependency_ro_task["vim_info"]["vim_id"]
1254 task_depends["TASK-{}".format(dependency_task_id)] = \
1255 dependency_ro_task["vim_info"]["vim_id"]
1256 if dependency_not_completed:
1257 # TODO set at vim_info.vim_details that it is waiting
1258 continue
1259
1260 if task["action"] == "DELETE":
1261 new_status, db_vim_info_update = self._delete_task(ro_task, task_index,
1262 task_depends, db_ro_task_update)
1263 new_status = "FINISHED" if new_status == "DONE" else new_status
1264 # ^with FINISHED instead of DONE it will not be refreshing
1265 if new_status in ("FINISHED", "SUPERSEDED"):
1266 target_update = "DELETE"
1267 elif task["action"] == "EXEC":
1268 new_status, db_vim_info_update, db_task_update = self.item2class[task["item"]].exec(
1269 ro_task, task_index, task_depends)
1270 new_status = "FINISHED" if new_status == "DONE" else new_status
1271 # ^with FINISHED instead of DONE it will not be refreshing
1272 if db_task_update:
1273 # load into database the modified db_task_update "retries" and "next_retry"
1274 if db_task_update.get("retries"):
1275 db_ro_task_update["tasks.{}.retries".format(task_index)] = db_task_update["retries"]
1276 next_check_at = time.time() + db_task_update.get("next_retry", 60)
1277 target_update = None
1278 elif task["action"] == "CREATE":
1279 if task["status"] == "SCHEDULED":
1280 if task_status_create:
1281 new_status = task_status_create
1282 target_update = "COPY_VIM_INFO"
1283 else:
1284 new_status, db_vim_info_update = \
1285 self.item2class[task["item"]].new(ro_task, task_index, task_depends)
1286 # self._create_task(ro_task, task_index, task_depends, db_ro_task_update)
1287 _update_refresh(new_status)
1288 else:
1289 if ro_task["vim_info"]["refresh_at"] and now > ro_task["vim_info"]["refresh_at"]:
1290 new_status, db_vim_info_update = self.item2class[task["item"]].refresh(ro_task)
1291 _update_refresh(new_status)
1292 except Exception as e:
1293 new_status = "FAILED"
1294 db_vim_info_update = {"vim_status": "VIM_ERROR", "vim_details": str(e)}
1295 if not isinstance(e, (NsWorkerException, vimconn.VimConnException)):
1296 self.logger.error("Unexpected exception at _delete_task task={}: {}".
1297 format(task["task_id"], e), exc_info=True)
1298
1299 try:
1300 if db_vim_info_update:
1301 db_vim_update = db_vim_info_update.copy()
1302 db_ro_task_update.update({"vim_info." + k: v for k, v in db_vim_info_update.items()})
1303 ro_task["vim_info"].update(db_vim_info_update)
1304
1305 if new_status:
1306 if task_action == "CREATE":
1307 task_status_create = new_status
1308 db_ro_task_update[task_path] = new_status
1309 if target_update or db_vim_update:
1310
1311 if target_update == "DELETE":
1312 self._update_target(task, None)
1313 elif target_update == "COPY_VIM_INFO":
1314 self._update_target(task, ro_task["vim_info"])
1315 else:
1316 self._update_target(task, db_vim_update)
1317
1318 except Exception as e:
1319 if isinstance(e, DbException) and e.http_code == HTTPStatus.NOT_FOUND:
1320 # if the vnfrs or nsrs has been removed from database, this task must be removed
1321 self.logger.debug("marking to delete task={}".format(task["task_id"]))
1322 self.tasks_to_delete.append(task)
1323 else:
1324 self.logger.error("Unexpected exception at _update_target task={}: {}".
1325 format(task["task_id"], e), exc_info=True)
1326
1327 q_filter = {"_id": ro_task["_id"], "to_check_at": ro_task["to_check_at"], "locked_at": ro_task["locked_at"]}
1328 # modify own task. Try filtering by to_next_check. For race condition if to_check_at has been modified,
1329 # outside this task (by ro_nbi) do not update it
1330 db_ro_task_update["locked_by"] = None
1331 # locked_at converted to int only for debugging. When has not decimals it means it has been unlocked
1332 db_ro_task_update["locked_at"] = int(now - self.MAX_TIME_LOCKED)
1333 db_ro_task_update["to_check_at"] = next_check_at
1334 if not self.db.set_one("ro_tasks",
1335 update_dict=db_ro_task_update,
1336 q_filter=q_filter,
1337 fail_on_empty=False):
1338 del db_ro_task_update["to_check_at"]
1339 del q_filter["to_check_at"]
1340 self.db.set_one("ro_tasks",
1341 q_filter=q_filter,
1342 update_dict=db_ro_task_update,
1343 fail_on_empty=True)
1344 except DbException as e:
1345 self.logger.error("ro_task={} Error updating database {}".format(ro_task_id, e))
1346 except Exception as e:
1347 self.logger.error("Error executing ro_task={}: {}".format(ro_task_id, e), exc_info=True)
1348
1349 def _update_target(self, task, ro_vim_item_update):
1350 table, _, temp = task["target_record"].partition(":")
1351 _id, _, path_vim_status = temp.partition(":")
1352 path_item = path_vim_status[:path_vim_status.rfind(".")]
1353 path_item = path_item[:path_item.rfind(".")]
1354 # path_vim_status: dot separated list targeting vim information, e.g. "vdur.10.vim_info.vim:id"
1355 # path_item: dot separated list targeting record information, e.g. "vdur.10"
1356 if ro_vim_item_update:
1357 update_dict = {path_vim_status + "." + k: v for k, v in ro_vim_item_update.items() if k in
1358 ('vim_id', 'vim_details', 'vim_name', 'vim_status', 'interfaces')}
1359 if path_vim_status.startswith("vdur."):
1360 # for backward compatibility, add vdur.name apart from vdur.vim_name
1361 if ro_vim_item_update.get("vim_name"):
1362 update_dict[path_item + ".name"] = ro_vim_item_update["vim_name"]
1363 # for backward compatibility, add vdur.vim-id apart from vdur.vim_id
1364 if ro_vim_item_update.get("vim_id"):
1365 update_dict[path_item + ".vim-id"] = ro_vim_item_update["vim_id"]
1366 # update general status
1367 if ro_vim_item_update.get("vim_status"):
1368 update_dict[path_item + ".status"] = ro_vim_item_update["vim_status"]
1369 if ro_vim_item_update.get("interfaces"):
1370 path_interfaces = path_item + ".interfaces"
1371 for i, iface in enumerate(ro_vim_item_update.get("interfaces")):
1372 if iface:
1373 update_dict.update({path_interfaces + ".{}.".format(i) + k: v for k, v in iface.items() if
1374 k in ('vlan', 'compute_node', 'pci')})
1375 # put ip_address and mac_address with ip-address and mac-address
1376 if iface.get('ip_address'):
1377 update_dict[path_interfaces + ".{}.".format(i) + "ip-address"] = iface['ip_address']
1378 if iface.get('mac_address'):
1379 update_dict[path_interfaces + ".{}.".format(i) + "mac-address"] = iface['mac_address']
1380 if iface.get("mgmt_vnf_interface") and iface.get("ip_address"):
1381 update_dict["ip-address"] = iface.get("ip_address").split(";")[0]
1382 if iface.get("mgmt_vdu_interface") and iface.get("ip_address"):
1383 update_dict[path_item + ".ip-address"] = iface.get("ip_address").split(";")[0]
1384
1385 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict)
1386 else:
1387 update_dict = {path_item + ".status": "DELETED"}
1388 self.db.set_one(table, q_filter={"_id": _id}, update_dict=update_dict, unset={path_vim_status: None})
1389
1390 def _process_delete_db_tasks(self):
1391 """
1392 Delete task from database because vnfrs or nsrs or both have been deleted
1393 :return: None. Uses and modify self.tasks_to_delete
1394 """
1395 while self.tasks_to_delete:
1396 task = self.tasks_to_delete[0]
1397 vnfrs_deleted = None
1398 nsr_id = task["nsr_id"]
1399 if task["target_record"].startswith("vnfrs:"):
1400 # check if nsrs is present
1401 if self.db.get_one("nsrs", {"_id": nsr_id}, fail_on_empty=False):
1402 vnfrs_deleted = task["target_record"].split(":")[1]
1403 try:
1404 self.delete_db_tasks(self.db, nsr_id, vnfrs_deleted)
1405 except Exception as e:
1406 self.logger.error("Error deleting task={}: {}".format(task["task_id"], e))
1407 self.tasks_to_delete.pop(0)
1408
1409 @staticmethod
1410 def delete_db_tasks(db, nsr_id, vnfrs_deleted):
1411 """
1412 Static method because it is called from osm_ng_ro.ns
1413 :param db: instance of database to use
1414 :param nsr_id: affected nsrs id
1415 :param vnfrs_deleted: only tasks with this vnfr id. If None, all affected by nsr_id
1416 :return: None, exception is fails
1417 """
1418 retries = 5
1419 for retry in range(retries):
1420 ro_tasks = db.get_list("ro_tasks", {"tasks.nsr_id": nsr_id})
1421 now = time.time()
1422 conflict = False
1423 for ro_task in ro_tasks:
1424 db_update = {}
1425 to_delete_ro_task = True
1426 for index, task in enumerate(ro_task["tasks"]):
1427 if not task:
1428 pass
1429 elif (not vnfrs_deleted and task["nsr_id"] == nsr_id) or \
1430 (vnfrs_deleted and task["target_record"].startswith("vnfrs:"+vnfrs_deleted)):
1431 db_update["tasks.{}".format(index)] = None
1432 else:
1433 to_delete_ro_task = False # used by other nsr, ro_task cannot be deleted
1434 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
1435 if to_delete_ro_task:
1436 if not db.del_one("ro_tasks",
1437 q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
1438 fail_on_empty=False):
1439 conflict = True
1440 elif db_update:
1441 db_update["modified_at"] = now
1442 if not db.set_one("ro_tasks",
1443 q_filter={"_id": ro_task["_id"], "modified_at": ro_task["modified_at"]},
1444 update_dict=db_update,
1445 fail_on_empty=False):
1446 conflict = True
1447 if not conflict:
1448 return
1449 else:
1450 raise NsWorkerException("Exceeded {} retries".format(retries))
1451
1452 def run(self):
1453 # load database
1454 self.logger.debug("Starting")
1455 while True:
1456 # step 1: get commands from queue
1457 try:
1458 task = self.task_queue.get(block=False if self.my_vims else True)
1459 if task[0] == "terminate":
1460 break
1461 elif task[0] == "load_vim":
1462 self._load_vim(task[1])
1463 elif task[0] == "unload_vim":
1464 self._unload_vim(task[1])
1465 elif task[0] == "reload_vim":
1466 self._reload_vim(task[1])
1467 elif task[0] == "check_vim":
1468 self._check_vim(task[1])
1469 continue
1470 except Exception as e:
1471 if isinstance(e, queue.Empty):
1472 pass
1473 else:
1474 self.logger.critical("Error processing task: {}".format(e), exc_info=True)
1475
1476 # step 2: process pending_tasks, delete not needed tasks
1477 try:
1478 if self.tasks_to_delete:
1479 self._process_delete_db_tasks()
1480 busy = False
1481 ro_task = self._get_db_task()
1482 if ro_task:
1483 self._process_pending_tasks(ro_task)
1484 busy = True
1485 if not busy:
1486 time.sleep(5)
1487 except Exception as e:
1488 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
1489
1490 self.logger.debug("Finishing")