7b197dac54858ac90fa2a6acb3a0641f8c3c0b41
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
31
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
41 BUILD: not used
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
51 the FIND params
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
53 on a net creation
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
58 sdn_port_id:
59 sdn_net_id:
60 vim_info
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
65 vim_wim_actions
66 M depends: dict with task_index(from depends_on) to dependency task
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
71 be done
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
76
77 """
78
79 import threading
80 import time
81 import Queue
82 import logging
83 import vimconn
84 import vimconn_openvim
85 import vimconn_aws
86 import vimconn_opennebula
87 import vimconn_openstack
88 import vimconn_vmware
89 import vimconn_fos
90 import vimconn_azure
91 import yaml
92 from db_base import db_base_Exception
93 from lib_osm_openvim.ovim import ovimException
94 from copy import deepcopy
95
96 __author__ = "Alfonso Tierno, Pablo Montes"
97 __date__ = "$28-Sep-2017 12:07:15$"
98
99 vim_module = {
100 "openvim": vimconn_openvim,
101 "aws": vimconn_aws,
102 "opennebula": vimconn_opennebula,
103 "openstack": vimconn_openstack,
104 "vmware": vimconn_vmware,
105 "fos": vimconn_fos,
106 "azure": vimconn_azure,
107 }
108
109
110 def is_task_id(task_id):
111 return task_id.startswith("TASK-")
112
113
114 class VimThreadException(Exception):
115 pass
116
117
118 class VimThreadExceptionNotFound(VimThreadException):
119 pass
120
121
122 class vim_thread(threading.Thread):
123 REFRESH_BUILD = 5 # 5 seconds
124 REFRESH_ACTIVE = 60 # 1 minute
125 REFRESH_ERROR = 600
126 REFRESH_DELETE = 3600 * 10
127
128 def __init__(self, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
129 db=None, db_lock=None, ovim=None):
130 """Init a thread.
131 Arguments:
132 'id' number of thead
133 'name' name of thread
134 'host','user': host ip or name to manage and user
135 'db', 'db_lock': database class and lock to use it in exclusion
136 """
137 threading.Thread.__init__(self)
138 self.vim = None
139 self.error_status = None
140 self.datacenter_name = datacenter_name
141 self.datacenter_tenant_id = datacenter_tenant_id
142 self.ovim = ovim
143 if not name:
144 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
145 else:
146 self.name = name
147 self.vim_persistent_info = {}
148 self.my_id = self.name[:64]
149
150 self.logger = logging.getLogger('openmano.vim.' + self.name)
151 self.db = db
152 self.db_lock = db_lock
153
154 self.task_lock = task_lock
155 self.task_queue = Queue.Queue(2000)
156
157 def get_vimconnector(self):
158 try:
159 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
160 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
161 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
162 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
163 'user', 'passwd', 'dt.config as dt_config')
164 where_ = {"dt.uuid": self.datacenter_tenant_id}
165 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
166 vim = vims[0]
167 vim_config = {}
168 if vim["config"]:
169 vim_config.update(yaml.load(vim["config"]))
170 if vim["dt_config"]:
171 vim_config.update(yaml.load(vim["dt_config"]))
172 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
173 vim_config['datacenter_id'] = vim.get('datacenter_id')
174
175 # get port_mapping
176 with self.db_lock:
177 vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings(
178 db_filter={"region": vim_config['datacenter_id'], "pci": None})
179
180 self.vim = vim_module[vim["type"]].vimconnector(
181 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
182 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
183 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
184 user=vim['user'], passwd=vim['passwd'],
185 config=vim_config, persistent_info=self.vim_persistent_info
186 )
187 self.error_status = None
188 except Exception as e:
189 self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e))
190 self.vim = None
191 self.error_status = "Error loading vimconnector: {}".format(e)
192
193 def _get_db_task(self):
194 """
195 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
196 :return: None
197 """
198 now = time.time()
199 try:
200 database_limit = 20
201 task_related = None
202 while True:
203 # get 20 (database_limit) entries each time
204 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
205 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
206 "status": ['SCHEDULED', 'BUILD', 'DONE'],
207 "worker": [None, self.my_id], "modified_at<=": now
208 },
209 ORDER_BY=("modified_at", "created_at",),
210 LIMIT=database_limit)
211 if not vim_actions:
212 return None, None
213 # if vim_actions[0]["modified_at"] > now:
214 # return int(vim_actions[0] - now)
215 for task in vim_actions:
216 # block related task
217 if task_related == task["related"]:
218 continue # ignore if a locking has already tried for these task set
219 task_related = task["related"]
220 # lock ...
221 self.db.update_rows("vim_wim_actions", UPDATE={"worker": self.my_id}, modified_time=0,
222 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
223 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
224 "worker": [None, self.my_id],
225 "related": task_related,
226 "item": task["item"],
227 })
228 # ... and read all related and check if locked
229 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
230 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
231 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
232 "related": task_related,
233 "item": task["item"],
234 },
235 ORDER_BY=("created_at",))
236 # check that all related tasks have been locked. If not release and try again. It can happen
237 # for race conditions if a new related task has been inserted by nfvo in the process
238 some_tasks_locked = False
239 some_tasks_not_locked = False
240 creation_task = None
241 for relate_task in related_tasks:
242 if relate_task["worker"] != self.my_id:
243 some_tasks_not_locked = True
244 else:
245 some_tasks_locked = True
246 if not creation_task and relate_task["action"] in ("CREATE", "FIND"):
247 creation_task = relate_task
248 if some_tasks_not_locked:
249 if some_tasks_locked: # unlock
250 self.db.update_rows("vim_wim_actions", UPDATE={"worker": None}, modified_time=0,
251 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
252 "worker": self.my_id,
253 "related": task_related,
254 "item": task["item"],
255 })
256 continue
257
258 # task of creation must be the first in the list of related_task
259 assert(related_tasks[0]["action"] in ("CREATE", "FIND"))
260
261 if task["extra"]:
262 extra = yaml.load(task["extra"])
263 else:
264 extra = {}
265 task["extra"] = extra
266 if extra.get("depends_on"):
267 task["depends"] = {}
268 if extra.get("params"):
269 task["params"] = deepcopy(extra["params"])
270 return task, related_tasks
271 except Exception as e:
272 self.logger.critical("Unexpected exception at _get_db_task: " + str(e), exc_info=True)
273 return None, None
274
275 def _delete_task(self, task):
276 """
277 Determine if this task need to be done or superseded
278 :return: None
279 """
280
281 def copy_extra_created(copy_to, copy_from):
282 copy_to["created"] = copy_from["created"]
283 if copy_from.get("sdn_net_id"):
284 copy_to["sdn_net_id"] = copy_from["sdn_net_id"]
285 if copy_from.get("interfaces"):
286 copy_to["interfaces"] = copy_from["interfaces"]
287 if copy_from.get("created_items"):
288 if not copy_to.get("created_items"):
289 copy_to["created_items"] = {}
290 copy_to["created_items"].update(copy_from["created_items"])
291
292 task_create = None
293 dependency_task = None
294 deletion_needed = False
295 if task["status"] == "FAILED":
296 return # TODO need to be retry??
297 try:
298 # get all related tasks
299 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
300 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
301 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
302 "action": ["FIND", "CREATE"],
303 "related": task["related"],
304 },
305 ORDER_BY=("created_at",),
306 )
307 for related_task in related_tasks:
308 if related_task["item"] == task["item"] and related_task["item_id"] == task["item_id"]:
309 task_create = related_task
310 # TASK_CREATE
311 if related_task["extra"]:
312 extra_created = yaml.load(related_task["extra"])
313 if extra_created.get("created"):
314 deletion_needed = True
315 related_task["extra"] = extra_created
316 elif not dependency_task:
317 dependency_task = related_task
318 if task_create and dependency_task:
319 break
320
321 # mark task_create as FINISHED
322 self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"},
323 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
324 "instance_action_id": task_create["instance_action_id"],
325 "task_index": task_create["task_index"]
326 })
327 if not deletion_needed:
328 return
329 elif dependency_task:
330 # move create information from task_create to relate_task
331 extra_new_created = yaml.load(dependency_task["extra"]) or {}
332 extra_new_created["created"] = extra_created["created"]
333 copy_extra_created(copy_to=extra_new_created, copy_from=extra_created)
334
335 self.db.update_rows("vim_wim_actions",
336 UPDATE={"extra": yaml.safe_dump(extra_new_created, default_flow_style=True,
337 width=256),
338 "vim_id": task_create.get("vim_id")},
339 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
340 "instance_action_id": dependency_task["instance_action_id"],
341 "task_index": dependency_task["task_index"]
342 })
343 return False
344 else:
345 task["vim_id"] = task_create["vim_id"]
346 copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"])
347 return True
348
349 except Exception as e:
350 self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True)
351
352 def _refres_vm(self, task):
353 """Call VIM to get VMs status"""
354 database_update = None
355
356 vim_id = task["vim_id"]
357 vm_to_refresh_list = [vim_id]
358 try:
359 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
360 vim_info = vim_dict[vim_id]
361 except vimconn.vimconnException as e:
362 # Mark all tasks at VIM_ERROR status
363 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
364 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
365
366 task_id = task["instance_action_id"] + "." + str(task["task_index"])
367 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
368
369 # check and update interfaces
370 task_warning_msg = ""
371 for interface in vim_info.get("interfaces", ()):
372 vim_interface_id = interface["vim_interface_id"]
373 if vim_interface_id not in task["extra"]["interfaces"]:
374 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
375 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
376 continue
377 task_interface = task["extra"]["interfaces"][vim_interface_id]
378 task_vim_interface = task_interface.get("vim_info")
379 if task_vim_interface != interface:
380 # delete old port
381 if task_interface.get("sdn_port_id"):
382 try:
383 with self.db_lock:
384 self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
385 task_interface["sdn_port_id"] = None
386 except ovimException as e:
387 error_text = "ovimException deleting external_port={}: {}".format(
388 task_interface["sdn_port_id"], e)
389 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
390 task_warning_msg += error_text
391 # TODO Set error_msg at instance_nets instead of instance VMs
392
393 # Create SDN port
394 sdn_net_id = task_interface.get("sdn_net_id")
395 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
396 sdn_port_name = sdn_net_id + "." + task["vim_id"]
397 sdn_port_name = sdn_port_name[:63]
398 try:
399 with self.db_lock:
400 sdn_port_id = self.ovim.new_external_port(
401 {"compute_node": interface["compute_node"],
402 "pci": interface["pci"],
403 "vlan": interface.get("vlan"),
404 "net_id": sdn_net_id,
405 "region": self.vim["config"]["datacenter_id"],
406 "name": sdn_port_name,
407 "mac": interface.get("mac_address")})
408 task_interface["sdn_port_id"] = sdn_port_id
409 except (ovimException, Exception) as e:
410 error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
411 format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
412 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
413 task_warning_msg += error_text
414 # TODO Set error_msg at instance_nets instead of instance VMs
415
416 self.db.update_rows('instance_interfaces',
417 UPDATE={"mac_address": interface.get("mac_address"),
418 "ip_address": interface.get("ip_address"),
419 "vim_interface_id": interface.get("vim_interface_id"),
420 "vim_info": interface.get("vim_info"),
421 "sdn_port_id": task_interface.get("sdn_port_id"),
422 "compute_node": interface.get("compute_node"),
423 "pci": interface.get("pci"),
424 "vlan": interface.get("vlan")},
425 WHERE={'uuid': task_interface["iface_id"]})
426 task_interface["vim_info"] = interface
427
428 # check and update task and instance_vms database
429 vim_info_error_msg = None
430 if vim_info.get("error_msg"):
431 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
432 elif task_warning_msg:
433 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
434 task_vim_info = task["extra"].get("vim_info")
435 task_error_msg = task.get("error_msg")
436 task_vim_status = task["extra"].get("vim_status")
437 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
438 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
439 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
440 if vim_info.get("vim_info"):
441 database_update["vim_info"] = vim_info["vim_info"]
442
443 task["extra"]["vim_status"] = vim_info["status"]
444 task["error_msg"] = vim_info_error_msg
445 if vim_info.get("vim_info"):
446 task["extra"]["vim_info"] = vim_info["vim_info"]
447
448 return database_update
449
450 def _refres_net(self, task):
451 """Call VIM to get network status"""
452 database_update = None
453
454 vim_id = task["vim_id"]
455 net_to_refresh_list = [vim_id]
456 try:
457 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
458 vim_info = vim_dict[vim_id]
459 except vimconn.vimconnException as e:
460 # Mark all tasks at VIM_ERROR status
461 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
462 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
463
464 task_id = task["instance_action_id"] + "." + str(task["task_index"])
465 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
466
467 task_vim_info = task["extra"].get("vim_info")
468 task_vim_status = task["extra"].get("vim_status")
469 task_error_msg = task.get("error_msg")
470 task_sdn_net_id = task["extra"].get("sdn_net_id")
471
472 vim_info_status = vim_info["status"]
473 vim_info_error_msg = vim_info.get("error_msg")
474 # get ovim status
475 if task_sdn_net_id:
476 try:
477 with self.db_lock:
478 sdn_net = self.ovim.show_network(task_sdn_net_id)
479 except (ovimException, Exception) as e:
480 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
481 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
482 sdn_net = {"status": "ERROR", "last_error": text_error}
483 if sdn_net["status"] == "ERROR":
484 if not vim_info_error_msg:
485 vim_info_error_msg = str(sdn_net.get("last_error"))
486 else:
487 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
488 self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
489 self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
490 vim_info_status = "ERROR"
491 elif sdn_net["status"] == "BUILD":
492 if vim_info_status == "ACTIVE":
493 vim_info_status = "BUILD"
494
495 # update database
496 if vim_info_error_msg:
497 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
498 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
499 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
500 task["extra"]["vim_status"] = vim_info_status
501 task["error_msg"] = vim_info_error_msg
502 if vim_info.get("vim_info"):
503 task["extra"]["vim_info"] = vim_info["vim_info"]
504 database_update = {"status": vim_info_status, "error_msg": vim_info_error_msg}
505 if vim_info.get("vim_info"):
506 database_update["vim_info"] = vim_info["vim_info"]
507 return database_update
508
509 def _proccess_pending_tasks(self, task, related_tasks):
510 old_task_status = task["status"]
511 create_or_find = False # if as result of processing this task something is created or found
512 next_refresh = 0
513
514 try:
515 if task["status"] == "SCHEDULED":
516 # check if tasks that this depends on have been completed
517 dependency_not_completed = False
518 dependency_modified_at = 0
519 for task_index in task["extra"].get("depends_on", ()):
520 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
521 if not task_dependency:
522 raise VimThreadException(
523 "Cannot get depending net task trying to get depending task {}.{}".format(
524 task["instance_action_id"], task_index))
525 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
526 # database must be look again
527 if task_dependency["status"] == "SCHEDULED":
528 dependency_not_completed = True
529 dependency_modified_at = task_dependency["modified_at"]
530 break
531 elif task_dependency["status"] == "FAILED":
532 raise VimThreadException(
533 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
534 task["action"], task["item"],
535 task["instance_action_id"], task["task_index"],
536 task_dependency["instance_action_id"], task_dependency["task_index"],
537 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
538
539 task["depends"]["TASK-"+str(task_index)] = task_dependency
540 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], task_index)] = task_dependency
541 if dependency_not_completed:
542 # Move this task to the time dependency is going to be modified plus 10 seconds.
543 self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10,
544 UPDATE={"worker": None},
545 WHERE={"datacenter_vim_id": self.datacenter_tenant_id, "worker": self.my_id,
546 "related": task["related"],
547 })
548 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
549 # if task["extra"]["tries"] > 3:
550 # raise VimThreadException(
551 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
552 # "(task {}.{})".format(task["action"], task["item"],
553 # task["instance_action_id"], task["task_index"],
554 # task_dependency["instance_action_id"], task_dependency["task_index"]
555 # task_dependency["action"], task_dependency["item"]))
556 return
557
558 database_update = None
559 if task["action"] == "DELETE":
560 deleted_needed = self._delete_task(task)
561 if not deleted_needed:
562 task["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
563 task["error_msg"] = None
564
565 if task["status"] == "SUPERSEDED":
566 # not needed to do anything but update database with the new status
567 database_update = None
568 elif not self.vim:
569 task["status"] = "FAILED"
570 task["error_msg"] = self.error_status
571 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
572 elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"):
573 # Do nothing, just copy values from one to another and updata database
574 task["status"] = related_tasks[0]["status"]
575 task["error_msg"] = related_tasks[0]["error_msg"]
576 task["vim_id"] = related_tasks[0]["vim_id"]
577 extra = yaml.load(related_tasks[0]["extra"])
578 task["extra"]["vim_status"] = extra["vim_status"]
579 next_refresh = related_tasks[0]["modified_at"] + 0.001
580 database_update = {"status": task["extra"].get("vim_status", "VIM_ERROR"),
581 "error_msg": task["error_msg"]}
582 if task["item"] == 'instance_vms':
583 database_update["vim_vm_id"] = task["vim_id"]
584 elif task["item"] == 'instance_nets':
585 database_update["vim_net_id"] = task["vim_id"]
586 elif task["item"] == 'instance_vms':
587 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
588 database_update = self._refres_vm(task)
589 create_or_find = True
590 elif task["action"] == "CREATE":
591 create_or_find = True
592 database_update = self.new_vm(task)
593 elif task["action"] == "DELETE":
594 self.del_vm(task)
595 else:
596 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
597 elif task["item"] == 'instance_nets':
598 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
599 database_update = self._refres_net(task)
600 create_or_find = True
601 elif task["action"] == "CREATE":
602 create_or_find = True
603 database_update = self.new_net(task)
604 elif task["action"] == "DELETE":
605 self.del_net(task)
606 elif task["action"] == "FIND":
607 database_update = self.get_net(task)
608 else:
609 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
610 elif task["item"] == 'instance_sfis':
611 if task["action"] == "CREATE":
612 create_or_find = True
613 database_update = self.new_sfi(task)
614 elif task["action"] == "DELETE":
615 self.del_sfi(task)
616 else:
617 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
618 elif task["item"] == 'instance_sfs':
619 if task["action"] == "CREATE":
620 create_or_find = True
621 database_update = self.new_sf(task)
622 elif task["action"] == "DELETE":
623 self.del_sf(task)
624 else:
625 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
626 elif task["item"] == 'instance_classifications':
627 if task["action"] == "CREATE":
628 create_or_find = True
629 database_update = self.new_classification(task)
630 elif task["action"] == "DELETE":
631 self.del_classification(task)
632 else:
633 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
634 elif task["item"] == 'instance_sfps':
635 if task["action"] == "CREATE":
636 create_or_find = True
637 database_update = self.new_sfp(task)
638 elif task["action"] == "DELETE":
639 self.del_sfp(task)
640 else:
641 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
642 else:
643 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
644 # TODO
645 except VimThreadException as e:
646 task["error_msg"] = str(e)
647 task["status"] = "FAILED"
648 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
649 if task["item"] == 'instance_vms':
650 database_update["vim_vm_id"] = None
651 elif task["item"] == 'instance_nets':
652 database_update["vim_net_id"] = None
653
654 task_id = task["instance_action_id"] + "." + str(task["task_index"])
655 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
656 task_id, task["item"], task["action"], task["status"],
657 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
658 try:
659 if not next_refresh:
660 if task["status"] == "DONE":
661 next_refresh = time.time()
662 if task["extra"].get("vim_status") == "BUILD":
663 next_refresh += self.REFRESH_BUILD
664 elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR"):
665 next_refresh += self.REFRESH_ERROR
666 elif task["extra"].get("vim_status") == "DELETED":
667 next_refresh += self.REFRESH_DELETE
668 else:
669 next_refresh += self.REFRESH_ACTIVE
670 elif task["status"] == "FAILED":
671 next_refresh = time.time() + self.REFRESH_DELETE
672
673 if create_or_find:
674 # modify all related task with action FIND/CREATED non SCHEDULED
675 self.db.update_rows(
676 table="vim_wim_actions", modified_time=next_refresh + 0.001,
677 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
678 "error_msg": task["error_msg"],
679 },
680
681 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
682 "worker": self.my_id,
683 "action": ["FIND", "CREATE"],
684 "related": task["related"],
685 "status<>": "SCHEDULED",
686 })
687 # modify own task
688 self.db.update_rows(
689 table="vim_wim_actions", modified_time=next_refresh,
690 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
691 "error_msg": task["error_msg"],
692 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
693 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
694 # Unlock tasks
695 self.db.update_rows(
696 table="vim_wim_actions", modified_time=0,
697 UPDATE={"worker": None},
698 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
699 "worker": self.my_id,
700 "related": task["related"],
701 })
702
703 # Update table instance_actions
704 if old_task_status == "SCHEDULED" and task["status"] != old_task_status:
705 self.db.update_rows(
706 table="instance_actions",
707 UPDATE={("number_failed" if task["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
708 WHERE={"uuid": task["instance_action_id"]})
709 if database_update:
710 where_filter = {"related": task["related"]}
711 if task["item"] == "instance_nets" and task["datacenter_vim_id"]:
712 where_filter["datacenter_tenant_id"] = task["datacenter_vim_id"]
713 self.db.update_rows(table=task["item"],
714 UPDATE=database_update,
715 WHERE=where_filter)
716 except db_base_Exception as e:
717 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
718
719 def insert_task(self, task):
720 try:
721 self.task_queue.put(task, False)
722 return None
723 except Queue.Full:
724 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
725
726 def del_task(self, task):
727 with self.task_lock:
728 if task["status"] == "SCHEDULED":
729 task["status"] = "SUPERSEDED"
730 return True
731 else: # task["status"] == "processing"
732 self.task_lock.release()
733 return False
734
735 def run(self):
736 self.logger.debug("Starting")
737 while True:
738 self.get_vimconnector()
739 self.logger.debug("Vimconnector loaded")
740 reload_thread = False
741
742 while True:
743 try:
744 while not self.task_queue.empty():
745 task = self.task_queue.get()
746 if isinstance(task, list):
747 pass
748 elif isinstance(task, str):
749 if task == 'exit':
750 return 0
751 elif task == 'reload':
752 reload_thread = True
753 break
754 self.task_queue.task_done()
755 if reload_thread:
756 break
757
758 task, related_tasks = self._get_db_task()
759 if task:
760 self._proccess_pending_tasks(task, related_tasks)
761 else:
762 time.sleep(5)
763
764 except Exception as e:
765 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
766
767 self.logger.debug("Finishing")
768
769 def _look_for_task(self, instance_action_id, task_id):
770 """
771 Look for a concrete task at vim_actions database table
772 :param instance_action_id: The instance_action_id
773 :param task_id: Can have several formats:
774 <task index>: integer
775 TASK-<task index> :backward compatibility,
776 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
777 :return: Task dictionary or None if not found
778 """
779 if isinstance(task_id, int):
780 task_index = task_id
781 else:
782 if task_id.startswith("TASK-"):
783 task_id = task_id[5:]
784 ins_action_id, _, task_index = task_id.rpartition(".")
785 if ins_action_id:
786 instance_action_id = ins_action_id
787
788 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
789 "task_index": task_index})
790 if not tasks:
791 return None
792 task = tasks[0]
793 task["params"] = None
794 task["depends"] = {}
795 if task["extra"]:
796 extra = yaml.load(task["extra"])
797 task["extra"] = extra
798 task["params"] = extra.get("params")
799 else:
800 task["extra"] = {}
801 return task
802
803 @staticmethod
804 def _format_vim_error_msg(error_text, max_length=1024):
805 if error_text and len(error_text) >= max_length:
806 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
807 return error_text
808
809 def new_vm(self, task):
810 task_id = task["instance_action_id"] + "." + str(task["task_index"])
811 try:
812 params = task["params"]
813 depends = task.get("depends")
814 net_list = params[5]
815 for net in net_list:
816 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
817 network_id = task["depends"][net["net_id"]].get("vim_id")
818 if not network_id:
819 raise VimThreadException(
820 "Cannot create VM because depends on a network not created or found: " +
821 str(depends[net["net_id"]]["error_msg"]))
822 net["net_id"] = network_id
823 params_copy = deepcopy(params)
824 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
825
826 # fill task_interfaces. Look for snd_net_id at database for each interface
827 task_interfaces = {}
828 for iface in params_copy[5]:
829 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
830 result = self.db.get_rows(
831 SELECT=('sdn_net_id', 'interface_id'),
832 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
833 WHERE={'ii.uuid': iface["uuid"]})
834 if result:
835 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
836 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
837 else:
838 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
839 iface["uuid"]),
840 exc_info=True)
841
842 task["vim_info"] = {}
843 task["extra"]["interfaces"] = task_interfaces
844 task["extra"]["created"] = True
845 task["extra"]["created_items"] = created_items
846 task["extra"]["vim_status"] = "BUILD"
847 task["error_msg"] = None
848 task["status"] = "DONE"
849 task["vim_id"] = vim_vm_id
850 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
851 return instance_element_update
852
853 except (vimconn.vimconnException, VimThreadException) as e:
854 self.logger.error("task={} new-VM: {}".format(task_id, e))
855 error_text = self._format_vim_error_msg(str(e))
856 task["error_msg"] = error_text
857 task["status"] = "FAILED"
858 task["vim_id"] = None
859 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
860 return instance_element_update
861
862 def del_vm(self, task):
863 task_id = task["instance_action_id"] + "." + str(task["task_index"])
864 vm_vim_id = task["vim_id"]
865 interfaces = task["extra"].get("interfaces", ())
866 try:
867 for iface in interfaces.values():
868 if iface.get("sdn_port_id"):
869 try:
870 with self.db_lock:
871 self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
872 except ovimException as e:
873 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
874 task_id, iface["sdn_port_id"], e), exc_info=True)
875 # TODO Set error_msg at instance_nets
876
877 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
878 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
879 task["error_msg"] = None
880 return None
881
882 except vimconn.vimconnException as e:
883 task["error_msg"] = self._format_vim_error_msg(str(e))
884 if isinstance(e, vimconn.vimconnNotFoundException):
885 # If not found mark as Done and fill error_msg
886 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
887 return None
888 task["status"] = "FAILED"
889 return None
890
891 def _get_net_internal(self, task, filter_param):
892 """
893 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
894 :param task: task for this find or find-or-create action
895 :param filter_param: parameters to send to the vimconnector
896 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
897 when network is not found or found more than one
898 """
899 vim_nets = self.vim.get_network_list(filter_param)
900 if not vim_nets:
901 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
902 elif len(vim_nets) > 1:
903 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
904 vim_net_id = vim_nets[0]["id"]
905
906 # Discover if this network is managed by a sdn controller
907 sdn_net_id = None
908 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
909 WHERE={'vim_net_id': vim_net_id, 'datacenter_tenant_id': self.datacenter_tenant_id},
910 ORDER="instance_scenario_id")
911 if result:
912 sdn_net_id = result[0]['sdn_net_id']
913
914 task["status"] = "DONE"
915 task["extra"]["vim_info"] = {}
916 task["extra"]["created"] = False
917 task["extra"]["vim_status"] = "BUILD"
918 task["extra"]["sdn_net_id"] = sdn_net_id
919 task["error_msg"] = None
920 task["vim_id"] = vim_net_id
921 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
922 "error_msg": None, "sdn_net_id": sdn_net_id}
923 return instance_element_update
924
925 def get_net(self, task):
926 task_id = task["instance_action_id"] + "." + str(task["task_index"])
927 try:
928 params = task["params"]
929 filter_param = params[0]
930 instance_element_update = self._get_net_internal(task, filter_param)
931 return instance_element_update
932
933 except (vimconn.vimconnException, VimThreadException) as e:
934 self.logger.error("task={} get-net: {}".format(task_id, e))
935 task["status"] = "FAILED"
936 task["vim_id"] = None
937 task["error_msg"] = self._format_vim_error_msg(str(e))
938 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
939 "error_msg": task["error_msg"]}
940 return instance_element_update
941
942 def new_net(self, task):
943 vim_net_id = None
944 sdn_net_id = None
945 task_id = task["instance_action_id"] + "." + str(task["task_index"])
946 action_text = ""
947 try:
948 # FIND
949 if task["extra"].get("find"):
950 action_text = "finding"
951 filter_param = task["extra"]["find"][0]
952 try:
953 instance_element_update = self._get_net_internal(task, filter_param)
954 return instance_element_update
955 except VimThreadExceptionNotFound:
956 pass
957 # CREATE
958 params = task["params"]
959 action_text = "creating VIM"
960 vim_net_id, created_items = self.vim.new_network(*params[0:3])
961
962 net_name = params[0]
963 net_type = params[1]
964 wim_account_name = None
965 if len(params) >= 4:
966 wim_account_name = params[3]
967
968 sdn_controller = self.vim.config.get('sdn-controller')
969 if sdn_controller and (net_type == "data" or net_type == "ptp"):
970 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
971
972 vim_net = self.vim.get_network(vim_net_id)
973 if vim_net.get('encapsulation') != 'vlan':
974 raise vimconn.vimconnException(
975 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
976 net_name, net_type, vim_net['encapsulation']))
977 network["vlan"] = vim_net.get('segmentation_id')
978 action_text = "creating SDN"
979 with self.db_lock:
980 sdn_net_id = self.ovim.new_network(network)
981
982 if wim_account_name and self.vim.config["wim_external_ports"]:
983 # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
984 action_text = "attaching external port to ovim network"
985 sdn_port_name = sdn_net_id + "." + task["vim_id"]
986 sdn_port_name = sdn_port_name[:63]
987 sdn_port_data = {
988 "compute_node": "__WIM:" + wim_account_name[0:58],
989 "pci": None,
990 "vlan": network["vlan"],
991 "net_id": sdn_net_id,
992 "region": self.vim["config"]["datacenter_id"],
993 "name": sdn_port_name,
994 }
995 try:
996 with self.db_lock:
997 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
998 except ovimException:
999 sdn_port_data["compute_node"] = "__WIM"
1000 with self.db_lock:
1001 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
1002 self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
1003 sdn_net_id))
1004 task["status"] = "DONE"
1005 task["extra"]["vim_info"] = {}
1006 task["extra"]["sdn_net_id"] = sdn_net_id
1007 task["extra"]["vim_status"] = "BUILD"
1008 task["extra"]["created"] = True
1009 task["extra"]["created_items"] = created_items
1010 task["error_msg"] = None
1011 task["vim_id"] = vim_net_id
1012 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
1013 "created": True, "error_msg": None}
1014 return instance_element_update
1015 except (vimconn.vimconnException, ovimException) as e:
1016 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1017 task["status"] = "FAILED"
1018 task["vim_id"] = vim_net_id
1019 task["error_msg"] = self._format_vim_error_msg(str(e))
1020 task["extra"]["sdn_net_id"] = sdn_net_id
1021 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
1022 "error_msg": task["error_msg"]}
1023 return instance_element_update
1024
1025 def del_net(self, task):
1026 net_vim_id = task["vim_id"]
1027 sdn_net_id = task["extra"].get("sdn_net_id")
1028 try:
1029 if sdn_net_id:
1030 # Delete any attached port to this sdn network. There can be ports associated to this network in case
1031 # it was manually done using 'openmano vim-net-sdn-attach'
1032 with self.db_lock:
1033 port_list = self.ovim.get_ports(columns={'uuid'},
1034 filter={'name': 'external_port', 'net_id': sdn_net_id})
1035 for port in port_list:
1036 self.ovim.delete_port(port['uuid'], idempotent=True)
1037 self.ovim.delete_network(sdn_net_id, idempotent=True)
1038 if net_vim_id:
1039 self.vim.delete_network(net_vim_id, task["extra"].get("created_items"))
1040 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1041 task["error_msg"] = None
1042 return None
1043 except ovimException as e:
1044 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
1045 "ports for net {}: {}".format(sdn_net_id, str(e)))
1046 except vimconn.vimconnException as e:
1047 task["error_msg"] = self._format_vim_error_msg(str(e))
1048 if isinstance(e, vimconn.vimconnNotFoundException):
1049 # If not found mark as Done and fill error_msg
1050 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1051 return None
1052 task["status"] = "FAILED"
1053 return None
1054
1055 # Service Function Instances
1056 def new_sfi(self, task):
1057 vim_sfi_id = None
1058 try:
1059 # Waits for interfaces to be ready (avoids failure)
1060 time.sleep(1)
1061 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1062 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1063 error_text = ""
1064 interfaces = task["depends"][dep_id]["extra"].get("interfaces")
1065
1066 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1067 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1068 ingress_vim_interface_id = None
1069 egress_vim_interface_id = None
1070 for vim_interface, interface_data in interfaces.iteritems():
1071 if interface_data.get("interface_id") == ingress_interface_id:
1072 ingress_vim_interface_id = vim_interface
1073 break
1074 if ingress_interface_id != egress_interface_id:
1075 for vim_interface, interface_data in interfaces.iteritems():
1076 if interface_data.get("interface_id") == egress_interface_id:
1077 egress_vim_interface_id = vim_interface
1078 break
1079 else:
1080 egress_vim_interface_id = ingress_vim_interface_id
1081 if not ingress_vim_interface_id or not egress_vim_interface_id:
1082 error_text = "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1083 ingress_vim_interface_id, egress_vim_interface_id)
1084 self.logger.error(error_text)
1085 task["error_msg"] = error_text
1086 task["status"] = "FAILED"
1087 task["vim_id"] = None
1088 return None
1089 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1090 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1091 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1092 ingress_port_id_list = [ingress_vim_interface_id]
1093 egress_port_id_list = [egress_vim_interface_id]
1094 name = "sfi-%s" % task["item_id"][:8]
1095 # By default no form of IETF SFC Encapsulation will be used
1096 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1097
1098 task["extra"]["created"] = True
1099 task["extra"]["vim_status"] = "ACTIVE"
1100 task["error_msg"] = None
1101 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1102 task["vim_id"] = vim_sfi_id
1103 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1104 return instance_element_update
1105
1106 except (vimconn.vimconnException, VimThreadException) as e:
1107 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1108 error_text = self._format_vim_error_msg(str(e))
1109 task["error_msg"] = error_text
1110 task["status"] = "FAILED"
1111 task["vim_id"] = None
1112 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1113 return instance_element_update
1114
1115 def del_sfi(self, task):
1116 sfi_vim_id = task["vim_id"]
1117 try:
1118 self.vim.delete_sfi(sfi_vim_id)
1119 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1120 task["error_msg"] = None
1121 return None
1122
1123 except vimconn.vimconnException as e:
1124 task["error_msg"] = self._format_vim_error_msg(str(e))
1125 if isinstance(e, vimconn.vimconnNotFoundException):
1126 # If not found mark as Done and fill error_msg
1127 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1128 return None
1129 task["status"] = "FAILED"
1130 return None
1131
1132 def new_sf(self, task):
1133 vim_sf_id = None
1134 try:
1135 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1136 error_text = ""
1137 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1138 # sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1139 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1140 sfi_id_list = []
1141 for sfi in sfis:
1142 sfi_id_list.append(sfi.get("vim_id"))
1143 name = "sf-%s" % task["item_id"][:8]
1144 # By default no form of IETF SFC Encapsulation will be used
1145 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1146
1147 task["extra"]["created"] = True
1148 task["extra"]["vim_status"] = "ACTIVE"
1149 task["error_msg"] = None
1150 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1151 task["vim_id"] = vim_sf_id
1152 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1153 return instance_element_update
1154
1155 except (vimconn.vimconnException, VimThreadException) as e:
1156 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1157 error_text = self._format_vim_error_msg(str(e))
1158 task["error_msg"] = error_text
1159 task["status"] = "FAILED"
1160 task["vim_id"] = None
1161 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1162 return instance_element_update
1163
1164 def del_sf(self, task):
1165 sf_vim_id = task["vim_id"]
1166 try:
1167 self.vim.delete_sf(sf_vim_id)
1168 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1169 task["error_msg"] = None
1170 return None
1171
1172 except vimconn.vimconnException as e:
1173 task["error_msg"] = self._format_vim_error_msg(str(e))
1174 if isinstance(e, vimconn.vimconnNotFoundException):
1175 # If not found mark as Done and fill error_msg
1176 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1177 return None
1178 task["status"] = "FAILED"
1179 return None
1180
1181 def new_classification(self, task):
1182 vim_classification_id = None
1183 try:
1184 params = task["params"]
1185 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1186 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1187 error_text = ""
1188 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys()
1189 # Bear in mind that different VIM connectors might support Classifications differently.
1190 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1191 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1192 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1193 # using the IPv4 flow classifier.
1194 name = "c-%s" % task["item_id"][:8]
1195 # if not CIDR is given for the IP addresses, add /32:
1196 ip_proto = int(params.get("ip_proto"))
1197 source_ip = params.get("source_ip")
1198 destination_ip = params.get("destination_ip")
1199 source_port = params.get("source_port")
1200 destination_port = params.get("destination_port")
1201 definition = {"logical_source_port": interfaces[0]}
1202 if ip_proto:
1203 if ip_proto == 1:
1204 ip_proto = 'icmp'
1205 elif ip_proto == 6:
1206 ip_proto = 'tcp'
1207 elif ip_proto == 17:
1208 ip_proto = 'udp'
1209 definition["protocol"] = ip_proto
1210 if source_ip:
1211 if '/' not in source_ip:
1212 source_ip += '/32'
1213 definition["source_ip_prefix"] = source_ip
1214 if source_port:
1215 definition["source_port_range_min"] = source_port
1216 definition["source_port_range_max"] = source_port
1217 if destination_port:
1218 definition["destination_port_range_min"] = destination_port
1219 definition["destination_port_range_max"] = destination_port
1220 if destination_ip:
1221 if '/' not in destination_ip:
1222 destination_ip += '/32'
1223 definition["destination_ip_prefix"] = destination_ip
1224
1225 vim_classification_id = self.vim.new_classification(
1226 name, 'legacy_flow_classifier', definition)
1227
1228 task["extra"]["created"] = True
1229 task["extra"]["vim_status"] = "ACTIVE"
1230 task["error_msg"] = None
1231 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1232 task["vim_id"] = vim_classification_id
1233 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id,
1234 "error_msg": None}
1235 return instance_element_update
1236
1237 except (vimconn.vimconnException, VimThreadException) as e:
1238 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1239 error_text = self._format_vim_error_msg(str(e))
1240 task["error_msg"] = error_text
1241 task["status"] = "FAILED"
1242 task["vim_id"] = None
1243 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1244 return instance_element_update
1245
1246 def del_classification(self, task):
1247 classification_vim_id = task["vim_id"]
1248 try:
1249 self.vim.delete_classification(classification_vim_id)
1250 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1251 task["error_msg"] = None
1252 return None
1253
1254 except vimconn.vimconnException as e:
1255 task["error_msg"] = self._format_vim_error_msg(str(e))
1256 if isinstance(e, vimconn.vimconnNotFoundException):
1257 # If not found mark as Done and fill error_msg
1258 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1259 return None
1260 task["status"] = "FAILED"
1261 return None
1262
1263 def new_sfp(self, task):
1264 vim_sfp_id = None
1265 try:
1266 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1267 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in
1268 task.get("extra").get("depends_on")]
1269 error_text = ""
1270 sf_id_list = []
1271 classification_id_list = []
1272 for dep in depending_tasks:
1273 vim_id = dep.get("vim_id")
1274 resource = dep.get("item")
1275 if resource == "instance_sfs":
1276 sf_id_list.append(vim_id)
1277 elif resource == "instance_classifications":
1278 classification_id_list.append(vim_id)
1279
1280 name = "sfp-%s" % task["item_id"][:8]
1281 # By default no form of IETF SFC Encapsulation will be used
1282 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1283
1284 task["extra"]["created"] = True
1285 task["extra"]["vim_status"] = "ACTIVE"
1286 task["error_msg"] = None
1287 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1288 task["vim_id"] = vim_sfp_id
1289 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1290 return instance_element_update
1291
1292 except (vimconn.vimconnException, VimThreadException) as e:
1293 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1294 error_text = self._format_vim_error_msg(str(e))
1295 task["error_msg"] = error_text
1296 task["status"] = "FAILED"
1297 task["vim_id"] = None
1298 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1299 return instance_element_update
1300
1301 def del_sfp(self, task):
1302 sfp_vim_id = task["vim_id"]
1303 try:
1304 self.vim.delete_sfp(sfp_vim_id)
1305 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1306 task["error_msg"] = None
1307 return None
1308
1309 except vimconn.vimconnException as e:
1310 task["error_msg"] = self._format_vim_error_msg(str(e))
1311 if isinstance(e, vimconn.vimconnNotFoundException):
1312 # If not found mark as Done and fill error_msg
1313 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1314 return None
1315 task["status"] = "FAILED"
1316 return None