d0b8dc91860095cb83267d9bfab1467d69323e0a
[osm/RO.git] / RO / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
31
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
41 BUILD: not used
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
51 the FIND params
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
53 on a net creation
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
58 sdn_port_id:
59 sdn_net_id:
60 vim_info
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
65 vim_wim_actions
66 M depends: dict with task_index(from depends_on) to dependency task
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
71 be done
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
76
77 """
78
79 import threading
80 import time
81 import queue
82 import logging
83 from osm_ro import vimconn
84 from osm_ro.wim.sdnconn import SdnConnectorError
85 import yaml
86 from osm_ro.db_base import db_base_Exception
87 from http import HTTPStatus
88 from copy import deepcopy
89
90 __author__ = "Alfonso Tierno, Pablo Montes"
91 __date__ = "$28-Sep-2017 12:07:15$"
92
93
94 def is_task_id(task_id):
95 return task_id.startswith("TASK-")
96
97
98 class VimThreadException(Exception):
99 pass
100
101
102 class VimThreadExceptionNotFound(VimThreadException):
103 pass
104
105
106 class vim_thread(threading.Thread):
107 REFRESH_BUILD = 5 # 5 seconds
108 REFRESH_ACTIVE = 60 # 1 minute
109 REFRESH_ERROR = 600
110 REFRESH_DELETE = 3600 * 10
111
112 def __init__(self, task_lock, plugins, name=None, wim_account_id=None, datacenter_tenant_id=None, db=None):
113 """Init a thread.
114 Arguments:
115 'id' number of thead
116 'name' name of thread
117 'host','user': host ip or name to manage and user
118 'db', 'db_lock': database class and lock to use it in exclusion
119 """
120 threading.Thread.__init__(self)
121 self.plugins = plugins
122 self.plugin_name = "unknown"
123 self.vim = None
124 self.sdnconnector = None
125 self.sdnconn_config = None
126 self.error_status = None
127 self.wim_account_id = wim_account_id
128 self.datacenter_tenant_id = datacenter_tenant_id
129 self.port_mapping = None
130 if self.wim_account_id:
131 self.target_k = "wim_account_id"
132 self.target_v = self.wim_account_id
133 else:
134 self.target_k = "datacenter_vim_id"
135 self.target_v = self.datacenter_tenant_id
136 if not name:
137 self.name = wim_account_id or str(datacenter_tenant_id)
138 else:
139 self.name = name
140 self.vim_persistent_info = {}
141 self.my_id = self.name[:64]
142
143 self.logger = logging.getLogger('openmano.{}.{}'.format("vim" if self.datacenter_tenant_id else "sdn",
144 self.name))
145 self.db = db
146
147 self.task_lock = task_lock
148 self.task_queue = queue.Queue(2000)
149
150 def _proccess_sdn_exception(self, exc):
151 if isinstance(exc, SdnConnectorError):
152 raise
153 else:
154 self.logger.error("plugin={} throws a non SdnConnectorError exception {}".format(self.plugin_name, exc),
155 exc_info=True)
156 raise SdnConnectorError(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc
157
158 def _proccess_vim_exception(self, exc):
159 if isinstance(exc, vimconn.vimconnException):
160 raise
161 else:
162 self.logger.error("plugin={} throws a non vimconnException exception {}".format(self.plugin_name, exc),
163 exc_info=True)
164 raise vimconn.vimconnException(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc
165
166 def get_vim_sdn_connector(self):
167 if self.datacenter_tenant_id:
168 try:
169 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
170 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
171 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
172 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
173 'user', 'passwd', 'dt.config as dt_config')
174 where_ = {"dt.uuid": self.datacenter_tenant_id}
175 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
176 vim = vims[0]
177 vim_config = {}
178 if vim["config"]:
179 vim_config.update(yaml.load(vim["config"], Loader=yaml.Loader))
180 if vim["dt_config"]:
181 vim_config.update(yaml.load(vim["dt_config"], Loader=yaml.Loader))
182 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
183 vim_config['datacenter_id'] = vim.get('datacenter_id')
184
185 # get port_mapping
186 # vim_port_mappings = self.ovim.get_of_port_mappings(
187 # db_filter={"datacenter_id": vim_config['datacenter_id']})
188 # vim_config["wim_external_ports"] = [x for x in vim_port_mappings
189 # if x["service_mapping_info"].get("wim")]
190 self.plugin_name = "rovim_" + vim["type"]
191 self.vim = self.plugins[self.plugin_name].vimconnector(
192 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
193 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
194 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
195 user=vim['user'], passwd=vim['passwd'],
196 config=vim_config, persistent_info=self.vim_persistent_info
197 )
198 self.error_status = None
199 self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
200 self.datacenter_tenant_id, self.plugin_name))
201 except Exception as e:
202 self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {}".format(
203 self.datacenter_tenant_id, self.plugin_name, e))
204 self.vim = None
205 self.error_status = "Error loading vimconnector: {}".format(e)
206 else:
207 try:
208 wim_account = self.db.get_rows(FROM="wim_accounts", WHERE={"uuid": self.wim_account_id})[0]
209 wim = self.db.get_rows(FROM="wims", WHERE={"uuid": wim_account["wim_id"]})[0]
210 if wim["config"]:
211 self.sdnconn_config = yaml.load(wim["config"], Loader=yaml.Loader)
212 else:
213 self.sdnconn_config = {}
214 if wim_account["config"]:
215 self.sdnconn_config.update(yaml.load(wim_account["config"], Loader=yaml.Loader))
216 self.port_mappings = self.db.get_rows(FROM="wim_port_mappings", WHERE={"wim_id": wim_account["wim_id"]})
217 if self.port_mappings:
218 self.sdnconn_config["service_endpoint_mapping"] = self.port_mappings
219 self.plugin_name = "rosdn_" + wim["type"]
220 self.sdnconnector = self.plugins[self.plugin_name](
221 wim, wim_account, config=self.sdnconn_config)
222 self.error_status = None
223 self.logger.info("Sdn Connector loaded for wim_account={}, plugin={}".format(
224 self.wim_account_id, self.plugin_name))
225 except Exception as e:
226 self.logger.error("Cannot load sdn connector for wim_account={}, plugin={}: {}".format(
227 self.wim_account_id, self.plugin_name, e))
228 self.sdnconnector = None
229 self.error_status = "Error loading sdn connector: {}".format(e)
230
231 def _get_db_task(self):
232 """
233 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
234 :return: None
235 """
236 now = time.time()
237 try:
238 database_limit = 20
239 task_related = None
240 while True:
241 # get 20 (database_limit) entries each time
242 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
243 WHERE={self.target_k: self.target_v,
244 "status": ['SCHEDULED', 'BUILD', 'DONE'],
245 "worker": [None, self.my_id], "modified_at<=": now
246 },
247 ORDER_BY=("modified_at", "created_at",),
248 LIMIT=database_limit)
249 if not vim_actions:
250 return None, None
251 # if vim_actions[0]["modified_at"] > now:
252 # return int(vim_actions[0] - now)
253 for task in vim_actions:
254 # block related task
255 if task_related == task["related"]:
256 continue # ignore if a locking has already tried for these task set
257 task_related = task["related"]
258 # lock ...
259 self.db.update_rows("vim_wim_actions", UPDATE={"worker": self.my_id}, modified_time=0,
260 WHERE={self.target_k: self.target_v,
261 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
262 "worker": [None, self.my_id],
263 "related": task_related,
264 "item": task["item"],
265 })
266 # ... and read all related and check if locked
267 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
268 WHERE={self.target_k: self.target_v,
269 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
270 "related": task_related,
271 "item": task["item"],
272 },
273 ORDER_BY=("created_at",))
274 # check that all related tasks have been locked. If not release and try again. It can happen
275 # for race conditions if a new related task has been inserted by nfvo in the process
276 some_tasks_locked = False
277 some_tasks_not_locked = False
278 creation_task = None
279 for relate_task in related_tasks:
280 if relate_task["worker"] != self.my_id:
281 some_tasks_not_locked = True
282 else:
283 some_tasks_locked = True
284 if not creation_task and relate_task["action"] in ("CREATE", "FIND"):
285 creation_task = relate_task
286 if some_tasks_not_locked:
287 if some_tasks_locked: # unlock
288 self.db.update_rows("vim_wim_actions", UPDATE={"worker": None}, modified_time=0,
289 WHERE={self.target_k: self.target_v,
290 "worker": self.my_id,
291 "related": task_related,
292 "item": task["item"],
293 })
294 continue
295
296 task["params"] = None
297 if task["extra"]:
298 extra = yaml.load(task["extra"], Loader=yaml.Loader)
299 else:
300 extra = {}
301 task["extra"] = extra
302 if extra.get("depends_on"):
303 task["depends"] = {}
304 if extra.get("params"):
305 task["params"] = deepcopy(extra["params"])
306 return task, related_tasks
307 except Exception as e:
308 self.logger.critical("Unexpected exception at _get_db_task: " + str(e), exc_info=True)
309 return None, None
310
311 def _delete_task(self, task):
312 """
313 Determine if this task need to be done or superseded
314 :return: None
315 """
316
317 def copy_extra_created(copy_to, copy_from):
318 copy_to["created"] = copy_from["created"]
319 if copy_from.get("sdn_net_id"):
320 copy_to["sdn_net_id"] = copy_from["sdn_net_id"]
321 if copy_from.get("interfaces"):
322 copy_to["interfaces"] = copy_from["interfaces"]
323 if copy_from.get("created_items"):
324 if not copy_to.get("created_items"):
325 copy_to["created_items"] = {}
326 copy_to["created_items"].update(copy_from["created_items"])
327
328 task_create = None
329 dependency_task = None
330 deletion_needed = task["extra"].get("created", False)
331 if task["status"] == "FAILED":
332 return # TODO need to be retry??
333 try:
334 # get all related tasks. task of creation must be the first in the list of related_task,
335 # unless the deletion fails and it is pendingit fails
336 # TODO this should be removed, passing related_tasks
337 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
338 WHERE={self.target_k: self.target_v,
339 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
340 "action": ["FIND", "CREATE"],
341 "related": task["related"],
342 },
343 ORDER_BY=("created_at",),
344 )
345 for related_task in related_tasks:
346 if related_task["item"] == task["item"] and related_task["item_id"] == task["item_id"]:
347 task_create = related_task
348 # TASK_CREATE
349 if related_task["extra"]:
350 extra_created = yaml.load(related_task["extra"], Loader=yaml.Loader)
351 if extra_created.get("created"):
352 deletion_needed = True
353 related_task["extra"] = extra_created
354 elif not dependency_task:
355 dependency_task = related_task
356 if task_create and dependency_task:
357 break
358
359 # mark task_create as FINISHED
360 if task_create:
361 self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"},
362 WHERE={self.target_k: self.target_v,
363 "instance_action_id": task_create["instance_action_id"],
364 "task_index": task_create["task_index"]
365 })
366 if not deletion_needed:
367 return False
368 elif dependency_task:
369 # move create information from task_create to relate_task
370 extra_new_created = yaml.load(dependency_task["extra"], Loader=yaml.Loader) or {}
371 extra_new_created["created"] = extra_created["created"]
372 copy_extra_created(copy_to=extra_new_created, copy_from=extra_created)
373
374 self.db.update_rows("vim_wim_actions",
375 UPDATE={"extra": yaml.safe_dump(extra_new_created, default_flow_style=True,
376 width=256),
377 "vim_id": task_create.get("vim_id")},
378 WHERE={self.target_k: self.target_v,
379 "instance_action_id": dependency_task["instance_action_id"],
380 "task_index": dependency_task["task_index"]
381 })
382 return False
383 elif task_create:
384 task["vim_id"] = task_create["vim_id"]
385 copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"])
386 # Ensure this task extra information is stored at database
387 self.db.update_rows("vim_wim_actions",
388 UPDATE={"extra": yaml.safe_dump(task["extra"], default_flow_style=True,
389 width=256)},
390 WHERE={self.target_k: self.target_v,
391 "instance_action_id": task["instance_action_id"],
392 "task_index": task["task_index"],
393 })
394 return True
395 return deletion_needed
396
397 except Exception as e:
398 self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True)
399
400 def _refres_vm(self, task):
401 """Call VIM to get VMs status"""
402 database_update = None
403
404 vim_id = task["vim_id"]
405 vm_to_refresh_list = [vim_id]
406 try:
407 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
408 vim_info = vim_dict[vim_id]
409 except vimconn.vimconnException as e:
410 # Mark all tasks at VIM_ERROR status
411 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
412 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
413
414 task_id = task["instance_action_id"] + "." + str(task["task_index"])
415 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
416
417 # check and update interfaces
418 task_warning_msg = ""
419 for interface in vim_info.get("interfaces", ()):
420 vim_interface_id = interface["vim_interface_id"]
421 if vim_interface_id not in task["extra"]["interfaces"]:
422 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
423 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
424 continue
425 task_interface = task["extra"]["interfaces"][vim_interface_id]
426 task_vim_interface = task_interface.get("vim_info")
427 if task_vim_interface != interface:
428 # delete old port
429 # if task_interface.get("sdn_port_id"):
430 # try:
431 # self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
432 # task_interface["sdn_port_id"] = None
433 # except ovimException as e:
434 # error_text = "ovimException deleting external_port={}: {}".format(
435 # task_interface["sdn_port_id"], e)
436 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
437 # task_warning_msg += error_text
438 # # TODO Set error_msg at instance_nets instead of instance VMs
439
440 # Create SDN port
441 # sdn_net_id = task_interface.get("sdn_net_id")
442 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
443 # sdn_port_name = sdn_net_id + "." + task["vim_id"]
444 # sdn_port_name = sdn_port_name[:63]
445 # try:
446 # sdn_port_id = self.ovim.new_external_port(
447 # {"compute_node": interface["compute_node"],
448 # "pci": interface["pci"],
449 # "vlan": interface.get("vlan"),
450 # "net_id": sdn_net_id,
451 # "region": self.vim["config"]["datacenter_id"],
452 # "name": sdn_port_name,
453 # "mac": interface.get("mac_address")})
454 # task_interface["sdn_port_id"] = sdn_port_id
455 # except (ovimException, Exception) as e:
456 # error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
457 # format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
458 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
459 # task_warning_msg += error_text
460 # # TODO Set error_msg at instance_nets instead of instance VMs
461
462 self.db.update_rows('instance_interfaces',
463 UPDATE={"mac_address": interface.get("mac_address"),
464 "ip_address": interface.get("ip_address"),
465 "vim_interface_id": interface.get("vim_interface_id"),
466 "vim_info": interface.get("vim_info"),
467 "sdn_port_id": task_interface.get("sdn_port_id"),
468 "compute_node": interface.get("compute_node"),
469 "pci": interface.get("pci"),
470 "vlan": interface.get("vlan")},
471 WHERE={'uuid': task_interface["iface_id"]})
472 task_interface["vim_info"] = interface
473 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
474 # # TODO Send message to task SDN to update
475
476 # check and update task and instance_vms database
477 vim_info_error_msg = None
478 if vim_info.get("error_msg"):
479 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
480 elif task_warning_msg:
481 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
482 task_vim_info = task["extra"].get("vim_info")
483 task_error_msg = task.get("error_msg")
484 task_vim_status = task["extra"].get("vim_status")
485 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
486 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
487 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
488 if vim_info.get("vim_info"):
489 database_update["vim_info"] = vim_info["vim_info"]
490
491 task["extra"]["vim_status"] = vim_info["status"]
492 task["error_msg"] = vim_info_error_msg
493 if vim_info.get("vim_info"):
494 task["extra"]["vim_info"] = vim_info["vim_info"]
495
496 return database_update
497
498 def _refres_net(self, task):
499 """Call VIM to get network status"""
500 database_update = None
501
502 vim_id = task["vim_id"]
503 net_to_refresh_list = [vim_id]
504 try:
505 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
506 vim_info = vim_dict[vim_id]
507 except vimconn.vimconnException as e:
508 # Mark all tasks at VIM_ERROR status
509 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
510 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
511
512 task_id = task["instance_action_id"] + "." + str(task["task_index"])
513 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
514
515 task_vim_info = task["extra"].get("vim_info")
516 task_vim_status = task["extra"].get("vim_status")
517 task_error_msg = task.get("error_msg")
518 # task_sdn_net_id = task["extra"].get("sdn_net_id")
519
520 vim_info_status = vim_info["status"]
521 vim_info_error_msg = vim_info.get("error_msg")
522 # get ovim status
523 # if task_sdn_net_id:
524 # try:
525 # sdn_net = self.ovim.show_network(task_sdn_net_id)
526 # except (ovimException, Exception) as e:
527 # text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
528 # self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
529 # sdn_net = {"status": "ERROR", "last_error": text_error}
530 # if sdn_net["status"] == "ERROR":
531 # if not vim_info_error_msg:
532 # vim_info_error_msg = str(sdn_net.get("last_error"))
533 # else:
534 # vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
535 # self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
536 # self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
537 # vim_info_status = "ERROR"
538 # elif sdn_net["status"] == "BUILD":
539 # if vim_info_status == "ACTIVE":
540 # vim_info_status = "BUILD"
541
542 # update database
543 if vim_info_error_msg:
544 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
545 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
546 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
547 task["extra"]["vim_status"] = vim_info_status
548 task["error_msg"] = vim_info_error_msg
549 if vim_info.get("vim_info"):
550 task["extra"]["vim_info"] = vim_info["vim_info"]
551 database_update = {"status": vim_info_status, "error_msg": vim_info_error_msg}
552 if vim_info.get("vim_info"):
553 database_update["vim_info"] = vim_info["vim_info"]
554 return database_update
555
556 def _proccess_pending_tasks(self, task, related_tasks):
557 old_task_status = task["status"]
558 create_or_find = False # if as result of processing this task something is created or found
559 next_refresh = 0
560 task_id = task["instance_action_id"] + "." + str(task["task_index"])
561
562 try:
563 if task["status"] == "SCHEDULED":
564 # check if tasks that this depends on have been completed
565 dependency_not_completed = False
566 dependency_modified_at = 0
567 for task_index in task["extra"].get("depends_on", ()):
568 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
569 if not task_dependency:
570 raise VimThreadException(
571 "Cannot get depending net task trying to get depending task {}.{}".format(
572 task["instance_action_id"], task_index))
573 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
574 # database must be look again
575 if task_dependency["status"] == "SCHEDULED":
576 dependency_not_completed = True
577 dependency_modified_at = task_dependency["modified_at"]
578 break
579 elif task_dependency["status"] == "FAILED":
580 raise VimThreadException(
581 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
582 task["action"], task["item"],
583 task["instance_action_id"], task["task_index"],
584 task_dependency["instance_action_id"], task_dependency["task_index"],
585 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
586
587 task["depends"]["TASK-"+str(task_index)] = task_dependency
588 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], task_index)] = task_dependency
589 if dependency_not_completed:
590 # Move this task to the time dependency is going to be modified plus 10 seconds.
591 self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10,
592 UPDATE={"worker": None},
593 WHERE={self.target_k: self.target_v, "worker": self.my_id,
594 "related": task["related"],
595 })
596 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
597 # if task["extra"]["tries"] > 3:
598 # raise VimThreadException(
599 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
600 # "(task {}.{})".format(task["action"], task["item"],
601 # task["instance_action_id"], task["task_index"],
602 # task_dependency["instance_action_id"], task_dependency["task_index"]
603 # task_dependency["action"], task_dependency["item"]))
604 return
605
606 database_update = None
607 if task["action"] == "DELETE":
608 deleted_needed = self._delete_task(task)
609 if not deleted_needed:
610 task["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
611 task["error_msg"] = None
612
613 if task["status"] == "SUPERSEDED":
614 # not needed to do anything but update database with the new status
615 database_update = None
616 elif not self.vim and not self.sdnconnector:
617 task["status"] = "FAILED"
618 task["error_msg"] = self.error_status
619 database_update = {"status": "VIM_ERROR" if self.datacenter_tenant_id else "WIM_ERROR",
620 "error_msg": task["error_msg"]}
621 elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"):
622 # Do nothing, just copy values from one to another and update database
623 task["status"] = related_tasks[0]["status"]
624 task["error_msg"] = related_tasks[0]["error_msg"]
625 task["vim_id"] = related_tasks[0]["vim_id"]
626 extra = yaml.load(related_tasks[0]["extra"], Loader=yaml.Loader)
627 task["extra"]["vim_status"] = extra.get("vim_status")
628 next_refresh = related_tasks[0]["modified_at"] + 0.001
629 database_update = {"status": task["extra"].get("vim_status", "VIM_ERROR"),
630 "error_msg": task["error_msg"]}
631 if task["item"] == 'instance_vms':
632 database_update["vim_vm_id"] = task["vim_id"]
633 elif task["item"] == 'instance_nets':
634 database_update["vim_net_id"] = task["vim_id"]
635 elif task["item"] == 'instance_vms':
636 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
637 database_update = self._refres_vm(task)
638 create_or_find = True
639 elif task["action"] == "CREATE":
640 create_or_find = True
641 database_update = self.new_vm(task)
642 elif task["action"] == "DELETE":
643 self.del_vm(task)
644 else:
645 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
646 elif task["item"] == 'instance_nets':
647 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
648 database_update = self._refres_net(task)
649 create_or_find = True
650 elif task["action"] == "CREATE":
651 create_or_find = True
652 database_update = self.new_net(task)
653 elif task["action"] == "DELETE":
654 self.del_net(task)
655 elif task["action"] == "FIND":
656 database_update = self.get_net(task)
657 else:
658 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
659 elif task["item"] == 'instance_wim_nets':
660 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
661 database_update = self.new_or_update_sdn_net(task)
662 create_or_find = True
663 elif task["action"] == "CREATE":
664 create_or_find = True
665 database_update = self.new_or_update_sdn_net(task)
666 elif task["action"] == "DELETE":
667 self.del_sdn_net(task)
668 elif task["action"] == "FIND":
669 database_update = self.get_sdn_net(task)
670 else:
671 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
672 elif task["item"] == 'instance_sfis':
673 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
674 database_update = self._refres_sfis(task)
675 create_or_find = True
676 elif task["action"] == "CREATE":
677 create_or_find = True
678 database_update = self.new_sfi(task)
679 elif task["action"] == "DELETE":
680 self.del_sfi(task)
681 else:
682 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
683 elif task["item"] == 'instance_sfs':
684 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
685 database_update = self._refres_sfs(task)
686 create_or_find = True
687 elif task["action"] == "CREATE":
688 create_or_find = True
689 database_update = self.new_sf(task)
690 elif task["action"] == "DELETE":
691 self.del_sf(task)
692 else:
693 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
694 elif task["item"] == 'instance_classifications':
695 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
696 database_update = self._refres_classifications(task)
697 create_or_find = True
698 elif task["action"] == "CREATE":
699 create_or_find = True
700 database_update = self.new_classification(task)
701 elif task["action"] == "DELETE":
702 self.del_classification(task)
703 else:
704 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
705 elif task["item"] == 'instance_sfps':
706 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
707 database_update = self._refres_sfps(task)
708 create_or_find = True
709 elif task["action"] == "CREATE":
710 create_or_find = True
711 database_update = self.new_sfp(task)
712 elif task["action"] == "DELETE":
713 self.del_sfp(task)
714 else:
715 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
716 else:
717 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
718 # TODO
719 except Exception as e:
720 if not isinstance(e, VimThreadException):
721 self.logger.error("Error executing task={}: {}".format(task_id, e), exc_info=True)
722 task["error_msg"] = str(e)
723 task["status"] = "FAILED"
724 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
725 # if task["item"] == 'instance_vms':
726 # database_update["vim_vm_id"] = None
727 # elif task["item"] == 'instance_nets':
728 # database_update["vim_net_id"] = None
729
730 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
731 task_id, task["item"], task["action"], task["status"],
732 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
733 try:
734 if not next_refresh:
735 if task["status"] == "DONE":
736 next_refresh = time.time()
737 if task["extra"].get("vim_status") == "BUILD":
738 next_refresh += self.REFRESH_BUILD
739 elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR", "WIM_ERROR"):
740 next_refresh += self.REFRESH_ERROR
741 elif task["extra"].get("vim_status") == "DELETED":
742 next_refresh += self.REFRESH_DELETE
743 else:
744 next_refresh += self.REFRESH_ACTIVE
745 elif task["status"] == "FAILED":
746 next_refresh = time.time() + self.REFRESH_DELETE
747
748 if create_or_find:
749 # modify all related task with action FIND/CREATED non SCHEDULED
750 self.db.update_rows(
751 table="vim_wim_actions", modified_time=next_refresh + 0.001,
752 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
753 "error_msg": task["error_msg"],
754 },
755
756 WHERE={self.target_k: self.target_v,
757 "worker": self.my_id,
758 "action": ["FIND", "CREATE"],
759 "related": task["related"],
760 "status<>": "SCHEDULED",
761 })
762 # modify own task
763 self.db.update_rows(
764 table="vim_wim_actions", modified_time=next_refresh,
765 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
766 "error_msg": task["error_msg"],
767 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
768 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
769 # Unlock tasks
770 self.db.update_rows(
771 table="vim_wim_actions", modified_time=0,
772 UPDATE={"worker": None},
773 WHERE={self.target_k: self.target_v,
774 "worker": self.my_id,
775 "related": task["related"],
776 })
777
778 # Update table instance_actions
779 if old_task_status == "SCHEDULED" and task["status"] != old_task_status:
780 self.db.update_rows(
781 table="instance_actions",
782 UPDATE={("number_failed" if task["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
783 WHERE={"uuid": task["instance_action_id"]})
784 if database_update:
785 where_filter = {"related": task["related"]}
786 if task["item"] == "instance_nets" and task["datacenter_vim_id"]:
787 where_filter["datacenter_tenant_id"] = task["datacenter_vim_id"]
788 self.db.update_rows(table=task["item"],
789 UPDATE=database_update,
790 WHERE=where_filter)
791 except db_base_Exception as e:
792 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
793
794 def insert_task(self, task):
795 try:
796 self.task_queue.put(task, False)
797 return None
798 except queue.Full:
799 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
800
801 def del_task(self, task):
802 with self.task_lock:
803 if task["status"] == "SCHEDULED":
804 task["status"] = "SUPERSEDED"
805 return True
806 else: # task["status"] == "processing"
807 self.task_lock.release()
808 return False
809
810 def run(self):
811 self.logger.debug("Starting")
812 while True:
813 self.get_vim_sdn_connector()
814 self.logger.debug("Vimconnector loaded")
815 reload_thread = False
816
817 while True:
818 try:
819 while not self.task_queue.empty():
820 task = self.task_queue.get()
821 if isinstance(task, list):
822 pass
823 elif isinstance(task, str):
824 if task == 'exit':
825 return 0
826 elif task == 'reload':
827 reload_thread = True
828 break
829 self.task_queue.task_done()
830 if reload_thread:
831 break
832
833 task, related_tasks = self._get_db_task()
834 if task:
835 self._proccess_pending_tasks(task, related_tasks)
836 else:
837 time.sleep(5)
838
839 except Exception as e:
840 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
841
842 self.logger.debug("Finishing")
843
844 def _look_for_task(self, instance_action_id, task_id):
845 """
846 Look for a concrete task at vim_actions database table
847 :param instance_action_id: The instance_action_id
848 :param task_id: Can have several formats:
849 <task index>: integer
850 TASK-<task index> :backward compatibility,
851 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
852 :return: Task dictionary or None if not found
853 """
854 if isinstance(task_id, int):
855 task_index = task_id
856 else:
857 if task_id.startswith("TASK-"):
858 task_id = task_id[5:]
859 ins_action_id, _, task_index = task_id.rpartition(".")
860 if ins_action_id:
861 instance_action_id = ins_action_id
862
863 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
864 "task_index": task_index})
865 if not tasks:
866 return None
867 task = tasks[0]
868 task["params"] = None
869 task["depends"] = {}
870 if task["extra"]:
871 extra = yaml.load(task["extra"], Loader=yaml.Loader)
872 task["extra"] = extra
873 task["params"] = extra.get("params")
874 else:
875 task["extra"] = {}
876 return task
877
878 @staticmethod
879 def _format_vim_error_msg(error_text, max_length=1024):
880 if error_text and len(error_text) >= max_length:
881 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
882 return error_text
883
884 def new_vm(self, task):
885 task_id = task["instance_action_id"] + "." + str(task["task_index"])
886 try:
887 params = task["params"]
888 depends = task.get("depends")
889 net_list = params[5]
890 for net in net_list:
891 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
892 network_id = task["depends"][net["net_id"]].get("vim_id")
893 if not network_id:
894 raise VimThreadException(
895 "Cannot create VM because depends on a network not created or found: " +
896 str(depends[net["net_id"]]["error_msg"]))
897 net["net_id"] = network_id
898 params_copy = deepcopy(params)
899 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
900
901 # fill task_interfaces. Look for snd_net_id at database for each interface
902 task_interfaces = {}
903 for iface in params_copy[5]:
904 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
905 result = self.db.get_rows(
906 SELECT=('sdn_net_id', 'interface_id'),
907 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
908 WHERE={'ii.uuid': iface["uuid"]})
909 if result:
910 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
911 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
912 else:
913 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
914 iface["uuid"]),
915 exc_info=True)
916
917 task["vim_info"] = {}
918 task["extra"]["interfaces"] = task_interfaces
919 task["extra"]["created"] = True
920 task["extra"]["created_items"] = created_items
921 task["extra"]["vim_status"] = "BUILD"
922 task["error_msg"] = None
923 task["status"] = "DONE"
924 task["vim_id"] = vim_vm_id
925 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
926 return instance_element_update
927
928 except (vimconn.vimconnException, VimThreadException) as e:
929 self.logger.error("task={} new-VM: {}".format(task_id, e))
930 error_text = self._format_vim_error_msg(str(e))
931 task["error_msg"] = error_text
932 task["status"] = "FAILED"
933 task["vim_id"] = None
934 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
935 return instance_element_update
936
937 def del_vm(self, task):
938 # task_id = task["instance_action_id"] + "." + str(task["task_index"])
939 vm_vim_id = task["vim_id"]
940 # interfaces = task["extra"].get("interfaces", ())
941 try:
942 # for iface in interfaces.values():
943 # if iface.get("sdn_port_id"):
944 # try:
945 # self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
946 # except ovimException as e:
947 # self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
948 # task_id, iface["sdn_port_id"], e), exc_info=True)
949 # # TODO Set error_msg at instance_nets
950
951 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
952 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
953 task["error_msg"] = None
954 return None
955
956 except vimconn.vimconnException as e:
957 task["error_msg"] = self._format_vim_error_msg(str(e))
958 if isinstance(e, vimconn.vimconnNotFoundException):
959 # If not found mark as Done and fill error_msg
960 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
961 return None
962 task["status"] = "FAILED"
963 return None
964
965 def _get_net_internal(self, task, filter_param):
966 """
967 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
968 :param task: task for this find or find-or-create action
969 :param filter_param: parameters to send to the vimconnector
970 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
971 when network is not found or found more than one
972 """
973 vim_nets = self.vim.get_network_list(filter_param)
974 if not vim_nets:
975 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
976 elif len(vim_nets) > 1:
977 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
978 vim_net_id = vim_nets[0]["id"]
979
980 # Discover if this network is managed by a sdn controller
981 sdn_net_id = None
982 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
983 WHERE={'vim_net_id': vim_net_id, 'datacenter_tenant_id': self.datacenter_tenant_id},
984 ORDER="instance_scenario_id")
985 if result:
986 sdn_net_id = result[0]['sdn_net_id']
987
988 task["status"] = "DONE"
989 task["extra"]["vim_info"] = {}
990 task["extra"]["created"] = False
991 task["extra"]["vim_status"] = "BUILD"
992 task["extra"]["sdn_net_id"] = sdn_net_id
993 task["error_msg"] = None
994 task["vim_id"] = vim_net_id
995 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
996 "error_msg": None, "sdn_net_id": sdn_net_id}
997 return instance_element_update
998
999 def get_net(self, task):
1000 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1001 try:
1002 params = task["params"]
1003 filter_param = params[0]
1004 instance_element_update = self._get_net_internal(task, filter_param)
1005 return instance_element_update
1006
1007 except (vimconn.vimconnException, VimThreadException) as e:
1008 self.logger.error("task={} get-net: {}".format(task_id, e))
1009 task["status"] = "FAILED"
1010 task["vim_id"] = None
1011 task["error_msg"] = self._format_vim_error_msg(str(e))
1012 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
1013 "error_msg": task["error_msg"]}
1014 return instance_element_update
1015
1016 def new_net(self, task):
1017 vim_net_id = None
1018 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1019 action_text = ""
1020 try:
1021 # FIND
1022 if task["extra"].get("find"):
1023 action_text = "finding"
1024 filter_param = task["extra"]["find"][0]
1025 try:
1026 instance_element_update = self._get_net_internal(task, filter_param)
1027 return instance_element_update
1028 except VimThreadExceptionNotFound:
1029 pass
1030 # CREATE
1031 params = task["params"]
1032 action_text = "creating VIM"
1033
1034 vim_net_id, created_items = self.vim.new_network(*params[0:5])
1035
1036 # net_name = params[0]
1037 # net_type = params[1]
1038 # wim_account_name = None
1039 # if len(params) >= 6:
1040 # wim_account_name = params[5]
1041
1042 # TODO fix at nfvo adding external port
1043 # if wim_account_name and self.vim.config["wim_external_ports"]:
1044 # # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
1045 # action_text = "attaching external port to ovim network"
1046 # sdn_port_name = "external_port"
1047 # sdn_port_data = {
1048 # "compute_node": "__WIM:" + wim_account_name[0:58],
1049 # "pci": None,
1050 # "vlan": network["vlan"],
1051 # "net_id": sdn_net_id,
1052 # "region": self.vim["config"]["datacenter_id"],
1053 # "name": sdn_port_name,
1054 # }
1055 # try:
1056 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1057 # except ovimException:
1058 # sdn_port_data["compute_node"] = "__WIM"
1059 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1060 # self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
1061 # sdn_net_id))
1062 task["status"] = "DONE"
1063 task["extra"]["vim_info"] = {}
1064 # task["extra"]["sdn_net_id"] = sdn_net_id
1065 task["extra"]["vim_status"] = "BUILD"
1066 task["extra"]["created"] = True
1067 task["extra"]["created_items"] = created_items
1068 task["error_msg"] = None
1069 task["vim_id"] = vim_net_id
1070 instance_element_update = {"vim_net_id": vim_net_id, "status": "BUILD",
1071 "created": True, "error_msg": None}
1072 return instance_element_update
1073 except vimconn.vimconnException as e:
1074 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1075 task["status"] = "FAILED"
1076 task["vim_id"] = vim_net_id
1077 task["error_msg"] = self._format_vim_error_msg(str(e))
1078 # task["extra"]["sdn_net_id"] = sdn_net_id
1079 instance_element_update = {"vim_net_id": vim_net_id, "status": "VIM_ERROR",
1080 "error_msg": task["error_msg"]}
1081 return instance_element_update
1082
1083 def del_net(self, task):
1084 net_vim_id = task["vim_id"]
1085 # sdn_net_id = task["extra"].get("sdn_net_id")
1086 try:
1087 if net_vim_id:
1088 self.vim.delete_network(net_vim_id, task["extra"].get("created_items"))
1089 # if sdn_net_id:
1090 # # Delete any attached port to this sdn network. There can be ports associated to this network in case
1091 # # it was manually done using 'openmano vim-net-sdn-attach'
1092 # port_list = self.ovim.get_ports(columns={'uuid'},
1093 # filter={'name': 'external_port', 'net_id': sdn_net_id})
1094 # for port in port_list:
1095 # self.ovim.delete_port(port['uuid'], idempotent=True)
1096 # self.ovim.delete_network(sdn_net_id, idempotent=True)
1097 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1098 task["error_msg"] = None
1099 return None
1100 except vimconn.vimconnException as e:
1101 task["error_msg"] = self._format_vim_error_msg(str(e))
1102 if isinstance(e, vimconn.vimconnNotFoundException):
1103 # If not found mark as Done and fill error_msg
1104 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1105 return None
1106 task["status"] = "FAILED"
1107 return None
1108
1109 def new_or_update_sdn_net(self, task):
1110 wimconn_net_id = task["vim_id"]
1111 created_items = task["extra"].get("created_items")
1112 connected_ports = task["extra"].get("connected_ports", [])
1113 new_connected_ports = []
1114 last_update = task["extra"].get("last_update", 0)
1115 sdn_status = "BUILD"
1116 sdn_info = None
1117
1118 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1119 error_list = []
1120 try:
1121 # FIND
1122 if task["extra"].get("find"):
1123 wimconn_id = task["extra"]["find"][0]
1124 try:
1125 instance_element_update = self.sdnconnector.get_connectivity_service_status(wimconn_id)
1126 wimconn_net_id = wimconn_id
1127 instance_element_update = {"wim_internal_id": wimconn_net_id, "created": False, "status": "BUILD",
1128 "error_msg": None, }
1129 return instance_element_update
1130 except Exception as e:
1131 if isinstance(e, SdnConnectorError) and e.http_error == HTTPStatus.NOT_FOUND.value:
1132 pass
1133 else:
1134 self._proccess_sdn_exception(e)
1135
1136 params = task["params"]
1137 # CREATE
1138 # look for ports
1139 sdn_ports = []
1140 pending_ports = 0
1141
1142 ports = self.db.get_rows(FROM='instance_interfaces', WHERE={'instance_wim_net_id': task["item_id"]})
1143 sdn_need_update = False
1144 if len(ports) != len(connected_ports):
1145 sdn_need_update = True
1146 for port in ports:
1147 # TODO. Do not connect if already done
1148 if port.get("compute_node") and port.get("pci"):
1149 for map in self.port_mappings:
1150 if map.get("device_id") == port["compute_node"] and \
1151 map.get("device_interface_id") == port["pci"]:
1152 break
1153 else:
1154 if self.sdnconn_config.get("mapping_not_needed"):
1155 map = {
1156 "service_endpoint_id": "{}:{}".format(port["compute_node"], port["pci"]),
1157 "service_endpoint_encapsulation_info": {
1158 "vlan": port["vlan"],
1159 "mac": port["mac_address"],
1160 "device_id": port["compute_node"],
1161 "device_interface_id": port["pci"]
1162 }
1163 }
1164 else:
1165 map = None
1166 error_list.append("Port mapping not found for compute_node={} pci={}".format(
1167 port["compute_node"], port["pci"]))
1168
1169 if map:
1170 if port["uuid"] not in connected_ports or port["modified_at"] > last_update:
1171 sdn_need_update = True
1172 new_connected_ports.append(port["uuid"])
1173 sdn_ports.append({
1174 "service_endpoint_id": map["service_endpoint_id"],
1175 "service_endpoint_encapsulation_type": "dot1q" if port["model"] == "SR-IOV" else None,
1176 "service_endpoint_encapsulation_info": {
1177 "vlan": port["vlan"],
1178 "mac": port["mac_address"],
1179 "device_id": map.get("device_id"),
1180 "device_interface_id": map.get("device_interface_id"),
1181 "switch_dpid": map.get("switch_dpid"),
1182 "switch_port": map.get("switch_port"),
1183 "service_mapping_info": map.get("service_mapping_info"),
1184 }
1185 })
1186
1187 else:
1188 pending_ports += 1
1189 if pending_ports:
1190 error_list.append("Waiting for getting interfaces location from VIM. Obtained '{}' of {}"
1191 .format(len(ports)-pending_ports, len(ports)))
1192 # if there are more ports to connect or they have been modified, call create/update
1193 if sdn_need_update and len(sdn_ports) >= 2:
1194 if not wimconn_net_id:
1195 if params[0] == "data":
1196 net_type = "ELAN"
1197 elif params[0] == "ptp":
1198 net_type = "ELINE"
1199 else:
1200 net_type = "L3"
1201
1202 wimconn_net_id, created_items = self.sdnconnector.create_connectivity_service(net_type, sdn_ports)
1203 else:
1204 created_items = self.sdnconnector.edit_connectivity_service(wimconn_net_id, conn_info=created_items,
1205 connection_points=sdn_ports)
1206 last_update = time.time()
1207 connected_ports = new_connected_ports
1208 elif wimconn_net_id:
1209 try:
1210 wim_status_dict = self.sdnconnector.get_connectivity_service_status(wimconn_net_id,
1211 conn_info=created_items)
1212 sdn_status = wim_status_dict["sdn_status"]
1213 if wim_status_dict.get("error_msg"):
1214 error_list.append(wim_status_dict.get("error_msg"))
1215 if wim_status_dict.get("sdn_info"):
1216 sdn_info = str(wim_status_dict.get("sdn_info"))
1217 except Exception as e:
1218 self._proccess_sdn_exception(e)
1219
1220 task["status"] = "DONE"
1221 task["extra"]["vim_info"] = {}
1222 # task["extra"]["sdn_net_id"] = sdn_net_id
1223 task["extra"]["vim_status"] = sdn_status
1224 task["extra"]["created"] = True
1225 task["extra"]["created_items"] = created_items
1226 task["extra"]["connected_ports"] = connected_ports
1227 task["extra"]["last_update"] = last_update
1228 task["error_msg"] = self._format_vim_error_msg(" ; ".join(error_list))
1229 task["vim_id"] = wimconn_net_id
1230 instance_element_update = {"wim_internal_id": wimconn_net_id, "status": sdn_status,
1231 "created": True, "error_msg": task["error_msg"] or None}
1232 except (vimconn.vimconnException, SdnConnectorError) as e:
1233 self.logger.error("task={} new-sdn-net: Error: {}".format(task_id, e))
1234 task["status"] = "FAILED"
1235 task["vim_id"] = wimconn_net_id
1236 task["error_msg"] = self._format_vim_error_msg(str(e))
1237 # task["extra"]["sdn_net_id"] = sdn_net_id
1238 instance_element_update = {"wim_internal_id": wimconn_net_id, "status": "WIM_ERROR",
1239 "error_msg": task["error_msg"]}
1240 if sdn_info:
1241 instance_element_update["wim_info"] = sdn_info
1242 return instance_element_update
1243
1244 def del_sdn_net(self, task):
1245 wimconn_net_id = task["vim_id"]
1246 try:
1247 try:
1248 if wimconn_net_id:
1249 self.sdnconnector.delete_connectivity_service(wimconn_net_id, task["extra"].get("created_items"))
1250 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1251 task["error_msg"] = None
1252 return None
1253 except Exception as e:
1254 self._proccess_sdn_exception(e)
1255 except SdnConnectorError as e:
1256 task["error_msg"] = self._format_vim_error_msg(str(e))
1257 if e.http_code == HTTPStatus.NOT_FOUND.value:
1258 # If not found mark as Done and fill error_msg
1259 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1260 task["error_msg"] = None
1261 return None
1262 task["status"] = "FAILED"
1263 return None
1264
1265 # Service Function Instances
1266 def new_sfi(self, task):
1267 vim_sfi_id = None
1268 try:
1269 # Waits for interfaces to be ready (avoids failure)
1270 time.sleep(1)
1271 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1272 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1273 error_text = ""
1274 interfaces = task["depends"][dep_id]["extra"].get("interfaces")
1275
1276 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1277 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1278 ingress_vim_interface_id = None
1279 egress_vim_interface_id = None
1280 for vim_interface, interface_data in interfaces.items():
1281 if interface_data.get("interface_id") == ingress_interface_id:
1282 ingress_vim_interface_id = vim_interface
1283 break
1284 if ingress_interface_id != egress_interface_id:
1285 for vim_interface, interface_data in interfaces.items():
1286 if interface_data.get("interface_id") == egress_interface_id:
1287 egress_vim_interface_id = vim_interface
1288 break
1289 else:
1290 egress_vim_interface_id = ingress_vim_interface_id
1291 if not ingress_vim_interface_id or not egress_vim_interface_id:
1292 error_text = "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1293 ingress_vim_interface_id, egress_vim_interface_id)
1294 self.logger.error(error_text)
1295 task["error_msg"] = error_text
1296 task["status"] = "FAILED"
1297 task["vim_id"] = None
1298 return None
1299 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1300 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1301 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1302 ingress_port_id_list = [ingress_vim_interface_id]
1303 egress_port_id_list = [egress_vim_interface_id]
1304 name = "sfi-{}".format(task["item_id"][:8])
1305 # By default no form of IETF SFC Encapsulation will be used
1306 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1307
1308 task["extra"]["created"] = True
1309 task["extra"]["vim_status"] = "ACTIVE"
1310 task["error_msg"] = None
1311 task["status"] = "DONE"
1312 task["vim_id"] = vim_sfi_id
1313 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1314 return instance_element_update
1315
1316 except (vimconn.vimconnException, VimThreadException) as e:
1317 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1318 error_text = self._format_vim_error_msg(str(e))
1319 task["error_msg"] = error_text
1320 task["status"] = "FAILED"
1321 task["vim_id"] = None
1322 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1323 return instance_element_update
1324
1325 def del_sfi(self, task):
1326 sfi_vim_id = task["vim_id"]
1327 try:
1328 self.vim.delete_sfi(sfi_vim_id)
1329 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1330 task["error_msg"] = None
1331 return None
1332
1333 except vimconn.vimconnException as e:
1334 task["error_msg"] = self._format_vim_error_msg(str(e))
1335 if isinstance(e, vimconn.vimconnNotFoundException):
1336 # If not found mark as Done and fill error_msg
1337 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1338 return None
1339 task["status"] = "FAILED"
1340 return None
1341
1342 def new_sf(self, task):
1343 vim_sf_id = None
1344 try:
1345 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1346 error_text = ""
1347 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1348 # sfis = next(iter(task.get("depends").values())).get("extra").get("params")[5]
1349 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1350 sfi_id_list = []
1351 for sfi in sfis:
1352 sfi_id_list.append(sfi.get("vim_id"))
1353 name = "sf-{}".format(task["item_id"][:8])
1354 # By default no form of IETF SFC Encapsulation will be used
1355 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1356
1357 task["extra"]["created"] = True
1358 task["extra"]["vim_status"] = "ACTIVE"
1359 task["error_msg"] = None
1360 task["status"] = "DONE"
1361 task["vim_id"] = vim_sf_id
1362 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1363 return instance_element_update
1364
1365 except (vimconn.vimconnException, VimThreadException) as e:
1366 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1367 error_text = self._format_vim_error_msg(str(e))
1368 task["error_msg"] = error_text
1369 task["status"] = "FAILED"
1370 task["vim_id"] = None
1371 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1372 return instance_element_update
1373
1374 def del_sf(self, task):
1375 sf_vim_id = task["vim_id"]
1376 try:
1377 self.vim.delete_sf(sf_vim_id)
1378 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1379 task["error_msg"] = None
1380 return None
1381
1382 except vimconn.vimconnException as e:
1383 task["error_msg"] = self._format_vim_error_msg(str(e))
1384 if isinstance(e, vimconn.vimconnNotFoundException):
1385 # If not found mark as Done and fill error_msg
1386 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1387 return None
1388 task["status"] = "FAILED"
1389 return None
1390
1391 def new_classification(self, task):
1392 vim_classification_id = None
1393 try:
1394 params = task["params"]
1395 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1396 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1397 error_text = ""
1398 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces")
1399 # Bear in mind that different VIM connectors might support Classifications differently.
1400 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1401 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1402 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1403 # using the IPv4 flow classifier.
1404 logical_source_port_vim_id = None
1405 logical_source_port_id = params.get("logical_source_port")
1406 for vim_interface, interface_data in interfaces.items():
1407 if interface_data.get("interface_id") == logical_source_port_id:
1408 logical_source_port_vim_id = vim_interface
1409 break
1410 if not logical_source_port_vim_id:
1411 error_text = "Error creating Flow Classifier, Logical Source Port id {}".format(
1412 logical_source_port_id)
1413 self.logger.error(error_text)
1414 task["error_msg"] = error_text
1415 task["status"] = "FAILED"
1416 task["vim_id"] = None
1417 return None
1418
1419 name = "c-{}".format(task["item_id"][:8])
1420 # if not CIDR is given for the IP addresses, add /32:
1421 ip_proto = int(params.get("ip_proto"))
1422 source_ip = params.get("source_ip")
1423 destination_ip = params.get("destination_ip")
1424 source_port = params.get("source_port")
1425 destination_port = params.get("destination_port")
1426 definition = {"logical_source_port": logical_source_port_vim_id}
1427 if ip_proto:
1428 if ip_proto == 1:
1429 ip_proto = 'icmp'
1430 elif ip_proto == 6:
1431 ip_proto = 'tcp'
1432 elif ip_proto == 17:
1433 ip_proto = 'udp'
1434 definition["protocol"] = ip_proto
1435 if source_ip:
1436 if '/' not in source_ip:
1437 source_ip += '/32'
1438 definition["source_ip_prefix"] = source_ip
1439 if source_port:
1440 definition["source_port_range_min"] = source_port
1441 definition["source_port_range_max"] = source_port
1442 if destination_port:
1443 definition["destination_port_range_min"] = destination_port
1444 definition["destination_port_range_max"] = destination_port
1445 if destination_ip:
1446 if '/' not in destination_ip:
1447 destination_ip += '/32'
1448 definition["destination_ip_prefix"] = destination_ip
1449
1450 vim_classification_id = self.vim.new_classification(
1451 name, 'legacy_flow_classifier', definition)
1452
1453 task["extra"]["created"] = True
1454 task["extra"]["vim_status"] = "ACTIVE"
1455 task["error_msg"] = None
1456 task["status"] = "DONE"
1457 task["vim_id"] = vim_classification_id
1458 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id,
1459 "error_msg": None}
1460 return instance_element_update
1461
1462 except (vimconn.vimconnException, VimThreadException) as e:
1463 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1464 error_text = self._format_vim_error_msg(str(e))
1465 task["error_msg"] = error_text
1466 task["status"] = "FAILED"
1467 task["vim_id"] = None
1468 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1469 return instance_element_update
1470
1471 def del_classification(self, task):
1472 classification_vim_id = task["vim_id"]
1473 try:
1474 self.vim.delete_classification(classification_vim_id)
1475 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1476 task["error_msg"] = None
1477 return None
1478
1479 except vimconn.vimconnException as e:
1480 task["error_msg"] = self._format_vim_error_msg(str(e))
1481 if isinstance(e, vimconn.vimconnNotFoundException):
1482 # If not found mark as Done and fill error_msg
1483 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1484 return None
1485 task["status"] = "FAILED"
1486 return None
1487
1488 def new_sfp(self, task):
1489 vim_sfp_id = None
1490 try:
1491 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1492 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in
1493 task.get("extra").get("depends_on")]
1494 error_text = ""
1495 sf_id_list = []
1496 classification_id_list = []
1497 for dep in depending_tasks:
1498 vim_id = dep.get("vim_id")
1499 resource = dep.get("item")
1500 if resource == "instance_sfs":
1501 sf_id_list.append(vim_id)
1502 elif resource == "instance_classifications":
1503 classification_id_list.append(vim_id)
1504
1505 name = "sfp-{}".format(task["item_id"][:8])
1506 # By default no form of IETF SFC Encapsulation will be used
1507 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1508
1509 task["extra"]["created"] = True
1510 task["extra"]["vim_status"] = "ACTIVE"
1511 task["error_msg"] = None
1512 task["status"] = "DONE"
1513 task["vim_id"] = vim_sfp_id
1514 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1515 return instance_element_update
1516
1517 except (vimconn.vimconnException, VimThreadException) as e:
1518 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1519 error_text = self._format_vim_error_msg(str(e))
1520 task["error_msg"] = error_text
1521 task["status"] = "FAILED"
1522 task["vim_id"] = None
1523 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1524 return instance_element_update
1525
1526 def del_sfp(self, task):
1527 sfp_vim_id = task["vim_id"]
1528 try:
1529 self.vim.delete_sfp(sfp_vim_id)
1530 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1531 task["error_msg"] = None
1532 return None
1533
1534 except vimconn.vimconnException as e:
1535 task["error_msg"] = self._format_vim_error_msg(str(e))
1536 if isinstance(e, vimconn.vimconnNotFoundException):
1537 # If not found mark as Done and fill error_msg
1538 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1539 return None
1540 task["status"] = "FAILED"
1541 return None
1542
1543 def _refres_sfps(self, task):
1544 """Call VIM to get SFPs status"""
1545 database_update = None
1546
1547 vim_id = task["vim_id"]
1548 sfp_to_refresh_list = [vim_id]
1549 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1550 try:
1551 vim_dict = self.vim.refresh_sfps_status(sfp_to_refresh_list)
1552 vim_info = vim_dict[vim_id]
1553 except vimconn.vimconnException as e:
1554 # Mark all tasks at VIM_ERROR status
1555 self.logger.error("task={} get-sfp: vimconnException when trying to refresh sfps {}".format(task_id, e))
1556 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1557
1558 self.logger.debug("task={} get-sfp: vim_sfp_id={} result={}".format(task_id, task["vim_id"], vim_info))
1559 #TODO: Revise this part
1560 vim_info_error_msg = None
1561 if vim_info.get("error_msg"):
1562 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1563 task_vim_info = task["extra"].get("vim_info")
1564 task_error_msg = task.get("error_msg")
1565 task_vim_status = task["extra"].get("vim_status")
1566 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1567 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1568 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1569 if vim_info.get("vim_info"):
1570 database_update["vim_info"] = vim_info["vim_info"]
1571
1572 task["extra"]["vim_status"] = vim_info["status"]
1573 task["error_msg"] = vim_info_error_msg
1574 if vim_info.get("vim_info"):
1575 task["extra"]["vim_info"] = vim_info["vim_info"]
1576
1577 return database_update
1578
1579 def _refres_sfis(self, task):
1580 """Call VIM to get sfis status"""
1581 database_update = None
1582
1583 vim_id = task["vim_id"]
1584 sfi_to_refresh_list = [vim_id]
1585 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1586 try:
1587 vim_dict = self.vim.refresh_sfis_status(sfi_to_refresh_list)
1588 vim_info = vim_dict[vim_id]
1589 except vimconn.vimconnException as e:
1590 # Mark all tasks at VIM_ERROR status
1591 self.logger.error("task={} get-sfi: vimconnException when trying to refresh sfis {}".format(task_id, e))
1592 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1593
1594 self.logger.debug("task={} get-sfi: vim_sfi_id={} result={}".format(task_id, task["vim_id"], vim_info))
1595 #TODO: Revise this part
1596 vim_info_error_msg = None
1597 if vim_info.get("error_msg"):
1598 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1599 task_vim_info = task["extra"].get("vim_info")
1600 task_error_msg = task.get("error_msg")
1601 task_vim_status = task["extra"].get("vim_status")
1602 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1603 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1604 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1605 if vim_info.get("vim_info"):
1606 database_update["vim_info"] = vim_info["vim_info"]
1607
1608 task["extra"]["vim_status"] = vim_info["status"]
1609 task["error_msg"] = vim_info_error_msg
1610 if vim_info.get("vim_info"):
1611 task["extra"]["vim_info"] = vim_info["vim_info"]
1612
1613 return database_update
1614
1615 def _refres_sfs(self, task):
1616 """Call VIM to get sfs status"""
1617 database_update = None
1618
1619 vim_id = task["vim_id"]
1620 sf_to_refresh_list = [vim_id]
1621 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1622 try:
1623 vim_dict = self.vim.refresh_sfs_status(sf_to_refresh_list)
1624 vim_info = vim_dict[vim_id]
1625 except vimconn.vimconnException as e:
1626 # Mark all tasks at VIM_ERROR status
1627 self.logger.error("task={} get-sf: vimconnException when trying to refresh sfs {}".format(task_id, e))
1628 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1629
1630 self.logger.debug("task={} get-sf: vim_sf_id={} result={}".format(task_id, task["vim_id"], vim_info))
1631 #TODO: Revise this part
1632 vim_info_error_msg = None
1633 if vim_info.get("error_msg"):
1634 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1635 task_vim_info = task["extra"].get("vim_info")
1636 task_error_msg = task.get("error_msg")
1637 task_vim_status = task["extra"].get("vim_status")
1638 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1639 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1640 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1641 if vim_info.get("vim_info"):
1642 database_update["vim_info"] = vim_info["vim_info"]
1643
1644 task["extra"]["vim_status"] = vim_info["status"]
1645 task["error_msg"] = vim_info_error_msg
1646 if vim_info.get("vim_info"):
1647 task["extra"]["vim_info"] = vim_info["vim_info"]
1648
1649 return database_update
1650
1651 def _refres_classifications(self, task):
1652 """Call VIM to get classifications status"""
1653 database_update = None
1654
1655 vim_id = task["vim_id"]
1656 classification_to_refresh_list = [vim_id]
1657 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1658 try:
1659 vim_dict = self.vim.refresh_classifications_status(classification_to_refresh_list)
1660 vim_info = vim_dict[vim_id]
1661 except vimconn.vimconnException as e:
1662 # Mark all tasks at VIM_ERROR status
1663 self.logger.error("task={} get-classification: vimconnException when trying to refresh classifications {}"
1664 .format(task_id, e))
1665 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
1666
1667 self.logger.debug("task={} get-classification: vim_classification_id={} result={}".format(task_id,
1668 task["vim_id"], vim_info))
1669 #TODO: Revise this part
1670 vim_info_error_msg = None
1671 if vim_info.get("error_msg"):
1672 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"])
1673 task_vim_info = task["extra"].get("vim_info")
1674 task_error_msg = task.get("error_msg")
1675 task_vim_status = task["extra"].get("vim_status")
1676 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
1677 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
1678 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
1679 if vim_info.get("vim_info"):
1680 database_update["vim_info"] = vim_info["vim_info"]
1681
1682 task["extra"]["vim_status"] = vim_info["status"]
1683 task["error_msg"] = vim_info_error_msg
1684 if vim_info.get("vim_info"):
1685 task["extra"]["vim_info"] = vim_info["vim_info"]
1686
1687 return database_update