Implemented edit method, some corrections to add and delete
[osm/RO.git] / RO / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
31
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
41 BUILD: not used
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
51 the FIND params
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
53 on a net creation
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
58 sdn_port_id:
59 sdn_net_id:
60 vim_info
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
65 vim_wim_actions
66 M depends: dict with task_index(from depends_on) to dependency task
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
71 be done
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
76
77 """
78
79 import threading
80 import time
81 import queue
82 import logging
83 from osm_ro import vimconn
84 from osm_ro.wim.sdnconn import SdnConnectorError
85 import yaml
86 from osm_ro.db_base import db_base_Exception
87 from http import HTTPStatus
88 from copy import deepcopy
89
90 __author__ = "Alfonso Tierno, Pablo Montes"
91 __date__ = "$28-Sep-2017 12:07:15$"
92
93
94 def is_task_id(task_id):
95 return task_id.startswith("TASK-")
96
97
98 class VimThreadException(Exception):
99 pass
100
101
102 class VimThreadExceptionNotFound(VimThreadException):
103 pass
104
105
106 class vim_thread(threading.Thread):
107 REFRESH_BUILD = 5 # 5 seconds
108 REFRESH_ACTIVE = 60 # 1 minute
109 REFRESH_ERROR = 600
110 REFRESH_DELETE = 3600 * 10
111
112 def __init__(self, task_lock, plugins, name=None, wim_account_id=None, datacenter_tenant_id=None, db=None):
113 """Init a thread.
114 Arguments:
115 'id' number of thead
116 'name' name of thread
117 'host','user': host ip or name to manage and user
118 'db', 'db_lock': database class and lock to use it in exclusion
119 """
120 threading.Thread.__init__(self)
121 self.plugins = plugins
122 self.plugin_name = "unknown"
123 self.vim = None
124 self.sdnconnector = None
125 self.sdnconn_config = None
126 self.error_status = None
127 self.wim_account_id = wim_account_id
128 self.datacenter_tenant_id = datacenter_tenant_id
129 self.port_mapping = None
130 if self.wim_account_id:
131 self.target_k = "wim_account_id"
132 self.target_v = self.wim_account_id
133 else:
134 self.target_k = "datacenter_vim_id"
135 self.target_v = self.datacenter_tenant_id
136 if not name:
137 self.name = wim_account_id or str(datacenter_tenant_id)
138 else:
139 self.name = name
140 self.vim_persistent_info = {}
141 self.my_id = self.name[:64]
142
143 self.logger = logging.getLogger('openmano.{}.{}'.format("vim" if self.datacenter_tenant_id else "sdn",
144 self.name))
145 self.db = db
146
147 self.task_lock = task_lock
148 self.task_queue = queue.Queue(2000)
149
150 def _proccess_sdn_exception(self, exc):
151 if isinstance(exc, SdnConnectorError):
152 raise
153 else:
154 self.logger.error("plugin={} throws a non SdnConnectorError exception {}".format(self.plugin_name, exc),
155 exc_info=True)
156 raise SdnConnectorError(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc
157
158 def _proccess_vim_exception(self, exc):
159 if isinstance(exc, vimconn.vimconnException):
160 raise
161 else:
162 self.logger.error("plugin={} throws a non vimconnException exception {}".format(self.plugin_name, exc),
163 exc_info=True)
164 raise vimconn.vimconnException(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc
165
166 def get_vim_sdn_connector(self):
167 if self.datacenter_tenant_id:
168 try:
169 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
170 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
171 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
172 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
173 'user', 'passwd', 'dt.config as dt_config')
174 where_ = {"dt.uuid": self.datacenter_tenant_id}
175 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
176 vim = vims[0]
177 vim_config = {}
178 if vim["config"]:
179 vim_config.update(yaml.load(vim["config"], Loader=yaml.Loader))
180 if vim["dt_config"]:
181 vim_config.update(yaml.load(vim["dt_config"], Loader=yaml.Loader))
182 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
183 vim_config['datacenter_id'] = vim.get('datacenter_id')
184
185 # get port_mapping
186 # vim_port_mappings = self.ovim.get_of_port_mappings(
187 # db_filter={"datacenter_id": vim_config['datacenter_id']})
188 # vim_config["wim_external_ports"] = [x for x in vim_port_mappings
189 # if x["service_mapping_info"].get("wim")]
190 self.plugin_name = "rovim_" + vim["type"]
191 self.vim = self.plugins[self.plugin_name].vimconnector(
192 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
193 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
194 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
195 user=vim['user'], passwd=vim['passwd'],
196 config=vim_config, persistent_info=self.vim_persistent_info
197 )
198 self.error_status = None
199 self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
200 self.datacenter_tenant_id, self.plugin_name))
201 except Exception as e:
202 self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {}".format(
203 self.datacenter_tenant_id, self.plugin_name, e))
204 self.vim = None
205 self.error_status = "Error loading vimconnector: {}".format(e)
206 else:
207 try:
208 wim_account = self.db.get_rows(FROM="wim_accounts", WHERE={"uuid": self.wim_account_id})[0]
209 wim = self.db.get_rows(FROM="wims", WHERE={"uuid": wim_account["wim_id"]})[0]
210 if wim["config"]:
211 self.sdnconn_config = yaml.load(wim["config"], Loader=yaml.Loader)
212 else:
213 self.sdnconn_config = {}
214 if wim_account["config"]:
215 self.sdnconn_config.update(yaml.load(wim_account["config"], Loader=yaml.Loader))
216 self.port_mappings = self.db.get_rows(FROM="wim_port_mappings", WHERE={"wim_id": wim_account["wim_id"]})
217 if self.port_mappings:
218 self.sdnconn_config["service_endpoint_mapping"] = self.port_mappings
219 self.plugin_name = "rosdn_" + wim["type"]
220 self.sdnconnector = self.plugins[self.plugin_name](
221 wim, wim_account, config=self.sdnconn_config)
222 self.error_status = None
223 self.logger.info("Sdn Connector loaded for wim_account={}, plugin={}".format(
224 self.wim_account_id, self.plugin_name))
225 except Exception as e:
226 self.logger.error("Cannot load sdn connector for wim_account={}, plugin={}: {}".format(
227 self.wim_account_id, self.plugin_name, e))
228 self.sdnconnector = None
229 self.error_status = "Error loading sdn connector: {}".format(e)
230
231 def _get_db_task(self):
232 """
233 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
234 :return: None
235 """
236 now = time.time()
237 try:
238 database_limit = 20
239 task_related = None
240 while True:
241 # get 20 (database_limit) entries each time
242 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
243 WHERE={self.target_k: self.target_v,
244 "status": ['SCHEDULED', 'BUILD', 'DONE'],
245 "worker": [None, self.my_id], "modified_at<=": now
246 },
247 ORDER_BY=("modified_at", "created_at",),
248 LIMIT=database_limit)
249 if not vim_actions:
250 return None, None
251 # if vim_actions[0]["modified_at"] > now:
252 # return int(vim_actions[0] - now)
253 for task in vim_actions:
254 # block related task
255 if task_related == task["related"]:
256 continue # ignore if a locking has already tried for these task set
257 task_related = task["related"]
258 # lock ...
259 self.db.update_rows("vim_wim_actions", UPDATE={"worker": self.my_id}, modified_time=0,
260 WHERE={self.target_k: self.target_v,
261 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
262 "worker": [None, self.my_id],
263 "related": task_related,
264 "item": task["item"],
265 })
266 # ... and read all related and check if locked
267 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
268 WHERE={self.target_k: self.target_v,
269 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
270 "related": task_related,
271 "item": task["item"],
272 },
273 ORDER_BY=("created_at",))
274 # check that all related tasks have been locked. If not release and try again. It can happen
275 # for race conditions if a new related task has been inserted by nfvo in the process
276 some_tasks_locked = False
277 some_tasks_not_locked = False
278 creation_task = None
279 for relate_task in related_tasks:
280 if relate_task["worker"] != self.my_id:
281 some_tasks_not_locked = True
282 else:
283 some_tasks_locked = True
284 if not creation_task and relate_task["action"] in ("CREATE", "FIND"):
285 creation_task = relate_task
286 if some_tasks_not_locked:
287 if some_tasks_locked: # unlock
288 self.db.update_rows("vim_wim_actions", UPDATE={"worker": None}, modified_time=0,
289 WHERE={self.target_k: self.target_v,
290 "worker": self.my_id,
291 "related": task_related,
292 "item": task["item"],
293 })
294 continue
295
296 # task of creation must be the first in the list of related_task
297 assert(related_tasks[0]["action"] in ("CREATE", "FIND"))
298
299 task["params"] = None
300 if task["extra"]:
301 extra = yaml.load(task["extra"], Loader=yaml.Loader)
302 else:
303 extra = {}
304 task["extra"] = extra
305 if extra.get("depends_on"):
306 task["depends"] = {}
307 if extra.get("params"):
308 task["params"] = deepcopy(extra["params"])
309 return task, related_tasks
310 except Exception as e:
311 self.logger.critical("Unexpected exception at _get_db_task: " + str(e), exc_info=True)
312 return None, None
313
314 def _delete_task(self, task):
315 """
316 Determine if this task need to be done or superseded
317 :return: None
318 """
319
320 def copy_extra_created(copy_to, copy_from):
321 copy_to["created"] = copy_from["created"]
322 if copy_from.get("sdn_net_id"):
323 copy_to["sdn_net_id"] = copy_from["sdn_net_id"]
324 if copy_from.get("interfaces"):
325 copy_to["interfaces"] = copy_from["interfaces"]
326 if copy_from.get("created_items"):
327 if not copy_to.get("created_items"):
328 copy_to["created_items"] = {}
329 copy_to["created_items"].update(copy_from["created_items"])
330
331 task_create = None
332 dependency_task = None
333 deletion_needed = False
334 if task["status"] == "FAILED":
335 return # TODO need to be retry??
336 try:
337 # get all related tasks
338 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
339 WHERE={self.target_k: self.target_v,
340 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
341 "action": ["FIND", "CREATE"],
342 "related": task["related"],
343 },
344 ORDER_BY=("created_at",),
345 )
346 for related_task in related_tasks:
347 if related_task["item"] == task["item"] and related_task["item_id"] == task["item_id"]:
348 task_create = related_task
349 # TASK_CREATE
350 if related_task["extra"]:
351 extra_created = yaml.load(related_task["extra"], Loader=yaml.Loader)
352 if extra_created.get("created"):
353 deletion_needed = True
354 related_task["extra"] = extra_created
355 elif not dependency_task:
356 dependency_task = related_task
357 if task_create and dependency_task:
358 break
359
360 # mark task_create as FINISHED
361 self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"},
362 WHERE={self.target_k: self.target_v,
363 "instance_action_id": task_create["instance_action_id"],
364 "task_index": task_create["task_index"]
365 })
366 if not deletion_needed:
367 return
368 elif dependency_task:
369 # move create information from task_create to relate_task
370 extra_new_created = yaml.load(dependency_task["extra"], Loader=yaml.Loader) or {}
371 extra_new_created["created"] = extra_created["created"]
372 copy_extra_created(copy_to=extra_new_created, copy_from=extra_created)
373
374 self.db.update_rows("vim_wim_actions",
375 UPDATE={"extra": yaml.safe_dump(extra_new_created, default_flow_style=True,
376 width=256),
377 "vim_id": task_create.get("vim_id")},
378 WHERE={self.target_k: self.target_v,
379 "instance_action_id": dependency_task["instance_action_id"],
380 "task_index": dependency_task["task_index"]
381 })
382 return False
383 else:
384 task["vim_id"] = task_create["vim_id"]
385 copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"])
386 return True
387
388 except Exception as e:
389 self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True)
390
391 def _refres_vm(self, task):
392 """Call VIM to get VMs status"""
393 database_update = None
394
395 vim_id = task["vim_id"]
396 vm_to_refresh_list = [vim_id]
397 try:
398 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
399 vim_info = vim_dict[vim_id]
400 except vimconn.vimconnException as e:
401 # Mark all tasks at VIM_ERROR status
402 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
403 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
404
405 task_id = task["instance_action_id"] + "." + str(task["task_index"])
406 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
407
408 # check and update interfaces
409 task_warning_msg = ""
410 for interface in vim_info.get("interfaces", ()):
411 vim_interface_id = interface["vim_interface_id"]
412 if vim_interface_id not in task["extra"]["interfaces"]:
413 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
414 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
415 continue
416 task_interface = task["extra"]["interfaces"][vim_interface_id]
417 task_vim_interface = task_interface.get("vim_info")
418 if task_vim_interface != interface:
419 # delete old port
420 # if task_interface.get("sdn_port_id"):
421 # try:
422 # self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
423 # task_interface["sdn_port_id"] = None
424 # except ovimException as e:
425 # error_text = "ovimException deleting external_port={}: {}".format(
426 # task_interface["sdn_port_id"], e)
427 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
428 # task_warning_msg += error_text
429 # # TODO Set error_msg at instance_nets instead of instance VMs
430
431 # Create SDN port
432 # sdn_net_id = task_interface.get("sdn_net_id")
433 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
434 # sdn_port_name = sdn_net_id + "." + task["vim_id"]
435 # sdn_port_name = sdn_port_name[:63]
436 # try:
437 # sdn_port_id = self.ovim.new_external_port(
438 # {"compute_node": interface["compute_node"],
439 # "pci": interface["pci"],
440 # "vlan": interface.get("vlan"),
441 # "net_id": sdn_net_id,
442 # "region": self.vim["config"]["datacenter_id"],
443 # "name": sdn_port_name,
444 # "mac": interface.get("mac_address")})
445 # task_interface["sdn_port_id"] = sdn_port_id
446 # except (ovimException, Exception) as e:
447 # error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
448 # format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
449 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
450 # task_warning_msg += error_text
451 # # TODO Set error_msg at instance_nets instead of instance VMs
452
453 self.db.update_rows('instance_interfaces',
454 UPDATE={"mac_address": interface.get("mac_address"),
455 "ip_address": interface.get("ip_address"),
456 "vim_interface_id": interface.get("vim_interface_id"),
457 "vim_info": interface.get("vim_info"),
458 "sdn_port_id": task_interface.get("sdn_port_id"),
459 "compute_node": interface.get("compute_node"),
460 "pci": interface.get("pci"),
461 "vlan": interface.get("vlan")},
462 WHERE={'uuid': task_interface["iface_id"]})
463 task_interface["vim_info"] = interface
464 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
465 # # TODO Send message to task SDN to update
466
467 # check and update task and instance_vms database
468 vim_info_error_msg = None
469 if vim_info.get("error_msg"):
470 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
471 elif task_warning_msg:
472 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
473 task_vim_info = task["extra"].get("vim_info")
474 task_error_msg = task.get("error_msg")
475 task_vim_status = task["extra"].get("vim_status")
476 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
477 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
478 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
479 if vim_info.get("vim_info"):
480 database_update["vim_info"] = vim_info["vim_info"]
481
482 task["extra"]["vim_status"] = vim_info["status"]
483 task["error_msg"] = vim_info_error_msg
484 if vim_info.get("vim_info"):
485 task["extra"]["vim_info"] = vim_info["vim_info"]
486
487 return database_update
488
489 def _refres_net(self, task):
490 """Call VIM to get network status"""
491 database_update = None
492
493 vim_id = task["vim_id"]
494 net_to_refresh_list = [vim_id]
495 try:
496 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
497 vim_info = vim_dict[vim_id]
498 except vimconn.vimconnException as e:
499 # Mark all tasks at VIM_ERROR status
500 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
501 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
502
503 task_id = task["instance_action_id"] + "." + str(task["task_index"])
504 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
505
506 task_vim_info = task["extra"].get("vim_info")
507 task_vim_status = task["extra"].get("vim_status")
508 task_error_msg = task.get("error_msg")
509 # task_sdn_net_id = task["extra"].get("sdn_net_id")
510
511 vim_info_status = vim_info["status"]
512 vim_info_error_msg = vim_info.get("error_msg")
513 # get ovim status
514 # if task_sdn_net_id:
515 # try:
516 # sdn_net = self.ovim.show_network(task_sdn_net_id)
517 # except (ovimException, Exception) as e:
518 # text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
519 # self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
520 # sdn_net = {"status": "ERROR", "last_error": text_error}
521 # if sdn_net["status"] == "ERROR":
522 # if not vim_info_error_msg:
523 # vim_info_error_msg = str(sdn_net.get("last_error"))
524 # else:
525 # vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
526 # self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
527 # self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
528 # vim_info_status = "ERROR"
529 # elif sdn_net["status"] == "BUILD":
530 # if vim_info_status == "ACTIVE":
531 # vim_info_status = "BUILD"
532
533 # update database
534 if vim_info_error_msg:
535 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
536 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
537 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
538 task["extra"]["vim_status"] = vim_info_status
539 task["error_msg"] = vim_info_error_msg
540 if vim_info.get("vim_info"):
541 task["extra"]["vim_info"] = vim_info["vim_info"]
542 database_update = {"status": vim_info_status, "error_msg": vim_info_error_msg}
543 if vim_info.get("vim_info"):
544 database_update["vim_info"] = vim_info["vim_info"]
545 return database_update
546
547 def _proccess_pending_tasks(self, task, related_tasks):
548 old_task_status = task["status"]
549 create_or_find = False # if as result of processing this task something is created or found
550 next_refresh = 0
551
552 try:
553 if task["status"] == "SCHEDULED":
554 # check if tasks that this depends on have been completed
555 dependency_not_completed = False
556 dependency_modified_at = 0
557 for task_index in task["extra"].get("depends_on", ()):
558 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
559 if not task_dependency:
560 raise VimThreadException(
561 "Cannot get depending net task trying to get depending task {}.{}".format(
562 task["instance_action_id"], task_index))
563 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
564 # database must be look again
565 if task_dependency["status"] == "SCHEDULED":
566 dependency_not_completed = True
567 dependency_modified_at = task_dependency["modified_at"]
568 break
569 elif task_dependency["status"] == "FAILED":
570 raise VimThreadException(
571 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
572 task["action"], task["item"],
573 task["instance_action_id"], task["task_index"],
574 task_dependency["instance_action_id"], task_dependency["task_index"],
575 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
576
577 task["depends"]["TASK-"+str(task_index)] = task_dependency
578 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], task_index)] = task_dependency
579 if dependency_not_completed:
580 # Move this task to the time dependency is going to be modified plus 10 seconds.
581 self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10,
582 UPDATE={"worker": None},
583 WHERE={self.target_k: self.target_v, "worker": self.my_id,
584 "related": task["related"],
585 })
586 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
587 # if task["extra"]["tries"] > 3:
588 # raise VimThreadException(
589 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
590 # "(task {}.{})".format(task["action"], task["item"],
591 # task["instance_action_id"], task["task_index"],
592 # task_dependency["instance_action_id"], task_dependency["task_index"]
593 # task_dependency["action"], task_dependency["item"]))
594 return
595
596 database_update = None
597 if task["action"] == "DELETE":
598 deleted_needed = self._delete_task(task)
599 if not deleted_needed:
600 task["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
601 task["error_msg"] = None
602
603 if task["status"] == "SUPERSEDED":
604 # not needed to do anything but update database with the new status
605 database_update = None
606 elif not self.vim and not self.sdnconnector:
607 task["status"] = "FAILED"
608 task["error_msg"] = self.error_status
609 database_update = {"status": "VIM_ERROR" if self.datacenter_tenant_id else "WIM_ERROR",
610 "error_msg": task["error_msg"]}
611 elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"):
612 # Do nothing, just copy values from one to another and update database
613 task["status"] = related_tasks[0]["status"]
614 task["error_msg"] = related_tasks[0]["error_msg"]
615 task["vim_id"] = related_tasks[0]["vim_id"]
616 extra = yaml.load(related_tasks[0]["extra"], Loader=yaml.Loader)
617 task["extra"]["vim_status"] = extra.get("vim_status")
618 next_refresh = related_tasks[0]["modified_at"] + 0.001
619 database_update = {"status": task["extra"].get("vim_status", "VIM_ERROR"),
620 "error_msg": task["error_msg"]}
621 if task["item"] == 'instance_vms':
622 database_update["vim_vm_id"] = task["vim_id"]
623 elif task["item"] == 'instance_nets':
624 database_update["vim_net_id"] = task["vim_id"]
625 elif task["item"] == 'instance_vms':
626 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
627 database_update = self._refres_vm(task)
628 create_or_find = True
629 elif task["action"] == "CREATE":
630 create_or_find = True
631 database_update = self.new_vm(task)
632 elif task["action"] == "DELETE":
633 self.del_vm(task)
634 else:
635 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
636 elif task["item"] == 'instance_nets':
637 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
638 database_update = self._refres_net(task)
639 create_or_find = True
640 elif task["action"] == "CREATE":
641 create_or_find = True
642 database_update = self.new_net(task)
643 elif task["action"] == "DELETE":
644 self.del_net(task)
645 elif task["action"] == "FIND":
646 database_update = self.get_net(task)
647 else:
648 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
649 elif task["item"] == 'instance_wim_nets':
650 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
651 database_update = self.new_or_update_sdn_net(task)
652 create_or_find = True
653 elif task["action"] == "CREATE":
654 create_or_find = True
655 database_update = self.new_or_update_sdn_net(task)
656 elif task["action"] == "DELETE":
657 self.del_sdn_net(task)
658 elif task["action"] == "FIND":
659 database_update = self.get_sdn_net(task)
660 else:
661 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
662 elif task["item"] == 'instance_sfis':
663 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
664 database_update = self._refres_sfis(task)
665 create_or_find = True
666 elif task["action"] == "CREATE":
667 create_or_find = True
668 database_update = self.new_sfi(task)
669 elif task["action"] == "DELETE":
670 self.del_sfi(task)
671 else:
672 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
673 elif task["item"] == 'instance_sfs':
674 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
675 database_update = self._refres_sfs(task)
676 create_or_find = True
677 elif task["action"] == "CREATE":
678 create_or_find = True
679 database_update = self.new_sf(task)
680 elif task["action"] == "DELETE":
681 self.del_sf(task)
682 else:
683 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
684 elif task["item"] == 'instance_classifications':
685 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
686 database_update = self._refres_classifications(task)
687 create_or_find = True
688 elif task["action"] == "CREATE":
689 create_or_find = True
690 database_update = self.new_classification(task)
691 elif task["action"] == "DELETE":
692 self.del_classification(task)
693 else:
694 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
695 elif task["item"] == 'instance_sfps':
696 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
697 database_update = self._refres_sfps(task)
698 create_or_find = True
699 elif task["action"] == "CREATE":
700 create_or_find = True
701 database_update = self.new_sfp(task)
702 elif task["action"] == "DELETE":
703 self.del_sfp(task)
704 else:
705 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
706 else:
707 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
708 # TODO
709 except VimThreadException as e:
710 task["error_msg"] = str(e)
711 task["status"] = "FAILED"
712 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
713 if task["item"] == 'instance_vms':
714 database_update["vim_vm_id"] = None
715 elif task["item"] == 'instance_nets':
716 database_update["vim_net_id"] = None
717
718 task_id = task["instance_action_id"] + "." + str(task["task_index"])
719 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
720 task_id, task["item"], task["action"], task["status"],
721 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
722 try:
723 if not next_refresh:
724 if task["status"] == "DONE":
725 next_refresh = time.time()
726 if task["extra"].get("vim_status") == "BUILD":
727 next_refresh += self.REFRESH_BUILD
728 elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR", "WIM_ERROR"):
729 next_refresh += self.REFRESH_ERROR
730 elif task["extra"].get("vim_status") == "DELETED":
731 next_refresh += self.REFRESH_DELETE
732 else:
733 next_refresh += self.REFRESH_ACTIVE
734 elif task["status"] == "FAILED":
735 next_refresh = time.time() + self.REFRESH_DELETE
736
737 if create_or_find:
738 # modify all related task with action FIND/CREATED non SCHEDULED
739 self.db.update_rows(
740 table="vim_wim_actions", modified_time=next_refresh + 0.001,
741 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
742 "error_msg": task["error_msg"],
743 },
744
745 WHERE={self.target_k: self.target_v,
746 "worker": self.my_id,
747 "action": ["FIND", "CREATE"],
748 "related": task["related"],
749 "status<>": "SCHEDULED",
750 })
751 # modify own task
752 self.db.update_rows(
753 table="vim_wim_actions", modified_time=next_refresh,
754 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
755 "error_msg": task["error_msg"],
756 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
757 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
758 # Unlock tasks
759 self.db.update_rows(
760 table="vim_wim_actions", modified_time=0,
761 UPDATE={"worker": None},
762 WHERE={self.target_k: self.target_v,
763 "worker": self.my_id,
764 "related": task["related"],
765 })
766
767 # Update table instance_actions
768 if old_task_status == "SCHEDULED" and task["status"] != old_task_status:
769 self.db.update_rows(
770 table="instance_actions",
771 UPDATE={("number_failed" if task["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
772 WHERE={"uuid": task["instance_action_id"]})
773 if database_update:
774 where_filter = {"related": task["related"]}
775 if task["item"] == "instance_nets" and task["datacenter_vim_id"]:
776 where_filter["datacenter_tenant_id"] = task["datacenter_vim_id"]
777 self.db.update_rows(table=task["item"],
778 UPDATE=database_update,
779 WHERE=where_filter)
780 except db_base_Exception as e:
781 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
782
783 def insert_task(self, task):
784 try:
785 self.task_queue.put(task, False)
786 return None
787 except queue.Full:
788 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
789
790 def del_task(self, task):
791 with self.task_lock:
792 if task["status"] == "SCHEDULED":
793 task["status"] = "SUPERSEDED"
794 return True
795 else: # task["status"] == "processing"
796 self.task_lock.release()
797 return False
798
799 def run(self):
800 self.logger.debug("Starting")
801 while True:
802 self.get_vim_sdn_connector()
803 self.logger.debug("Vimconnector loaded")
804 reload_thread = False
805
806 while True:
807 try:
808 while not self.task_queue.empty():
809 task = self.task_queue.get()
810 if isinstance(task, list):
811 pass
812 elif isinstance(task, str):
813 if task == 'exit':
814 return 0
815 elif task == 'reload':
816 reload_thread = True
817 break
818 self.task_queue.task_done()
819 if reload_thread:
820 break
821
822 task, related_tasks = self._get_db_task()
823 if task:
824 self._proccess_pending_tasks(task, related_tasks)
825 else:
826 time.sleep(5)
827
828 except Exception as e:
829 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
830
831 self.logger.debug("Finishing")
832
833 def _look_for_task(self, instance_action_id, task_id):
834 """
835 Look for a concrete task at vim_actions database table
836 :param instance_action_id: The instance_action_id
837 :param task_id: Can have several formats:
838 <task index>: integer
839 TASK-<task index> :backward compatibility,
840 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
841 :return: Task dictionary or None if not found
842 """
843 if isinstance(task_id, int):
844 task_index = task_id
845 else:
846 if task_id.startswith("TASK-"):
847 task_id = task_id[5:]
848 ins_action_id, _, task_index = task_id.rpartition(".")
849 if ins_action_id:
850 instance_action_id = ins_action_id
851
852 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
853 "task_index": task_index})
854 if not tasks:
855 return None
856 task = tasks[0]
857 task["params"] = None
858 task["depends"] = {}
859 if task["extra"]:
860 extra = yaml.load(task["extra"], Loader=yaml.Loader)
861 task["extra"] = extra
862 task["params"] = extra.get("params")
863 else:
864 task["extra"] = {}
865 return task
866
867 @staticmethod
868 def _format_vim_error_msg(error_text, max_length=1024):
869 if error_text and len(error_text) >= max_length:
870 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
871 return error_text
872
873 def new_vm(self, task):
874 task_id = task["instance_action_id"] + "." + str(task["task_index"])
875 try:
876 params = task["params"]
877 depends = task.get("depends")
878 net_list = params[5]
879 for net in net_list:
880 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
881 network_id = task["depends"][net["net_id"]].get("vim_id")
882 if not network_id:
883 raise VimThreadException(
884 "Cannot create VM because depends on a network not created or found: " +
885 str(depends[net["net_id"]]["error_msg"]))
886 net["net_id"] = network_id
887 params_copy = deepcopy(params)
888 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
889
890 # fill task_interfaces. Look for snd_net_id at database for each interface
891 task_interfaces = {}
892 for iface in params_copy[5]:
893 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
894 result = self.db.get_rows(
895 SELECT=('sdn_net_id', 'interface_id'),
896 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
897 WHERE={'ii.uuid': iface["uuid"]})
898 if result:
899 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
900 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
901 else:
902 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
903 iface["uuid"]),
904 exc_info=True)
905
906 task["vim_info"] = {}
907 task["extra"]["interfaces"] = task_interfaces
908 task["extra"]["created"] = True
909 task["extra"]["created_items"] = created_items
910 task["extra"]["vim_status"] = "BUILD"
911 task["error_msg"] = None
912 task["status"] = "DONE"
913 task["vim_id"] = vim_vm_id
914 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
915 return instance_element_update
916
917 except (vimconn.vimconnException, VimThreadException) as e:
918 self.logger.error("task={} new-VM: {}".format(task_id, e))
919 error_text = self._format_vim_error_msg(str(e))
920 task["error_msg"] = error_text
921 task["status"] = "FAILED"
922 task["vim_id"] = None
923 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
924 return instance_element_update
925
926 def del_vm(self, task):
927 # task_id = task["instance_action_id"] + "." + str(task["task_index"])
928 vm_vim_id = task["vim_id"]
929 # interfaces = task["extra"].get("interfaces", ())
930 try:
931 # for iface in interfaces.values():
932 # if iface.get("sdn_port_id"):
933 # try:
934 # self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
935 # except ovimException as e:
936 # self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
937 # task_id, iface["sdn_port_id"], e), exc_info=True)
938 # # TODO Set error_msg at instance_nets
939
940 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
941 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
942 task["error_msg"] = None
943 return None
944
945 except vimconn.vimconnException as e:
946 task["error_msg"] = self._format_vim_error_msg(str(e))
947 if isinstance(e, vimconn.vimconnNotFoundException):
948 # If not found mark as Done and fill error_msg
949 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
950 return None
951 task["status"] = "FAILED"
952 return None
953
954 def _get_net_internal(self, task, filter_param):
955 """
956 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
957 :param task: task for this find or find-or-create action
958 :param filter_param: parameters to send to the vimconnector
959 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
960 when network is not found or found more than one
961 """
962 vim_nets = self.vim.get_network_list(filter_param)
963 if not vim_nets:
964 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
965 elif len(vim_nets) > 1:
966 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
967 vim_net_id = vim_nets[0]["id"]
968
969 # Discover if this network is managed by a sdn controller
970 sdn_net_id = None
971 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
972 WHERE={'vim_net_id': vim_net_id, 'datacenter_tenant_id': self.datacenter_tenant_id},
973 ORDER="instance_scenario_id")
974 if result:
975 sdn_net_id = result[0]['sdn_net_id']
976
977 task["status"] = "DONE"
978 task["extra"]["vim_info"] = {}
979 task["extra"]["created"] = False
980 task["extra"]["vim_status"] = "BUILD"
981 task["extra"]["sdn_net_id"] = sdn_net_id
982 task["error_msg"] = None
983 task["vim_id"] = vim_net_id
984 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
985 "error_msg": None, "sdn_net_id": sdn_net_id}
986 return instance_element_update
987
988 def get_net(self, task):
989 task_id = task["instance_action_id"] + "." + str(task["task_index"])
990 try:
991 params = task["params"]
992 filter_param = params[0]
993 instance_element_update = self._get_net_internal(task, filter_param)
994 return instance_element_update
995
996 except (vimconn.vimconnException, VimThreadException) as e:
997 self.logger.error("task={} get-net: {}".format(task_id, e))
998 task["status"] = "FAILED"
999 task["vim_id"] = None
1000 task["error_msg"] = self._format_vim_error_msg(str(e))
1001 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
1002 "error_msg": task["error_msg"]}
1003 return instance_element_update
1004
1005 def new_net(self, task):
1006 vim_net_id = None
1007 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1008 action_text = ""
1009 try:
1010 # FIND
1011 if task["extra"].get("find"):
1012 action_text = "finding"
1013 filter_param = task["extra"]["find"][0]
1014 try:
1015 instance_element_update = self._get_net_internal(task, filter_param)
1016 return instance_element_update
1017 except VimThreadExceptionNotFound:
1018 pass
1019 # CREATE
1020 params = task["params"]
1021 action_text = "creating VIM"
1022
1023 vim_net_id, created_items = self.vim.new_network(*params[0:5])
1024
1025 # net_name = params[0]
1026 # net_type = params[1]
1027 # wim_account_name = None
1028 # if len(params) >= 6:
1029 # wim_account_name = params[5]
1030
1031 # TODO fix at nfvo adding external port
1032 # if wim_account_name and self.vim.config["wim_external_ports"]:
1033 # # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
1034 # action_text = "attaching external port to ovim network"
1035 # sdn_port_name = "external_port"
1036 # sdn_port_data = {
1037 # "compute_node": "__WIM:" + wim_account_name[0:58],
1038 # "pci": None,
1039 # "vlan": network["vlan"],
1040 # "net_id": sdn_net_id,
1041 # "region": self.vim["config"]["datacenter_id"],
1042 # "name": sdn_port_name,
1043 # }
1044 # try:
1045 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1046 # except ovimException:
1047 # sdn_port_data["compute_node"] = "__WIM"
1048 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1049 # self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
1050 # sdn_net_id))
1051 task["status"] = "DONE"
1052 task["extra"]["vim_info"] = {}
1053 # task["extra"]["sdn_net_id"] = sdn_net_id
1054 task["extra"]["vim_status"] = "BUILD"
1055 task["extra"]["created"] = True
1056 task["extra"]["created_items"] = created_items
1057 task["error_msg"] = None
1058 task["vim_id"] = vim_net_id
1059 instance_element_update = {"vim_net_id": vim_net_id, "status": "BUILD",
1060 "created": True, "error_msg": None}
1061 return instance_element_update
1062 except vimconn.vimconnException as e:
1063 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1064 task["status"] = "FAILED"
1065 task["vim_id"] = vim_net_id
1066 task["error_msg"] = self._format_vim_error_msg(str(e))
1067 # task["extra"]["sdn_net_id"] = sdn_net_id
1068 instance_element_update = {"vim_net_id": vim_net_id, "status": "VIM_ERROR",
1069 "error_msg": task["error_msg"]}
1070 return instance_element_update
1071
1072 def del_net(self, task):
1073 net_vim_id = task["vim_id"]
1074 # sdn_net_id = task["extra"].get("sdn_net_id")
1075 try:
1076 if net_vim_id:
1077 self.vim.delete_network(net_vim_id, task["extra"].get("created_items"))
1078 # if sdn_net_id:
1079 # # Delete any attached port to this sdn network. There can be ports associated to this network in case
1080 # # it was manually done using 'openmano vim-net-sdn-attach'
1081 # port_list = self.ovim.get_ports(columns={'uuid'},
1082 # filter={'name': 'external_port', 'net_id': sdn_net_id})
1083 # for port in port_list:
1084 # self.ovim.delete_port(port['uuid'], idempotent=True)
1085 # self.ovim.delete_network(sdn_net_id, idempotent=True)
1086 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1087 task["error_msg"] = None
1088 return None
1089 except vimconn.vimconnException as e:
1090 task["error_msg"] = self._format_vim_error_msg(str(e))
1091 if isinstance(e, vimconn.vimconnNotFoundException):
1092 # If not found mark as Done and fill error_msg
1093 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1094 return None
1095 task["status"] = "FAILED"
1096 return None
1097
1098 def new_or_update_sdn_net(self, task):
1099 wimconn_net_id = task["vim_id"]
1100 created_items = task["extra"].get("created_items")
1101 connected_ports = task["extra"].get("connected_ports", [])
1102 new_connected_ports = []
1103 last_update = task["extra"].get("last_update", 0)
1104 sdn_status = "BUILD"
1105 sdn_info = None
1106
1107 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1108 error_list = []
1109 try:
1110 # FIND
1111 if task["extra"].get("find"):
1112 wimconn_id = task["extra"]["find"][0]
1113 try:
1114 instance_element_update = self.sdnconnector.get_connectivity_service_status(wimconn_id)
1115 wimconn_net_id = wimconn_id
1116 instance_element_update = {"wim_internal_id": wimconn_net_id, "created": False, "status": "BUILD",
1117 "error_msg": None, }
1118 return instance_element_update
1119 except Exception as e:
1120 if isinstance(e, SdnConnectorError) and e.http_error == HTTPStatus.NOT_FOUND.value:
1121 pass
1122 else:
1123 self._proccess_sdn_exception(e)
1124
1125 params = task["params"]
1126 # CREATE
1127 # look for ports
1128 sdn_ports = []
1129 pending_ports = 0
1130
1131 ports = self.db.get_rows(FROM='instance_interfaces', WHERE={'instance_wim_net_id': task["item_id"]})
1132 sdn_need_update = False
1133 for port in ports:
1134 # TODO. Do not connect if already done
1135 if port.get("compute_node") and port.get("pci"):
1136 for map in self.port_mappings:
1137 if map.get("device_id") == port["compute_node"] and \
1138 map.get("device_interface_id") == port["pci"]:
1139 break
1140 else:
1141 if self.sdnconn_config.get("mapping_not_needed"):
1142 map = {
1143 "service_endpoint_id": "{}:{}".format(port["compute_node"], port["pci"]),
1144 "service_endpoint_encapsulation_info": {
1145 "vlan": port["vlan"],
1146 "mac": port["mac_address"],
1147 "device_id": port["compute_node"],
1148 "device_interface_id": port["pci"]
1149 }
1150 }
1151 else:
1152 map = None
1153 error_list.append("Port mapping not found for compute_node={} pci={}".format(
1154 port["compute_node"], port["pci"]))
1155
1156 if map:
1157 if port["uuid"] not in connected_ports or port["modified_at"] > last_update:
1158 sdn_need_update = True
1159 new_connected_ports.append(port["uuid"])
1160 sdn_ports.append({
1161 "service_endpoint_id": map["service_endpoint_id"],
1162 "service_endpoint_encapsulation_type": "dot1q" if port["model"] == "SR-IOV" else None,
1163 "service_endpoint_encapsulation_info": {
1164 "vlan": port["vlan"],
1165 "mac": port["mac_address"],
1166 "device_id": map.get("device_id"),
1167 "device_interface_id": map.get("device_interface_id"),
1168 "switch_dpid": map.get("switch_dpid"),
1169 "switch_port": map.get("switch_port"),
1170 "service_mapping_info": map.get("service_mapping_info"),
1171 }
1172 })
1173
1174 else:
1175 pending_ports += 1
1176 if pending_ports:
1177 error_list.append("Waiting for getting interfaces location from VIM. Obtained '{}' of {}"
1178 .format(len(ports)-pending_ports, len(ports)))
1179 # if there are more ports to connect or they have been modified, call create/update
1180 if sdn_need_update and len(sdn_ports) >= 2:
1181 if not wimconn_net_id:
1182 if params[0] == "data":
1183 net_type = "ELAN"
1184 elif params[0] == "ptp":
1185 net_type = "ELINE"
1186 else:
1187 net_type = "L3"
1188
1189 wimconn_net_id, created_items = self.sdnconnector.create_connectivity_service(net_type, sdn_ports)
1190 else:
1191 created_items = self.sdnconnector.edit_connectivity_service(wimconn_net_id, conn_info=created_items,
1192 connection_points=sdn_ports)
1193 last_update = time.time()
1194 connected_ports = new_connected_ports
1195 elif wimconn_net_id:
1196 try:
1197 wim_status_dict = self.sdnconnector.get_connectivity_service_status(wimconn_net_id,
1198 conn_info=created_items)
1199 sdn_status = wim_status_dict["sdn_status"]
1200 if wim_status_dict.get("error_msg"):
1201 error_list.append(wim_status_dict.get("error_msg"))
1202 if wim_status_dict.get("sdn_info"):
1203 sdn_info = str(wim_status_dict.get("sdn_info"))
1204 except Exception as e:
1205 self._proccess_sdn_exception(e)
1206
1207 task["status"] = "DONE"
1208 task["extra"]["vim_info"] = {}
1209 # task["extra"]["sdn_net_id"] = sdn_net_id
1210 task["extra"]["vim_status"] = sdn_status
1211 task["extra"]["created"] = True
1212 task["extra"]["created_items"] = created_items
1213 task["extra"]["connected_ports"] = connected_ports
1214 task["extra"]["last_update"] = last_update
1215 task["error_msg"] = self._format_vim_error_msg(" ; ".join(error_list))
1216 task["vim_id"] = wimconn_net_id
1217 instance_element_update = {"wim_internal_id": wimconn_net_id, "status": sdn_status,
1218 "created": True, "error_msg": task["error_msg"] or None}
1219 except (vimconn.vimconnException, SdnConnectorError) as e:
1220 self.logger.error("task={} new-sdn-net: Error: {}".format(task_id, e))
1221 task["status"] = "FAILED"
1222 task["vim_id"] = wimconn_net_id
1223 task["error_msg"] = self._format_vim_error_msg(str(e))
1224 # task["extra"]["sdn_net_id"] = sdn_net_id
1225 instance_element_update = {"wim_internal_id": wimconn_net_id, "status": "WIM_ERROR",
1226 "error_msg": task["error_msg"]}
1227 if sdn_info:
1228 instance_element_update["wim_info"] = sdn_info
1229 return instance_element_update
1230
1231 def del_sdn_net(self, task):
1232 wimconn_net_id = task["vim_id"]
1233 try:
1234 try:
1235 if wimconn_net_id:
1236 self.sdnconnector.delete_connectivity_service(wimconn_net_id, task["extra"].get("created_items"))
1237 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1238 task["error_msg"] = None
1239 return None
1240 except Exception as e:
1241 self._proccess_sdn_exception(e)
1242 except SdnConnectorError as e:
1243 task["error_msg"] = self._format_vim_error_msg(str(e))
1244 if e.http_code == HTTPStatus.NOT_FOUND.value:
1245 # If not found mark as Done and fill error_msg
1246 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1247 task["error_msg"] = None
1248 return None
1249 task["status"] = "FAILED"
1250 return None
1251
1252 # Service Function Instances
1253 def new_sfi(self, task):
1254 vim_sfi_id = None
1255 try:
1256 # Waits for interfaces to be ready (avoids failure)
1257 time.sleep(1)
1258 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1259 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1260 error_text = ""
1261 interfaces = task["depends"][dep_id]["extra"].get("interfaces")
1262
1263 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1264 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1265 ingress_vim_interface_id = None
1266 egress_vim_interface_id = None
1267 for vim_interface, interface_data in interfaces.items():
1268 if interface_data.get("interface_id") == ingress_interface_id:
1269 ingress_vim_interface_id = vim_interface
1270 break
1271 if ingress_interface_id != egress_interface_id:
1272 for vim_interface, interface_data in interfaces.items():
1273 if interface_data.get("interface_id") == egress_interface_id:
1274 egress_vim_interface_id = vim_interface
1275 break
1276 else:
1277 egress_vim_interface_id = ingress_vim_interface_id
1278 if not ingress_vim_interface_id or not egress_vim_interface_id:
1279 error_text = "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1280 ingress_vim_interface_id, egress_vim_interface_id)
1281 self.logger.error(error_text)
1282 task["error_msg"] = error_text
1283 task["status"] = "FAILED"
1284 task["vim_id"] = None
1285 return None
1286 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1287 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1288 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1289 ingress_port_id_list = [ingress_vim_interface_id]
1290 egress_port_id_list = [egress_vim_interface_id]
1291 name = "sfi-{}".format(task["item_id"][:8])
1292 # By default no form of IETF SFC Encapsulation will be used
1293 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1294
1295 task["extra"]["created"] = True
1296 task["extra"]["vim_status"] = "ACTIVE"
1297 task["error_msg"] = None
1298 task["status"] = "DONE"
1299 task["vim_id"] = vim_sfi_id
1300 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1301 return instance_element_update
1302
1303 except (vimconn.vimconnException, VimThreadException) as e:
1304 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1305 error_text = self._format_vim_error_msg(str(e))
1306 task["error_msg"] = error_text
1307 task["status"] = "FAILED"
1308 task["vim_id"] = None
1309 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1310 return instance_element_update
1311
1312 def del_sfi(self, task):
1313 sfi_vim_id = task["vim_id"]
1314 try:
1315 self.vim.delete_sfi(sfi_vim_id)
1316 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1317 task["error_msg"] = None
1318 return None
1319
1320 except vimconn.vimconnException as e:
1321 task["error_msg"] = self._format_vim_error_msg(str(e))
1322 if isinstance(e, vimconn.vimconnNotFoundException):
1323 # If not found mark as Done and fill error_msg
1324 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1325 return None
1326 task["status"] = "FAILED"
1327 return None
1328
1329 def new_sf(self, task):
1330 vim_sf_id = None
1331 try:
1332 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1333 error_text = ""
1334 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1335 # sfis = next(iter(task.get("depends").values())).get("extra").get("params")[5]
1336 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1337 sfi_id_list = []
1338 for sfi in sfis:
1339 sfi_id_list.append(sfi.get("vim_id"))
1340 name = "sf-{}".format(task["item_id"][:8])
1341 # By default no form of IETF SFC Encapsulation will be used
1342 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1343
1344 task["extra"]["created"] = True
1345 task["extra"]["vim_status"] = "ACTIVE"
1346 task["error_msg"] = None
1347 task["status"] = "DONE"
1348 task["vim_id"] = vim_sf_id
1349 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1350 return instance_element_update
1351
1352 except (vimconn.vimconnException, VimThreadException) as e:
1353 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1354 error_text = self._format_vim_error_msg(str(e))
1355 task["error_msg"] = error_text
1356 task["status"] = "FAILED"
1357 task["vim_id"] = None
1358 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1359 return instance_element_update
1360
1361 def del_sf(self, task):
1362 sf_vim_id = task["vim_id"]
1363 try:
1364 self.vim.delete_sf(sf_vim_id)
1365 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1366 task["error_msg"] = None
1367 return None
1368
1369 except vimconn.vimconnException as e:
1370 task["error_msg"] = self._format_vim_error_msg(str(e))
1371 if isinstance(e, vimconn.vimconnNotFoundException):
1372 # If not found mark as Done and fill error_msg
1373 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1374 return None
1375 task["status"] = "FAILED"
1376 return None
1377
1378 def new_classification(self, task):
1379 vim_classification_id = None
1380 try:
1381 params = task["params"]
1382 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1383 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1384 error_text = ""
1385 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces")
1386 # Bear in mind that different VIM connectors might support Classifications differently.
1387 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1388 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1389 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1390 # using the IPv4 flow classifier.
1391 logical_source_port_vim_id = None
1392 logical_source_port_id = params.get("logical_source_port")
1393 for vim_interface, interface_data in interfaces.items():
1394 if interface_data.get("interface_id") == logical_source_port_id:
1395 logical_source_port_vim_id = vim_interface
1396 break
1397 if not logical_source_port_vim_id:
1398 error_text = "Error creating Flow Classifier, Logical Source Port id {}".format(
1399 logical_source_port_id)
1400 self.logger.error(error_text)
1401 task["error_msg"] = error_text
1402 task["status"] = "FAILED"
1403 task["vim_id"] = None
1404 return None
1405
1406 name = "c-{}".format(task["item_id"][:8])
1407 # if not CIDR is given for the IP addresses, add /32:
1408 ip_proto = int(params.get("ip_proto"))
1409 source_ip = params.get("source_ip")
1410 destination_ip = params.get("destination_ip")
1411 source_port = params.get("source_port")
1412 destination_port = params.get("destination_port")
1413 definition = {"logical_source_port": logical_source_port_vim_id}
1414 if ip_proto:
1415 if ip_proto == 1:
1416 ip_proto = 'icmp'
1417 elif ip_proto == 6:
1418 ip_proto = 'tcp'
1419 elif ip_proto == 17:
1420 ip_proto = 'udp'
1421 definition["protocol"] = ip_proto
1422 if source_ip:
1423 if '/' not in source_ip:
1424 source_ip += '/32'
1425 definition["source_ip_prefix"] = source_ip
1426 if source_port:
1427 definition["source_port_range_min"] = source_port
1428 definition["source_port_range_max"] = source_port
1429 if destination_port:
1430 definition["destination_port_range_min"] = destination_port
1431 definition["destination_port_range_max"] = destination_port
1432 if destination_ip:
1433 if '/' not in destination_ip:
1434 destination_ip += '/32'
1435 definition["destination_ip_prefix"] = destination_ip
1436
1437 vim_classification_id = self.vim.new_classification(
1438 name, 'legacy_flow_classifier', definition)
1439
1440 task["extra"]["created"] = True
1441 task["extra"]["vim_status"] = "ACTIVE"
1442 task["error_msg"] = None
1443 task["status"] = "DONE"
1444 task["vim_id"] = vim_classification_id
1445 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id,
1446 "error_msg": None}
1447 return instance_element_update
1448
1449 except (vimconn.vimconnException, VimThreadException) as e:
1450 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1451 error_text = self._format_vim_error_msg(str(e))
1452 task["error_msg"] = error_text
1453 task["status"] = "FAILED"
1454 task["vim_id"] = None
1455 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1456 return instance_element_update
1457
1458 def del_classification(self, task):
1459 classification_vim_id = task["vim_id"]
1460 try:
1461 self.vim.delete_classification(classification_vim_id)
1462 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1463 task["error_msg"] = None
1464 return None
1465
1466 except vimconn.vimconnException as e:
1467 task["error_msg"] = self._format_vim_error_msg(str(e))
1468 if isinstance(e, vimconn.vimconnNotFoundException):
1469 # If not found mark as Done and fill error_msg
1470 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1471 return None
1472 task["status"] = "FAILED"
1473 return None
1474
1475 def new_sfp(self, task):
1476 vim_sfp_id = None
1477 try:
1478 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1479 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in
1480 task.get("extra").get("depends_on")]
1481 error_text = ""
1482 sf_id_list = []
1483 classification_id_list = []
1484 for dep in depending_tasks:
1485 vim_id = dep.get("vim_id")
1486 resource = dep.get("item")
1487 if resource == "instance_sfs":
1488 sf_id_list.append(vim_id)
1489 elif resource == "instance_classifications":
1490 classification_id_list.append(vim_id)
1491
1492 name = "sfp-{}".format(task["item_id"][:8])
1493 # By default no form of IETF SFC Encapsulation will be used
1494 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1495
1496 task["extra"]["created"] = True
1497 task["extra"]["vim_status"] = "ACTIVE"
1498 task["error_msg"] = None
1499 task["status"] = "DONE"
1500 task["vim_id"] = vim_sfp_id
1501 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1502 return instance_element_update
1503
1504 except (vimconn.vimconnException, VimThreadException) as e:
1505 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1506 error_text = self._format_vim_error_msg(str(e))
1507 task["error_msg"] = error_text
1508 task["status"] = "FAILED"
1509 task["vim_id"] = None
1510 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1511 return instance_element_update
1512
1513 def del_sfp(self, task):
1514 sfp_vim_id = task["vim_id"]
1515 try:
1516 self.vim.delete_sfp(sfp_vim_id)
1517 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1518 task["error_msg"] = None
1519 return None
1520
1521 except vimconn.vimconnException as e:
1522 task["error_msg"] = self._format_vim_error_msg(str(e))
1523 if isinstance(e, vimconn.vimconnNotFoundException):
1524 # If not found mark as Done and fill error_msg
1525 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1526 return None
1527 task["status"] = "FAILED"
1528 return None
1529
1530 def _refres_sfps(self, task):
1531 """Call VIM to get SFPs status"""
1532 database_update = None
1533
1534 vim_id = task["vim_id"]
1535 sfp_to_refresh_list = [vim_id]
1536 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1537 try:
1538 vim_dict = self.vim.refresh_sfps_status(sfp_to_refresh_list)
1539 vim_info = vim_dict[vim_id]
1540 except vimconn.vimconnException as e:
1541 # Mark all tasks at VIM_ERROR status
1542 self.logger.error("task={} get-sfp: vimconnException when trying to refresh sfps {}".format(task_id, e))
1543 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1544
1545 self.logger.debug("task={} get-sfp: vim_sfp_id={} result={}".format(task_id, task["vim_id"], vim_info))
1546 #TODO: Revise this part
1547 vim_info_error_msg = None
1548 if vim_info.get("error_msg"):
1549 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1550 task_vim_info = task["extra"].get("vim_info")
1551 task_error_msg = task.get("error_msg")
1552 task_vim_status = task["extra"].get("vim_status")
1553 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1554 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1555 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1556 if vim_info.get("vim_info"):
1557 database_update["vim_info"] = vim_info["vim_info"]
1558
1559 task["extra"]["vim_status"] = vim_info["status"]
1560 task["error_msg"] = vim_info_error_msg
1561 if vim_info.get("vim_info"):
1562 task["extra"]["vim_info"] = vim_info["vim_info"]
1563
1564 return database_update
1565
1566 def _refres_sfis(self, task):
1567 """Call VIM to get sfis status"""
1568 database_update = None
1569
1570 vim_id = task["vim_id"]
1571 sfi_to_refresh_list = [vim_id]
1572 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1573 try:
1574 vim_dict = self.vim.refresh_sfis_status(sfi_to_refresh_list)
1575 vim_info = vim_dict[vim_id]
1576 except vimconn.vimconnException as e:
1577 # Mark all tasks at VIM_ERROR status
1578 self.logger.error("task={} get-sfi: vimconnException when trying to refresh sfis {}".format(task_id, e))
1579 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1580
1581 self.logger.debug("task={} get-sfi: vim_sfi_id={} result={}".format(task_id, task["vim_id"], vim_info))
1582 #TODO: Revise this part
1583 vim_info_error_msg = None
1584 if vim_info.get("error_msg"):
1585 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1586 task_vim_info = task["extra"].get("vim_info")
1587 task_error_msg = task.get("error_msg")
1588 task_vim_status = task["extra"].get("vim_status")
1589 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1590 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1591 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1592 if vim_info.get("vim_info"):
1593 database_update["vim_info"] = vim_info["vim_info"]
1594
1595 task["extra"]["vim_status"] = vim_info["status"]
1596 task["error_msg"] = vim_info_error_msg
1597 if vim_info.get("vim_info"):
1598 task["extra"]["vim_info"] = vim_info["vim_info"]
1599
1600 return database_update
1601
1602 def _refres_sfs(self, task):
1603 """Call VIM to get sfs status"""
1604 database_update = None
1605
1606 vim_id = task["vim_id"]
1607 sf_to_refresh_list = [vim_id]
1608 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1609 try:
1610 vim_dict = self.vim.refresh_sfs_status(sf_to_refresh_list)
1611 vim_info = vim_dict[vim_id]
1612 except vimconn.vimconnException as e:
1613 # Mark all tasks at VIM_ERROR status
1614 self.logger.error("task={} get-sf: vimconnException when trying to refresh sfs {}".format(task_id, e))
1615 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1616
1617 self.logger.debug("task={} get-sf: vim_sf_id={} result={}".format(task_id, task["vim_id"], vim_info))
1618 #TODO: Revise this part
1619 vim_info_error_msg = None
1620 if vim_info.get("error_msg"):
1621 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1622 task_vim_info = task["extra"].get("vim_info")
1623 task_error_msg = task.get("error_msg")
1624 task_vim_status = task["extra"].get("vim_status")
1625 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1626 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1627 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1628 if vim_info.get("vim_info"):
1629 database_update["vim_info"] = vim_info["vim_info"]
1630
1631 task["extra"]["vim_status"] = vim_info["status"]
1632 task["error_msg"] = vim_info_error_msg
1633 if vim_info.get("vim_info"):
1634 task["extra"]["vim_info"] = vim_info["vim_info"]
1635
1636 return database_update
1637
1638 def _refres_classifications(self, task):
1639 """Call VIM to get classifications status"""
1640 database_update = None
1641
1642 vim_id = task["vim_id"]
1643 classification_to_refresh_list = [vim_id]
1644 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1645 try:
1646 vim_dict = self.vim.refresh_classifications_status(classification_to_refresh_list)
1647 vim_info = vim_dict[vim_id]
1648 except vimconn.vimconnException as e:
1649 # Mark all tasks at VIM_ERROR status
1650 self.logger.error("task={} get-classification: vimconnException when trying to refresh classifications {}"
1651 .format(task_id, e))
1652 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1653
1654 self.logger.debug("task={} get-classification: vim_classification_id={} result={}".format(task_id,
1655 task["vim_id"], vim_info))
1656 #TODO: Revise this part
1657 vim_info_error_msg = None
1658 if vim_info.get("error_msg"):
1659 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1660 task_vim_info = task["extra"].get("vim_info")
1661 task_error_msg = task.get("error_msg")
1662 task_vim_status = task["extra"].get("vim_status")
1663 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1664 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1665 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1666 if vim_info.get("vim_info"):
1667 database_update["vim_info"] = vim_info["vim_info"]
1668
1669 task["extra"]["vim_status"] = vim_info["status"]
1670 task["error_msg"] = vim_info_error_msg
1671 if vim_info.get("vim_info"):
1672 task["extra"]["vim_info"] = vim_info["vim_info"]
1673
1674 return database_update