587c0600c899d8f497e66beaf8fcb1f78f7a1306
[osm/RO.git] / RO / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
31
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
41 BUILD: not used
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
51 the FIND params
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
53 on a net creation
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
58 sdn_port_id:
59 sdn_net_id:
60 vim_info
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
65 vim_wim_actions
66 M depends: dict with task_index(from depends_on) to dependency task
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
71 be done
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
76
77 """
78
79 import threading
80 import time
81 import queue
82 import logging
83 from osm_ro import vimconn
84 from osm_ro.wim.sdnconn import SdnConnectorError
85 import yaml
86 from osm_ro.db_base import db_base_Exception
87 from http import HTTPStatus
88 from copy import deepcopy
89
90 __author__ = "Alfonso Tierno, Pablo Montes"
91 __date__ = "$28-Sep-2017 12:07:15$"
92
93
94 def is_task_id(task_id):
95 return task_id.startswith("TASK-")
96
97
98 class VimThreadException(Exception):
99 pass
100
101
102 class VimThreadExceptionNotFound(VimThreadException):
103 pass
104
105
106 class vim_thread(threading.Thread):
107 REFRESH_BUILD = 5 # 5 seconds
108 REFRESH_ACTIVE = 60 # 1 minute
109 REFRESH_ERROR = 600
110 REFRESH_DELETE = 3600 * 10
111
112 def __init__(self, task_lock, plugins, name=None, wim_account_id=None, datacenter_tenant_id=None, db=None):
113 """Init a thread.
114 Arguments:
115 'id' number of thead
116 'name' name of thread
117 'host','user': host ip or name to manage and user
118 'db', 'db_lock': database class and lock to use it in exclusion
119 """
120 threading.Thread.__init__(self)
121 self.plugins = plugins
122 self.plugin_name = "unknown"
123 self.vim = None
124 self.sdnconnector = None
125 self.sdnconn_config = None
126 self.error_status = None
127 self.wim_account_id = wim_account_id
128 self.datacenter_tenant_id = datacenter_tenant_id
129 self.port_mapping = None
130 if self.wim_account_id:
131 self.target_k = "wim_account_id"
132 self.target_v = self.wim_account_id
133 else:
134 self.target_k = "datacenter_vim_id"
135 self.target_v = self.datacenter_tenant_id
136 if not name:
137 self.name = wim_account_id or str(datacenter_tenant_id)
138 else:
139 self.name = name
140 self.vim_persistent_info = {}
141 self.my_id = self.name[:64]
142
143 self.logger = logging.getLogger('openmano.{}.{}'.format("vim" if self.datacenter_tenant_id else "sdn",
144 self.name))
145 self.db = db
146
147 self.task_lock = task_lock
148 self.task_queue = queue.Queue(2000)
149
150 def _proccess_sdn_exception(self, exc):
151 if isinstance(exc, SdnConnectorError):
152 raise
153 else:
154 self.logger.error("plugin={} throws a non SdnConnectorError exception {}".format(self.plugin_name, exc),
155 exc_info=True)
156 raise SdnConnectorError(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc
157
158 def _proccess_vim_exception(self, exc):
159 if isinstance(exc, vimconn.vimconnException):
160 raise
161 else:
162 self.logger.error("plugin={} throws a non vimconnException exception {}".format(self.plugin_name, exc),
163 exc_info=True)
164 raise vimconn.vimconnException(str(exc), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) from exc
165
166 def get_vim_sdn_connector(self):
167 if self.datacenter_tenant_id:
168 try:
169 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
170 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
171 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
172 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
173 'user', 'passwd', 'dt.config as dt_config')
174 where_ = {"dt.uuid": self.datacenter_tenant_id}
175 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
176 vim = vims[0]
177 vim_config = {}
178 if vim["config"]:
179 vim_config.update(yaml.load(vim["config"], Loader=yaml.Loader))
180 if vim["dt_config"]:
181 vim_config.update(yaml.load(vim["dt_config"], Loader=yaml.Loader))
182 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
183 vim_config['datacenter_id'] = vim.get('datacenter_id')
184
185 # get port_mapping
186 # vim_port_mappings = self.ovim.get_of_port_mappings(
187 # db_filter={"datacenter_id": vim_config['datacenter_id']})
188 # vim_config["wim_external_ports"] = [x for x in vim_port_mappings
189 # if x["service_mapping_info"].get("wim")]
190 self.plugin_name = "rovim_" + vim["type"]
191 self.vim = self.plugins[self.plugin_name].vimconnector(
192 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
193 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
194 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
195 user=vim['user'], passwd=vim['passwd'],
196 config=vim_config, persistent_info=self.vim_persistent_info
197 )
198 self.error_status = None
199 self.logger.info("Vim Connector loaded for vim_account={}, plugin={}".format(
200 self.datacenter_tenant_id, self.plugin_name))
201 except Exception as e:
202 self.logger.error("Cannot load vimconnector for vim_account={} plugin={}: {}".format(
203 self.datacenter_tenant_id, self.plugin_name, e))
204 self.vim = None
205 self.error_status = "Error loading vimconnector: {}".format(e)
206 else:
207 try:
208 wim_account = self.db.get_rows(FROM="wim_accounts", WHERE={"uuid": self.wim_account_id})[0]
209 wim = self.db.get_rows(FROM="wims", WHERE={"uuid": wim_account["wim_id"]})[0]
210 if wim["config"]:
211 self.sdnconn_config = yaml.load(wim["config"], Loader=yaml.Loader)
212 else:
213 self.sdnconn_config = {}
214 if wim_account["config"]:
215 self.sdnconn_config.update(yaml.load(wim_account["config"], Loader=yaml.Loader))
216 self.port_mappings = self.db.get_rows(FROM="wim_port_mappings", WHERE={"wim_id": wim_account["wim_id"]})
217 if self.port_mappings:
218 self.sdnconn_config["service_endpoint_mapping"] = self.port_mappings
219 self.plugin_name = "rosdn_" + wim["type"]
220 self.sdnconnector = self.plugins[self.plugin_name](
221 wim, wim_account, config=self.sdnconn_config)
222 self.error_status = None
223 self.logger.info("Sdn Connector loaded for wim_account={}, plugin={}".format(
224 self.wim_account_id, self.plugin_name))
225 except Exception as e:
226 self.logger.error("Cannot load sdn connector for wim_account={}, plugin={}: {}".format(
227 self.wim_account_id, self.plugin_name, e))
228 self.sdnconnector = None
229 self.error_status = "Error loading sdn connector: {}".format(e)
230
231 def _get_db_task(self):
232 """
233 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
234 :return: None
235 """
236 now = time.time()
237 try:
238 database_limit = 20
239 task_related = None
240 while True:
241 # get 20 (database_limit) entries each time
242 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
243 WHERE={self.target_k: self.target_v,
244 "status": ['SCHEDULED', 'BUILD', 'DONE'],
245 "worker": [None, self.my_id], "modified_at<=": now
246 },
247 ORDER_BY=("modified_at", "created_at",),
248 LIMIT=database_limit)
249 if not vim_actions:
250 return None, None
251 # if vim_actions[0]["modified_at"] > now:
252 # return int(vim_actions[0] - now)
253 for task in vim_actions:
254 # block related task
255 if task_related == task["related"]:
256 continue # ignore if a locking has already tried for these task set
257 task_related = task["related"]
258 # lock ...
259 self.db.update_rows("vim_wim_actions", UPDATE={"worker": self.my_id}, modified_time=0,
260 WHERE={self.target_k: self.target_v,
261 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
262 "worker": [None, self.my_id],
263 "related": task_related,
264 "item": task["item"],
265 })
266 # ... and read all related and check if locked
267 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
268 WHERE={self.target_k: self.target_v,
269 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
270 "related": task_related,
271 "item": task["item"],
272 },
273 ORDER_BY=("created_at",))
274 # check that all related tasks have been locked. If not release and try again. It can happen
275 # for race conditions if a new related task has been inserted by nfvo in the process
276 some_tasks_locked = False
277 some_tasks_not_locked = False
278 creation_task = None
279 for relate_task in related_tasks:
280 if relate_task["worker"] != self.my_id:
281 some_tasks_not_locked = True
282 else:
283 some_tasks_locked = True
284 if not creation_task and relate_task["action"] in ("CREATE", "FIND"):
285 creation_task = relate_task
286 if some_tasks_not_locked:
287 if some_tasks_locked: # unlock
288 self.db.update_rows("vim_wim_actions", UPDATE={"worker": None}, modified_time=0,
289 WHERE={self.target_k: self.target_v,
290 "worker": self.my_id,
291 "related": task_related,
292 "item": task["item"],
293 })
294 continue
295
296 # task of creation must be the first in the list of related_task
297 assert(related_tasks[0]["action"] in ("CREATE", "FIND"))
298
299 task["params"] = None
300 if task["extra"]:
301 extra = yaml.load(task["extra"], Loader=yaml.Loader)
302 else:
303 extra = {}
304 task["extra"] = extra
305 if extra.get("depends_on"):
306 task["depends"] = {}
307 if extra.get("params"):
308 task["params"] = deepcopy(extra["params"])
309 return task, related_tasks
310 except Exception as e:
311 self.logger.critical("Unexpected exception at _get_db_task: " + str(e), exc_info=True)
312 return None, None
313
314 def _delete_task(self, task):
315 """
316 Determine if this task need to be done or superseded
317 :return: None
318 """
319
320 def copy_extra_created(copy_to, copy_from):
321 copy_to["created"] = copy_from["created"]
322 if copy_from.get("sdn_net_id"):
323 copy_to["sdn_net_id"] = copy_from["sdn_net_id"]
324 if copy_from.get("interfaces"):
325 copy_to["interfaces"] = copy_from["interfaces"]
326 if copy_from.get("created_items"):
327 if not copy_to.get("created_items"):
328 copy_to["created_items"] = {}
329 copy_to["created_items"].update(copy_from["created_items"])
330
331 task_create = None
332 dependency_task = None
333 deletion_needed = False
334 if task["status"] == "FAILED":
335 return # TODO need to be retry??
336 try:
337 # get all related tasks
338 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
339 WHERE={self.target_k: self.target_v,
340 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
341 "action": ["FIND", "CREATE"],
342 "related": task["related"],
343 },
344 ORDER_BY=("created_at",),
345 )
346 for related_task in related_tasks:
347 if related_task["item"] == task["item"] and related_task["item_id"] == task["item_id"]:
348 task_create = related_task
349 # TASK_CREATE
350 if related_task["extra"]:
351 extra_created = yaml.load(related_task["extra"], Loader=yaml.Loader)
352 if extra_created.get("created"):
353 deletion_needed = True
354 related_task["extra"] = extra_created
355 elif not dependency_task:
356 dependency_task = related_task
357 if task_create and dependency_task:
358 break
359
360 # mark task_create as FINISHED
361 self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"},
362 WHERE={self.target_k: self.target_v,
363 "instance_action_id": task_create["instance_action_id"],
364 "task_index": task_create["task_index"]
365 })
366 if not deletion_needed:
367 return
368 elif dependency_task:
369 # move create information from task_create to relate_task
370 extra_new_created = yaml.load(dependency_task["extra"], Loader=yaml.Loader) or {}
371 extra_new_created["created"] = extra_created["created"]
372 copy_extra_created(copy_to=extra_new_created, copy_from=extra_created)
373
374 self.db.update_rows("vim_wim_actions",
375 UPDATE={"extra": yaml.safe_dump(extra_new_created, default_flow_style=True,
376 width=256),
377 "vim_id": task_create.get("vim_id")},
378 WHERE={self.target_k: self.target_v,
379 "instance_action_id": dependency_task["instance_action_id"],
380 "task_index": dependency_task["task_index"]
381 })
382 return False
383 else:
384 task["vim_id"] = task_create["vim_id"]
385 copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"])
386 return True
387
388 except Exception as e:
389 self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True)
390
391 def _refres_vm(self, task):
392 """Call VIM to get VMs status"""
393 database_update = None
394
395 vim_id = task["vim_id"]
396 vm_to_refresh_list = [vim_id]
397 try:
398 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
399 vim_info = vim_dict[vim_id]
400 except vimconn.vimconnException as e:
401 # Mark all tasks at VIM_ERROR status
402 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
403 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
404
405 task_id = task["instance_action_id"] + "." + str(task["task_index"])
406 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
407
408 # check and update interfaces
409 task_warning_msg = ""
410 for interface in vim_info.get("interfaces", ()):
411 vim_interface_id = interface["vim_interface_id"]
412 if vim_interface_id not in task["extra"]["interfaces"]:
413 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
414 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
415 continue
416 task_interface = task["extra"]["interfaces"][vim_interface_id]
417 task_vim_interface = task_interface.get("vim_info")
418 if task_vim_interface != interface:
419 # delete old port
420 # if task_interface.get("sdn_port_id"):
421 # try:
422 # self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
423 # task_interface["sdn_port_id"] = None
424 # except ovimException as e:
425 # error_text = "ovimException deleting external_port={}: {}".format(
426 # task_interface["sdn_port_id"], e)
427 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
428 # task_warning_msg += error_text
429 # # TODO Set error_msg at instance_nets instead of instance VMs
430
431 # Create SDN port
432 # sdn_net_id = task_interface.get("sdn_net_id")
433 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
434 # sdn_port_name = sdn_net_id + "." + task["vim_id"]
435 # sdn_port_name = sdn_port_name[:63]
436 # try:
437 # sdn_port_id = self.ovim.new_external_port(
438 # {"compute_node": interface["compute_node"],
439 # "pci": interface["pci"],
440 # "vlan": interface.get("vlan"),
441 # "net_id": sdn_net_id,
442 # "region": self.vim["config"]["datacenter_id"],
443 # "name": sdn_port_name,
444 # "mac": interface.get("mac_address")})
445 # task_interface["sdn_port_id"] = sdn_port_id
446 # except (ovimException, Exception) as e:
447 # error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
448 # format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
449 # self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
450 # task_warning_msg += error_text
451 # # TODO Set error_msg at instance_nets instead of instance VMs
452
453 self.db.update_rows('instance_interfaces',
454 UPDATE={"mac_address": interface.get("mac_address"),
455 "ip_address": interface.get("ip_address"),
456 "vim_interface_id": interface.get("vim_interface_id"),
457 "vim_info": interface.get("vim_info"),
458 "sdn_port_id": task_interface.get("sdn_port_id"),
459 "compute_node": interface.get("compute_node"),
460 "pci": interface.get("pci"),
461 "vlan": interface.get("vlan")},
462 WHERE={'uuid': task_interface["iface_id"]})
463 task_interface["vim_info"] = interface
464 # if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
465 # # TODO Send message to task SDN to update
466
467 # check and update task and instance_vms database
468 vim_info_error_msg = None
469 if vim_info.get("error_msg"):
470 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
471 elif task_warning_msg:
472 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
473 task_vim_info = task["extra"].get("vim_info")
474 task_error_msg = task.get("error_msg")
475 task_vim_status = task["extra"].get("vim_status")
476 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
477 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
478 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
479 if vim_info.get("vim_info"):
480 database_update["vim_info"] = vim_info["vim_info"]
481
482 task["extra"]["vim_status"] = vim_info["status"]
483 task["error_msg"] = vim_info_error_msg
484 if vim_info.get("vim_info"):
485 task["extra"]["vim_info"] = vim_info["vim_info"]
486
487 return database_update
488
489 def _refres_net(self, task):
490 """Call VIM to get network status"""
491 database_update = None
492
493 vim_id = task["vim_id"]
494 net_to_refresh_list = [vim_id]
495 try:
496 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
497 vim_info = vim_dict[vim_id]
498 except vimconn.vimconnException as e:
499 # Mark all tasks at VIM_ERROR status
500 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
501 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
502
503 task_id = task["instance_action_id"] + "." + str(task["task_index"])
504 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
505
506 task_vim_info = task["extra"].get("vim_info")
507 task_vim_status = task["extra"].get("vim_status")
508 task_error_msg = task.get("error_msg")
509 # task_sdn_net_id = task["extra"].get("sdn_net_id")
510
511 vim_info_status = vim_info["status"]
512 vim_info_error_msg = vim_info.get("error_msg")
513 # get ovim status
514 # if task_sdn_net_id:
515 # try:
516 # sdn_net = self.ovim.show_network(task_sdn_net_id)
517 # except (ovimException, Exception) as e:
518 # text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
519 # self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
520 # sdn_net = {"status": "ERROR", "last_error": text_error}
521 # if sdn_net["status"] == "ERROR":
522 # if not vim_info_error_msg:
523 # vim_info_error_msg = str(sdn_net.get("last_error"))
524 # else:
525 # vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
526 # self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
527 # self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
528 # vim_info_status = "ERROR"
529 # elif sdn_net["status"] == "BUILD":
530 # if vim_info_status == "ACTIVE":
531 # vim_info_status = "BUILD"
532
533 # update database
534 if vim_info_error_msg:
535 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
536 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
537 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
538 task["extra"]["vim_status"] = vim_info_status
539 task["error_msg"] = vim_info_error_msg
540 if vim_info.get("vim_info"):
541 task["extra"]["vim_info"] = vim_info["vim_info"]
542 database_update = {"status": vim_info_status, "error_msg": vim_info_error_msg}
543 if vim_info.get("vim_info"):
544 database_update["vim_info"] = vim_info["vim_info"]
545 return database_update
546
547 def _proccess_pending_tasks(self, task, related_tasks):
548 old_task_status = task["status"]
549 create_or_find = False # if as result of processing this task something is created or found
550 next_refresh = 0
551
552 try:
553 if task["status"] == "SCHEDULED":
554 # check if tasks that this depends on have been completed
555 dependency_not_completed = False
556 dependency_modified_at = 0
557 for task_index in task["extra"].get("depends_on", ()):
558 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
559 if not task_dependency:
560 raise VimThreadException(
561 "Cannot get depending net task trying to get depending task {}.{}".format(
562 task["instance_action_id"], task_index))
563 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
564 # database must be look again
565 if task_dependency["status"] == "SCHEDULED":
566 dependency_not_completed = True
567 dependency_modified_at = task_dependency["modified_at"]
568 break
569 elif task_dependency["status"] == "FAILED":
570 raise VimThreadException(
571 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
572 task["action"], task["item"],
573 task["instance_action_id"], task["task_index"],
574 task_dependency["instance_action_id"], task_dependency["task_index"],
575 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
576
577 task["depends"]["TASK-"+str(task_index)] = task_dependency
578 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], task_index)] = task_dependency
579 if dependency_not_completed:
580 # Move this task to the time dependency is going to be modified plus 10 seconds.
581 self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10,
582 UPDATE={"worker": None},
583 WHERE={self.target_k: self.target_v, "worker": self.my_id,
584 "related": task["related"],
585 })
586 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
587 # if task["extra"]["tries"] > 3:
588 # raise VimThreadException(
589 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
590 # "(task {}.{})".format(task["action"], task["item"],
591 # task["instance_action_id"], task["task_index"],
592 # task_dependency["instance_action_id"], task_dependency["task_index"]
593 # task_dependency["action"], task_dependency["item"]))
594 return
595
596 database_update = None
597 if task["action"] == "DELETE":
598 deleted_needed = self._delete_task(task)
599 if not deleted_needed:
600 task["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
601 task["error_msg"] = None
602
603 if task["status"] == "SUPERSEDED":
604 # not needed to do anything but update database with the new status
605 database_update = None
606 elif not self.vim and not self.sdnconnector:
607 task["status"] = "FAILED"
608 task["error_msg"] = self.error_status
609 database_update = {"status": "VIM_ERROR" if self.datacenter_tenant_id else "WIM_ERROR",
610 "error_msg": task["error_msg"]}
611 elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"):
612 # Do nothing, just copy values from one to another and update database
613 task["status"] = related_tasks[0]["status"]
614 task["error_msg"] = related_tasks[0]["error_msg"]
615 task["vim_id"] = related_tasks[0]["vim_id"]
616 extra = yaml.load(related_tasks[0]["extra"], Loader=yaml.Loader)
617 task["extra"]["vim_status"] = extra.get("vim_status")
618 next_refresh = related_tasks[0]["modified_at"] + 0.001
619 database_update = {"status": task["extra"].get("vim_status", "VIM_ERROR"),
620 "error_msg": task["error_msg"]}
621 if task["item"] == 'instance_vms':
622 database_update["vim_vm_id"] = task["vim_id"]
623 elif task["item"] == 'instance_nets':
624 database_update["vim_net_id"] = task["vim_id"]
625 elif task["item"] == 'instance_vms':
626 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
627 database_update = self._refres_vm(task)
628 create_or_find = True
629 elif task["action"] == "CREATE":
630 create_or_find = True
631 database_update = self.new_vm(task)
632 elif task["action"] == "DELETE":
633 self.del_vm(task)
634 else:
635 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
636 elif task["item"] == 'instance_nets':
637 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
638 database_update = self._refres_net(task)
639 create_or_find = True
640 elif task["action"] == "CREATE":
641 create_or_find = True
642 database_update = self.new_net(task)
643 elif task["action"] == "DELETE":
644 self.del_net(task)
645 elif task["action"] == "FIND":
646 database_update = self.get_net(task)
647 else:
648 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
649 elif task["item"] == 'instance_wim_nets':
650 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
651 database_update = self.new_or_update_sdn_net(task)
652 create_or_find = True
653 elif task["action"] == "CREATE":
654 create_or_find = True
655 database_update = self.new_or_update_sdn_net(task)
656 elif task["action"] == "DELETE":
657 self.del_sdn_net(task)
658 elif task["action"] == "FIND":
659 database_update = self.get_sdn_net(task)
660 else:
661 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
662 elif task["item"] == 'instance_sfis':
663 if task["action"] == "CREATE":
664 create_or_find = True
665 database_update = self.new_sfi(task)
666 elif task["action"] == "DELETE":
667 self.del_sfi(task)
668 else:
669 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
670 elif task["item"] == 'instance_sfs':
671 if task["action"] == "CREATE":
672 create_or_find = True
673 database_update = self.new_sf(task)
674 elif task["action"] == "DELETE":
675 self.del_sf(task)
676 else:
677 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
678 elif task["item"] == 'instance_classifications':
679 if task["action"] == "CREATE":
680 create_or_find = True
681 database_update = self.new_classification(task)
682 elif task["action"] == "DELETE":
683 self.del_classification(task)
684 else:
685 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
686 elif task["item"] == 'instance_sfps':
687 if task["action"] == "CREATE":
688 create_or_find = True
689 database_update = self.new_sfp(task)
690 elif task["action"] == "DELETE":
691 self.del_sfp(task)
692 else:
693 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
694 else:
695 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
696 # TODO
697 except VimThreadException as e:
698 task["error_msg"] = str(e)
699 task["status"] = "FAILED"
700 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
701 if task["item"] == 'instance_vms':
702 database_update["vim_vm_id"] = None
703 elif task["item"] == 'instance_nets':
704 database_update["vim_net_id"] = None
705
706 task_id = task["instance_action_id"] + "." + str(task["task_index"])
707 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
708 task_id, task["item"], task["action"], task["status"],
709 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
710 try:
711 if not next_refresh:
712 if task["status"] == "DONE":
713 next_refresh = time.time()
714 if task["extra"].get("vim_status") == "BUILD":
715 next_refresh += self.REFRESH_BUILD
716 elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR", "WIM_ERROR"):
717 next_refresh += self.REFRESH_ERROR
718 elif task["extra"].get("vim_status") == "DELETED":
719 next_refresh += self.REFRESH_DELETE
720 else:
721 next_refresh += self.REFRESH_ACTIVE
722 elif task["status"] == "FAILED":
723 next_refresh = time.time() + self.REFRESH_DELETE
724
725 if create_or_find:
726 # modify all related task with action FIND/CREATED non SCHEDULED
727 self.db.update_rows(
728 table="vim_wim_actions", modified_time=next_refresh + 0.001,
729 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
730 "error_msg": task["error_msg"],
731 },
732
733 WHERE={self.target_k: self.target_v,
734 "worker": self.my_id,
735 "action": ["FIND", "CREATE"],
736 "related": task["related"],
737 "status<>": "SCHEDULED",
738 })
739 # modify own task
740 self.db.update_rows(
741 table="vim_wim_actions", modified_time=next_refresh,
742 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
743 "error_msg": task["error_msg"],
744 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
745 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
746 # Unlock tasks
747 self.db.update_rows(
748 table="vim_wim_actions", modified_time=0,
749 UPDATE={"worker": None},
750 WHERE={self.target_k: self.target_v,
751 "worker": self.my_id,
752 "related": task["related"],
753 })
754
755 # Update table instance_actions
756 if old_task_status == "SCHEDULED" and task["status"] != old_task_status:
757 self.db.update_rows(
758 table="instance_actions",
759 UPDATE={("number_failed" if task["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
760 WHERE={"uuid": task["instance_action_id"]})
761 if database_update:
762 where_filter = {"related": task["related"]}
763 if task["item"] == "instance_nets" and task["datacenter_vim_id"]:
764 where_filter["datacenter_tenant_id"] = task["datacenter_vim_id"]
765 self.db.update_rows(table=task["item"],
766 UPDATE=database_update,
767 WHERE=where_filter)
768 except db_base_Exception as e:
769 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
770
771 def insert_task(self, task):
772 try:
773 self.task_queue.put(task, False)
774 return None
775 except queue.Full:
776 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
777
778 def del_task(self, task):
779 with self.task_lock:
780 if task["status"] == "SCHEDULED":
781 task["status"] = "SUPERSEDED"
782 return True
783 else: # task["status"] == "processing"
784 self.task_lock.release()
785 return False
786
787 def run(self):
788 self.logger.debug("Starting")
789 while True:
790 self.get_vim_sdn_connector()
791 self.logger.debug("Vimconnector loaded")
792 reload_thread = False
793
794 while True:
795 try:
796 while not self.task_queue.empty():
797 task = self.task_queue.get()
798 if isinstance(task, list):
799 pass
800 elif isinstance(task, str):
801 if task == 'exit':
802 return 0
803 elif task == 'reload':
804 reload_thread = True
805 break
806 self.task_queue.task_done()
807 if reload_thread:
808 break
809
810 task, related_tasks = self._get_db_task()
811 if task:
812 self._proccess_pending_tasks(task, related_tasks)
813 else:
814 time.sleep(5)
815
816 except Exception as e:
817 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
818
819 self.logger.debug("Finishing")
820
821 def _look_for_task(self, instance_action_id, task_id):
822 """
823 Look for a concrete task at vim_actions database table
824 :param instance_action_id: The instance_action_id
825 :param task_id: Can have several formats:
826 <task index>: integer
827 TASK-<task index> :backward compatibility,
828 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
829 :return: Task dictionary or None if not found
830 """
831 if isinstance(task_id, int):
832 task_index = task_id
833 else:
834 if task_id.startswith("TASK-"):
835 task_id = task_id[5:]
836 ins_action_id, _, task_index = task_id.rpartition(".")
837 if ins_action_id:
838 instance_action_id = ins_action_id
839
840 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
841 "task_index": task_index})
842 if not tasks:
843 return None
844 task = tasks[0]
845 task["params"] = None
846 task["depends"] = {}
847 if task["extra"]:
848 extra = yaml.load(task["extra"], Loader=yaml.Loader)
849 task["extra"] = extra
850 task["params"] = extra.get("params")
851 else:
852 task["extra"] = {}
853 return task
854
855 @staticmethod
856 def _format_vim_error_msg(error_text, max_length=1024):
857 if error_text and len(error_text) >= max_length:
858 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
859 return error_text
860
861 def new_vm(self, task):
862 task_id = task["instance_action_id"] + "." + str(task["task_index"])
863 try:
864 params = task["params"]
865 depends = task.get("depends")
866 net_list = params[5]
867 for net in net_list:
868 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
869 network_id = task["depends"][net["net_id"]].get("vim_id")
870 if not network_id:
871 raise VimThreadException(
872 "Cannot create VM because depends on a network not created or found: " +
873 str(depends[net["net_id"]]["error_msg"]))
874 net["net_id"] = network_id
875 params_copy = deepcopy(params)
876 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
877
878 # fill task_interfaces. Look for snd_net_id at database for each interface
879 task_interfaces = {}
880 for iface in params_copy[5]:
881 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
882 result = self.db.get_rows(
883 SELECT=('sdn_net_id', 'interface_id'),
884 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
885 WHERE={'ii.uuid': iface["uuid"]})
886 if result:
887 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
888 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
889 else:
890 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
891 iface["uuid"]),
892 exc_info=True)
893
894 task["vim_info"] = {}
895 task["extra"]["interfaces"] = task_interfaces
896 task["extra"]["created"] = True
897 task["extra"]["created_items"] = created_items
898 task["extra"]["vim_status"] = "BUILD"
899 task["error_msg"] = None
900 task["status"] = "DONE"
901 task["vim_id"] = vim_vm_id
902 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
903 return instance_element_update
904
905 except (vimconn.vimconnException, VimThreadException) as e:
906 self.logger.error("task={} new-VM: {}".format(task_id, e))
907 error_text = self._format_vim_error_msg(str(e))
908 task["error_msg"] = error_text
909 task["status"] = "FAILED"
910 task["vim_id"] = None
911 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
912 return instance_element_update
913
914 def del_vm(self, task):
915 # task_id = task["instance_action_id"] + "." + str(task["task_index"])
916 vm_vim_id = task["vim_id"]
917 # interfaces = task["extra"].get("interfaces", ())
918 try:
919 # for iface in interfaces.values():
920 # if iface.get("sdn_port_id"):
921 # try:
922 # self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
923 # except ovimException as e:
924 # self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
925 # task_id, iface["sdn_port_id"], e), exc_info=True)
926 # # TODO Set error_msg at instance_nets
927
928 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
929 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
930 task["error_msg"] = None
931 return None
932
933 except vimconn.vimconnException as e:
934 task["error_msg"] = self._format_vim_error_msg(str(e))
935 if isinstance(e, vimconn.vimconnNotFoundException):
936 # If not found mark as Done and fill error_msg
937 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
938 return None
939 task["status"] = "FAILED"
940 return None
941
942 def _get_net_internal(self, task, filter_param):
943 """
944 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
945 :param task: task for this find or find-or-create action
946 :param filter_param: parameters to send to the vimconnector
947 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
948 when network is not found or found more than one
949 """
950 vim_nets = self.vim.get_network_list(filter_param)
951 if not vim_nets:
952 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
953 elif len(vim_nets) > 1:
954 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
955 vim_net_id = vim_nets[0]["id"]
956
957 # Discover if this network is managed by a sdn controller
958 sdn_net_id = None
959 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
960 WHERE={'vim_net_id': vim_net_id, 'datacenter_tenant_id': self.datacenter_tenant_id},
961 ORDER="instance_scenario_id")
962 if result:
963 sdn_net_id = result[0]['sdn_net_id']
964
965 task["status"] = "DONE"
966 task["extra"]["vim_info"] = {}
967 task["extra"]["created"] = False
968 task["extra"]["vim_status"] = "BUILD"
969 task["extra"]["sdn_net_id"] = sdn_net_id
970 task["error_msg"] = None
971 task["vim_id"] = vim_net_id
972 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
973 "error_msg": None, "sdn_net_id": sdn_net_id}
974 return instance_element_update
975
976 def get_net(self, task):
977 task_id = task["instance_action_id"] + "." + str(task["task_index"])
978 try:
979 params = task["params"]
980 filter_param = params[0]
981 instance_element_update = self._get_net_internal(task, filter_param)
982 return instance_element_update
983
984 except (vimconn.vimconnException, VimThreadException) as e:
985 self.logger.error("task={} get-net: {}".format(task_id, e))
986 task["status"] = "FAILED"
987 task["vim_id"] = None
988 task["error_msg"] = self._format_vim_error_msg(str(e))
989 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
990 "error_msg": task["error_msg"]}
991 return instance_element_update
992
993 def new_net(self, task):
994 vim_net_id = None
995 task_id = task["instance_action_id"] + "." + str(task["task_index"])
996 action_text = ""
997 try:
998 # FIND
999 if task["extra"].get("find"):
1000 action_text = "finding"
1001 filter_param = task["extra"]["find"][0]
1002 try:
1003 instance_element_update = self._get_net_internal(task, filter_param)
1004 return instance_element_update
1005 except VimThreadExceptionNotFound:
1006 pass
1007 # CREATE
1008 params = task["params"]
1009 action_text = "creating VIM"
1010
1011 vim_net_id, created_items = self.vim.new_network(*params[0:5])
1012
1013 # net_name = params[0]
1014 # net_type = params[1]
1015 # wim_account_name = None
1016 # if len(params) >= 6:
1017 # wim_account_name = params[5]
1018
1019 # TODO fix at nfvo adding external port
1020 # if wim_account_name and self.vim.config["wim_external_ports"]:
1021 # # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
1022 # action_text = "attaching external port to ovim network"
1023 # sdn_port_name = "external_port"
1024 # sdn_port_data = {
1025 # "compute_node": "__WIM:" + wim_account_name[0:58],
1026 # "pci": None,
1027 # "vlan": network["vlan"],
1028 # "net_id": sdn_net_id,
1029 # "region": self.vim["config"]["datacenter_id"],
1030 # "name": sdn_port_name,
1031 # }
1032 # try:
1033 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1034 # except ovimException:
1035 # sdn_port_data["compute_node"] = "__WIM"
1036 # sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1037 # self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
1038 # sdn_net_id))
1039 task["status"] = "DONE"
1040 task["extra"]["vim_info"] = {}
1041 # task["extra"]["sdn_net_id"] = sdn_net_id
1042 task["extra"]["vim_status"] = "BUILD"
1043 task["extra"]["created"] = True
1044 task["extra"]["created_items"] = created_items
1045 task["error_msg"] = None
1046 task["vim_id"] = vim_net_id
1047 instance_element_update = {"vim_net_id": vim_net_id, "status": "BUILD",
1048 "created": True, "error_msg": None}
1049 return instance_element_update
1050 except vimconn.vimconnException as e:
1051 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1052 task["status"] = "FAILED"
1053 task["vim_id"] = vim_net_id
1054 task["error_msg"] = self._format_vim_error_msg(str(e))
1055 # task["extra"]["sdn_net_id"] = sdn_net_id
1056 instance_element_update = {"vim_net_id": vim_net_id, "status": "VIM_ERROR",
1057 "error_msg": task["error_msg"]}
1058 return instance_element_update
1059
1060 def del_net(self, task):
1061 net_vim_id = task["vim_id"]
1062 # sdn_net_id = task["extra"].get("sdn_net_id")
1063 try:
1064 if net_vim_id:
1065 self.vim.delete_network(net_vim_id, task["extra"].get("created_items"))
1066 # if sdn_net_id:
1067 # # Delete any attached port to this sdn network. There can be ports associated to this network in case
1068 # # it was manually done using 'openmano vim-net-sdn-attach'
1069 # port_list = self.ovim.get_ports(columns={'uuid'},
1070 # filter={'name': 'external_port', 'net_id': sdn_net_id})
1071 # for port in port_list:
1072 # self.ovim.delete_port(port['uuid'], idempotent=True)
1073 # self.ovim.delete_network(sdn_net_id, idempotent=True)
1074 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1075 task["error_msg"] = None
1076 return None
1077 except vimconn.vimconnException as e:
1078 task["error_msg"] = self._format_vim_error_msg(str(e))
1079 if isinstance(e, vimconn.vimconnNotFoundException):
1080 # If not found mark as Done and fill error_msg
1081 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1082 return None
1083 task["status"] = "FAILED"
1084 return None
1085
1086 def new_or_update_sdn_net(self, task):
1087 wimconn_net_id = task["vim_id"]
1088 created_items = task["extra"].get("created_items")
1089 connected_ports = task["extra"].get("connected_ports", [])
1090 new_connected_ports = []
1091 last_update = task["extra"].get("last_update", 0)
1092 sdn_status = "BUILD"
1093 sdn_info = None
1094
1095 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1096 error_list = []
1097 try:
1098 # FIND
1099 if task["extra"].get("find"):
1100 wimconn_id = task["extra"]["find"][0]
1101 try:
1102 instance_element_update = self.sdnconnector.get_connectivity_service_status(wimconn_id)
1103 wimconn_net_id = wimconn_id
1104 instance_element_update = {"wim_internal_id": wimconn_net_id, "created": False, "status": "BUILD",
1105 "error_msg": None, }
1106 return instance_element_update
1107 except Exception as e:
1108 if isinstance(e, SdnConnectorError) and e.http_error == HTTPStatus.NOT_FOUND.value:
1109 pass
1110 else:
1111 self._proccess_sdn_exception(e)
1112
1113 params = task["params"]
1114 # CREATE
1115 # look for ports
1116 sdn_ports = []
1117 pending_ports = 0
1118
1119 ports = self.db.get_rows(FROM='instance_interfaces', WHERE={'instance_wim_net_id': task["item_id"]})
1120 sdn_need_update = False
1121 for port in ports:
1122 # TODO. Do not connect if already done
1123 if port.get("compute_node") and port.get("pci"):
1124 for map in self.port_mappings:
1125 if map.get("device_id") == port["compute_node"] and \
1126 map.get("device_interface_id") == port["pci"]:
1127 break
1128 else:
1129 if self.sdnconn_config.get("mapping_not_needed"):
1130 map = {
1131 "service_endpoint_id": "{}:{}".format(port["compute_node"], port["pci"]),
1132 "service_endpoint_encapsulation_info": {
1133 "vlan": port["vlan"],
1134 "mac": port["mac_address"],
1135 "device_id": port["compute_node"],
1136 "device_interface_id": port["pci"]
1137 }
1138 }
1139 else:
1140 map = None
1141 error_list.append("Port mapping not found for compute_node={} pci={}".format(
1142 port["compute_node"], port["pci"]))
1143
1144 if map:
1145 if port["uuid"] not in connected_ports or port["modified_at"] > last_update:
1146 sdn_need_update = True
1147 new_connected_ports.append(port["uuid"])
1148 sdn_ports.append({
1149 "service_endpoint_id": map["service_endpoint_id"],
1150 "service_endpoint_encapsulation_type": "dot1q" if port["model"] == "SR-IOV" else None,
1151 "service_endpoint_encapsulation_info": {
1152 "vlan": port["vlan"],
1153 "mac": port["mac_address"],
1154 "device_id": map.get("device_id"),
1155 "device_interface_id": map.get("device_interface_id"),
1156 "switch_dpid": map.get("switch_dpid"),
1157 "switch_port": map.get("switch_port"),
1158 "service_mapping_info": map.get("service_mapping_info"),
1159 }
1160 })
1161
1162 else:
1163 pending_ports += 1
1164 if pending_ports:
1165 error_list.append("Waiting for getting interfaces location from VIM. Obtained '{}' of {}"
1166 .format(len(ports)-pending_ports, len(ports)))
1167 # if there are more ports to connect or they have been modified, call create/update
1168 if sdn_need_update and len(sdn_ports) >= 2:
1169 if not wimconn_net_id:
1170 if params[0] == "data":
1171 net_type = "ELAN"
1172 elif params[0] == "ptp":
1173 net_type = "ELINE"
1174 else:
1175 net_type = "L3"
1176
1177 wimconn_net_id, created_items = self.sdnconnector.create_connectivity_service(net_type, sdn_ports)
1178 else:
1179 created_items = self.sdnconnector.edit_connectivity_service(wimconn_net_id, conn_info=created_items,
1180 connection_points=sdn_ports)
1181 last_update = time.time()
1182 connected_ports = new_connected_ports
1183 elif wimconn_net_id:
1184 try:
1185 wim_status_dict = self.sdnconnector.get_connectivity_service_status(wimconn_net_id,
1186 conn_info=created_items)
1187 sdn_status = wim_status_dict["sdn_status"]
1188 if wim_status_dict.get("error_msg"):
1189 error_list.append(wim_status_dict.get("error_msg"))
1190 if wim_status_dict.get("sdn_info"):
1191 sdn_info = str(wim_status_dict.get("sdn_info"))
1192 except Exception as e:
1193 self._proccess_sdn_exception(e)
1194
1195 task["status"] = "DONE"
1196 task["extra"]["vim_info"] = {}
1197 # task["extra"]["sdn_net_id"] = sdn_net_id
1198 task["extra"]["vim_status"] = sdn_status
1199 task["extra"]["created"] = True
1200 task["extra"]["created_items"] = created_items
1201 task["extra"]["connected_ports"] = connected_ports
1202 task["extra"]["last_update"] = last_update
1203 task["error_msg"] = self._format_vim_error_msg(" ; ".join(error_list))
1204 task["vim_id"] = wimconn_net_id
1205 instance_element_update = {"wim_internal_id": wimconn_net_id, "status": sdn_status,
1206 "created": True, "error_msg": task["error_msg"] or None}
1207 except (vimconn.vimconnException, SdnConnectorError) as e:
1208 self.logger.error("task={} new-sdn-net: Error: {}".format(task_id, e))
1209 task["status"] = "FAILED"
1210 task["vim_id"] = wimconn_net_id
1211 task["error_msg"] = self._format_vim_error_msg(str(e))
1212 # task["extra"]["sdn_net_id"] = sdn_net_id
1213 instance_element_update = {"wim_internal_id": wimconn_net_id, "status": "WIM_ERROR",
1214 "error_msg": task["error_msg"]}
1215 if sdn_info:
1216 instance_element_update["wim_info"] = sdn_info
1217 return instance_element_update
1218
1219 def del_sdn_net(self, task):
1220 wimconn_net_id = task["vim_id"]
1221 try:
1222 try:
1223 if wimconn_net_id:
1224 self.sdnconnector.delete_connectivity_service(wimconn_net_id, task["extra"].get("created_items"))
1225 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1226 task["error_msg"] = None
1227 return None
1228 except Exception as e:
1229 self._proccess_sdn_exception(e)
1230 except SdnConnectorError as e:
1231 task["error_msg"] = self._format_vim_error_msg(str(e))
1232 if e.http_code == HTTPStatus.NOT_FOUND.value:
1233 # If not found mark as Done and fill error_msg
1234 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1235 task["error_msg"] = None
1236 return None
1237 task["status"] = "FAILED"
1238 return None
1239
1240 # Service Function Instances
1241 def new_sfi(self, task):
1242 vim_sfi_id = None
1243 try:
1244 # Waits for interfaces to be ready (avoids failure)
1245 time.sleep(1)
1246 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1247 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1248 error_text = ""
1249 interfaces = task["depends"][dep_id]["extra"].get("interfaces")
1250
1251 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1252 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1253 ingress_vim_interface_id = None
1254 egress_vim_interface_id = None
1255 for vim_interface, interface_data in interfaces.items():
1256 if interface_data.get("interface_id") == ingress_interface_id:
1257 ingress_vim_interface_id = vim_interface
1258 break
1259 if ingress_interface_id != egress_interface_id:
1260 for vim_interface, interface_data in interfaces.items():
1261 if interface_data.get("interface_id") == egress_interface_id:
1262 egress_vim_interface_id = vim_interface
1263 break
1264 else:
1265 egress_vim_interface_id = ingress_vim_interface_id
1266 if not ingress_vim_interface_id or not egress_vim_interface_id:
1267 error_text = "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1268 ingress_vim_interface_id, egress_vim_interface_id)
1269 self.logger.error(error_text)
1270 task["error_msg"] = error_text
1271 task["status"] = "FAILED"
1272 task["vim_id"] = None
1273 return None
1274 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1275 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1276 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1277 ingress_port_id_list = [ingress_vim_interface_id]
1278 egress_port_id_list = [egress_vim_interface_id]
1279 name = "sfi-{}".format(task["item_id"][:8])
1280 # By default no form of IETF SFC Encapsulation will be used
1281 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1282
1283 task["extra"]["created"] = True
1284 task["extra"]["vim_status"] = "ACTIVE"
1285 task["error_msg"] = None
1286 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1287 task["vim_id"] = vim_sfi_id
1288 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1289 return instance_element_update
1290
1291 except (vimconn.vimconnException, VimThreadException) as e:
1292 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1293 error_text = self._format_vim_error_msg(str(e))
1294 task["error_msg"] = error_text
1295 task["status"] = "FAILED"
1296 task["vim_id"] = None
1297 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1298 return instance_element_update
1299
1300 def del_sfi(self, task):
1301 sfi_vim_id = task["vim_id"]
1302 try:
1303 self.vim.delete_sfi(sfi_vim_id)
1304 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1305 task["error_msg"] = None
1306 return None
1307
1308 except vimconn.vimconnException as e:
1309 task["error_msg"] = self._format_vim_error_msg(str(e))
1310 if isinstance(e, vimconn.vimconnNotFoundException):
1311 # If not found mark as Done and fill error_msg
1312 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1313 return None
1314 task["status"] = "FAILED"
1315 return None
1316
1317 def new_sf(self, task):
1318 vim_sf_id = None
1319 try:
1320 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1321 error_text = ""
1322 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1323 # sfis = next(iter(task.get("depends").values())).get("extra").get("params")[5]
1324 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1325 sfi_id_list = []
1326 for sfi in sfis:
1327 sfi_id_list.append(sfi.get("vim_id"))
1328 name = "sf-{}".format(task["item_id"][:8])
1329 # By default no form of IETF SFC Encapsulation will be used
1330 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1331
1332 task["extra"]["created"] = True
1333 task["extra"]["vim_status"] = "ACTIVE"
1334 task["error_msg"] = None
1335 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1336 task["vim_id"] = vim_sf_id
1337 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1338 return instance_element_update
1339
1340 except (vimconn.vimconnException, VimThreadException) as e:
1341 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1342 error_text = self._format_vim_error_msg(str(e))
1343 task["error_msg"] = error_text
1344 task["status"] = "FAILED"
1345 task["vim_id"] = None
1346 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1347 return instance_element_update
1348
1349 def del_sf(self, task):
1350 sf_vim_id = task["vim_id"]
1351 try:
1352 self.vim.delete_sf(sf_vim_id)
1353 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1354 task["error_msg"] = None
1355 return None
1356
1357 except vimconn.vimconnException as e:
1358 task["error_msg"] = self._format_vim_error_msg(str(e))
1359 if isinstance(e, vimconn.vimconnNotFoundException):
1360 # If not found mark as Done and fill error_msg
1361 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1362 return None
1363 task["status"] = "FAILED"
1364 return None
1365
1366 def new_classification(self, task):
1367 vim_classification_id = None
1368 try:
1369 params = task["params"]
1370 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1371 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1372 error_text = ""
1373 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces")
1374 # Bear in mind that different VIM connectors might support Classifications differently.
1375 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1376 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1377 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1378 # using the IPv4 flow classifier.
1379 logical_source_port_vim_id = None
1380 logical_source_port_id = params.get("logical_source_port")
1381 for vim_interface, interface_data in interfaces.items():
1382 if interface_data.get("interface_id") == logical_source_port_id:
1383 logical_source_port_vim_id = vim_interface
1384 break
1385 if not logical_source_port_vim_id:
1386 error_text = "Error creating Flow Classifier, Logical Source Port id {}".format(
1387 logical_source_port_id)
1388 self.logger.error(error_text)
1389 task["error_msg"] = error_text
1390 task["status"] = "FAILED"
1391 task["vim_id"] = None
1392 return None
1393
1394 name = "c-{}".format(task["item_id"][:8])
1395 # if not CIDR is given for the IP addresses, add /32:
1396 ip_proto = int(params.get("ip_proto"))
1397 source_ip = params.get("source_ip")
1398 destination_ip = params.get("destination_ip")
1399 source_port = params.get("source_port")
1400 destination_port = params.get("destination_port")
1401 definition = {"logical_source_port": logical_source_port_vim_id}
1402 if ip_proto:
1403 if ip_proto == 1:
1404 ip_proto = 'icmp'
1405 elif ip_proto == 6:
1406 ip_proto = 'tcp'
1407 elif ip_proto == 17:
1408 ip_proto = 'udp'
1409 definition["protocol"] = ip_proto
1410 if source_ip:
1411 if '/' not in source_ip:
1412 source_ip += '/32'
1413 definition["source_ip_prefix"] = source_ip
1414 if source_port:
1415 definition["source_port_range_min"] = source_port
1416 definition["source_port_range_max"] = source_port
1417 if destination_port:
1418 definition["destination_port_range_min"] = destination_port
1419 definition["destination_port_range_max"] = destination_port
1420 if destination_ip:
1421 if '/' not in destination_ip:
1422 destination_ip += '/32'
1423 definition["destination_ip_prefix"] = destination_ip
1424
1425 vim_classification_id = self.vim.new_classification(
1426 name, 'legacy_flow_classifier', definition)
1427
1428 task["extra"]["created"] = True
1429 task["extra"]["vim_status"] = "ACTIVE"
1430 task["error_msg"] = None
1431 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1432 task["vim_id"] = vim_classification_id
1433 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id,
1434 "error_msg": None}
1435 return instance_element_update
1436
1437 except (vimconn.vimconnException, VimThreadException) as e:
1438 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1439 error_text = self._format_vim_error_msg(str(e))
1440 task["error_msg"] = error_text
1441 task["status"] = "FAILED"
1442 task["vim_id"] = None
1443 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1444 return instance_element_update
1445
1446 def del_classification(self, task):
1447 classification_vim_id = task["vim_id"]
1448 try:
1449 self.vim.delete_classification(classification_vim_id)
1450 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1451 task["error_msg"] = None
1452 return None
1453
1454 except vimconn.vimconnException as e:
1455 task["error_msg"] = self._format_vim_error_msg(str(e))
1456 if isinstance(e, vimconn.vimconnNotFoundException):
1457 # If not found mark as Done and fill error_msg
1458 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1459 return None
1460 task["status"] = "FAILED"
1461 return None
1462
1463 def new_sfp(self, task):
1464 vim_sfp_id = None
1465 try:
1466 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1467 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in
1468 task.get("extra").get("depends_on")]
1469 error_text = ""
1470 sf_id_list = []
1471 classification_id_list = []
1472 for dep in depending_tasks:
1473 vim_id = dep.get("vim_id")
1474 resource = dep.get("item")
1475 if resource == "instance_sfs":
1476 sf_id_list.append(vim_id)
1477 elif resource == "instance_classifications":
1478 classification_id_list.append(vim_id)
1479
1480 name = "sfp-{}".format(task["item_id"][:8])
1481 # By default no form of IETF SFC Encapsulation will be used
1482 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1483
1484 task["extra"]["created"] = True
1485 task["extra"]["vim_status"] = "ACTIVE"
1486 task["error_msg"] = None
1487 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1488 task["vim_id"] = vim_sfp_id
1489 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1490 return instance_element_update
1491
1492 except (vimconn.vimconnException, VimThreadException) as e:
1493 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1494 error_text = self._format_vim_error_msg(str(e))
1495 task["error_msg"] = error_text
1496 task["status"] = "FAILED"
1497 task["vim_id"] = None
1498 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1499 return instance_element_update
1500
1501 def del_sfp(self, task):
1502 sfp_vim_id = task["vim_id"]
1503 try:
1504 self.vim.delete_sfp(sfp_vim_id)
1505 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1506 task["error_msg"] = None
1507 return None
1508
1509 except vimconn.vimconnException as e:
1510 task["error_msg"] = self._format_vim_error_msg(str(e))
1511 if isinstance(e, vimconn.vimconnNotFoundException):
1512 # If not found mark as Done and fill error_msg
1513 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1514 return None
1515 task["status"] = "FAILED"
1516 return None