1 # -*- coding: utf-8 -*-
4 # Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U.
5 # This file is part of openvim
8 # Licensed under the Apache License, Version 2.0 (the "License"); you may
9 # not use this file except in compliance with the License. You may obtain
10 # a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact with: nfvlabs@tid.es
25 This is thread that interacts with a VIM. It processes TASKs sequentially against a single VIM.
26 The tasks are stored at database in table vim_wim_actions
27 Several vim_wim_actions can refer to the same element at VIM (flavor, network, ...). This is somethng to avoid if RO
28 is migrated to a non-relational database as mongo db. Each vim_wim_actions reference a different instance_Xxxxx
29 In this case "related" colunm contains the same value, to know they refer to the same vim. In case of deletion, it
30 there is related tasks using this element, it is not deleted, The vim_info needed to delete is transfered to other task
32 The task content is (M: stored at memory, D: stored at database):
33 MD instance_action_id: reference a global action over an instance-scenario: database instance_actions
34 MD task_index: index number of the task. This together with the previous forms a unique key identifier
35 MD datacenter_vim_id: should contain the uuid of the VIM managed by this thread
36 MD vim_id: id of the vm,net,etc at VIM
37 MD item: database table name, can be instance_vms, instance_nets, TODO: datacenter_flavors, datacenter_images
38 MD item_id: uuid of the referenced entry in the previous table
39 MD action: CREATE, DELETE, FIND
40 MD status: SCHEDULED: action need to be done
42 DONE: Done and it must be polled to VIM periodically to see status. ONLY for action=CREATE or FIND
43 FAILED: It cannot be created/found/deleted
44 FINISHED: similar to DONE, but no refresh is needed anymore. Task is maintained at database but
45 it is never processed by any thread
46 SUPERSEDED: similar to FINSISHED, but nothing has been done to completed the task.
47 MD extra: text with yaml format at database, dict at memory with:
48 params: list with the params to be sent to the VIM for CREATE or FIND. For DELETE the vim_id is taken
49 from other related tasks
50 find: (only for CREATE tasks) if present it should FIND before creating and use if existing. Contains
52 depends_on: list with the 'task_index'es of tasks that must be completed before. e.g. a vm creation depends
54 can contain an int (single index on the same instance-action) or str (compete action ID)
55 sdn_net_id: used for net.
56 interfaces: used for VMs. Each key is the uuid of the instance_interfaces entry at database
57 iface_id: uuid of intance_interfaces
61 created_items: dictionary with extra elements created that need to be deleted. e.g. ports, volumes,...
62 created: False if the VIM element is not created by other actions, and it should not be deleted
63 vim_status: VIM status of the element. Stored also at database in the instance_XXX
64 vim_info: Detailed information of a vm/net from the VIM. Stored at database in the instance_XXX but not at
66 M depends: dict with task_index(from depends_on) to dependency task
67 M params: same as extra[params]
68 MD error_msg: descriptive text upon an error.Stored also at database instance_XXX
69 MD created_at: task creation time. The task of creation must be the oldest
70 MD modified_at: next time task need to be processed. For example, for a refresh, it contain next time refresh must
72 MD related: All the tasks over the same VIM element have same "related". Note that other VIMs can contain the
73 same value of related, but this thread only process those task of one VIM. Also related can be the
74 same among several NS os isntance-scenarios
75 MD worker: Used to lock in case of several thread workers.
83 from osm_ro
import vimconn
85 from osm_ro
.db_base
import db_base_Exception
87 class ovimException(Exception):
90 from copy
import deepcopy
92 __author__
= "Alfonso Tierno, Pablo Montes"
93 __date__
= "$28-Sep-2017 12:07:15$"
96 def is_task_id(task_id
):
97 return task_id
.startswith("TASK-")
100 class VimThreadException(Exception):
104 class VimThreadExceptionNotFound(VimThreadException
):
108 class vim_thread(threading
.Thread
):
109 REFRESH_BUILD
= 5 # 5 seconds
110 REFRESH_ACTIVE
= 60 # 1 minute
112 REFRESH_DELETE
= 3600 * 10
114 def __init__(self
, task_lock
, plugins
, name
=None, datacenter_name
=None, datacenter_tenant_id
=None,
115 db
=None, db_lock
=None, ovim
=None):
119 'name' name of thread
120 'host','user': host ip or name to manage and user
121 'db', 'db_lock': database class and lock to use it in exclusion
123 threading
.Thread
.__init
__(self
)
124 self
.plugins
= plugins
126 self
.error_status
= None
127 self
.datacenter_name
= datacenter_name
128 self
.datacenter_tenant_id
= datacenter_tenant_id
131 self
.name
= vimconn
["id"] + "." + vimconn
["config"]["datacenter_tenant_id"]
134 self
.vim_persistent_info
= {}
135 self
.my_id
= self
.name
[:64]
137 self
.logger
= logging
.getLogger('openmano.vim.' + self
.name
)
139 self
.db_lock
= db_lock
141 self
.task_lock
= task_lock
142 self
.task_queue
= queue
.Queue(2000)
144 def get_vimconnector(self
):
146 from_
= "datacenter_tenants as dt join datacenters as d on dt.datacenter_id=d.uuid"
147 select_
= ('type', 'd.config as config', 'd.uuid as datacenter_id', 'vim_url', 'vim_url_admin',
148 'd.name as datacenter_name', 'dt.uuid as datacenter_tenant_id',
149 'dt.vim_tenant_name as vim_tenant_name', 'dt.vim_tenant_id as vim_tenant_id',
150 'user', 'passwd', 'dt.config as dt_config')
151 where_
= {"dt.uuid": self
.datacenter_tenant_id
}
152 vims
= self
.db
.get_rows(FROM
=from_
, SELECT
=select_
, WHERE
=where_
)
156 vim_config
.update(yaml
.load(vim
["config"], Loader
=yaml
.Loader
))
158 vim_config
.update(yaml
.load(vim
["dt_config"], Loader
=yaml
.Loader
))
159 vim_config
['datacenter_tenant_id'] = vim
.get('datacenter_tenant_id')
160 vim_config
['datacenter_id'] = vim
.get('datacenter_id')
164 vim_config
["wim_external_ports"] = self
.ovim
.get_of_port_mappings(
165 db_filter
={"region": vim_config
['datacenter_id'], "pci": None})
167 self
.vim
= self
.plugins
["rovim_" + vim
["type"]].vimconnector(
168 uuid
=vim
['datacenter_id'], name
=vim
['datacenter_name'],
169 tenant_id
=vim
['vim_tenant_id'], tenant_name
=vim
['vim_tenant_name'],
170 url
=vim
['vim_url'], url_admin
=vim
['vim_url_admin'],
171 user
=vim
['user'], passwd
=vim
['passwd'],
172 config
=vim_config
, persistent_info
=self
.vim_persistent_info
174 self
.error_status
= None
175 except Exception as e
:
176 self
.logger
.error("Cannot load vimconnector for vim_account {}: {}".format(self
.datacenter_tenant_id
, e
))
178 self
.error_status
= "Error loading vimconnector: {}".format(e
)
180 def _get_db_task(self
):
182 Read actions from database and reload them at memory. Fill self.refresh_list, pending_list, vim_actions
190 # get 20 (database_limit) entries each time
191 vim_actions
= self
.db
.get_rows(FROM
="vim_wim_actions",
192 WHERE
={"datacenter_vim_id": self
.datacenter_tenant_id
,
193 "status": ['SCHEDULED', 'BUILD', 'DONE'],
194 "worker": [None, self
.my_id
], "modified_at<=": now
196 ORDER_BY
=("modified_at", "created_at",),
197 LIMIT
=database_limit
)
200 # if vim_actions[0]["modified_at"] > now:
201 # return int(vim_actions[0] - now)
202 for task
in vim_actions
:
204 if task_related
== task
["related"]:
205 continue # ignore if a locking has already tried for these task set
206 task_related
= task
["related"]
208 self
.db
.update_rows("vim_wim_actions", UPDATE
={"worker": self
.my_id
}, modified_time
=0,
209 WHERE
={"datacenter_vim_id": self
.datacenter_tenant_id
,
210 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
211 "worker": [None, self
.my_id
],
212 "related": task_related
,
213 "item": task
["item"],
215 # ... and read all related and check if locked
216 related_tasks
= self
.db
.get_rows(FROM
="vim_wim_actions",
217 WHERE
={"datacenter_vim_id": self
.datacenter_tenant_id
,
218 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
219 "related": task_related
,
220 "item": task
["item"],
222 ORDER_BY
=("created_at",))
223 # check that all related tasks have been locked. If not release and try again. It can happen
224 # for race conditions if a new related task has been inserted by nfvo in the process
225 some_tasks_locked
= False
226 some_tasks_not_locked
= False
228 for relate_task
in related_tasks
:
229 if relate_task
["worker"] != self
.my_id
:
230 some_tasks_not_locked
= True
232 some_tasks_locked
= True
233 if not creation_task
and relate_task
["action"] in ("CREATE", "FIND"):
234 creation_task
= relate_task
235 if some_tasks_not_locked
:
236 if some_tasks_locked
: # unlock
237 self
.db
.update_rows("vim_wim_actions", UPDATE
={"worker": None}, modified_time
=0,
238 WHERE
={"datacenter_vim_id": self
.datacenter_tenant_id
,
239 "worker": self
.my_id
,
240 "related": task_related
,
241 "item": task
["item"],
245 # task of creation must be the first in the list of related_task
246 assert(related_tasks
[0]["action"] in ("CREATE", "FIND"))
248 task
["params"] = None
250 extra
= yaml
.load(task
["extra"], Loader
=yaml
.Loader
)
253 task
["extra"] = extra
254 if extra
.get("depends_on"):
256 if extra
.get("params"):
257 task
["params"] = deepcopy(extra
["params"])
258 return task
, related_tasks
259 except Exception as e
:
260 self
.logger
.critical("Unexpected exception at _get_db_task: " + str(e
), exc_info
=True)
263 def _delete_task(self
, task
):
265 Determine if this task need to be done or superseded
269 def copy_extra_created(copy_to
, copy_from
):
270 copy_to
["created"] = copy_from
["created"]
271 if copy_from
.get("sdn_net_id"):
272 copy_to
["sdn_net_id"] = copy_from
["sdn_net_id"]
273 if copy_from
.get("interfaces"):
274 copy_to
["interfaces"] = copy_from
["interfaces"]
275 if copy_from
.get("created_items"):
276 if not copy_to
.get("created_items"):
277 copy_to
["created_items"] = {}
278 copy_to
["created_items"].update(copy_from
["created_items"])
281 dependency_task
= None
282 deletion_needed
= False
283 if task
["status"] == "FAILED":
284 return # TODO need to be retry??
286 # get all related tasks
287 related_tasks
= self
.db
.get_rows(FROM
="vim_wim_actions",
288 WHERE
={"datacenter_vim_id": self
.datacenter_tenant_id
,
289 "status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
290 "action": ["FIND", "CREATE"],
291 "related": task
["related"],
293 ORDER_BY
=("created_at",),
295 for related_task
in related_tasks
:
296 if related_task
["item"] == task
["item"] and related_task
["item_id"] == task
["item_id"]:
297 task_create
= related_task
299 if related_task
["extra"]:
300 extra_created
= yaml
.load(related_task
["extra"], Loader
=yaml
.Loader
)
301 if extra_created
.get("created"):
302 deletion_needed
= True
303 related_task
["extra"] = extra_created
304 elif not dependency_task
:
305 dependency_task
= related_task
306 if task_create
and dependency_task
:
309 # mark task_create as FINISHED
310 self
.db
.update_rows("vim_wim_actions", UPDATE
={"status": "FINISHED"},
311 WHERE
={"datacenter_vim_id": self
.datacenter_tenant_id
,
312 "instance_action_id": task_create
["instance_action_id"],
313 "task_index": task_create
["task_index"]
315 if not deletion_needed
:
317 elif dependency_task
:
318 # move create information from task_create to relate_task
319 extra_new_created
= yaml
.load(dependency_task
["extra"], Loader
=yaml
.Loader
) or {}
320 extra_new_created
["created"] = extra_created
["created"]
321 copy_extra_created(copy_to
=extra_new_created
, copy_from
=extra_created
)
323 self
.db
.update_rows("vim_wim_actions",
324 UPDATE
={"extra": yaml
.safe_dump(extra_new_created
, default_flow_style
=True,
326 "vim_id": task_create
.get("vim_id")},
327 WHERE
={"datacenter_vim_id": self
.datacenter_tenant_id
,
328 "instance_action_id": dependency_task
["instance_action_id"],
329 "task_index": dependency_task
["task_index"]
333 task
["vim_id"] = task_create
["vim_id"]
334 copy_extra_created(copy_to
=task
["extra"], copy_from
=task_create
["extra"])
337 except Exception as e
:
338 self
.logger
.critical("Unexpected exception at _delete_task: " + str(e
), exc_info
=True)
340 def _refres_vm(self
, task
):
341 """Call VIM to get VMs status"""
342 database_update
= None
344 vim_id
= task
["vim_id"]
345 vm_to_refresh_list
= [vim_id
]
347 vim_dict
= self
.vim
.refresh_vms_status(vm_to_refresh_list
)
348 vim_info
= vim_dict
[vim_id
]
349 except vimconn
.vimconnException
as e
:
350 # Mark all tasks at VIM_ERROR status
351 self
.logger
.error("task=several get-VM: vimconnException when trying to refresh vms " + str(e
))
352 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
354 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
355 self
.logger
.debug("task={} get-VM: vim_vm_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
357 # check and update interfaces
358 task_warning_msg
= ""
359 for interface
in vim_info
.get("interfaces", ()):
360 vim_interface_id
= interface
["vim_interface_id"]
361 if vim_interface_id
not in task
["extra"]["interfaces"]:
362 self
.logger
.critical("task={} get-VM: Interface not found {} on task info {}".format(
363 task_id
, vim_interface_id
, task
["extra"]["interfaces"]), exc_info
=True)
365 task_interface
= task
["extra"]["interfaces"][vim_interface_id
]
366 task_vim_interface
= task_interface
.get("vim_info")
367 if task_vim_interface
!= interface
:
369 if task_interface
.get("sdn_port_id"):
372 self
.ovim
.delete_port(task_interface
["sdn_port_id"], idempotent
=True)
373 task_interface
["sdn_port_id"] = None
374 except ovimException
as e
:
375 error_text
= "ovimException deleting external_port={}: {}".format(
376 task_interface
["sdn_port_id"], e
)
377 self
.logger
.error("task={} get-VM: {}".format(task_id
, error_text
), exc_info
=True)
378 task_warning_msg
+= error_text
379 # TODO Set error_msg at instance_nets instead of instance VMs
382 sdn_net_id
= task_interface
.get("sdn_net_id")
383 if sdn_net_id
and interface
.get("compute_node") and interface
.get("pci"):
384 sdn_port_name
= sdn_net_id
+ "." + task
["vim_id"]
385 sdn_port_name
= sdn_port_name
[:63]
388 sdn_port_id
= self
.ovim
.new_external_port(
389 {"compute_node": interface
["compute_node"],
390 "pci": interface
["pci"],
391 "vlan": interface
.get("vlan"),
392 "net_id": sdn_net_id
,
393 "region": self
.vim
["config"]["datacenter_id"],
394 "name": sdn_port_name
,
395 "mac": interface
.get("mac_address")})
396 task_interface
["sdn_port_id"] = sdn_port_id
397 except (ovimException
, Exception) as e
:
398 error_text
= "ovimException creating new_external_port compute_node={} pci={} vlan={} {}".\
399 format(interface
["compute_node"], interface
["pci"], interface
.get("vlan"), e
)
400 self
.logger
.error("task={} get-VM: {}".format(task_id
, error_text
), exc_info
=True)
401 task_warning_msg
+= error_text
402 # TODO Set error_msg at instance_nets instead of instance VMs
404 self
.db
.update_rows('instance_interfaces',
405 UPDATE
={"mac_address": interface
.get("mac_address"),
406 "ip_address": interface
.get("ip_address"),
407 "vim_interface_id": interface
.get("vim_interface_id"),
408 "vim_info": interface
.get("vim_info"),
409 "sdn_port_id": task_interface
.get("sdn_port_id"),
410 "compute_node": interface
.get("compute_node"),
411 "pci": interface
.get("pci"),
412 "vlan": interface
.get("vlan")},
413 WHERE
={'uuid': task_interface
["iface_id"]})
414 task_interface
["vim_info"] = interface
416 # check and update task and instance_vms database
417 vim_info_error_msg
= None
418 if vim_info
.get("error_msg"):
419 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info
["error_msg"] + task_warning_msg
)
420 elif task_warning_msg
:
421 vim_info_error_msg
= self
._format
_vim
_error
_msg
(task_warning_msg
)
422 task_vim_info
= task
["extra"].get("vim_info")
423 task_error_msg
= task
.get("error_msg")
424 task_vim_status
= task
["extra"].get("vim_status")
425 if task_vim_status
!= vim_info
["status"] or task_error_msg
!= vim_info_error_msg
or \
426 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
427 database_update
= {"status": vim_info
["status"], "error_msg": vim_info_error_msg
}
428 if vim_info
.get("vim_info"):
429 database_update
["vim_info"] = vim_info
["vim_info"]
431 task
["extra"]["vim_status"] = vim_info
["status"]
432 task
["error_msg"] = vim_info_error_msg
433 if vim_info
.get("vim_info"):
434 task
["extra"]["vim_info"] = vim_info
["vim_info"]
436 return database_update
438 def _refres_net(self
, task
):
439 """Call VIM to get network status"""
440 database_update
= None
442 vim_id
= task
["vim_id"]
443 net_to_refresh_list
= [vim_id
]
445 vim_dict
= self
.vim
.refresh_nets_status(net_to_refresh_list
)
446 vim_info
= vim_dict
[vim_id
]
447 except vimconn
.vimconnException
as e
:
448 # Mark all tasks at VIM_ERROR status
449 self
.logger
.error("task=several get-net: vimconnException when trying to refresh nets " + str(e
))
450 vim_info
= {"status": "VIM_ERROR", "error_msg": str(e
)}
452 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
453 self
.logger
.debug("task={} get-net: vim_net_id={} result={}".format(task_id
, task
["vim_id"], vim_info
))
455 task_vim_info
= task
["extra"].get("vim_info")
456 task_vim_status
= task
["extra"].get("vim_status")
457 task_error_msg
= task
.get("error_msg")
458 task_sdn_net_id
= task
["extra"].get("sdn_net_id")
460 vim_info_status
= vim_info
["status"]
461 vim_info_error_msg
= vim_info
.get("error_msg")
466 sdn_net
= self
.ovim
.show_network(task_sdn_net_id
)
467 except (ovimException
, Exception) as e
:
468 text_error
= "ovimException getting network snd_net_id={}: {}".format(task_sdn_net_id
, e
)
469 self
.logger
.error("task={} get-net: {}".format(task_id
, text_error
), exc_info
=True)
470 sdn_net
= {"status": "ERROR", "last_error": text_error
}
471 if sdn_net
["status"] == "ERROR":
472 if not vim_info_error_msg
:
473 vim_info_error_msg
= str(sdn_net
.get("last_error"))
475 vim_info_error_msg
= "VIM_ERROR: {} && SDN_ERROR: {}".format(
476 self
._format
_vim
_error
_msg
(vim_info_error_msg
, 1024 // 2 - 14),
477 self
._format
_vim
_error
_msg
(sdn_net
["last_error"], 1024 // 2 - 14))
478 vim_info_status
= "ERROR"
479 elif sdn_net
["status"] == "BUILD":
480 if vim_info_status
== "ACTIVE":
481 vim_info_status
= "BUILD"
484 if vim_info_error_msg
:
485 vim_info_error_msg
= self
._format
_vim
_error
_msg
(vim_info_error_msg
)
486 if task_vim_status
!= vim_info_status
or task_error_msg
!= vim_info_error_msg
or \
487 (vim_info
.get("vim_info") and task_vim_info
!= vim_info
["vim_info"]):
488 task
["extra"]["vim_status"] = vim_info_status
489 task
["error_msg"] = vim_info_error_msg
490 if vim_info
.get("vim_info"):
491 task
["extra"]["vim_info"] = vim_info
["vim_info"]
492 database_update
= {"status": vim_info_status
, "error_msg": vim_info_error_msg
}
493 if vim_info
.get("vim_info"):
494 database_update
["vim_info"] = vim_info
["vim_info"]
495 return database_update
497 def _proccess_pending_tasks(self
, task
, related_tasks
):
498 old_task_status
= task
["status"]
499 create_or_find
= False # if as result of processing this task something is created or found
503 if task
["status"] == "SCHEDULED":
504 # check if tasks that this depends on have been completed
505 dependency_not_completed
= False
506 dependency_modified_at
= 0
507 for task_index
in task
["extra"].get("depends_on", ()):
508 task_dependency
= self
._look
_for
_task
(task
["instance_action_id"], task_index
)
509 if not task_dependency
:
510 raise VimThreadException(
511 "Cannot get depending net task trying to get depending task {}.{}".format(
512 task
["instance_action_id"], task_index
))
513 # task["depends"]["TASK-" + str(task_index)] = task_dependency #it references another object,so
514 # database must be look again
515 if task_dependency
["status"] == "SCHEDULED":
516 dependency_not_completed
= True
517 dependency_modified_at
= task_dependency
["modified_at"]
519 elif task_dependency
["status"] == "FAILED":
520 raise VimThreadException(
521 "Cannot {} {}, (task {}.{}) because depends on failed {}.{}, (task{}.{}): {}".format(
522 task
["action"], task
["item"],
523 task
["instance_action_id"], task
["task_index"],
524 task_dependency
["instance_action_id"], task_dependency
["task_index"],
525 task_dependency
["action"], task_dependency
["item"], task_dependency
.get("error_msg")))
527 task
["depends"]["TASK-"+str(task_index
)] = task_dependency
528 task
["depends"]["TASK-{}.{}".format(task
["instance_action_id"], task_index
)] = task_dependency
529 if dependency_not_completed
:
530 # Move this task to the time dependency is going to be modified plus 10 seconds.
531 self
.db
.update_rows("vim_wim_actions", modified_time
=dependency_modified_at
+ 10,
532 UPDATE
={"worker": None},
533 WHERE
={"datacenter_vim_id": self
.datacenter_tenant_id
, "worker": self
.my_id
,
534 "related": task
["related"],
536 # task["extra"]["tries"] = task["extra"].get("tries", 0) + 1
537 # if task["extra"]["tries"] > 3:
538 # raise VimThreadException(
539 # "Cannot {} {}, (task {}.{}) because timeout waiting to complete {} {}, "
540 # "(task {}.{})".format(task["action"], task["item"],
541 # task["instance_action_id"], task["task_index"],
542 # task_dependency["instance_action_id"], task_dependency["task_index"]
543 # task_dependency["action"], task_dependency["item"]))
546 database_update
= None
547 if task
["action"] == "DELETE":
548 deleted_needed
= self
._delete
_task
(task
)
549 if not deleted_needed
:
550 task
["status"] = "SUPERSEDED" # with FINISHED instead of DONE it will not be refreshing
551 task
["error_msg"] = None
553 if task
["status"] == "SUPERSEDED":
554 # not needed to do anything but update database with the new status
555 database_update
= None
557 task
["status"] = "FAILED"
558 task
["error_msg"] = self
.error_status
559 database_update
= {"status": "VIM_ERROR", "error_msg": task
["error_msg"]}
560 elif task
["item_id"] != related_tasks
[0]["item_id"] and task
["action"] in ("FIND", "CREATE"):
561 # Do nothing, just copy values from one to another and updata database
562 task
["status"] = related_tasks
[0]["status"]
563 task
["error_msg"] = related_tasks
[0]["error_msg"]
564 task
["vim_id"] = related_tasks
[0]["vim_id"]
565 extra
= yaml
.load(related_tasks
[0]["extra"], Loader
=yaml
.Loader
)
566 task
["extra"]["vim_status"] = extra
.get("vim_status")
567 next_refresh
= related_tasks
[0]["modified_at"] + 0.001
568 database_update
= {"status": task
["extra"].get("vim_status", "VIM_ERROR"),
569 "error_msg": task
["error_msg"]}
570 if task
["item"] == 'instance_vms':
571 database_update
["vim_vm_id"] = task
["vim_id"]
572 elif task
["item"] == 'instance_nets':
573 database_update
["vim_net_id"] = task
["vim_id"]
574 elif task
["item"] == 'instance_vms':
575 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
576 database_update
= self
._refres
_vm
(task
)
577 create_or_find
= True
578 elif task
["action"] == "CREATE":
579 create_or_find
= True
580 database_update
= self
.new_vm(task
)
581 elif task
["action"] == "DELETE":
584 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
585 elif task
["item"] == 'instance_nets':
586 if task
["status"] in ('BUILD', 'DONE') and task
["action"] in ("FIND", "CREATE"):
587 database_update
= self
._refres
_net
(task
)
588 create_or_find
= True
589 elif task
["action"] == "CREATE":
590 create_or_find
= True
591 database_update
= self
.new_net(task
)
592 elif task
["action"] == "DELETE":
594 elif task
["action"] == "FIND":
595 database_update
= self
.get_net(task
)
597 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
598 elif task
["item"] == 'instance_sfis':
599 if task
["action"] == "CREATE":
600 create_or_find
= True
601 database_update
= self
.new_sfi(task
)
602 elif task
["action"] == "DELETE":
605 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
606 elif task
["item"] == 'instance_sfs':
607 if task
["action"] == "CREATE":
608 create_or_find
= True
609 database_update
= self
.new_sf(task
)
610 elif task
["action"] == "DELETE":
613 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
614 elif task
["item"] == 'instance_classifications':
615 if task
["action"] == "CREATE":
616 create_or_find
= True
617 database_update
= self
.new_classification(task
)
618 elif task
["action"] == "DELETE":
619 self
.del_classification(task
)
621 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
622 elif task
["item"] == 'instance_sfps':
623 if task
["action"] == "CREATE":
624 create_or_find
= True
625 database_update
= self
.new_sfp(task
)
626 elif task
["action"] == "DELETE":
629 raise vimconn
.vimconnException(self
.name
+ "unknown task action {}".format(task
["action"]))
631 raise vimconn
.vimconnException(self
.name
+ "unknown task item {}".format(task
["item"]))
633 except VimThreadException
as e
:
634 task
["error_msg"] = str(e
)
635 task
["status"] = "FAILED"
636 database_update
= {"status": "VIM_ERROR", "error_msg": task
["error_msg"]}
637 if task
["item"] == 'instance_vms':
638 database_update
["vim_vm_id"] = None
639 elif task
["item"] == 'instance_nets':
640 database_update
["vim_net_id"] = None
642 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
643 self
.logger
.debug("task={} item={} action={} result={}:'{}' params={}".format(
644 task_id
, task
["item"], task
["action"], task
["status"],
645 task
["vim_id"] if task
["status"] == "DONE" else task
.get("error_msg"), task
["params"]))
648 if task
["status"] == "DONE":
649 next_refresh
= time
.time()
650 if task
["extra"].get("vim_status") == "BUILD":
651 next_refresh
+= self
.REFRESH_BUILD
652 elif task
["extra"].get("vim_status") in ("ERROR", "VIM_ERROR"):
653 next_refresh
+= self
.REFRESH_ERROR
654 elif task
["extra"].get("vim_status") == "DELETED":
655 next_refresh
+= self
.REFRESH_DELETE
657 next_refresh
+= self
.REFRESH_ACTIVE
658 elif task
["status"] == "FAILED":
659 next_refresh
= time
.time() + self
.REFRESH_DELETE
662 # modify all related task with action FIND/CREATED non SCHEDULED
664 table
="vim_wim_actions", modified_time
=next_refresh
+ 0.001,
665 UPDATE
={"status": task
["status"], "vim_id": task
.get("vim_id"),
666 "error_msg": task
["error_msg"],
669 WHERE
={"datacenter_vim_id": self
.datacenter_tenant_id
,
670 "worker": self
.my_id
,
671 "action": ["FIND", "CREATE"],
672 "related": task
["related"],
673 "status<>": "SCHEDULED",
677 table
="vim_wim_actions", modified_time
=next_refresh
,
678 UPDATE
={"status": task
["status"], "vim_id": task
.get("vim_id"),
679 "error_msg": task
["error_msg"],
680 "extra": yaml
.safe_dump(task
["extra"], default_flow_style
=True, width
=256)},
681 WHERE
={"instance_action_id": task
["instance_action_id"], "task_index": task
["task_index"]})
684 table
="vim_wim_actions", modified_time
=0,
685 UPDATE
={"worker": None},
686 WHERE
={"datacenter_vim_id": self
.datacenter_tenant_id
,
687 "worker": self
.my_id
,
688 "related": task
["related"],
691 # Update table instance_actions
692 if old_task_status
== "SCHEDULED" and task
["status"] != old_task_status
:
694 table
="instance_actions",
695 UPDATE
={("number_failed" if task
["status"] == "FAILED" else "number_done"): {"INCREMENT": 1}},
696 WHERE
={"uuid": task
["instance_action_id"]})
698 where_filter
= {"related": task
["related"]}
699 if task
["item"] == "instance_nets" and task
["datacenter_vim_id"]:
700 where_filter
["datacenter_tenant_id"] = task
["datacenter_vim_id"]
701 self
.db
.update_rows(table
=task
["item"],
702 UPDATE
=database_update
,
704 except db_base_Exception
as e
:
705 self
.logger
.error("task={} Error updating database {}".format(task_id
, e
), exc_info
=True)
707 def insert_task(self
, task
):
709 self
.task_queue
.put(task
, False)
712 raise vimconn
.vimconnException(self
.name
+ ": timeout inserting a task")
714 def del_task(self
, task
):
716 if task
["status"] == "SCHEDULED":
717 task
["status"] = "SUPERSEDED"
719 else: # task["status"] == "processing"
720 self
.task_lock
.release()
724 self
.logger
.debug("Starting")
726 self
.get_vimconnector()
727 self
.logger
.debug("Vimconnector loaded")
728 reload_thread
= False
732 while not self
.task_queue
.empty():
733 task
= self
.task_queue
.get()
734 if isinstance(task
, list):
736 elif isinstance(task
, str):
739 elif task
== 'reload':
742 self
.task_queue
.task_done()
746 task
, related_tasks
= self
._get
_db
_task
()
748 self
._proccess
_pending
_tasks
(task
, related_tasks
)
752 except Exception as e
:
753 self
.logger
.critical("Unexpected exception at run: " + str(e
), exc_info
=True)
755 self
.logger
.debug("Finishing")
757 def _look_for_task(self
, instance_action_id
, task_id
):
759 Look for a concrete task at vim_actions database table
760 :param instance_action_id: The instance_action_id
761 :param task_id: Can have several formats:
762 <task index>: integer
763 TASK-<task index> :backward compatibility,
764 [TASK-]<instance_action_id>.<task index>: this instance_action_id overrides the one in the parameter
765 :return: Task dictionary or None if not found
767 if isinstance(task_id
, int):
770 if task_id
.startswith("TASK-"):
771 task_id
= task_id
[5:]
772 ins_action_id
, _
, task_index
= task_id
.rpartition(".")
774 instance_action_id
= ins_action_id
776 tasks
= self
.db
.get_rows(FROM
="vim_wim_actions", WHERE
={"instance_action_id": instance_action_id
,
777 "task_index": task_index
})
781 task
["params"] = None
784 extra
= yaml
.load(task
["extra"], Loader
=yaml
.Loader
)
785 task
["extra"] = extra
786 task
["params"] = extra
.get("params")
792 def _format_vim_error_msg(error_text
, max_length
=1024):
793 if error_text
and len(error_text
) >= max_length
:
794 return error_text
[:max_length
// 2 - 3] + " ... " + error_text
[-max_length
// 2 + 3:]
797 def new_vm(self
, task
):
798 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
800 params
= task
["params"]
801 depends
= task
.get("depends")
804 if "net_id" in net
and is_task_id(net
["net_id"]): # change task_id into network_id
805 network_id
= task
["depends"][net
["net_id"]].get("vim_id")
807 raise VimThreadException(
808 "Cannot create VM because depends on a network not created or found: " +
809 str(depends
[net
["net_id"]]["error_msg"]))
810 net
["net_id"] = network_id
811 params_copy
= deepcopy(params
)
812 vim_vm_id
, created_items
= self
.vim
.new_vminstance(*params_copy
)
814 # fill task_interfaces. Look for snd_net_id at database for each interface
816 for iface
in params_copy
[5]:
817 task_interfaces
[iface
["vim_id"]] = {"iface_id": iface
["uuid"]}
818 result
= self
.db
.get_rows(
819 SELECT
=('sdn_net_id', 'interface_id'),
820 FROM
='instance_nets as ine join instance_interfaces as ii on ii.instance_net_id=ine.uuid',
821 WHERE
={'ii.uuid': iface
["uuid"]})
823 task_interfaces
[iface
["vim_id"]]["sdn_net_id"] = result
[0]['sdn_net_id']
824 task_interfaces
[iface
["vim_id"]]["interface_id"] = result
[0]['interface_id']
826 self
.logger
.critical("task={} new-VM: instance_nets uuid={} not found at DB".format(task_id
,
830 task
["vim_info"] = {}
831 task
["extra"]["interfaces"] = task_interfaces
832 task
["extra"]["created"] = True
833 task
["extra"]["created_items"] = created_items
834 task
["extra"]["vim_status"] = "BUILD"
835 task
["error_msg"] = None
836 task
["status"] = "DONE"
837 task
["vim_id"] = vim_vm_id
838 instance_element_update
= {"status": "BUILD", "vim_vm_id": vim_vm_id
, "error_msg": None}
839 return instance_element_update
841 except (vimconn
.vimconnException
, VimThreadException
) as e
:
842 self
.logger
.error("task={} new-VM: {}".format(task_id
, e
))
843 error_text
= self
._format
_vim
_error
_msg
(str(e
))
844 task
["error_msg"] = error_text
845 task
["status"] = "FAILED"
846 task
["vim_id"] = None
847 instance_element_update
= {"status": "VIM_ERROR", "vim_vm_id": None, "error_msg": error_text
}
848 return instance_element_update
850 def del_vm(self
, task
):
851 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
852 vm_vim_id
= task
["vim_id"]
853 interfaces
= task
["extra"].get("interfaces", ())
855 for iface
in interfaces
.values():
856 if iface
.get("sdn_port_id"):
859 self
.ovim
.delete_port(iface
["sdn_port_id"], idempotent
=True)
860 except ovimException
as e
:
861 self
.logger
.error("task={} del-VM: ovimException when deleting external_port={}: {} ".format(
862 task_id
, iface
["sdn_port_id"], e
), exc_info
=True)
863 # TODO Set error_msg at instance_nets
865 self
.vim
.delete_vminstance(vm_vim_id
, task
["extra"].get("created_items"))
866 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
867 task
["error_msg"] = None
870 except vimconn
.vimconnException
as e
:
871 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
872 if isinstance(e
, vimconn
.vimconnNotFoundException
):
873 # If not found mark as Done and fill error_msg
874 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
876 task
["status"] = "FAILED"
879 def _get_net_internal(self
, task
, filter_param
):
881 Common code for get_net and new_net. It looks for a network on VIM with the filter_params
882 :param task: task for this find or find-or-create action
883 :param filter_param: parameters to send to the vimconnector
884 :return: a dict with the content to update the instance_nets database table. Raises an exception on error, or
885 when network is not found or found more than one
887 vim_nets
= self
.vim
.get_network_list(filter_param
)
889 raise VimThreadExceptionNotFound("Network not found with this criteria: '{}'".format(filter_param
))
890 elif len(vim_nets
) > 1:
891 raise VimThreadException("More than one network found with this criteria: '{}'".format(filter_param
))
892 vim_net_id
= vim_nets
[0]["id"]
894 # Discover if this network is managed by a sdn controller
896 result
= self
.db
.get_rows(SELECT
=('sdn_net_id',), FROM
='instance_nets',
897 WHERE
={'vim_net_id': vim_net_id
, 'datacenter_tenant_id': self
.datacenter_tenant_id
},
898 ORDER
="instance_scenario_id")
900 sdn_net_id
= result
[0]['sdn_net_id']
902 task
["status"] = "DONE"
903 task
["extra"]["vim_info"] = {}
904 task
["extra"]["created"] = False
905 task
["extra"]["vim_status"] = "BUILD"
906 task
["extra"]["sdn_net_id"] = sdn_net_id
907 task
["error_msg"] = None
908 task
["vim_id"] = vim_net_id
909 instance_element_update
= {"vim_net_id": vim_net_id
, "created": False, "status": "BUILD",
910 "error_msg": None, "sdn_net_id": sdn_net_id
}
911 return instance_element_update
913 def get_net(self
, task
):
914 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
916 params
= task
["params"]
917 filter_param
= params
[0]
918 instance_element_update
= self
._get
_net
_internal
(task
, filter_param
)
919 return instance_element_update
921 except (vimconn
.vimconnException
, VimThreadException
) as e
:
922 self
.logger
.error("task={} get-net: {}".format(task_id
, e
))
923 task
["status"] = "FAILED"
924 task
["vim_id"] = None
925 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
926 instance_element_update
= {"vim_net_id": None, "status": "VIM_ERROR",
927 "error_msg": task
["error_msg"]}
928 return instance_element_update
930 def new_net(self
, task
):
933 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
937 if task
["extra"].get("find"):
938 action_text
= "finding"
939 filter_param
= task
["extra"]["find"][0]
941 instance_element_update
= self
._get
_net
_internal
(task
, filter_param
)
942 return instance_element_update
943 except VimThreadExceptionNotFound
:
946 params
= task
["params"]
947 action_text
= "creating VIM"
948 vim_net_id
, created_items
= self
.vim
.new_network(*params
[0:3])
952 wim_account_name
= None
954 wim_account_name
= params
[3]
956 sdn_controller
= self
.vim
.config
.get('sdn-controller')
957 if sdn_controller
and (net_type
== "data" or net_type
== "ptp"):
958 network
= {"name": net_name
, "type": net_type
, "region": self
.vim
["config"]["datacenter_id"]}
960 vim_net
= self
.vim
.get_network(vim_net_id
)
961 if vim_net
.get('encapsulation') != 'vlan':
962 raise vimconn
.vimconnException(
963 "net '{}' defined as type '{}' has not vlan encapsulation '{}'".format(
964 net_name
, net_type
, vim_net
['encapsulation']))
965 network
["vlan"] = vim_net
.get('segmentation_id')
966 action_text
= "creating SDN"
968 sdn_net_id
= self
.ovim
.new_network(network
)
970 if wim_account_name
and self
.vim
.config
["wim_external_ports"]:
971 # add external port to connect WIM. Try with compute node __WIM:wim_name and __WIM
972 action_text
= "attaching external port to ovim network"
973 sdn_port_name
= "external_port"
975 "compute_node": "__WIM:" + wim_account_name
[0:58],
977 "vlan": network
["vlan"],
978 "net_id": sdn_net_id
,
979 "region": self
.vim
["config"]["datacenter_id"],
980 "name": sdn_port_name
,
984 sdn_external_port_id
= self
.ovim
.new_external_port(sdn_port_data
)
985 except ovimException
:
986 sdn_port_data
["compute_node"] = "__WIM"
988 sdn_external_port_id
= self
.ovim
.new_external_port(sdn_port_data
)
989 self
.logger
.debug("Added sdn_external_port {} to sdn_network {}".format(sdn_external_port_id
,
991 task
["status"] = "DONE"
992 task
["extra"]["vim_info"] = {}
993 task
["extra"]["sdn_net_id"] = sdn_net_id
994 task
["extra"]["vim_status"] = "BUILD"
995 task
["extra"]["created"] = True
996 task
["extra"]["created_items"] = created_items
997 task
["error_msg"] = None
998 task
["vim_id"] = vim_net_id
999 instance_element_update
= {"vim_net_id": vim_net_id
, "sdn_net_id": sdn_net_id
, "status": "BUILD",
1000 "created": True, "error_msg": None}
1001 return instance_element_update
1002 except (vimconn
.vimconnException
, ovimException
) as e
:
1003 self
.logger
.error("task={} new-net: Error {}: {}".format(task_id
, action_text
, e
))
1004 task
["status"] = "FAILED"
1005 task
["vim_id"] = vim_net_id
1006 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1007 task
["extra"]["sdn_net_id"] = sdn_net_id
1008 instance_element_update
= {"vim_net_id": vim_net_id
, "sdn_net_id": sdn_net_id
, "status": "VIM_ERROR",
1009 "error_msg": task
["error_msg"]}
1010 return instance_element_update
1012 def del_net(self
, task
):
1013 net_vim_id
= task
["vim_id"]
1014 sdn_net_id
= task
["extra"].get("sdn_net_id")
1017 self
.vim
.delete_network(net_vim_id
, task
["extra"].get("created_items"))
1019 # Delete any attached port to this sdn network. There can be ports associated to this network in case
1020 # it was manually done using 'openmano vim-net-sdn-attach'
1022 port_list
= self
.ovim
.get_ports(columns
={'uuid'},
1023 filter={'name': 'external_port', 'net_id': sdn_net_id
})
1024 for port
in port_list
:
1025 self
.ovim
.delete_port(port
['uuid'], idempotent
=True)
1026 self
.ovim
.delete_network(sdn_net_id
, idempotent
=True)
1027 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1028 task
["error_msg"] = None
1030 except ovimException
as e
:
1031 task
["error_msg"] = self
._format
_vim
_error
_msg
("ovimException obtaining and deleting external "
1032 "ports for net {}: {}".format(sdn_net_id
, str(e
)))
1033 except vimconn
.vimconnException
as e
:
1034 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1035 if isinstance(e
, vimconn
.vimconnNotFoundException
):
1036 # If not found mark as Done and fill error_msg
1037 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1039 task
["status"] = "FAILED"
1042 # Service Function Instances
1043 def new_sfi(self
, task
):
1046 # Waits for interfaces to be ready (avoids failure)
1048 dep_id
= "TASK-" + str(task
["extra"]["depends_on"][0])
1049 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1051 interfaces
= task
["depends"][dep_id
]["extra"].get("interfaces")
1053 ingress_interface_id
= task
.get("extra").get("params").get("ingress_interface_id")
1054 egress_interface_id
= task
.get("extra").get("params").get("egress_interface_id")
1055 ingress_vim_interface_id
= None
1056 egress_vim_interface_id
= None
1057 for vim_interface
, interface_data
in interfaces
.items():
1058 if interface_data
.get("interface_id") == ingress_interface_id
:
1059 ingress_vim_interface_id
= vim_interface
1061 if ingress_interface_id
!= egress_interface_id
:
1062 for vim_interface
, interface_data
in interfaces
.items():
1063 if interface_data
.get("interface_id") == egress_interface_id
:
1064 egress_vim_interface_id
= vim_interface
1067 egress_vim_interface_id
= ingress_vim_interface_id
1068 if not ingress_vim_interface_id
or not egress_vim_interface_id
:
1069 error_text
= "Error creating Service Function Instance, Ingress: {}, Egress: {}".format(
1070 ingress_vim_interface_id
, egress_vim_interface_id
)
1071 self
.logger
.error(error_text
)
1072 task
["error_msg"] = error_text
1073 task
["status"] = "FAILED"
1074 task
["vim_id"] = None
1076 # At the moment, every port associated with the VM will be used both as ingress and egress ports.
1077 # Bear in mind that different VIM connectors might support SFI differently. In the case of OpenStack,
1078 # only the first ingress and first egress ports will be used to create the SFI (Port Pair).
1079 ingress_port_id_list
= [ingress_vim_interface_id
]
1080 egress_port_id_list
= [egress_vim_interface_id
]
1081 name
= "sfi-{}".format(task
["item_id"][:8])
1082 # By default no form of IETF SFC Encapsulation will be used
1083 vim_sfi_id
= self
.vim
.new_sfi(name
, ingress_port_id_list
, egress_port_id_list
, sfc_encap
=False)
1085 task
["extra"]["created"] = True
1086 task
["extra"]["vim_status"] = "ACTIVE"
1087 task
["error_msg"] = None
1088 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1089 task
["vim_id"] = vim_sfi_id
1090 instance_element_update
= {"status": "ACTIVE", "vim_sfi_id": vim_sfi_id
, "error_msg": None}
1091 return instance_element_update
1093 except (vimconn
.vimconnException
, VimThreadException
) as e
:
1094 self
.logger
.error("Error creating Service Function Instance, task=%s: %s", task_id
, str(e
))
1095 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1096 task
["error_msg"] = error_text
1097 task
["status"] = "FAILED"
1098 task
["vim_id"] = None
1099 instance_element_update
= {"status": "VIM_ERROR", "vim_sfi_id": None, "error_msg": error_text
}
1100 return instance_element_update
1102 def del_sfi(self
, task
):
1103 sfi_vim_id
= task
["vim_id"]
1105 self
.vim
.delete_sfi(sfi_vim_id
)
1106 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1107 task
["error_msg"] = None
1110 except vimconn
.vimconnException
as e
:
1111 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1112 if isinstance(e
, vimconn
.vimconnNotFoundException
):
1113 # If not found mark as Done and fill error_msg
1114 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1116 task
["status"] = "FAILED"
1119 def new_sf(self
, task
):
1122 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1124 depending_tasks
= ["TASK-" + str(dep_id
) for dep_id
in task
["extra"]["depends_on"]]
1125 # sfis = next(iter(task.get("depends").values())).get("extra").get("params")[5]
1126 sfis
= [task
.get("depends").get(dep_task
) for dep_task
in depending_tasks
]
1129 sfi_id_list
.append(sfi
.get("vim_id"))
1130 name
= "sf-{}".format(task
["item_id"][:8])
1131 # By default no form of IETF SFC Encapsulation will be used
1132 vim_sf_id
= self
.vim
.new_sf(name
, sfi_id_list
, sfc_encap
=False)
1134 task
["extra"]["created"] = True
1135 task
["extra"]["vim_status"] = "ACTIVE"
1136 task
["error_msg"] = None
1137 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1138 task
["vim_id"] = vim_sf_id
1139 instance_element_update
= {"status": "ACTIVE", "vim_sf_id": vim_sf_id
, "error_msg": None}
1140 return instance_element_update
1142 except (vimconn
.vimconnException
, VimThreadException
) as e
:
1143 self
.logger
.error("Error creating Service Function, task=%s: %s", task_id
, str(e
))
1144 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1145 task
["error_msg"] = error_text
1146 task
["status"] = "FAILED"
1147 task
["vim_id"] = None
1148 instance_element_update
= {"status": "VIM_ERROR", "vim_sf_id": None, "error_msg": error_text
}
1149 return instance_element_update
1151 def del_sf(self
, task
):
1152 sf_vim_id
= task
["vim_id"]
1154 self
.vim
.delete_sf(sf_vim_id
)
1155 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1156 task
["error_msg"] = None
1159 except vimconn
.vimconnException
as e
:
1160 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1161 if isinstance(e
, vimconn
.vimconnNotFoundException
):
1162 # If not found mark as Done and fill error_msg
1163 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1165 task
["status"] = "FAILED"
1168 def new_classification(self
, task
):
1169 vim_classification_id
= None
1171 params
= task
["params"]
1172 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1173 dep_id
= "TASK-" + str(task
["extra"]["depends_on"][0])
1175 interfaces
= task
.get("depends").get(dep_id
).get("extra").get("interfaces").keys()
1176 # Bear in mind that different VIM connectors might support Classifications differently.
1177 # In the case of OpenStack, only the first VNF attached to the classifier will be used
1178 # to create the Classification(s) (the "logical source port" of the "Flow Classifier").
1179 # Since the VNFFG classifier match lacks the ethertype, classification defaults to
1180 # using the IPv4 flow classifier.
1181 name
= "c-{}".format(task
["item_id"][:8])
1182 # if not CIDR is given for the IP addresses, add /32:
1183 ip_proto
= int(params
.get("ip_proto"))
1184 source_ip
= params
.get("source_ip")
1185 destination_ip
= params
.get("destination_ip")
1186 source_port
= params
.get("source_port")
1187 destination_port
= params
.get("destination_port")
1188 definition
= {"logical_source_port": interfaces
[0]}
1194 elif ip_proto
== 17:
1196 definition
["protocol"] = ip_proto
1198 if '/' not in source_ip
:
1200 definition
["source_ip_prefix"] = source_ip
1202 definition
["source_port_range_min"] = source_port
1203 definition
["source_port_range_max"] = source_port
1204 if destination_port
:
1205 definition
["destination_port_range_min"] = destination_port
1206 definition
["destination_port_range_max"] = destination_port
1208 if '/' not in destination_ip
:
1209 destination_ip
+= '/32'
1210 definition
["destination_ip_prefix"] = destination_ip
1212 vim_classification_id
= self
.vim
.new_classification(
1213 name
, 'legacy_flow_classifier', definition
)
1215 task
["extra"]["created"] = True
1216 task
["extra"]["vim_status"] = "ACTIVE"
1217 task
["error_msg"] = None
1218 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1219 task
["vim_id"] = vim_classification_id
1220 instance_element_update
= {"status": "ACTIVE", "vim_classification_id": vim_classification_id
,
1222 return instance_element_update
1224 except (vimconn
.vimconnException
, VimThreadException
) as e
:
1225 self
.logger
.error("Error creating Classification, task=%s: %s", task_id
, str(e
))
1226 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1227 task
["error_msg"] = error_text
1228 task
["status"] = "FAILED"
1229 task
["vim_id"] = None
1230 instance_element_update
= {"status": "VIM_ERROR", "vim_classification_id": None, "error_msg": error_text
}
1231 return instance_element_update
1233 def del_classification(self
, task
):
1234 classification_vim_id
= task
["vim_id"]
1236 self
.vim
.delete_classification(classification_vim_id
)
1237 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1238 task
["error_msg"] = None
1241 except vimconn
.vimconnException
as e
:
1242 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1243 if isinstance(e
, vimconn
.vimconnNotFoundException
):
1244 # If not found mark as Done and fill error_msg
1245 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1247 task
["status"] = "FAILED"
1250 def new_sfp(self
, task
):
1253 task_id
= task
["instance_action_id"] + "." + str(task
["task_index"])
1254 depending_tasks
= [task
.get("depends").get("TASK-" + str(tsk_id
)) for tsk_id
in
1255 task
.get("extra").get("depends_on")]
1258 classification_id_list
= []
1259 for dep
in depending_tasks
:
1260 vim_id
= dep
.get("vim_id")
1261 resource
= dep
.get("item")
1262 if resource
== "instance_sfs":
1263 sf_id_list
.append(vim_id
)
1264 elif resource
== "instance_classifications":
1265 classification_id_list
.append(vim_id
)
1267 name
= "sfp-{}".format(task
["item_id"][:8])
1268 # By default no form of IETF SFC Encapsulation will be used
1269 vim_sfp_id
= self
.vim
.new_sfp(name
, classification_id_list
, sf_id_list
, sfc_encap
=False)
1271 task
["extra"]["created"] = True
1272 task
["extra"]["vim_status"] = "ACTIVE"
1273 task
["error_msg"] = None
1274 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1275 task
["vim_id"] = vim_sfp_id
1276 instance_element_update
= {"status": "ACTIVE", "vim_sfp_id": vim_sfp_id
, "error_msg": None}
1277 return instance_element_update
1279 except (vimconn
.vimconnException
, VimThreadException
) as e
:
1280 self
.logger
.error("Error creating Service Function, task=%s: %s", task_id
, str(e
))
1281 error_text
= self
._format
_vim
_error
_msg
(str(e
))
1282 task
["error_msg"] = error_text
1283 task
["status"] = "FAILED"
1284 task
["vim_id"] = None
1285 instance_element_update
= {"status": "VIM_ERROR", "vim_sfp_id": None, "error_msg": error_text
}
1286 return instance_element_update
1288 def del_sfp(self
, task
):
1289 sfp_vim_id
= task
["vim_id"]
1291 self
.vim
.delete_sfp(sfp_vim_id
)
1292 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1293 task
["error_msg"] = None
1296 except vimconn
.vimconnException
as e
:
1297 task
["error_msg"] = self
._format
_vim
_error
_msg
(str(e
))
1298 if isinstance(e
, vimconn
.vimconnNotFoundException
):
1299 # If not found mark as Done and fill error_msg
1300 task
["status"] = "FINISHED" # with FINISHED instead of DONE it will not be refreshing
1302 task
["status"] = "FAILED"