Merge branch 'feature7106'
[osm/RO.git] / osm_ro / vim_thread.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
6 # All Rights Reserved.
7 #
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
11 #
12 # http://www.apache.org/licenses/LICENSE-2.0
13 #
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
18 # under the License.
19 #
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
22 ##
23
24 """"
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
31
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
41 BUILD: not used
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
51 the FIND params
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
53 on a net creation
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
58 sdn_port_id:
59 sdn_net_id:
60 vim_info
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
65 vim_wim_actions
66 M depends: dict with task_index(from depends_on) to vim_id
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
71 be done
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
76
77 """
78
79 import threading
80 import time
81 import Queue
82 import logging
83 import vimconn
84 import vimconn_openvim
85 import vimconn_aws
86 import vimconn_opennebula
87 import vimconn_openstack
88 import vimconn_vmware
89 import yaml
90 from db_base import db_base_Exception
91 from lib_osm_openvim.ovim import ovimException
92 from copy import deepcopy
93
94 __author__ = "Alfonso Tierno, Pablo Montes"
95 __date__ = "$28-Sep-2017 12:07:15$"
96
97 vim_module = {
98 "openvim": vimconn_openvim,
99 "aws": vimconn_aws,
100 "opennebula": vimconn_opennebula,
101 "openstack": vimconn_openstack,
102 "vmware": vimconn_vmware,
103 }
104
105
106 def is_task_id(task_id):
107 return task_id.startswith("TASK-")
108
109
110 class VimThreadException(Exception):
111 pass
112
113
114 class VimThreadExceptionNotFound(VimThreadException):
115 pass
116
117
118 class vim_thread(threading.Thread):
119 REFRESH_BUILD = 5 # 5 seconds
120 REFRESH_ACTIVE = 60 # 1 minute
121 REFRESH_ERROR = 600
122 REFRESH_DELETE = 3600 * 10
123
124 def __init__(self, task_lock, name=None, datacenter_name=None, datacenter_tenant_id=None,
125 db=None, db_lock=None, ovim=None):
126 """Init a thread.
127 Arguments:
128 'id' number of thead
129 'name' name of thread
130 'host','user': host ip or name to manage and user
131 'db', 'db_lock': database class and lock to use it in exclusion
132 """
133 threading.Thread.__init__(self)
134 self.vim = None
135 self.error_status = None
136 self.datacenter_name = datacenter_name
137 self.datacenter_tenant_id = datacenter_tenant_id
138 self.ovim = ovim
139 if not name:
140 self.name = vimconn["id"] + "." + vimconn["config"]["datacenter_tenant_id"]
141 else:
142 self.name = name
143 self.vim_persistent_info = {}
144 self.my_id = self.name[:64]
145
146 self.logger = logging.getLogger('openmano.vim.' + self.name)
147 self.db = db
148 self.db_lock = db_lock
149
150 self.task_lock = task_lock
151 self.task_queue = Queue.Queue(2000)
152
153 def get_vimconnector(self):
154 try:
155 from_ = "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
156 select_ = ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
157 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
158 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
159 'user', 'passwd', 'dt.config as dt_config')
160 where_ = {"dt.uuid": self.datacenter_tenant_id}
161 vims = self.db.get_rows(FROM=from_, SELECT=select_, WHERE=where_)
162 vim = vims[0]
163 vim_config = {}
164 if vim["config"]:
165 vim_config.update(yaml.load(vim["config"]))
166 if vim["dt_config"]:
167 vim_config.update(yaml.load(vim["dt_config"]))
168 vim_config['datacenter_tenant_id'] = vim.get('datacenter_tenant_id')
169 vim_config['datacenter_id'] = vim.get('datacenter_id')
170
171 # get port_mapping
172 with self.db_lock:
173 vim_config["wim_external_ports"] = self.ovim.get_of_port_mappings(
174 db_filter={"region": vim_config['datacenter_id'], "pci": None})
175
176 self.vim = vim_module[vim["type"]].vimconnector(
177 uuid=vim['datacenter_id'], name=vim['datacenter_name'],
178 tenant_id=vim['vim_tenant_id'], tenant_name=vim['vim_tenant_name'],
179 url=vim['vim_url'], url_admin=vim['vim_url_admin'],
180 user=vim['user'], passwd=vim['passwd'],
181 config=vim_config, persistent_info=self.vim_persistent_info
182 )
183 self.error_status = None
184 except Exception as e:
185 self.logger.error("Cannot load vimconnector for vim_account {}: {}".format(self.datacenter_tenant_id, e))
186 self.vim = None
187 self.error_status = "Error loading vimconnector: {}".format(e)
188
189 def _get_db_task(self):
190 """
191 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
192 :return: None
193 """
194 now = time.time()
195 try:
196 database_limit = 20
197 task_related = None
198 while True:
199 # get 20 (database_limit) entries each time
200 vim_actions = self.db.get_rows(FROM="vim_wim_actions",
201 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
202 "status": ['SCHEDULED', 'BUILD', 'DONE'],
203 "worker": [None, self.my_id], "modified_at<=": now
204 },
205 ORDER_BY=("modified_at", "created_at",),
206 LIMIT=database_limit)
207 if not vim_actions:
208 return None, None
209 # if vim_actions[0]["modified_at"] > now:
210 # return int(vim_actions[0] - now)
211 for task in vim_actions:
212 # block related task
213 if task_related == task["related"]:
214 continue # ignore if a locking has already tried for these task set
215 task_related = task["related"]
216 # lock ...
217 self.db.update_rows("vim_wim_actions", UPDATE={"worker": self.my_id}, modified_time=0,
218 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
219 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
220 "worker": [None, self.my_id],
221 "related": task_related,
222 "item": task["item"],
223 })
224 # ... and read all related and check if locked
225 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
226 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
227 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
228 "related": task_related,
229 "item": task["item"],
230 },
231 ORDER_BY=("created_at",))
232 # check that all related tasks have been locked. If not release and try again. It can happen
233 # for race conditions if a new related task has been inserted by nfvo in the process
234 some_tasks_locked = False
235 some_tasks_not_locked = False
236 creation_task = None
237 for relate_task in related_tasks:
238 if relate_task["worker"] != self.my_id:
239 some_tasks_not_locked = True
240 else:
241 some_tasks_locked = True
242 if not creation_task and relate_task["action"] in ("CREATE", "FIND"):
243 creation_task = relate_task
244 if some_tasks_not_locked:
245 if some_tasks_locked: # unlock
246 self.db.update_rows("vim_wim_actions", UPDATE={"worker": None}, modified_time=0,
247 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
248 "worker": self.my_id,
249 "related": task_related,
250 "item": task["item"],
251 })
252 continue
253
254 # task of creation must be the first in the list of related_task
255 assert(related_tasks[0]["action"] in ("CREATE", "FIND"))
256
257 if task["extra"]:
258 extra = yaml.load(task["extra"])
259 else:
260 extra = {}
261 task["extra"] = extra
262 if extra.get("depends_on"):
263 task["depends"] = {}
264 if extra.get("params"):
265 task["params"] = deepcopy(extra["params"])
266 return task, related_tasks
267 except Exception as e:
268 self.logger.critical("Unexpected exception at _get_db_task: " + str(e), exc_info=True)
269 return None, None
270
271 def _delete_task(self, task):
272 """
273 Determine if this task need to be done or superseded
274 :return: None
275 """
276
277 def copy_extra_created(copy_to, copy_from):
278 copy_to["created"] = copy_from["created"]
279 if copy_from.get("sdn_net_id"):
280 copy_to["sdn_net_id"] = copy_from["sdn_net_id"]
281 if copy_from.get("interfaces"):
282 copy_to["interfaces"] = copy_from["interfaces"]
283 if copy_from.get("created_items"):
284 if not copy_to.get("created_items"):
285 copy_to["created_items"] = {}
286 copy_to["created_items"].update(copy_from["created_items"])
287
288 task_create = None
289 dependency_task = None
290 deletion_needed = False
291 if task["status"] == "FAILED":
292 return # TODO need to be retry??
293 try:
294 # get all related tasks
295 related_tasks = self.db.get_rows(FROM="vim_wim_actions",
296 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
297 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
298 "action": ["FIND", "CREATE"],
299 "related": task["related"],
300 },
301 ORDER_BY=("created_at",),
302 )
303 for related_task in related_tasks:
304 if related_task["item"] == task["item"] and related_task["item_id"] == task["item_id"]:
305 task_create = related_task
306 # TASK_CREATE
307 if related_task["extra"]:
308 extra_created = yaml.load(related_task["extra"])
309 if extra_created.get("created"):
310 deletion_needed = True
311 related_task["extra"] = extra_created
312 elif not dependency_task:
313 dependency_task = related_task
314 if task_create and dependency_task:
315 break
316
317 # mark task_create as FINISHED
318 self.db.update_rows("vim_wim_actions", UPDATE={"status": "FINISHED"},
319 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
320 "instance_action_id": task_create["instance_action_id"],
321 "task_index": task_create["task_index"]
322 })
323 if not deletion_needed:
324 return
325 elif dependency_task:
326 # move create information from task_create to relate_task
327 extra_new_created = yaml.load(dependency_task["extra"]) or {}
328 extra_new_created["created"] = extra_created["created"]
329 copy_extra_created(copy_to=extra_new_created, copy_from=extra_created)
330
331 self.db.update_rows("vim_wim_actions",
332 UPDATE={"extra": yaml.safe_dump(extra_new_created, default_flow_style=True,
333 width=256),
334 "vim_id": task_create.get("vim_id")},
335 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
336 "instance_action_id": dependency_task["instance_action_id"],
337 "task_index": dependency_task["task_index"]
338 })
339 return False
340 else:
341 task["vim_id"] = task_create["vim_id"]
342 copy_extra_created(copy_to=task["extra"], copy_from=task_create["extra"])
343 return True
344
345 except Exception as e:
346 self.logger.critical("Unexpected exception at _delete_task: " + str(e), exc_info=True)
347
348 def _refres_vm(self, task):
349 """Call VIM to get VMs status"""
350 database_update = None
351
352 vim_id = task["vim_id"]
353 vm_to_refresh_list = [vim_id]
354 try:
355 vim_dict = self.vim.refresh_vms_status(vm_to_refresh_list)
356 vim_info = vim_dict[vim_id]
357 except vimconn.vimconnException as e:
358 # Mark all tasks at VIM_ERROR status
359 self.logger.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e))
360 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
361
362 task_id = task["instance_action_id"] + "." + str(task["task_index"])
363 self.logger.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id, task["vim_id"], vim_info))
364
365 # check and update interfaces
366 task_warning_msg = ""
367 for interface in vim_info.get("interfaces", ()):
368 vim_interface_id = interface["vim_interface_id"]
369 if vim_interface_id not in task["extra"]["interfaces"]:
370 self.logger.critical("task={} get-VM: Interface not found {} on task info {}".format(
371 task_id, vim_interface_id, task["extra"]["interfaces"]), exc_info=True)
372 continue
373 task_interface = task["extra"]["interfaces"][vim_interface_id]
374 task_vim_interface = task_interface.get("vim_info")
375 if task_vim_interface != interface:
376 # delete old port
377 if task_interface.get("sdn_port_id"):
378 try:
379 with self.db_lock:
380 self.ovim.delete_port(task_interface["sdn_port_id"], idempotent=True)
381 task_interface["sdn_port_id"] = None
382 except ovimException as e:
383 error_text = "ovimException deleting external_port={}: {}".format(
384 task_interface["sdn_port_id"], e)
385 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
386 task_warning_msg += error_text
387 # TODO Set error_msg at instance_nets instead of instance VMs
388
389 # Create SDN port
390 sdn_net_id = task_interface.get("sdn_net_id")
391 if sdn_net_id and interface.get("compute_node") and interface.get("pci"):
392 sdn_port_name = sdn_net_id + "." + task["vim_id"]
393 sdn_port_name = sdn_port_name[:63]
394 try:
395 with self.db_lock:
396 sdn_port_id = self.ovim.new_external_port(
397 {"compute_node": interface["compute_node"],
398 "pci": interface["pci"],
399 "vlan": interface.get("vlan"),
400 "net_id": sdn_net_id,
401 "region": self.vim["config"]["datacenter_id"],
402 "name": sdn_port_name,
403 "mac": interface.get("mac_address")})
404 task_interface["sdn_port_id"] = sdn_port_id
405 except (ovimException, Exception) as e:
406 error_text = "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
407 format(interface["compute_node"], interface["pci"], interface.get("vlan"), e)
408 self.logger.error("task={} get-VM: {}".format(task_id, error_text), exc_info=True)
409 task_warning_msg += error_text
410 # TODO Set error_msg at instance_nets instead of instance VMs
411
412 self.db.update_rows('instance_interfaces',
413 UPDATE={"mac_address": interface.get("mac_address"),
414 "ip_address": interface.get("ip_address"),
415 "vim_interface_id": interface.get("vim_interface_id"),
416 "vim_info": interface.get("vim_info"),
417 "sdn_port_id": task_interface.get("sdn_port_id"),
418 "compute_node": interface.get("compute_node"),
419 "pci": interface.get("pci"),
420 "vlan": interface.get("vlan")},
421 WHERE={'uuid': task_interface["iface_id"]})
422 task_interface["vim_info"] = interface
423
424 # check and update task and instance_vms database
425 vim_info_error_msg = None
426 if vim_info.get("error_msg"):
427 vim_info_error_msg = self._format_vim_error_msg(vim_info["error_msg"] + task_warning_msg)
428 elif task_warning_msg:
429 vim_info_error_msg = self._format_vim_error_msg(task_warning_msg)
430 task_vim_info = task["extra"].get("vim_info")
431 task_error_msg = task.get("error_msg")
432 task_vim_status = task["extra"].get("vim_status")
433 if task_vim_status != vim_info["status"] or task_error_msg != vim_info_error_msg or \
434 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
435 database_update = {"status": vim_info["status"], "error_msg": vim_info_error_msg}
436 if vim_info.get("vim_info"):
437 database_update["vim_info"] = vim_info["vim_info"]
438
439 task["extra"]["vim_status"] = vim_info["status"]
440 task["error_msg"] = vim_info_error_msg
441 if vim_info.get("vim_info"):
442 task["extra"]["vim_info"] = vim_info["vim_info"]
443
444 return database_update
445
446 def _refres_net(self, task):
447 """Call VIM to get network status"""
448 database_update = None
449
450 vim_id = task["vim_id"]
451 net_to_refresh_list = [vim_id]
452 try:
453 vim_dict = self.vim.refresh_nets_status(net_to_refresh_list)
454 vim_info = vim_dict[vim_id]
455 except vimconn.vimconnException as e:
456 # Mark all tasks at VIM_ERROR status
457 self.logger.error("task=several get-net: vimconnException when trying to refresh nets " + str(e))
458 vim_info = {"status": "VIM_ERROR", "error_msg": str(e)}
459
460 task_id = task["instance_action_id"] + "." + str(task["task_index"])
461 self.logger.debug("task={} get-net: vim_net_id={} result={}".format(task_id, task["vim_id"], vim_info))
462
463 task_vim_info = task["extra"].get("vim_info")
464 task_vim_status = task["extra"].get("vim_status")
465 task_error_msg = task.get("error_msg")
466 task_sdn_net_id = task["extra"].get("sdn_net_id")
467
468 vim_info_status = vim_info["status"]
469 vim_info_error_msg = vim_info.get("error_msg")
470 # get ovim status
471 if task_sdn_net_id:
472 try:
473 with self.db_lock:
474 sdn_net = self.ovim.show_network(task_sdn_net_id)
475 except (ovimException, Exception) as e:
476 text_error = "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id, e)
477 self.logger.error("task={} get-net: {}".format(task_id, text_error), exc_info=True)
478 sdn_net = {"status": "ERROR", "last_error": text_error}
479 if sdn_net["status"] == "ERROR":
480 if not vim_info_error_msg:
481 vim_info_error_msg = str(sdn_net.get("last_error"))
482 else:
483 vim_info_error_msg = "VIM_ERROR: {} && SDN_ERROR: {}".format(
484 self._format_vim_error_msg(vim_info_error_msg, 1024 // 2 - 14),
485 self._format_vim_error_msg(sdn_net["last_error"], 1024 // 2 - 14))
486 vim_info_status = "ERROR"
487 elif sdn_net["status"] == "BUILD":
488 if vim_info_status == "ACTIVE":
489 vim_info_status = "BUILD"
490
491 # update database
492 if vim_info_error_msg:
493 vim_info_error_msg = self._format_vim_error_msg(vim_info_error_msg)
494 if task_vim_status != vim_info_status or task_error_msg != vim_info_error_msg or \
495 (vim_info.get("vim_info") and task_vim_info != vim_info["vim_info"]):
496 task["extra"]["vim_status"] = vim_info_status
497 task["error_msg"] = vim_info_error_msg
498 if vim_info.get("vim_info"):
499 task["extra"]["vim_info"] = vim_info["vim_info"]
500 database_update = {"status": vim_info_status, "error_msg": vim_info_error_msg}
501 if vim_info.get("vim_info"):
502 database_update["vim_info"] = vim_info["vim_info"]
503 return database_update
504
505 def _proccess_pending_tasks(self, task, related_tasks):
506 old_task_status = task["status"]
507 create_or_find = False # if as result of processing this task something is created or found
508 next_refresh = 0
509
510 try:
511 if task["status"] == "SCHEDULED":
512 # check if tasks that this depends on have been completed
513 dependency_not_completed = False
514 dependency_modified_at = 0
515 for task_index in task["extra"].get("depends_on", ()):
516 task_dependency = self._look_for_task(task["instance_action_id"], task_index)
517 if not task_dependency:
518 raise VimThreadException(
519 "Cannot get depending net task trying to get depending task {}.{}".format(
520 task["instance_action_id"], task_index))
521 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
522 # database must be look again
523 if task_dependency["status"] == "SCHEDULED":
524 dependency_not_completed = True
525 dependency_modified_at = task_dependency["modified_at"]
526 break
527 elif task_dependency["status"] == "FAILED":
528 raise VimThreadException(
529 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
530 task["action"], task["item"],
531 task["instance_action_id"], task["task_index"],
532 task_dependency["instance_action_id"], task_dependency["task_index"],
533 task_dependency["action"], task_dependency["item"], task_dependency.get("error_msg")))
534
535 task["depends"]["TASK-"+str(task_index)] = task_dependency["vim_id"]
536 task["depends"]["TASK-{}.{}".format(task["instance_action_id"], task_index)] =\
537 task_dependency["vim_id"]
538 if dependency_not_completed:
539 # Move this task to the time dependency is going to be modified plus 10 seconds.
540 self.db.update_rows("vim_wim_actions", modified_time=dependency_modified_at + 10,
541 UPDATE={"worker": None},
542 WHERE={"datacenter_vim_id": self.datacenter_tenant_id, "worker": self.my_id,
543 "related": task["related"],
544 })
545 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
546 # if task["extra"]["tries"] > 3:
547 # raise VimThreadException(
548 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
549 # "(task {}.{})".format(task["action"], task["item"],
550 # task["instance_action_id"], task["task_index"],
551 # task_dependency["instance_action_id"], task_dependency["task_index"]
552 # task_dependency["action"], task_dependency["item"]))
553 return
554
555 database_update = None
556 if task["action"] == "DELETE":
557 deleted_needed = self._delete_task(task)
558 if not deleted_needed:
559 task["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
560 task["error_msg"] = None
561
562 if task["status"] == "SUPERSEDED":
563 # not needed to do anything but update database with the new status
564 database_update = None
565 elif not self.vim:
566 task["status"] = "FAILED"
567 task["error_msg"] = self.error_status
568 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
569 elif task["item_id"] != related_tasks[0]["item_id"] and task["action"] in ("FIND", "CREATE"):
570 # Do nothing, just copy values from one to another and updata database
571 task["status"] = related_tasks[0]["status"]
572 task["error_msg"] = related_tasks[0]["error_msg"]
573 task["vim_id"] = related_tasks[0]["vim_id"]
574 extra = yaml.load(related_tasks[0]["extra"])
575 task["extra"]["vim_status"] = extra["vim_status"]
576 next_refresh = related_tasks[0]["modified_at"] + 0.001
577 database_update = {"status": task["extra"].get("vim_status", "VIM_ERROR"),
578 "error_msg": task["error_msg"]}
579 if task["item"] == 'instance_vms':
580 database_update["vim_vm_id"] = task["vim_id"]
581 elif task["item"] == 'instance_nets':
582 database_update["vim_net_id"] = task["vim_id"]
583 elif task["item"] == 'instance_vms':
584 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
585 database_update = self._refres_vm(task)
586 create_or_find = True
587 elif task["action"] == "CREATE":
588 create_or_find = True
589 database_update = self.new_vm(task)
590 elif task["action"] == "DELETE":
591 self.del_vm(task)
592 else:
593 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
594 elif task["item"] == 'instance_nets':
595 if task["status"] in ('BUILD', 'DONE') and task["action"] in ("FIND", "CREATE"):
596 database_update = self._refres_net(task)
597 create_or_find = True
598 elif task["action"] == "CREATE":
599 create_or_find = True
600 database_update = self.new_net(task)
601 elif task["action"] == "DELETE":
602 self.del_net(task)
603 elif task["action"] == "FIND":
604 database_update = self.get_net(task)
605 else:
606 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
607 elif task["item"] == 'instance_sfis':
608 if task["action"] == "CREATE":
609 create_or_find = True
610 database_update = self.new_sfi(task)
611 elif task["action"] == "DELETE":
612 self.del_sfi(task)
613 else:
614 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
615 elif task["item"] == 'instance_sfs':
616 if task["action"] == "CREATE":
617 create_or_find = True
618 database_update = self.new_sf(task)
619 elif task["action"] == "DELETE":
620 self.del_sf(task)
621 else:
622 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
623 elif task["item"] == 'instance_classifications':
624 if task["action"] == "CREATE":
625 create_or_find = True
626 database_update = self.new_classification(task)
627 elif task["action"] == "DELETE":
628 self.del_classification(task)
629 else:
630 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
631 elif task["item"] == 'instance_sfps':
632 if task["action"] == "CREATE":
633 create_or_find = True
634 database_update = self.new_sfp(task)
635 elif task["action"] == "DELETE":
636 self.del_sfp(task)
637 else:
638 raise vimconn.vimconnException(self.name + "unknown task action {}".format(task["action"]))
639 else:
640 raise vimconn.vimconnException(self.name + "unknown task item {}".format(task["item"]))
641 # TODO
642 except VimThreadException as e:
643 task["error_msg"] = str(e)
644 task["status"] = "FAILED"
645 database_update = {"status": "VIM_ERROR", "error_msg": task["error_msg"]}
646 if task["item"] == 'instance_vms':
647 database_update["vim_vm_id"] = None
648 elif task["item"] == 'instance_nets':
649 database_update["vim_net_id"] = None
650
651 task_id = task["instance_action_id"] + "." + str(task["task_index"])
652 self.logger.debug("task={} item={} action={} result={}:'{}' params={}".format(
653 task_id, task["item"], task["action"], task["status"],
654 task["vim_id"] if task["status"] == "DONE" else task.get("error_msg"), task["params"]))
655 try:
656 if not next_refresh:
657 if task["status"] == "DONE":
658 next_refresh = time.time()
659 if task["extra"].get("vim_status") == "BUILD":
660 next_refresh += self.REFRESH_BUILD
661 elif task["extra"].get("vim_status") in ("ERROR", "VIM_ERROR"):
662 next_refresh += self.REFRESH_ERROR
663 elif task["extra"].get("vim_status") == "DELETED":
664 next_refresh += self.REFRESH_DELETE
665 else:
666 next_refresh += self.REFRESH_ACTIVE
667 elif task["status"] == "FAILED":
668 next_refresh = time.time() + self.REFRESH_DELETE
669
670 if create_or_find:
671 # modify all related task with action FIND/CREATED non SCHEDULED
672 self.db.update_rows(
673 table="vim_wim_actions", modified_time=next_refresh + 0.001,
674 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
675 "error_msg": task["error_msg"],
676 },
677
678 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
679 "worker": self.my_id,
680 "action": ["FIND", "CREATE"],
681 "related": task["related"],
682 "status<>": "SCHEDULED",
683 })
684 # modify own task
685 self.db.update_rows(
686 table="vim_wim_actions", modified_time=next_refresh,
687 UPDATE={"status": task["status"], "vim_id": task.get("vim_id"),
688 "error_msg": task["error_msg"],
689 "extra": yaml.safe_dump(task["extra"], default_flow_style=True, width=256)},
690 WHERE={"instance_action_id": task["instance_action_id"], "task_index": task["task_index"]})
691 # Unlock tasks
692 self.db.update_rows(
693 table="vim_wim_actions", modified_time=0,
694 UPDATE={"worker": None},
695 WHERE={"datacenter_vim_id": self.datacenter_tenant_id,
696 "worker": self.my_id,
697 "related": task["related"],
698 })
699
700 # Update table instance_actions
701 if old_task_status == "SCHEDULED" and task["status"] != old_task_status:
702 self.db.update_rows(
703 table="instance_actions",
704 UPDATE={("number_failed" if task["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
705 WHERE={"uuid": task["instance_action_id"]})
706 if database_update:
707 self.db.update_rows(table=task["item"],
708 UPDATE=database_update,
709 WHERE={"related": task["related"]})
710 except db_base_Exception as e:
711 self.logger.error("task={} Error updating database {}".format(task_id, e), exc_info=True)
712
713 def insert_task(self, task):
714 try:
715 self.task_queue.put(task, False)
716 return None
717 except Queue.Full:
718 raise vimconn.vimconnException(self.name + ": timeout inserting a task")
719
720 def del_task(self, task):
721 with self.task_lock:
722 if task["status"] == "SCHEDULED":
723 task["status"] = "SUPERSEDED"
724 return True
725 else: # task["status"] == "processing"
726 self.task_lock.release()
727 return False
728
729 def run(self):
730 self.logger.debug("Starting")
731 while True:
732 self.get_vimconnector()
733 self.logger.debug("Vimconnector loaded")
734 reload_thread = False
735
736 while True:
737 try:
738 while not self.task_queue.empty():
739 task = self.task_queue.get()
740 if isinstance(task, list):
741 pass
742 elif isinstance(task, str):
743 if task == 'exit':
744 return 0
745 elif task == 'reload':
746 reload_thread = True
747 break
748 self.task_queue.task_done()
749 if reload_thread:
750 break
751
752 task, related_tasks = self._get_db_task()
753 if task:
754 self._proccess_pending_tasks(task, related_tasks)
755 else:
756 time.sleep(5)
757
758 except Exception as e:
759 self.logger.critical("Unexpected exception at run: " + str(e), exc_info=True)
760
761 self.logger.debug("Finishing")
762
763 def _look_for_task(self, instance_action_id, task_id):
764 """
765 Look for a concrete task at vim_actions database table
766 :param instance_action_id: The instance_action_id
767 :param task_id: Can have several formats:
768 <task index>: integer
769 TASK-<task index> :backward compatibility,
770 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
771 :return: Task dictionary or None if not found
772 """
773 if isinstance(task_id, int):
774 task_index = task_id
775 else:
776 if task_id.startswith("TASK-"):
777 task_id = task_id[5:]
778 ins_action_id, _, task_index = task_id.rpartition(".")
779 if ins_action_id:
780 instance_action_id = ins_action_id
781
782 tasks = self.db.get_rows(FROM="vim_wim_actions", WHERE={"instance_action_id": instance_action_id,
783 "task_index": task_index})
784 if not tasks:
785 return None
786 task = tasks[0]
787 task["params"] = None
788 task["depends"] = {}
789 if task["extra"]:
790 extra = yaml.load(task["extra"])
791 task["extra"] = extra
792 task["params"] = extra.get("params")
793 else:
794 task["extra"] = {}
795 return task
796
797 @staticmethod
798 def _format_vim_error_msg(error_text, max_length=1024):
799 if error_text and len(error_text) >= max_length:
800 return error_text[:max_length // 2 - 3] + " ... " + error_text[-max_length // 2 + 3:]
801 return error_text
802
803 def new_vm(self, task):
804 task_id = task["instance_action_id"] + "." + str(task["task_index"])
805 try:
806 params = task["params"]
807 depends = task.get("depends")
808 net_list = params[5]
809 for net in net_list:
810 if "net_id" in net and is_task_id(net["net_id"]): # change task_id into network_id
811 network_id = task["depends"][net["net_id"]]
812 if not network_id:
813 raise VimThreadException(
814 "Cannot create VM because depends on a network not created or found: " +
815 str(depends[net["net_id"]]["error_msg"]))
816 net["net_id"] = network_id
817 params_copy = deepcopy(params)
818 vim_vm_id, created_items = self.vim.new_vminstance(*params_copy)
819
820 # fill task_interfaces. Look for snd_net_id at database for each interface
821 task_interfaces = {}
822 for iface in params_copy[5]:
823 task_interfaces[iface["vim_id"]] = {"iface_id": iface["uuid"]}
824 result = self.db.get_rows(
825 SELECT=('sdn_net_id', 'interface_id'),
826 FROM='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
827 WHERE={'ii.uuid': iface["uuid"]})
828 if result:
829 task_interfaces[iface["vim_id"]]["sdn_net_id"] = result[0]['sdn_net_id']
830 task_interfaces[iface["vim_id"]]["interface_id"] = result[0]['interface_id']
831 else:
832 self.logger.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id,
833 iface["uuid"]),
834 exc_info=True)
835
836 task["vim_info"] = {}
837 task["extra"]["interfaces"] = task_interfaces
838 task["extra"]["created"] = True
839 task["extra"]["created_items"] = created_items
840 task["extra"]["vim_status"] = "BUILD"
841 task["error_msg"] = None
842 task["status"] = "DONE"
843 task["vim_id"] = vim_vm_id
844 instance_element_update = {"status": "BUILD", "vim_vm_id": vim_vm_id, "error_msg": None}
845 return instance_element_update
846
847 except (vimconn.vimconnException, VimThreadException) as e:
848 self.logger.error("task={} new-VM: {}".format(task_id, e))
849 error_text = self._format_vim_error_msg(str(e))
850 task["error_msg"] = error_text
851 task["status"] = "FAILED"
852 task["vim_id"] = None
853 instance_element_update = {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text}
854 return instance_element_update
855
856 def del_vm(self, task):
857 task_id = task["instance_action_id"] + "." + str(task["task_index"])
858 vm_vim_id = task["vim_id"]
859 interfaces = task["extra"].get("interfaces", ())
860 try:
861 for iface in interfaces.values():
862 if iface.get("sdn_port_id"):
863 try:
864 with self.db_lock:
865 self.ovim.delete_port(iface["sdn_port_id"], idempotent=True)
866 except ovimException as e:
867 self.logger.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
868 task_id, iface["sdn_port_id"], e), exc_info=True)
869 # TODO Set error_msg at instance_nets
870
871 self.vim.delete_vminstance(vm_vim_id, task["extra"].get("created_items"))
872 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
873 task["error_msg"] = None
874 return None
875
876 except vimconn.vimconnException as e:
877 task["error_msg"] = self._format_vim_error_msg(str(e))
878 if isinstance(e, vimconn.vimconnNotFoundException):
879 # If not found mark as Done and fill error_msg
880 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
881 return None
882 task["status"] = "FAILED"
883 return None
884
885 def _get_net_internal(self, task, filter_param):
886 """
887 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
888 :param task: task for this find or find-or-create action
889 :param filter_param: parameters to send to the vimconnector
890 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
891 when network is not found or found more than one
892 """
893 vim_nets = self.vim.get_network_list(filter_param)
894 if not vim_nets:
895 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param))
896 elif len(vim_nets) > 1:
897 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param))
898 vim_net_id = vim_nets[0]["id"]
899
900 # Discover if this network is managed by a sdn controller
901 sdn_net_id = None
902 result = self.db.get_rows(SELECT=('sdn_net_id',), FROM='instance_nets',
903 WHERE={'vim_net_id': vim_net_id, 'datacenter_tenant_id': self.datacenter_tenant_id},
904 ORDER="instance_scenario_id")
905 if result:
906 sdn_net_id = result[0]['sdn_net_id']
907
908 task["status"] = "DONE"
909 task["extra"]["vim_info"] = {}
910 task["extra"]["created"] = False
911 task["extra"]["vim_status"] = "BUILD"
912 task["extra"]["sdn_net_id"] = sdn_net_id
913 task["error_msg"] = None
914 task["vim_id"] = vim_net_id
915 instance_element_update = {"vim_net_id": vim_net_id, "created": False, "status": "BUILD",
916 "error_msg": None, "sdn_net_id": sdn_net_id}
917 return instance_element_update
918
919 def get_net(self, task):
920 task_id = task["instance_action_id"] + "." + str(task["task_index"])
921 try:
922 params = task["params"]
923 filter_param = params[0]
924 instance_element_update = self._get_net_internal(task, filter_param)
925 return instance_element_update
926
927 except (vimconn.vimconnException, VimThreadException) as e:
928 self.logger.error("task={} get-net: {}".format(task_id, e))
929 task["status"] = "FAILED"
930 task["vim_id"] = None
931 task["error_msg"] = self._format_vim_error_msg(str(e))
932 instance_element_update = {"vim_net_id": None, "status": "VIM_ERROR",
933 "error_msg": task["error_msg"]}
934 return instance_element_update
935
936 def new_net(self, task):
937 vim_net_id = None
938 sdn_net_id = None
939 task_id = task["instance_action_id"] + "." + str(task["task_index"])
940 action_text = ""
941 try:
942 # FIND
943 if task["extra"].get("find"):
944 action_text = "finding"
945 filter_param = task["extra"]["find"][0]
946 try:
947 instance_element_update = self._get_net_internal(task, filter_param)
948 return instance_element_update
949 except VimThreadExceptionNotFound:
950 pass
951 # CREATE
952 params = task["params"]
953 action_text = "creating VIM"
954 vim_net_id, created_items = self.vim.new_network(*params[0:3])
955
956 net_name = params[0]
957 net_type = params[1]
958 wim_account_name = None
959 if len(params) >= 4:
960 wim_account_name = params[3]
961
962 sdn_controller = self.vim.config.get('sdn-controller')
963 if sdn_controller and (net_type == "data" or net_type == "ptp"):
964 network = {"name": net_name, "type": net_type, "region": self.vim["config"]["datacenter_id"]}
965
966 vim_net = self.vim.get_network(vim_net_id)
967 if vim_net.get('encapsulation') != 'vlan':
968 raise vimconn.vimconnException(
969 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
970 net_name, net_type, vim_net['encapsulation']))
971 network["vlan"] = vim_net.get('segmentation_id')
972 action_text = "creating SDN"
973 with self.db_lock:
974 sdn_net_id = self.ovim.new_network(network)
975
976 if wim_account_name and self.vim.config["wim_external_ports"]:
977 # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
978 action_text = "attaching external port to ovim network"
979 sdn_port_name = sdn_net_id + "." + task["vim_id"]
980 sdn_port_name = sdn_port_name[:63]
981 sdn_port_data = {
982 "compute_node": "__WIM:" + wim_account_name[0:58],
983 "pci": None,
984 "vlan": network["vlan"],
985 "net_id": sdn_net_id,
986 "region": self.vim["config"]["datacenter_id"],
987 "name": sdn_port_name,
988 }
989 try:
990 with self.db_lock:
991 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
992 except ovimException:
993 sdn_port_data["compute_node"] = "__WIM"
994 with self.db_lock:
995 sdn_external_port_id = self.ovim.new_external_port(sdn_port_data)
996 self.logger.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id,
997 sdn_net_id))
998 task["status"] = "DONE"
999 task["extra"]["vim_info"] = {}
1000 task["extra"]["sdn_net_id"] = sdn_net_id
1001 task["extra"]["vim_status"] = "BUILD"
1002 task["extra"]["created"] = True
1003 task["extra"]["created_items"] = created_items
1004 task["error_msg"] = None
1005 task["vim_id"] = vim_net_id
1006 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "BUILD",
1007 "created": True, "error_msg": None}
1008 return instance_element_update
1009 except (vimconn.vimconnException, ovimException) as e:
1010 self.logger.error("task={} new-net: Error {}: {}".format(task_id, action_text, e))
1011 task["status"] = "FAILED"
1012 task["vim_id"] = vim_net_id
1013 task["error_msg"] = self._format_vim_error_msg(str(e))
1014 task["extra"]["sdn_net_id"] = sdn_net_id
1015 instance_element_update = {"vim_net_id": vim_net_id, "sdn_net_id": sdn_net_id, "status": "VIM_ERROR",
1016 "error_msg": task["error_msg"]}
1017 return instance_element_update
1018
1019 def del_net(self, task):
1020 net_vim_id = task["vim_id"]
1021 sdn_net_id = task["extra"].get("sdn_net_id")
1022 try:
1023 if sdn_net_id:
1024 # Delete any attached port to this sdn network. There can be ports associated to this network in case
1025 # it was manually done using 'openmano vim-net-sdn-attach'
1026 with self.db_lock:
1027 port_list = self.ovim.get_ports(columns={'uuid'},
1028 filter={'name': 'external_port', 'net_id': sdn_net_id})
1029 for port in port_list:
1030 self.ovim.delete_port(port['uuid'], idempotent=True)
1031 self.ovim.delete_network(sdn_net_id, idempotent=True)
1032 if net_vim_id:
1033 self.vim.delete_network(net_vim_id, task["extra"].get("created_items"))
1034 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1035 task["error_msg"] = None
1036 return None
1037 except ovimException as e:
1038 task["error_msg"] = self._format_vim_error_msg("ovimException obtaining and deleting external "
1039 "ports for net {}: {}".format(sdn_net_id, str(e)))
1040 except vimconn.vimconnException as e:
1041 task["error_msg"] = self._format_vim_error_msg(str(e))
1042 if isinstance(e, vimconn.vimconnNotFoundException):
1043 # If not found mark as Done and fill error_msg
1044 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1045 return None
1046 task["status"] = "FAILED"
1047 return None
1048
1049 # Service Function Instances
1050 def new_sfi(self, task):
1051 vim_sfi_id = None
1052 try:
1053 # Waits for interfaces to be ready (avoids failure)
1054 time.sleep(1)
1055 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1056 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1057 error_text = ""
1058 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces")
1059 ingress_interface_id = task.get("extra").get("params").get("ingress_interface_id")
1060 egress_interface_id = task.get("extra").get("params").get("egress_interface_id")
1061 ingress_vim_interface_id = None
1062 egress_vim_interface_id = None
1063 for vim_interface, interface_data in interfaces.iteritems():
1064 if interface_data.get("interface_id") == ingress_interface_id:
1065 ingress_vim_interface_id = vim_interface
1066 break
1067 if ingress_interface_id != egress_interface_id:
1068 for vim_interface, interface_data in interfaces.iteritems():
1069 if interface_data.get("interface_id") == egress_interface_id:
1070 egress_vim_interface_id = vim_interface
1071 break
1072 else:
1073 egress_vim_interface_id = ingress_vim_interface_id
1074 if not ingress_vim_interface_id or not egress_vim_interface_id:
1075 error_text = "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1076 ingress_vim_interface_id, egress_vim_interface_id)
1077 self.logger.error(error_text)
1078 task["error_msg"] = error_text
1079 task["status"] = "FAILED"
1080 task["vim_id"] = None
1081 return None
1082 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1083 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1084 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1085 ingress_port_id_list = [ingress_vim_interface_id]
1086 egress_port_id_list = [egress_vim_interface_id]
1087 name = "sfi-%s" % task["item_id"][:8]
1088 # By default no form of IETF SFC Encapsulation will be used
1089 vim_sfi_id = self.vim.new_sfi(name, ingress_port_id_list, egress_port_id_list, sfc_encap=False)
1090
1091 task["extra"]["created"] = True
1092 task["extra"]["vim_status"] = "ACTIVE"
1093 task["error_msg"] = None
1094 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1095 task["vim_id"] = vim_sfi_id
1096 instance_element_update = {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id, "error_msg": None}
1097 return instance_element_update
1098
1099 except (vimconn.vimconnException, VimThreadException) as e:
1100 self.logger.error("Error creating Service Function Instance, task=%s: %s", task_id, str(e))
1101 error_text = self._format_vim_error_msg(str(e))
1102 task["error_msg"] = error_text
1103 task["status"] = "FAILED"
1104 task["vim_id"] = None
1105 instance_element_update = {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text}
1106 return instance_element_update
1107
1108 def del_sfi(self, task):
1109 sfi_vim_id = task["vim_id"]
1110 try:
1111 self.vim.delete_sfi(sfi_vim_id)
1112 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1113 task["error_msg"] = None
1114 return None
1115
1116 except vimconn.vimconnException as e:
1117 task["error_msg"] = self._format_vim_error_msg(str(e))
1118 if isinstance(e, vimconn.vimconnNotFoundException):
1119 # If not found mark as Done and fill error_msg
1120 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1121 return None
1122 task["status"] = "FAILED"
1123 return None
1124
1125 def new_sf(self, task):
1126 vim_sf_id = None
1127 try:
1128 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1129 error_text = ""
1130 depending_tasks = ["TASK-" + str(dep_id) for dep_id in task["extra"]["depends_on"]]
1131 # sfis = task.get("depends").values()[0].get("extra").get("params")[5]
1132 sfis = [task.get("depends").get(dep_task) for dep_task in depending_tasks]
1133 sfi_id_list = []
1134 for sfi in sfis:
1135 sfi_id_list.append(sfi.get("vim_id"))
1136 name = "sf-%s" % task["item_id"][:8]
1137 # By default no form of IETF SFC Encapsulation will be used
1138 vim_sf_id = self.vim.new_sf(name, sfi_id_list, sfc_encap=False)
1139
1140 task["extra"]["created"] = True
1141 task["extra"]["vim_status"] = "ACTIVE"
1142 task["error_msg"] = None
1143 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1144 task["vim_id"] = vim_sf_id
1145 instance_element_update = {"status": "ACTIVE", "vim_sf_id": vim_sf_id, "error_msg": None}
1146 return instance_element_update
1147
1148 except (vimconn.vimconnException, VimThreadException) as e:
1149 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1150 error_text = self._format_vim_error_msg(str(e))
1151 task["error_msg"] = error_text
1152 task["status"] = "FAILED"
1153 task["vim_id"] = None
1154 instance_element_update = {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text}
1155 return instance_element_update
1156
1157 def del_sf(self, task):
1158 sf_vim_id = task["vim_id"]
1159 try:
1160 self.vim.delete_sf(sf_vim_id)
1161 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1162 task["error_msg"] = None
1163 return None
1164
1165 except vimconn.vimconnException as e:
1166 task["error_msg"] = self._format_vim_error_msg(str(e))
1167 if isinstance(e, vimconn.vimconnNotFoundException):
1168 # If not found mark as Done and fill error_msg
1169 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1170 return None
1171 task["status"] = "FAILED"
1172 return None
1173
1174 def new_classification(self, task):
1175 vim_classification_id = None
1176 try:
1177 params = task["params"]
1178 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1179 dep_id = "TASK-" + str(task["extra"]["depends_on"][0])
1180 error_text = ""
1181 interfaces = task.get("depends").get(dep_id).get("extra").get("interfaces").keys()
1182 # Bear in mind that different VIM connectors might support Classifications differently.
1183 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1184 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1185 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1186 # using the IPv4 flow classifier.
1187 name = "c-%s" % task["item_id"][:8]
1188 # if not CIDR is given for the IP addresses, add /32:
1189 ip_proto = int(params.get("ip_proto"))
1190 source_ip = params.get("source_ip")
1191 destination_ip = params.get("destination_ip")
1192 source_port = params.get("source_port")
1193 destination_port = params.get("destination_port")
1194 definition = {"logical_source_port": interfaces[0]}
1195 if ip_proto:
1196 if ip_proto == 1:
1197 ip_proto = 'icmp'
1198 elif ip_proto == 6:
1199 ip_proto = 'tcp'
1200 elif ip_proto == 17:
1201 ip_proto = 'udp'
1202 definition["protocol"] = ip_proto
1203 if source_ip:
1204 if '/' not in source_ip:
1205 source_ip += '/32'
1206 definition["source_ip_prefix"] = source_ip
1207 if source_port:
1208 definition["source_port_range_min"] = source_port
1209 definition["source_port_range_max"] = source_port
1210 if destination_port:
1211 definition["destination_port_range_min"] = destination_port
1212 definition["destination_port_range_max"] = destination_port
1213 if destination_ip:
1214 if '/' not in destination_ip:
1215 destination_ip += '/32'
1216 definition["destination_ip_prefix"] = destination_ip
1217
1218 vim_classification_id = self.vim.new_classification(
1219 name, 'legacy_flow_classifier', definition)
1220
1221 task["extra"]["created"] = True
1222 task["extra"]["vim_status"] = "ACTIVE"
1223 task["error_msg"] = None
1224 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1225 task["vim_id"] = vim_classification_id
1226 instance_element_update = {"status": "ACTIVE", "vim_classification_id": vim_classification_id,
1227 "error_msg": None}
1228 return instance_element_update
1229
1230 except (vimconn.vimconnException, VimThreadException) as e:
1231 self.logger.error("Error creating Classification, task=%s: %s", task_id, str(e))
1232 error_text = self._format_vim_error_msg(str(e))
1233 task["error_msg"] = error_text
1234 task["status"] = "FAILED"
1235 task["vim_id"] = None
1236 instance_element_update = {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text}
1237 return instance_element_update
1238
1239 def del_classification(self, task):
1240 classification_vim_id = task["vim_id"]
1241 try:
1242 self.vim.delete_classification(classification_vim_id)
1243 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1244 task["error_msg"] = None
1245 return None
1246
1247 except vimconn.vimconnException as e:
1248 task["error_msg"] = self._format_vim_error_msg(str(e))
1249 if isinstance(e, vimconn.vimconnNotFoundException):
1250 # If not found mark as Done and fill error_msg
1251 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1252 return None
1253 task["status"] = "FAILED"
1254 return None
1255
1256 def new_sfp(self, task):
1257 vim_sfp_id = None
1258 try:
1259 task_id = task["instance_action_id"] + "." + str(task["task_index"])
1260 depending_tasks = [task.get("depends").get("TASK-" + str(tsk_id)) for tsk_id in
1261 task.get("extra").get("depends_on")]
1262 error_text = ""
1263 sf_id_list = []
1264 classification_id_list = []
1265 for dep in depending_tasks:
1266 vim_id = dep.get("vim_id")
1267 resource = dep.get("item")
1268 if resource == "instance_sfs":
1269 sf_id_list.append(vim_id)
1270 elif resource == "instance_classifications":
1271 classification_id_list.append(vim_id)
1272
1273 name = "sfp-%s" % task["item_id"][:8]
1274 # By default no form of IETF SFC Encapsulation will be used
1275 vim_sfp_id = self.vim.new_sfp(name, classification_id_list, sf_id_list, sfc_encap=False)
1276
1277 task["extra"]["created"] = True
1278 task["extra"]["vim_status"] = "ACTIVE"
1279 task["error_msg"] = None
1280 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1281 task["vim_id"] = vim_sfp_id
1282 instance_element_update = {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id, "error_msg": None}
1283 return instance_element_update
1284
1285 except (vimconn.vimconnException, VimThreadException) as e:
1286 self.logger.error("Error creating Service Function, task=%s: %s", task_id, str(e))
1287 error_text = self._format_vim_error_msg(str(e))
1288 task["error_msg"] = error_text
1289 task["status"] = "FAILED"
1290 task["vim_id"] = None
1291 instance_element_update = {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text}
1292 return instance_element_update
1293
1294 def del_sfp(self, task):
1295 sfp_vim_id = task["vim_id"]
1296 try:
1297 self.vim.delete_sfp(sfp_vim_id)
1298 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1299 task["error_msg"] = None
1300 return None
1301
1302 except vimconn.vimconnException as e:
1303 task["error_msg"] = self._format_vim_error_msg(str(e))
1304 if isinstance(e, vimconn.vimconnNotFoundException):
1305 # If not found mark as Done and fill error_msg
1306 task["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1307 return None
1308 task["status"] = "FAILED"
1309 return None