fix 1150 limit error length of a failing SDN connector
[osm/RO.git] / RO / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
31
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
41 BUILD: not used
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
51 the FIND params
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
53 on a net creation
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
58 sdn_port_id:
59 sdn_net_id:
60 vim_info
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
65 vim_wim_actions
66 M depends: dict with task_index(from depends_on) to dependency task
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
71 be done
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
76
77 """
78
79 import threading
80 import time
81 import queue
82 import logging
83 from osm_ro_plugin import vimconn
84 from osm_ro_plugin.sdnconn import SdnConnectorError
85 import yaml
86 from osm_ro.db_base import db_base_Exception
87 from http import HTTPStatus
88 from copy import deepcopy
89
90 __author__ = "Alfonso Tierno, Pablo Montes"
91 __date__ = "$28-Sep-2017 12:07:15$"
92
93
94 def is_task_id(task_id):
95 return task_id.startswith("TASK-")
96
97
98 class VimThreadException(Exception):
99 pass
100
101
102 class VimThreadExceptionNotFound(VimThreadException):
103 pass
104
105
106 class vim_thread(threading.Thread):
107 REFRESH_BUILD = 5 # 5 seconds
108 REFRESH_ACTIVE = 60 # 1 minute
109 REFRESH_ERROR = 600
110 REFRESH_DELETE = 3600 * 10
111
112 def __init__(self, task_lock, plugins, name=None, wim_account_id=None, datacenter_tenant_id=None, db=None):
113 """Init a thread.
114 Arguments:
115 'id' number of thead
116 'name' name of thread
117 'host','user': host ip or name to manage and user
118 'db', 'db_lock': database class and lock to use it in exclusion
119 """
120 threading.Thread.__init__(self)
121 self.plugins = plugins
122 self.plugin_name = "unknown"
123 self.vim = None
124 self.sdnconnector = None
125 self.sdnconn_config = None
126 self.error_status = None
127 self.wim_account_id = wim_account_id
128 self.datacenter_tenant_id = datacenter_tenant_id
129 self.port_mappings = None
130 if self.wim_account_id:
131 self.target_k = "wim_account_id"
132 self.target_v = self.wim_account_id
133 else:
134 self.target_k = "datacenter_vim_id"
135 self.target_v = self.datacenter_tenant_id
136 if not name:
137 self.name = wim_account_id or str(datacenter_tenant_id)
138 else:
139 self.name = name
140 self.vim_persistent_info = {}
141 self.my_id = self.name[:64]
142
143 self.logger = logging.getLogger('openmano.{}.{}'.format("vim" if self.datacenter_tenant_id else "sdn",
144 self.name))
145 self.db = db
146
147 self.task_lock = task_lock
148 self.task_queue = queue.Queue(2000)
149
150 def _proccess_sdn_exception(self, exc):
151 if isinstance(exc, SdnConnectorError):
152 raise
153 else:
154 self.logger.error("plugin={} throws a non SdnConnectorError exception {}".format(self.plugin_name, exc),
155 exc_info=True)
156 raise SdnConnectorError(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc
157
158 def _proccess_vim_exception(self, exc):
159 if isinstance(exc, vimconn.VimConnException):
160 raise
161 else:
162 self.logger.error("plugin={} throws a non vimconnException exception {}".format(self.plugin_name, exc),
163 exc_info=True)
164 raise vimconn.VimConnException(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc
165
166 def get_vim_sdn_connector(self):
167 if self.datacenter_tenant_id:
168 try:
169 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
170 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
171 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
172 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
173 'user', 'passwd', 'dt.config as dt_config')
174 where_ = {"dt.uuid": self.datacenter_tenant_id}
175 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
176 vim = vims[0]
177 vim_config = {}
178 if vim["config"]:
179 vim_config.update(yaml.load(vim["config"], Loader=yaml.Loader))
180 if vim["dt_config"]:
181 vim_config.update(yaml.load(vim["dt_config"], Loader=yaml.Loader))
182 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
183 vim_config['datacenter_id'] = vim.get('datacenter_id')
184
185 # get port_mapping
186 # vim_port_mappings = self.ovim.get_of_port_mappings(
187 # db_filter={"datacenter_id": vim_config['datacenter_id']})
188 # vim_config["wim_external_ports"] = [x for x in vim_port_mappings
189 # if x["service_mapping_info"].get("wim")]
190 self.plugin_name = "rovim_" + vim["type"]
191 self.vim = self.plugins[self.plugin_name](
192 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
193 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
194 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
195 user=vim['user'], passwd=vim['passwd'],
196 config=vim_config, persistent_info=self.vim_persistent_info
197 )
198 self.error_status = None
199 self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
200 self.datacenter_tenant_id, self.plugin_name))
201 except Exception as e:
202 self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {}".format(
203 self.datacenter_tenant_id, self.plugin_name, e))
204 self.vim = None
205 self.error_status = "Error loading vimconnector: {}".format(e)
206 else:
207 try:
208 wim_account = self.db.get_rows(FROM="wim_accounts", WHERE={"uuid": self.wim_account_id})[0]
209 wim = self.db.get_rows(FROM="wims", WHERE={"uuid": wim_account["wim_id"]})[0]
210 if wim["config"]:
211 self.sdnconn_config = yaml.load(wim["config"], Loader=yaml.Loader)
212 else:
213 self.sdnconn_config = {}
214 if wim_account["config"]:
215 self.sdnconn_config.update(yaml.load(wim_account["config"], Loader=yaml.Loader))
216 self.port_mappings = self.db.get_rows(FROM="wim_port_mappings", WHERE={"wim_id": wim_account["wim_id"]})
217 if self.port_mappings:
218 self.sdnconn_config["service_endpoint_mapping"] = self.port_mappings
219 self.plugin_name = "rosdn_" + wim["type"]
220 self.sdnconnector = self.plugins[self.plugin_name](
221 wim, wim_account, config=self.sdnconn_config)
222 self.error_status = None
223 self.logger.info("Sdn Connector loaded for wim_account={}, plugin={}".format(
224 self.wim_account_id, self.plugin_name))
225 except Exception as e:
226 self.logger.error("Cannot load sdn connector for wim_account={}, plugin={}: {}".format(
227 self.wim_account_id, self.plugin_name, e), exc_info=True)
228 self.sdnconnector = None
229 self.error_status = self._format_vim_error_msg("Error loading sdn connector: {}".format(e))
230
231 def _get_db_task(self):
232 """
233 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
234 :return: None
235 """
236 now = time.time()
237 try:
238 database_limit = 20
239 task_related = None
240 while True:
241 # get 20 (database_limit) entries each time
242 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
243 WHERE={self.target_k: self.target_v,
244 "status": ['SCHEDULED', 'BUILD', 'DONE'],
245 "worker": [None, self.my_id], "modified_at<=": now
246 },
247 ORDER_BY=("modified_at", "created_at",),
248 LIMIT=database_limit)
249 if not vim_actions:
250 return None, None
251 # if vim_actions[0]["modified_at"] > now:
252 # return int(vim_actions[0] - now)
253 for task in vim_actions:
254 # block related task
255 if task_related == task["related"]:
256 continue # ignore if a locking has already tried for these task set
257 task_related = task["related"]
258 # lock ...
259 self.db.update_rows("vim_wim_actions", UPDATE={"worker": self.my_id}, modified_time=0,
260 WHERE={self.target_k: self.target_v,
261 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
262 "worker": [None, self.my_id],
263 "related": task_related,
264 "item": task["item"],
265 })
266 # ... and read all related and check if locked
267 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
268 WHERE={self.target_k: self.target_v,
269 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
270 "related": task_related,
271 "item": task["item"],
272 },
273 ORDER_BY=("created_at",))
274 # check that all related tasks have been locked. If not release and try again. It can happen
275 # for race conditions if a new related task has been inserted by nfvo in the process
276 some_tasks_locked = False
277 some_tasks_not_locked = False
278 creation_task = None
279 for relate_task in related_tasks:
280 if relate_task["worker"] != self.my_id:
281 some_tasks_not_locked = True
282 else:
283 some_tasks_locked = True
284 if not creation_task and relate_task["action"] in ("CREATE", "FIND"):
285 creation_task = relate_task
286 if some_tasks_not_locked:
287 if some_tasks_locked: # unlock
288 self.db.update_rows("vim_wim_actions", UPDATE={"worker": None}, modified_time=0,
289 WHERE={self.target_k: self.target_v,
290 "worker": self.my_id,
291 "related": task_related,
292 "item": task["item"],
293 })
294 continue
295
296 task["params"] = None
297 if task["extra"]:
298 extra = yaml.load(task["extra"], Loader=yaml.Loader)
299 else:
300 extra = {}
301 task["extra"] = extra
302 if extra.get("depends_on"):
303 task["depends"] = {}
304 if extra.get("params"):
305 task["params"] = deepcopy(extra["params"])
306 return task, related_tasks
307 except Exception as e:
308 self.logger.critical("Unexpected exception at _get_db_task: " + str(e), exc_info=True)
309 return None, None
310
311 def _delete_task(self, task):
312 """
313 Determine if this task need to be done or superseded
314 :return: None
315 """
316
317 def copy_extra_created(copy_to, copy_from):
318 copy_to["created"] = copy_from["created"]
319 if copy_from.get("sdn_net_id"):
320 copy_to["sdn_net_id"] = copy_from["sdn_net_id"]
321 if copy_from.get("interfaces"):
322 copy_to["interfaces"] = copy_from["interfaces"]
323 if copy_from.get("sdn-ports"):
324 copy_to["sdn-ports"] = copy_from["sdn-ports"]
325 if copy_from.get("created_items"):
326 if not copy_to.get("created_items"):
327 copy_to["created_items"] = {}
328 copy_to["created_items"].update(copy_from["created_items"])
329
330 task_create = None
331 dependency_task = None
332 deletion_needed = task["extra"].get("created", False)
333 if task["status"] == "FAILED":
334 return # TODO need to be retry??
335 try:
336 # get all related tasks. task of creation must be the first in the list of related_task,
337 # unless the deletion fails and it is pendingit fails
338 # TODO this should be removed, passing related_tasks
339 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
340 WHERE={self.target_k: self.target_v,
341 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
342 "action": ["FIND", "CREATE"],
343 "related": task["related"],
344 },
345 ORDER_BY=("created_at",),
346 )
347 for related_task in related_tasks:
348 if related_task["item"] == task["item"] and related_task["item_id"] == task["item_id"]:
349 task_create = related_task
350 # TASK_CREATE
351 if related_task["extra"]:
352 extra_created = yaml.load(related_task["extra"], Loader=yaml.Loader)
353 if extra_created.get("created"):
354 deletion_needed = True
355 related_task["extra"] = extra_created
356 elif not dependency_task:
357 dependency_task = related_task
358 if task_create and dependency_task:
359 break
360
361 # mark task_create as FINISHED
362 if task_create:
363 self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"},
364 WHERE={self.target_k: self.target_v,
365 "instance_action_id": task_create["instance_action_id"],
366 "task_index": task_create["task_index"]
367 })
368 if not deletion_needed:
369 return False
370 elif dependency_task:
371 # move create information from task_create to relate_task
372 extra_new_created = yaml.load(dependency_task["extra"], Loader=yaml.Loader) or {}
373 extra_new_created["created"] = extra_created["created"]
374 copy_extra_created(copy_to=extra_new_created, copy_from=extra_created)
375
376 self.db.update_rows("vim_wim_actions",
377 UPDATE={"extra": yaml.safe_dump(extra_new_created, default_flow_style=True,
378 width=256),
379 "vim_id": task_create.get("vim_id")},
380 WHERE={self.target_k: self.target_v,
381 "instance_action_id": dependency_task["instance_action_id"],
382 "task_index": dependency_task["task_index"]
383 })
384 return False
385 elif task_create:
386 task["vim_id"] = task_create["vim_id"]
387 copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"])
388 # Ensure this task extra information is stored at database
389 self.db.update_rows("vim_wim_actions",
390 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True,
391 width=256)},
392 WHERE={self.target_k: self.target_v,
393 "instance_action_id": task["instance_action_id"],
394 "task_index": task["task_index"],
395 })
396 return True
397 return deletion_needed
398
399 except Exception as e:
400 self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True)
401
402 def _refres_vm(self, task):
403 """Call VIM to get VMs status"""
404 database_update = None
405
406 vim_id = task["vim_id"]
407 vm_to_refresh_list = [vim_id]
408 try:
409 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
410 vim_info = vim_dict[vim_id]
411 except vimconn.VimConnException as e:
412 # Mark all tasks at VIM_ERROR status
413 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
414 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
415
416 task_id = task["instance_action_id"] + "." + str(task["task_index"])
417 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
418
419 # check and update interfaces
420 task_warning_msg = ""
421 for interface in vim_info.get("interfaces", ()):
422 vim_interface_id = interface["vim_interface_id"]
423 if vim_interface_id not in task["extra"]["interfaces"]:
424 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
425 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
426 continue
427 task_interface = task["extra"]["interfaces"][vim_interface_id]
428 task_vim_interface = task_interface.get("vim_info")
429 if task_vim_interface != interface:
430 # delete old port
431 # if task_interface.get("sdn_port_id"):
432 # try:
433 # self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
434 # task_interface["sdn_port_id"] = None
435 # except ovimException as e:
436 # error_text = "ovimException deleting external_port={}: {}".format(
437 # task_interface["sdn_port_id"], e)
438 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
439 # task_warning_msg += error_text
440 # # TODO Set error_msg at instance_nets instead of instance VMs
441
442 # Create SDN port
443 # sdn_net_id = task_interface.get("sdn_net_id")
444 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
445 # sdn_port_name = sdn_net_id + "." + task["vim_id"]
446 # sdn_port_name = sdn_port_name[:63]
447 # try:
448 # sdn_port_id = self.ovim.new_external_port(
449 # {"compute_node": interface["compute_node"],
450 # "pci": interface["pci"],
451 # "vlan": interface.get("vlan"),
452 # "net_id": sdn_net_id,
453 # "region": self.vim["config"]["datacenter_id"],
454 # "name": sdn_port_name,
455 # "mac": interface.get("mac_address")})
456 # task_interface["sdn_port_id"] = sdn_port_id
457 # except (ovimException, Exception) as e:
458 # error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
459 # format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
460 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
461 # task_warning_msg += error_text
462 # # TODO Set error_msg at instance_nets instead of instance VMs
463
464 self.db.update_rows('instance_interfaces',
465 UPDATE={"mac_address": interface.get("mac_address"),
466 "ip_address": interface.get("ip_address"),
467 "vim_interface_id": interface.get("vim_interface_id"),
468 "vim_info": interface.get("vim_info"),
469 "sdn_port_id": task_interface.get("sdn_port_id"),
470 "compute_node": interface.get("compute_node"),
471 "pci": interface.get("pci"),
472 "vlan": interface.get("vlan")},
473 WHERE={'uuid': task_interface["iface_id"]})
474 task_interface["vim_info"] = interface
475 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
476 # # TODO Send message to task SDN to update
477
478 # check and update task and instance_vms database
479 vim_info_error_msg = None
480 if vim_info.get("error_msg"):
481 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
482 elif task_warning_msg:
483 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
484 task_vim_info = task["extra"].get("vim_info")
485 task_error_msg = task.get("error_msg")
486 task_vim_status = task["extra"].get("vim_status")
487 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
488 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
489 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
490 if vim_info.get("vim_info"):
491 database_update["vim_info"] = vim_info["vim_info"]
492
493 task["extra"]["vim_status"] = vim_info["status"]
494 task["error_msg"] = vim_info_error_msg
495 if vim_info.get("vim_info"):
496 task["extra"]["vim_info"] = vim_info["vim_info"]
497
498 return database_update
499
500 def _refres_net(self, task):
501 """Call VIM to get network status"""
502 database_update = None
503
504 vim_id = task["vim_id"]
505 net_to_refresh_list = [vim_id]
506 try:
507 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
508 vim_info = vim_dict[vim_id]
509 except vimconn.VimConnException as e:
510 # Mark all tasks at VIM_ERROR status
511 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
512 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
513
514 task_id = task["instance_action_id"] + "." + str(task["task_index"])
515 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
516
517 task_vim_info = task["extra"].get("vim_info")
518 task_vim_status = task["extra"].get("vim_status")
519 task_error_msg = task.get("error_msg")
520 # task_sdn_net_id = task["extra"].get("sdn_net_id")
521
522 vim_info_status = vim_info["status"]
523 vim_info_error_msg = vim_info.get("error_msg")
524 # get ovim status
525 # if task_sdn_net_id:
526 # try:
527 # sdn_net = self.ovim.show_network(task_sdn_net_id)
528 # except (ovimException, Exception) as e:
529 # text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
530 # self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
531 # sdn_net = {"status": "ERROR", "last_error": text_error}
532 # if sdn_net["status"] == "ERROR":
533 # if not vim_info_error_msg:
534 # vim_info_error_msg = str(sdn_net.get("last_error"))
535 # else:
536 # vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
537 # self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
538 # self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
539 # vim_info_status = "ERROR"
540 # elif sdn_net["status"] == "BUILD":
541 # if vim_info_status == "ACTIVE":
542 # vim_info_status = "BUILD"
543
544 # update database
545 if vim_info_error_msg:
546 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
547 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
548 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
549 task["extra"]["vim_status"] = vim_info_status
550 task["error_msg"] = vim_info_error_msg
551 if vim_info.get("vim_info"):
552 task["extra"]["vim_info"] = vim_info["vim_info"]
553 database_update = {"status": vim_info_status, "error_msg": vim_info_error_msg}
554 if vim_info.get("vim_info"):
555 database_update["vim_info"] = vim_info["vim_info"]
556 return database_update
557
558 def _proccess_pending_tasks(self, task, related_tasks):
559 old_task_status = task["status"]
560 create_or_find = False # if as result of processing this task something is created or found
561 next_refresh = 0
562 task_id = task["instance_action_id"] + "." + str(task["task_index"])
563
564 try:
565 if task["status"] == "SCHEDULED":
566 # check if tasks that this depends on have been completed
567 dependency_not_completed = False
568 dependency_modified_at = 0
569 for task_index in task["extra"].get("depends_on", ()):
570 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
571 if not task_dependency:
572 raise VimThreadException(
573 "Cannot get depending net task trying to get depending task {}.{}".format(
574 task["instance_action_id"], task_index))
575 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
576 # database must be look again
577 if task_dependency["status"] == "SCHEDULED":
578 dependency_not_completed = True
579 dependency_modified_at = task_dependency["modified_at"]
580 break
581 elif task_dependency["status"] == "FAILED":
582 raise VimThreadException(
583 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
584 task["action"], task["item"],
585 task["instance_action_id"], task["task_index"],
586 task_dependency["instance_action_id"], task_dependency["task_index"],
587 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
588
589 task["depends"]["TASK-"+str(task_index)] = task_dependency
590 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], task_index)] = task_dependency
591 if dependency_not_completed:
592 # Move this task to the time dependency is going to be modified plus 10 seconds.
593 self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10,
594 UPDATE={"worker": None},
595 WHERE={self.target_k: self.target_v, "worker": self.my_id,
596 "related": task["related"],
597 })
598 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
599 # if task["extra"]["tries"] > 3:
600 # raise VimThreadException(
601 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
602 # "(task {}.{})".format(task["action"], task["item"],
603 # task["instance_action_id"], task["task_index"],
604 # task_dependency["instance_action_id"], task_dependency["task_index"]
605 # task_dependency["action"], task_dependency["item"]))
606 return
607
608 database_update = None
609 if task["action"] == "DELETE":
610 deleted_needed = self._delete_task(task)
611 if not deleted_needed:
612 task["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
613 task["error_msg"] = None
614
615 if task["status"] == "SUPERSEDED":
616 # not needed to do anything but update database with the new status
617 database_update = None
618 elif not self.vim and not self.sdnconnector:
619 task["status"] = "FAILED"
620 task["error_msg"] = self.error_status
621 database_update = {"status": "VIM_ERROR" if self.datacenter_tenant_id else "WIM_ERROR",
622 "error_msg": task["error_msg"]}
623 elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"):
624 # Do nothing, just copy values from one to another and update database
625 task["status"] = related_tasks[0]["status"]
626 task["error_msg"] = related_tasks[0]["error_msg"]
627 task["vim_id"] = related_tasks[0]["vim_id"]
628 extra = yaml.load(related_tasks[0]["extra"], Loader=yaml.Loader)
629 task["extra"]["vim_status"] = extra.get("vim_status")
630 next_refresh = related_tasks[0]["modified_at"] + 0.001
631 database_update = {"status": task["extra"].get("vim_status", "VIM_ERROR"),
632 "error_msg": task["error_msg"]}
633 if task["item"] == 'instance_vms':
634 database_update["vim_vm_id"] = task["vim_id"]
635 elif task["item"] == 'instance_nets':
636 database_update["vim_net_id"] = task["vim_id"]
637 elif task["item"] == 'instance_vms':
638 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
639 database_update = self._refres_vm(task)
640 create_or_find = True
641 elif task["action"] == "CREATE":
642 create_or_find = True
643 database_update = self.new_vm(task)
644 elif task["action"] == "DELETE":
645 self.del_vm(task)
646 else:
647 raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"]))
648 elif task["item"] == 'instance_nets':
649 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
650 database_update = self._refres_net(task)
651 create_or_find = True
652 elif task["action"] == "CREATE":
653 create_or_find = True
654 database_update = self.new_net(task)
655 elif task["action"] == "DELETE":
656 self.del_net(task)
657 elif task["action"] == "FIND":
658 database_update = self.get_net(task)
659 else:
660 raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"]))
661 elif task["item"] == 'instance_wim_nets':
662 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
663 database_update = self.new_or_update_sdn_net(task)
664 create_or_find = True
665 elif task["action"] == "CREATE":
666 create_or_find = True
667 database_update = self.new_or_update_sdn_net(task)
668 elif task["action"] == "DELETE":
669 self.del_sdn_net(task)
670 elif task["action"] == "FIND":
671 database_update = self.get_sdn_net(task)
672 else:
673 raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"]))
674 elif task["item"] == 'instance_sfis':
675 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
676 database_update = self._refres_sfis(task)
677 create_or_find = True
678 elif task["action"] == "CREATE":
679 create_or_find = True
680 database_update = self.new_sfi(task)
681 elif task["action"] == "DELETE":
682 self.del_sfi(task)
683 else:
684 raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"]))
685 elif task["item"] == 'instance_sfs':
686 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
687 database_update = self._refres_sfs(task)
688 create_or_find = True
689 elif task["action"] == "CREATE":
690 create_or_find = True
691 database_update = self.new_sf(task)
692 elif task["action"] == "DELETE":
693 self.del_sf(task)
694 else:
695 raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"]))
696 elif task["item"] == 'instance_classifications':
697 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
698 database_update = self._refres_classifications(task)
699 create_or_find = True
700 elif task["action"] == "CREATE":
701 create_or_find = True
702 database_update = self.new_classification(task)
703 elif task["action"] == "DELETE":
704 self.del_classification(task)
705 else:
706 raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"]))
707 elif task["item"] == 'instance_sfps':
708 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
709 database_update = self._refres_sfps(task)
710 create_or_find = True
711 elif task["action"] == "CREATE":
712 create_or_find = True
713 database_update = self.new_sfp(task)
714 elif task["action"] == "DELETE":
715 self.del_sfp(task)
716 else:
717 raise vimconn.VimConnException(self.name + "unknown task action {}".format(task["action"]))
718 else:
719 raise vimconn.VimConnException(self.name + "unknown task item {}".format(task["item"]))
720 # TODO
721 except Exception as e:
722 if not isinstance(e, VimThreadException):
723 self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
724 task["error_msg"] = str(e)
725 task["status"] = "FAILED"
726 database_update = {"status": "VIM_ERROR" if task["item"] != "instance_wim_nets" else "WIM_ERROR",
727 "error_msg": task["error_msg"]}
728 # if task["item"] == 'instance_vms':
729 # database_update["vim_vm_id"] = None
730 # elif task["item"] == 'instance_nets':
731 # database_update["vim_net_id"] = None
732
733 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
734 task_id, task["item"], task["action"], task["status"],
735 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
736 try:
737 if not next_refresh:
738 if task["status"] == "DONE":
739 next_refresh = time.time()
740 if task["extra"].get("vim_status") == "BUILD":
741 next_refresh += self.REFRESH_BUILD
742 elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR", "WIM_ERROR"):
743 next_refresh += self.REFRESH_ERROR
744 elif task["extra"].get("vim_status") == "DELETED":
745 next_refresh += self.REFRESH_DELETE
746 else:
747 next_refresh += self.REFRESH_ACTIVE
748 elif task["status"] == "FAILED":
749 next_refresh = time.time() + self.REFRESH_DELETE
750
751 if create_or_find:
752 # modify all related task with action FIND/CREATED non SCHEDULED
753 self.db.update_rows(
754 table="vim_wim_actions", modified_time=next_refresh + 0.001,
755 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
756 "error_msg": task["error_msg"],
757 },
758
759 WHERE={self.target_k: self.target_v,
760 "worker": self.my_id,
761 "action": ["FIND", "CREATE"],
762 "related": task["related"],
763 "status<>": "SCHEDULED",
764 })
765 # modify own task
766 self.db.update_rows(
767 table="vim_wim_actions", modified_time=next_refresh,
768 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
769 "error_msg": task["error_msg"],
770 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
771 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
772 # Unlock tasks
773 self.db.update_rows(
774 table="vim_wim_actions", modified_time=0,
775 UPDATE={"worker": None},
776 WHERE={self.target_k: self.target_v,
777 "worker": self.my_id,
778 "related": task["related"],
779 })
780
781 # Update table instance_actions
782 if old_task_status == "SCHEDULED" and task["status"] != old_task_status:
783 self.db.update_rows(
784 table="instance_actions",
785 UPDATE={("number_failed" if task["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
786 WHERE={"uuid": task["instance_action_id"]})
787 if database_update:
788 where_filter = {"related": task["related"]}
789 if task["item"] == "instance_nets" and task["datacenter_vim_id"]:
790 where_filter["datacenter_tenant_id"] = task["datacenter_vim_id"]
791 self.db.update_rows(table=task["item"],
792 UPDATE=database_update,
793 WHERE=where_filter)
794 except db_base_Exception as e:
795 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
796
797 def insert_task(self, task):
798 try:
799 self.task_queue.put(task, False)
800 return None
801 except queue.Full:
802 raise vimconn.VimConnException(self.name + ": timeout inserting a task")
803
804 def del_task(self, task):
805 with self.task_lock:
806 if task["status"] == "SCHEDULED":
807 task["status"] = "SUPERSEDED"
808 return True
809 else: # task["status"] == "processing"
810 self.task_lock.release()
811 return False
812
813 def run(self):
814 self.logger.debug("Starting")
815 while True:
816 self.get_vim_sdn_connector()
817 self.logger.debug("Vimconnector loaded")
818 reload_thread = False
819
820 while True:
821 try:
822 while not self.task_queue.empty():
823 task = self.task_queue.get()
824 if isinstance(task, list):
825 pass
826 elif isinstance(task, str):
827 if task == 'exit':
828 return 0
829 elif task == 'reload':
830 reload_thread = True
831 break
832 self.task_queue.task_done()
833 if reload_thread:
834 break
835
836 task, related_tasks = self._get_db_task()
837 if task:
838 self._proccess_pending_tasks(task, related_tasks)
839 else:
840 time.sleep(5)
841
842 except Exception as e:
843 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
844
845 self.logger.debug("Finishing")
846
847 def _look_for_task(self, instance_action_id, task_id):
848 """
849 Look for a concrete task at vim_actions database table
850 :param instance_action_id: The instance_action_id
851 :param task_id: Can have several formats:
852 <task index>: integer
853 TASK-<task index> :backward compatibility,
854 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
855 :return: Task dictionary or None if not found
856 """
857 if isinstance(task_id, int):
858 task_index = task_id
859 else:
860 if task_id.startswith("TASK-"):
861 task_id = task_id[5:]
862 ins_action_id, _, task_index = task_id.rpartition(".")
863 if ins_action_id:
864 instance_action_id = ins_action_id
865
866 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
867 "task_index": task_index})
868 if not tasks:
869 return None
870 task = tasks[0]
871 task["params"] = None
872 task["depends"] = {}
873 if task["extra"]:
874 extra = yaml.load(task["extra"], Loader=yaml.Loader)
875 task["extra"] = extra
876 task["params"] = extra.get("params")
877 else:
878 task["extra"] = {}
879 return task
880
881 @staticmethod
882 def _format_vim_error_msg(error_text, max_length=1024):
883 if error_text and len(error_text) >= max_length:
884 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
885 return error_text
886
887 def new_vm(self, task):
888 task_id = task["instance_action_id"] + "." + str(task["task_index"])
889 try:
890 params = task["params"]
891 depends = task.get("depends")
892 net_list = params[5]
893 for net in net_list:
894 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
895 network_id = task["depends"][net["net_id"]].get("vim_id")
896 if not network_id:
897 raise VimThreadException(
898 "Cannot create VM because depends on a network not created or found: " +
899 str(depends[net["net_id"]]["error_msg"]))
900 net["net_id"] = network_id
901 params_copy = deepcopy(params)
902 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
903
904 # fill task_interfaces. Look for snd_net_id at database for each interface
905 task_interfaces = {}
906 for iface in params_copy[5]:
907 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
908 result = self.db.get_rows(
909 SELECT=('sdn_net_id', 'interface_id'),
910 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
911 WHERE={'ii.uuid': iface["uuid"]})
912 if result:
913 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
914 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
915 else:
916 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
917 iface["uuid"]),
918 exc_info=True)
919
920 task["vim_info"] = {}
921 task["extra"]["interfaces"] = task_interfaces
922 task["extra"]["created"] = True
923 task["extra"]["created_items"] = created_items
924 task["extra"]["vim_status"] = "BUILD"
925 task["error_msg"] = None
926 task["status"] = "DONE"
927 task["vim_id"] = vim_vm_id
928 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
929 return instance_element_update
930
931 except (vimconn.VimConnException, VimThreadException) as e:
932 self.logger.error("task={} new-VM: {}".format(task_id, e))
933 error_text = self._format_vim_error_msg(str(e))
934 task["error_msg"] = error_text
935 task["status"] = "FAILED"
936 task["vim_id"] = None
937 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
938 return instance_element_update
939
940 def del_vm(self, task):
941 # task_id = task["instance_action_id"] + "." + str(task["task_index"])
942 vm_vim_id = task["vim_id"]
943 # interfaces = task["extra"].get("interfaces", ())
944 try:
945 # for iface in interfaces.values():
946 # if iface.get("sdn_port_id"):
947 # try:
948 # self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
949 # except ovimException as e:
950 # self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
951 # task_id, iface["sdn_port_id"], e), exc_info=True)
952 # # TODO Set error_msg at instance_nets
953
954 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
955 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
956 task["error_msg"] = None
957 return None
958
959 except vimconn.VimConnException as e:
960 task["error_msg"] = self._format_vim_error_msg(str(e))
961 if isinstance(e, vimconn.VimConnNotFoundException):
962 # If not found mark as Done and fill error_msg
963 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
964 return None
965 task["status"] = "FAILED"
966 return None
967
968 def _get_net_internal(self, task, filter_param):
969 """
970 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
971 :param task: task for this find or find-or-create action
972 :param filter_param: parameters to send to the vimconnector
973 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
974 when network is not found or found more than one
975 """
976 vim_nets = self.vim.get_network_list(filter_param)
977 if not vim_nets:
978 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
979 elif len(vim_nets) > 1:
980 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
981 vim_net_id = vim_nets[0]["id"]
982
983 # Discover if this network is managed by a sdn controller
984 sdn_net_id = None
985 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
986 WHERE={'vim_net_id': vim_net_id, 'datacenter_tenant_id': self.datacenter_tenant_id},
987 ORDER="instance_scenario_id")
988 if result:
989 sdn_net_id = result[0]['sdn_net_id']
990
991 task["status"] = "DONE"
992 task["extra"]["vim_info"] = {}
993 task["extra"]["created"] = False
994 task["extra"]["vim_status"] = "BUILD"
995 task["extra"]["sdn_net_id"] = sdn_net_id
996 task["error_msg"] = None
997 task["vim_id"] = vim_net_id
998 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
999 "error_msg": None, "sdn_net_id": sdn_net_id}
1000 return instance_element_update
1001
1002 def get_net(self, task):
1003 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1004 try:
1005 params = task["params"]
1006 filter_param = params[0]
1007 instance_element_update = self._get_net_internal(task, filter_param)
1008 return instance_element_update
1009
1010 except (vimconn.VimConnException, VimThreadException) as e:
1011 self.logger.error("task={} get-net: {}".format(task_id, e))
1012 task["status"] = "FAILED"
1013 task["vim_id"] = None
1014 task["error_msg"] = self._format_vim_error_msg(str(e))
1015 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
1016 "error_msg": task["error_msg"]}
1017 return instance_element_update
1018
1019 def new_net(self, task):
1020 vim_net_id = None
1021 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1022 action_text = ""
1023 try:
1024 # FIND
1025 if task["extra"].get("find"):
1026 action_text = "finding"
1027 filter_param = task["extra"]["find"][0]
1028 try:
1029 instance_element_update = self._get_net_internal(task, filter_param)
1030 return instance_element_update
1031 except VimThreadExceptionNotFound:
1032 pass
1033 # CREATE
1034 params = task["params"]
1035 action_text = "creating VIM"
1036
1037 vim_net_id, created_items = self.vim.new_network(*params[0:5])
1038
1039 # net_name = params[0]
1040 # net_type = params[1]
1041 # wim_account_name = None
1042 # if len(params) >= 6:
1043 # wim_account_name = params[5]
1044
1045 # TODO fix at nfvo adding external port
1046 # if wim_account_name and self.vim.config["wim_external_ports"]:
1047 # # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
1048 # action_text = "attaching external port to ovim network"
1049 # sdn_port_name = "external_port"
1050 # sdn_port_data = {
1051 # "compute_node": "__WIM:" + wim_account_name[0:58],
1052 # "pci": None,
1053 # "vlan": network["vlan"],
1054 # "net_id": sdn_net_id,
1055 # "region": self.vim["config"]["datacenter_id"],
1056 # "name": sdn_port_name,
1057 # }
1058 # try:
1059 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1060 # except ovimException:
1061 # sdn_port_data["compute_node"] = "__WIM"
1062 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1063 # self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
1064 # sdn_net_id))
1065 task["status"] = "DONE"
1066 task["extra"]["vim_info"] = {}
1067 # task["extra"]["sdn_net_id"] = sdn_net_id
1068 task["extra"]["vim_status"] = "BUILD"
1069 task["extra"]["created"] = True
1070 task["extra"]["created_items"] = created_items
1071 task["error_msg"] = None
1072 task["vim_id"] = vim_net_id
1073 instance_element_update = {"vim_net_id": vim_net_id, "status": "BUILD",
1074 "created": True, "error_msg": None}
1075 return instance_element_update
1076 except vimconn.VimConnException as e:
1077 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1078 task["status"] = "FAILED"
1079 task["vim_id"] = vim_net_id
1080 task["error_msg"] = self._format_vim_error_msg(str(e))
1081 # task["extra"]["sdn_net_id"] = sdn_net_id
1082 instance_element_update = {"vim_net_id": vim_net_id, "status": "VIM_ERROR",
1083 "error_msg": task["error_msg"]}
1084 return instance_element_update
1085
1086 def del_net(self, task):
1087 net_vim_id = task["vim_id"]
1088 # sdn_net_id = task["extra"].get("sdn_net_id")
1089 try:
1090 if net_vim_id:
1091 self.vim.delete_network(net_vim_id, task["extra"].get("created_items"))
1092 # if sdn_net_id:
1093 # # Delete any attached port to this sdn network. There can be ports associated to this network in case
1094 # # it was manually done using 'openmano vim-net-sdn-attach'
1095 # port_list = self.ovim.get_ports(columns={'uuid'},
1096 # filter={'name': 'external_port', 'net_id': sdn_net_id})
1097 # for port in port_list:
1098 # self.ovim.delete_port(port['uuid'], idempotent=True)
1099 # self.ovim.delete_network(sdn_net_id, idempotent=True)
1100 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1101 task["error_msg"] = None
1102 return None
1103 except vimconn.VimConnException as e:
1104 task["error_msg"] = self._format_vim_error_msg(str(e))
1105 if isinstance(e, vimconn.VimConnNotFoundException):
1106 # If not found mark as Done and fill error_msg
1107 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1108 return None
1109 task["status"] = "FAILED"
1110 return None
1111
1112 def new_or_update_sdn_net(self, task):
1113 wimconn_net_id = task["vim_id"]
1114 created_items = task["extra"].get("created_items")
1115 connected_ports = task["extra"].get("connected_ports", [])
1116 new_connected_ports = []
1117 last_update = task["extra"].get("last_update", 0)
1118 sdn_status = task["extra"].get("vim_status", "BUILD")
1119 sdn_info = None
1120
1121 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1122 error_list = []
1123 try:
1124 # FIND
1125 if task["extra"].get("find"):
1126 wimconn_id = task["extra"]["find"][0]
1127 try:
1128 instance_element_update = self.sdnconnector.get_connectivity_service_status(wimconn_id)
1129 wimconn_net_id = wimconn_id
1130 instance_element_update = {"wim_internal_id": wimconn_net_id, "created": False, "status": "BUILD",
1131 "error_msg": None, }
1132 return instance_element_update
1133 except Exception as e:
1134 if isinstance(e, SdnConnectorError) and e.http_error == HTTPStatus.NOT_FOUND.value:
1135 pass
1136 else:
1137 self._proccess_sdn_exception(e)
1138
1139 params = task["params"]
1140 # CREATE
1141 # look for ports
1142 sdn_ports = []
1143 pending_ports = 0
1144 vlan_used = None
1145
1146 ports = self.db.get_rows(FROM='instance_interfaces', WHERE={'instance_wim_net_id': task["item_id"]})
1147 sdn_need_update = False
1148 for port in ports:
1149 vlan_used = port.get("vlan") or vlan_used
1150 # TODO. Do not connect if already done
1151 if port.get("compute_node") and port.get("pci"):
1152 for pmap in self.port_mappings:
1153 if pmap.get("device_id") == port["compute_node"] and \
1154 pmap.get("device_interface_id") == port["pci"]:
1155 break
1156 else:
1157 if self.sdnconn_config.get("mapping_not_needed"):
1158 pmap = {
1159 "service_endpoint_id": "{}:{}".format(port["compute_node"], port["pci"]),
1160 "service_endpoint_encapsulation_info": {
1161 "vlan": port["vlan"],
1162 "mac": port["mac_address"],
1163 "device_id": port["compute_node"],
1164 "device_interface_id": port["pci"]
1165 }
1166 }
1167 else:
1168 pmap = None
1169 error_list.append("Port mapping not found for compute_node={} pci={}".format(
1170 port["compute_node"], port["pci"]))
1171
1172 if pmap:
1173 if port["modified_at"] > last_update:
1174 sdn_need_update = True
1175 new_connected_ports.append(port["uuid"])
1176 sdn_ports.append({
1177 "service_endpoint_id": pmap["service_endpoint_id"],
1178 "service_endpoint_encapsulation_type": "dot1q" if port["model"] == "SR-IOV" else None,
1179 "service_endpoint_encapsulation_info": {
1180 "vlan": port["vlan"],
1181 "mac": port["mac_address"],
1182 "device_id": pmap.get("device_id"),
1183 "device_interface_id": pmap.get("device_interface_id"),
1184 "switch_dpid": pmap.get("switch_dpid"),
1185 "switch_port": pmap.get("switch_port"),
1186 "service_mapping_info": pmap.get("service_mapping_info"),
1187 }
1188 })
1189
1190 else:
1191 pending_ports += 1
1192 if pending_ports:
1193 error_list.append("Waiting for getting interfaces location from VIM. Obtained '{}' of {}"
1194 .format(len(ports)-pending_ports, len(ports)))
1195
1196 # connect external ports
1197 for index, external_port in enumerate(task["extra"].get("sdn-ports") or ()):
1198 external_port_id = external_port.get("service_endpoint_id") or str(index)
1199 sdn_ports.append({
1200 "service_endpoint_id": external_port_id,
1201 "service_endpoint_encapsulation_type": external_port.get("service_endpoint_encapsulation_type",
1202 "dot1q"),
1203 "service_endpoint_encapsulation_info": {
1204 "vlan": external_port.get("vlan") or vlan_used,
1205 "mac": external_port.get("mac_address"),
1206 "device_id": external_port.get("device_id"),
1207 "device_interface_id": external_port.get("device_interface_id"),
1208 "switch_dpid": external_port.get("switch_dpid") or external_port.get("switch_id"),
1209 "switch_port": external_port.get("switch_port"),
1210 "service_mapping_info": external_port.get("service_mapping_info"),
1211 }})
1212 new_connected_ports.append(external_port_id)
1213
1214 # if there are more ports to connect or they have been modified, call create/update
1215 try:
1216 if set(connected_ports) != set(new_connected_ports) or sdn_need_update:
1217 last_update = time.time()
1218 if not wimconn_net_id:
1219 if len(sdn_ports) < 2:
1220 if not pending_ports:
1221 sdn_status = "ACTIVE"
1222 else:
1223 if params[0] == "data":
1224 net_type = "ELAN"
1225 elif params[0] == "ptp":
1226 net_type = "ELINE"
1227 else:
1228 net_type = "L3"
1229 wimconn_net_id, created_items = self.sdnconnector.create_connectivity_service(
1230 net_type, sdn_ports)
1231 else:
1232 created_items = self.sdnconnector.edit_connectivity_service(
1233 wimconn_net_id, conn_info=created_items, connection_points=sdn_ports)
1234 connected_ports = new_connected_ports
1235 elif wimconn_net_id:
1236 wim_status_dict = self.sdnconnector.get_connectivity_service_status(wimconn_net_id,
1237 conn_info=created_items)
1238 sdn_status = wim_status_dict["sdn_status"]
1239 if wim_status_dict.get("error_msg"):
1240 error_list.append(wim_status_dict.get("error_msg"))
1241 if wim_status_dict.get("sdn_info"):
1242 sdn_info = str(wim_status_dict.get("sdn_info"))
1243 except Exception as e:
1244 self._proccess_sdn_exception(e)
1245
1246 task["status"] = "DONE"
1247 task["extra"]["vim_info"] = {}
1248 # task["extra"]["sdn_net_id"] = sdn_net_id
1249 task["extra"]["vim_status"] = sdn_status
1250 task["extra"]["created"] = True
1251 task["extra"]["created_items"] = created_items
1252 task["extra"]["connected_ports"] = connected_ports
1253 task["extra"]["last_update"] = last_update
1254 task["error_msg"] = self._format_vim_error_msg(" ; ".join(error_list))
1255 task["vim_id"] = wimconn_net_id
1256 instance_element_update = {"wim_internal_id": wimconn_net_id, "status": sdn_status,
1257 "created": True, "error_msg": task["error_msg"] or None}
1258 except (vimconn.VimConnException, SdnConnectorError) as e:
1259 self.logger.error("task={} new-sdn-net: Error: {}".format(task_id, e))
1260 task["status"] = "FAILED"
1261 task["vim_id"] = wimconn_net_id
1262 task["error_msg"] = self._format_vim_error_msg(str(e))
1263 # task["extra"]["sdn_net_id"] = sdn_net_id
1264 instance_element_update = {"wim_internal_id": wimconn_net_id, "status": "WIM_ERROR",
1265 "error_msg": task["error_msg"]}
1266
1267 if sdn_info:
1268 instance_element_update["wim_info"] = sdn_info
1269 return instance_element_update
1270
1271 def del_sdn_net(self, task):
1272 wimconn_net_id = task["vim_id"]
1273 try:
1274 try:
1275 if wimconn_net_id:
1276 self.sdnconnector.delete_connectivity_service(wimconn_net_id, task["extra"].get("created_items"))
1277 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1278 task["error_msg"] = None
1279 return None
1280 except Exception as e:
1281 self._proccess_sdn_exception(e)
1282 except SdnConnectorError as e:
1283 task["error_msg"] = self._format_vim_error_msg(str(e))
1284 if e.http_code == HTTPStatus.NOT_FOUND.value:
1285 # If not found mark as Done and fill error_msg
1286 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1287 task["error_msg"] = None
1288 return None
1289 task["status"] = "FAILED"
1290 return None
1291
1292 # Service Function Instances
1293 def new_sfi(self, task):
1294 vim_sfi_id = None
1295 try:
1296 # Waits for interfaces to be ready (avoids failure)
1297 time.sleep(1)
1298 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1299 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1300 error_text = ""
1301 interfaces = task["depends"][dep_id]["extra"].get("interfaces")
1302
1303 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1304 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1305 ingress_vim_interface_id = None
1306 egress_vim_interface_id = None
1307 for vim_interface, interface_data in interfaces.items():
1308 if interface_data.get("interface_id") == ingress_interface_id:
1309 ingress_vim_interface_id = vim_interface
1310 break
1311 if ingress_interface_id != egress_interface_id:
1312 for vim_interface, interface_data in interfaces.items():
1313 if interface_data.get("interface_id") == egress_interface_id:
1314 egress_vim_interface_id = vim_interface
1315 break
1316 else:
1317 egress_vim_interface_id = ingress_vim_interface_id
1318 if not ingress_vim_interface_id or not egress_vim_interface_id:
1319 error_text = "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1320 ingress_vim_interface_id, egress_vim_interface_id)
1321 self.logger.error(error_text)
1322 task["error_msg"] = error_text
1323 task["status"] = "FAILED"
1324 task["vim_id"] = None
1325 return None
1326 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1327 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1328 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1329 ingress_port_id_list = [ingress_vim_interface_id]
1330 egress_port_id_list = [egress_vim_interface_id]
1331 name = "sfi-{}".format(task["item_id"][:8])
1332 # By default no form of IETF SFC Encapsulation will be used
1333 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1334
1335 task["extra"]["created"] = True
1336 task["extra"]["vim_status"] = "ACTIVE"
1337 task["error_msg"] = None
1338 task["status"] = "DONE"
1339 task["vim_id"] = vim_sfi_id
1340 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1341 return instance_element_update
1342
1343 except (vimconn.VimConnException, VimThreadException) as e:
1344 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1345 error_text = self._format_vim_error_msg(str(e))
1346 task["error_msg"] = error_text
1347 task["status"] = "FAILED"
1348 task["vim_id"] = None
1349 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1350 return instance_element_update
1351
1352 def del_sfi(self, task):
1353 sfi_vim_id = task["vim_id"]
1354 try:
1355 self.vim.delete_sfi(sfi_vim_id)
1356 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1357 task["error_msg"] = None
1358 return None
1359
1360 except vimconn.VimConnException as e:
1361 task["error_msg"] = self._format_vim_error_msg(str(e))
1362 if isinstance(e, vimconn.VimConnNotFoundException):
1363 # If not found mark as Done and fill error_msg
1364 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1365 return None
1366 task["status"] = "FAILED"
1367 return None
1368
1369 def new_sf(self, task):
1370 vim_sf_id = None
1371 try:
1372 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1373 error_text = ""
1374 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1375 # sfis = next(iter(task.get("depends").values())).get("extra").get("params")[5]
1376 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1377 sfi_id_list = []
1378 for sfi in sfis:
1379 sfi_id_list.append(sfi.get("vim_id"))
1380 name = "sf-{}".format(task["item_id"][:8])
1381 # By default no form of IETF SFC Encapsulation will be used
1382 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1383
1384 task["extra"]["created"] = True
1385 task["extra"]["vim_status"] = "ACTIVE"
1386 task["error_msg"] = None
1387 task["status"] = "DONE"
1388 task["vim_id"] = vim_sf_id
1389 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1390 return instance_element_update
1391
1392 except (vimconn.VimConnException, VimThreadException) as e:
1393 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1394 error_text = self._format_vim_error_msg(str(e))
1395 task["error_msg"] = error_text
1396 task["status"] = "FAILED"
1397 task["vim_id"] = None
1398 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1399 return instance_element_update
1400
1401 def del_sf(self, task):
1402 sf_vim_id = task["vim_id"]
1403 try:
1404 self.vim.delete_sf(sf_vim_id)
1405 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1406 task["error_msg"] = None
1407 return None
1408
1409 except vimconn.VimConnException as e:
1410 task["error_msg"] = self._format_vim_error_msg(str(e))
1411 if isinstance(e, vimconn.VimConnNotFoundException):
1412 # If not found mark as Done and fill error_msg
1413 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1414 return None
1415 task["status"] = "FAILED"
1416 return None
1417
1418 def new_classification(self, task):
1419 vim_classification_id = None
1420 try:
1421 params = task["params"]
1422 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1423 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1424 error_text = ""
1425 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces")
1426 # Bear in mind that different VIM connectors might support Classifications differently.
1427 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1428 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1429 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1430 # using the IPv4 flow classifier.
1431 logical_source_port_vim_id = None
1432 logical_source_port_id = params.get("logical_source_port")
1433 for vim_interface, interface_data in interfaces.items():
1434 if interface_data.get("interface_id") == logical_source_port_id:
1435 logical_source_port_vim_id = vim_interface
1436 break
1437 if not logical_source_port_vim_id:
1438 error_text = "Error creating Flow Classifier, Logical Source Port id {}".format(
1439 logical_source_port_id)
1440 self.logger.error(error_text)
1441 task["error_msg"] = error_text
1442 task["status"] = "FAILED"
1443 task["vim_id"] = None
1444 return None
1445
1446 name = "c-{}".format(task["item_id"][:8])
1447 # if not CIDR is given for the IP addresses, add /32:
1448 ip_proto = int(params.get("ip_proto"))
1449 source_ip = params.get("source_ip")
1450 destination_ip = params.get("destination_ip")
1451 source_port = params.get("source_port")
1452 destination_port = params.get("destination_port")
1453 definition = {"logical_source_port": logical_source_port_vim_id}
1454 if ip_proto:
1455 if ip_proto == 1:
1456 ip_proto = 'icmp'
1457 elif ip_proto == 6:
1458 ip_proto = 'tcp'
1459 elif ip_proto == 17:
1460 ip_proto = 'udp'
1461 definition["protocol"] = ip_proto
1462 if source_ip:
1463 if '/' not in source_ip:
1464 source_ip += '/32'
1465 definition["source_ip_prefix"] = source_ip
1466 if source_port:
1467 definition["source_port_range_min"] = source_port
1468 definition["source_port_range_max"] = source_port
1469 if destination_port:
1470 definition["destination_port_range_min"] = destination_port
1471 definition["destination_port_range_max"] = destination_port
1472 if destination_ip:
1473 if '/' not in destination_ip:
1474 destination_ip += '/32'
1475 definition["destination_ip_prefix"] = destination_ip
1476
1477 vim_classification_id = self.vim.new_classification(
1478 name, 'legacy_flow_classifier', definition)
1479
1480 task["extra"]["created"] = True
1481 task["extra"]["vim_status"] = "ACTIVE"
1482 task["error_msg"] = None
1483 task["status"] = "DONE"
1484 task["vim_id"] = vim_classification_id
1485 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id,
1486 "error_msg": None}
1487 return instance_element_update
1488
1489 except (vimconn.VimConnException, VimThreadException) as e:
1490 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1491 error_text = self._format_vim_error_msg(str(e))
1492 task["error_msg"] = error_text
1493 task["status"] = "FAILED"
1494 task["vim_id"] = None
1495 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1496 return instance_element_update
1497
1498 def del_classification(self, task):
1499 classification_vim_id = task["vim_id"]
1500 try:
1501 self.vim.delete_classification(classification_vim_id)
1502 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1503 task["error_msg"] = None
1504 return None
1505
1506 except vimconn.VimConnException as e:
1507 task["error_msg"] = self._format_vim_error_msg(str(e))
1508 if isinstance(e, vimconn.VimConnNotFoundException):
1509 # If not found mark as Done and fill error_msg
1510 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1511 return None
1512 task["status"] = "FAILED"
1513 return None
1514
1515 def new_sfp(self, task):
1516 vim_sfp_id = None
1517 try:
1518 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1519 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in
1520 task.get("extra").get("depends_on")]
1521 error_text = ""
1522 sf_id_list = []
1523 classification_id_list = []
1524 for dep in depending_tasks:
1525 vim_id = dep.get("vim_id")
1526 resource = dep.get("item")
1527 if resource == "instance_sfs":
1528 sf_id_list.append(vim_id)
1529 elif resource == "instance_classifications":
1530 classification_id_list.append(vim_id)
1531
1532 name = "sfp-{}".format(task["item_id"][:8])
1533 # By default no form of IETF SFC Encapsulation will be used
1534 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1535
1536 task["extra"]["created"] = True
1537 task["extra"]["vim_status"] = "ACTIVE"
1538 task["error_msg"] = None
1539 task["status"] = "DONE"
1540 task["vim_id"] = vim_sfp_id
1541 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1542 return instance_element_update
1543
1544 except (vimconn.VimConnException, VimThreadException) as e:
1545 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1546 error_text = self._format_vim_error_msg(str(e))
1547 task["error_msg"] = error_text
1548 task["status"] = "FAILED"
1549 task["vim_id"] = None
1550 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1551 return instance_element_update
1552
1553 def del_sfp(self, task):
1554 sfp_vim_id = task["vim_id"]
1555 try:
1556 self.vim.delete_sfp(sfp_vim_id)
1557 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1558 task["error_msg"] = None
1559 return None
1560
1561 except vimconn.VimConnException as e:
1562 task["error_msg"] = self._format_vim_error_msg(str(e))
1563 if isinstance(e, vimconn.VimConnNotFoundException):
1564 # If not found mark as Done and fill error_msg
1565 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1566 return None
1567 task["status"] = "FAILED"
1568 return None
1569
1570 def _refres_sfps(self, task):
1571 """Call VIM to get SFPs status"""
1572 database_update = None
1573
1574 vim_id = task["vim_id"]
1575 sfp_to_refresh_list = [vim_id]
1576 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1577 try:
1578 vim_dict = self.vim.refresh_sfps_status(sfp_to_refresh_list)
1579 vim_info = vim_dict[vim_id]
1580 except vimconn.VimConnException as e:
1581 # Mark all tasks at VIM_ERROR status
1582 self.logger.error("task={} get-sfp: vimconnException when trying to refresh sfps {}".format(task_id, e))
1583 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1584
1585 self.logger.debug("task={} get-sfp: vim_sfp_id={} result={}".format(task_id, task["vim_id"], vim_info))
1586 #TODO: Revise this part
1587 vim_info_error_msg = None
1588 if vim_info.get("error_msg"):
1589 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1590 task_vim_info = task["extra"].get("vim_info")
1591 task_error_msg = task.get("error_msg")
1592 task_vim_status = task["extra"].get("vim_status")
1593 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1594 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1595 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1596 if vim_info.get("vim_info"):
1597 database_update["vim_info"] = vim_info["vim_info"]
1598
1599 task["extra"]["vim_status"] = vim_info["status"]
1600 task["error_msg"] = vim_info_error_msg
1601 if vim_info.get("vim_info"):
1602 task["extra"]["vim_info"] = vim_info["vim_info"]
1603
1604 return database_update
1605
1606 def _refres_sfis(self, task):
1607 """Call VIM to get sfis status"""
1608 database_update = None
1609
1610 vim_id = task["vim_id"]
1611 sfi_to_refresh_list = [vim_id]
1612 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1613 try:
1614 vim_dict = self.vim.refresh_sfis_status(sfi_to_refresh_list)
1615 vim_info = vim_dict[vim_id]
1616 except vimconn.VimConnException as e:
1617 # Mark all tasks at VIM_ERROR status
1618 self.logger.error("task={} get-sfi: vimconnException when trying to refresh sfis {}".format(task_id, e))
1619 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1620
1621 self.logger.debug("task={} get-sfi: vim_sfi_id={} result={}".format(task_id, task["vim_id"], vim_info))
1622 #TODO: Revise this part
1623 vim_info_error_msg = None
1624 if vim_info.get("error_msg"):
1625 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1626 task_vim_info = task["extra"].get("vim_info")
1627 task_error_msg = task.get("error_msg")
1628 task_vim_status = task["extra"].get("vim_status")
1629 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1630 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1631 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1632 if vim_info.get("vim_info"):
1633 database_update["vim_info"] = vim_info["vim_info"]
1634
1635 task["extra"]["vim_status"] = vim_info["status"]
1636 task["error_msg"] = vim_info_error_msg
1637 if vim_info.get("vim_info"):
1638 task["extra"]["vim_info"] = vim_info["vim_info"]
1639
1640 return database_update
1641
1642 def _refres_sfs(self, task):
1643 """Call VIM to get sfs status"""
1644 database_update = None
1645
1646 vim_id = task["vim_id"]
1647 sf_to_refresh_list = [vim_id]
1648 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1649 try:
1650 vim_dict = self.vim.refresh_sfs_status(sf_to_refresh_list)
1651 vim_info = vim_dict[vim_id]
1652 except vimconn.VimConnException as e:
1653 # Mark all tasks at VIM_ERROR status
1654 self.logger.error("task={} get-sf: vimconnException when trying to refresh sfs {}".format(task_id, e))
1655 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1656
1657 self.logger.debug("task={} get-sf: vim_sf_id={} result={}".format(task_id, task["vim_id"], vim_info))
1658 #TODO: Revise this part
1659 vim_info_error_msg = None
1660 if vim_info.get("error_msg"):
1661 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1662 task_vim_info = task["extra"].get("vim_info")
1663 task_error_msg = task.get("error_msg")
1664 task_vim_status = task["extra"].get("vim_status")
1665 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1666 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1667 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1668 if vim_info.get("vim_info"):
1669 database_update["vim_info"] = vim_info["vim_info"]
1670
1671 task["extra"]["vim_status"] = vim_info["status"]
1672 task["error_msg"] = vim_info_error_msg
1673 if vim_info.get("vim_info"):
1674 task["extra"]["vim_info"] = vim_info["vim_info"]
1675
1676 return database_update
1677
1678 def _refres_classifications(self, task):
1679 """Call VIM to get classifications status"""
1680 database_update = None
1681
1682 vim_id = task["vim_id"]
1683 classification_to_refresh_list = [vim_id]
1684 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1685 try:
1686 vim_dict = self.vim.refresh_classifications_status(classification_to_refresh_list)
1687 vim_info = vim_dict[vim_id]
1688 except vimconn.VimConnException as e:
1689 # Mark all tasks at VIM_ERROR status
1690 self.logger.error("task={} get-classification: vimconnException when trying to refresh classifications {}"
1691 .format(task_id, e))
1692 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1693
1694 self.logger.debug("task={} get-classification: vim_classification_id={} result={}".format(task_id,
1695 task["vim_id"], vim_info))
1696 #TODO: Revise this part
1697 vim_info_error_msg = None
1698 if vim_info.get("error_msg"):
1699 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1700 task_vim_info = task["extra"].get("vim_info")
1701 task_error_msg = task.get("error_msg")
1702 task_vim_status = task["extra"].get("vim_status")
1703 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1704 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1705 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1706 if vim_info.get("vim_info"):
1707 database_update["vim_info"] = vim_info["vim_info"]
1708
1709 task["extra"]["vim_status"] = vim_info["status"]
1710 task["error_msg"] = vim_info_error_msg
1711 if vim_info.get("vim_info"):
1712 task["extra"]["vim_info"] = vim_info["vim_info"]
1713
1714 return database_update