self.consecutive_errors = 0
self.first_start = False
- # contains created tasks/futures to be able to cancel
- self.lcm_tasks = TaskRegistry()
# logging
self.logger = logging.getLogger('lcm')
# get id
self.logger.critical(str(e), exc_info=True)
raise LcmException(str(e))
+ # contains created tasks/futures to be able to cancel
+ self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
+
self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop)
self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config,
self.vca_config, self.loop)
# under the License.
##
+import asyncio
from collections import OrderedDict
# from osm_common.dbbase import DbException
return tuple(filled)
-class TaskRegistry:
+# LcmBase must be listed before TaskRegistry, as it is a dependency.
+class LcmBase:
+
+ def __init__(self, db, msg, fs, logger):
+ """
+
+ :param db: database connection
+ """
+ self.db = db
+ self.msg = msg
+ self.fs = fs
+ self.logger = logger
+
+ def update_db_2(self, item, _id, _desc):
+ """
+ Updates database with _desc information. If success _desc is cleared
+ :param item:
+ :param _id:
+ :param _desc: dictionary with the content to update. Keys are dot separated keys for
+ :return: None. Exception is raised on error
+ """
+ if not _desc:
+ return
+ self.db.set_one(item, {"_id": _id}, _desc)
+ _desc.clear()
+ # except DbException as e:
+ # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
+
+
+class TaskRegistry(LcmBase):
"""
Implements a registry of task needed for later cancelation, look for related tasks that must be completed before
etc. It stores a four level dict
Second level is the _id
Third level is the operation id
Fourth level is a descriptive name, the value is the task class
+
+ The HA (High-Availability) methods are used when more than one LCM instance is running.
+ To register the current task in the external DB, use LcmBase as base class, to be able
+ to reuse LcmBase.update_db_2()
+ The DB registry uses the following fields to distinguish a task:
+ - op_type: operation type ("nslcmops" or "nsilcmops")
+ - op_id: operation ID
+ - worker: the worker ID for this process
"""
- def __init__(self):
+ instance_id_label_dict = {'ns': 'nsInstanceId', 'nsi': 'netsliceInstanceId'}
+
+ def __init__(self, worker_id=None, db=None, logger=None):
self.task_registry = {
"ns": {},
"nsi": {},
"wim_account": {},
"sdn": {},
}
+ self.worker_id = worker_id
+ self.db = db
+ self.logger = logger
def register(self, topic, _id, op_id, task_name, task):
"""
def cancel(self, topic, _id, target_op_id=None, target_task_name=None):
"""
- Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
+ Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
this is cancelled, and the same with task_name
"""
if not self.task_registry[topic].get(_id):
# if result:
# self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name))
-
-class LcmBase:
-
- def __init__(self, db, msg, fs, logger):
+ def lock_HA(self, topic, op_type, op_id):
"""
-
- :param db: database connection
+ Lock an task, if possible, to indicate to the HA system that
+ the task will be executed in this LCM instance.
+ :param topic: Can be "ns", "nsi"
+ :param op_type: Operation type, can be "nslcmops", "nsilcmops"
+ :param op_id: id of the operation of the related item
+ :return:
+ True=lock successful => execute the task (not registered by any other LCM instance)
+ False=lock failed => do NOT execute the task (already registered by another LCM instance)
"""
- self.db = db
- self.msg = msg
- self.fs = fs
- self.logger = logger
- def update_db_2(self, item, _id, _desc):
+ db_lock_task = self.db.set_one(op_type,
+ q_filter={'_id': op_id, '_admin.worker': None},
+ update_dict={'_admin.worker': self.worker_id},
+ fail_on_empty=False)
+
+ if db_lock_task is None:
+ self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id))
+ return False
+ else:
+ return True
+
+ async def waitfor_related_HA(self, topic, op_type, op_id=None):
"""
- Updates database with _desc information. If success _desc is cleared
- :param item:
- :param _id:
- :param _desc: dictionary with the content to update. Keys are dot separated keys for
- :return: None. Exception is raised on error
+ Wait for any pending related HA tasks
"""
- if not _desc:
+
+ # InstanceId label
+ instance_id_label = self.instance_id_label_dict.get(topic)
+
+ # Get 'startTime' timestamp for this operation
+ step = "Getting timestamp for op_id={} from db".format(op_id)
+ db_lcmop = self.db.get_one(op_type,
+ {"_id": op_id},
+ fail_on_empty=False)
+ if not db_lcmop:
return
- self.db.set_one(item, {"_id": _id}, _desc)
- _desc.clear()
- # except DbException as e:
- # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
+ starttime_this_op = db_lcmop.get("startTime")
+ instance_id = db_lcmop.get(instance_id_label)
+
+ # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
+ timeout_wait_for_task = 3600 # Max time (seconds) to wait for a related task to finish
+ # interval_wait_for_task = 30 # A too long polling interval slows things down considerably
+ interval_wait_for_task = 10 # Interval in seconds for polling related tasks
+ time_left = timeout_wait_for_task
+ old_num_related_tasks = 0
+ while True:
+ # Get related tasks (operations within the same NS or NSI instance) which are
+ # still running (operationState='PROCESSING') and which were started before this task.
+ _filter = {instance_id_label: instance_id,
+ 'operationState': 'PROCESSING',
+ 'startTime.lt': starttime_this_op}
+ db_waitfor_related_task = self.db.get_list(op_type,
+ q_filter=_filter)
+ new_num_related_tasks = len(db_waitfor_related_task)
+ if not new_num_related_tasks:
+ # There are no related tasks, no need to wait, so return.
+ return
+ # If number of pending related tasks have changed,
+ # update the 'detailed-status' field and log the change.
+ if new_num_related_tasks != old_num_related_tasks:
+ db_lcmops_update = {}
+ step = db_lcmops_update["detailed-status"] = \
+ "Waiting for {} related tasks to be completed.".format(
+ new_num_related_tasks)
+ self.logger.debug("Task {} operation={} {}".format(topic, op_id, step))
+ self.update_db_2(op_type, op_id, db_lcmops_update)
+ old_num_related_tasks = new_num_related_tasks
+ time_left -= interval_wait_for_task
+ if time_left < 0:
+ raise LcmException(
+ "Timeout ({}) when waiting for related tasks to be completed".format(
+ timeout_wait_for_task))
+ await asyncio.sleep(interval_wait_for_task)
+
+ return
raise LcmException("ns_update_nsir: Not found vld={} at RO info".format(vld["id"]))
async def instantiate(self, nsir_id, nsilcmop_id):
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA('nsi', 'nsilcmops', nsilcmop_id)
+ if not task_is_locked_by_me:
+ return
+
logging_text = "Task netslice={} instantiate={} ".format(nsir_id, nsilcmop_id)
self.logger.debug(logging_text + "Enter")
# get all needed from database
db_nsir_update_RO["netslice_scenario_id"] = desc["uuid"]
db_nsir_update_RO["vld_id"] = RO_ns_params["name"]
db_nsir_update["_admin.deployed.RO"].append(db_nsir_update_RO)
-
+
def overwrite_nsd_params(self, db_nsir, nslcmop):
RO_list = []
vld_op_list = []
return nsr_id, nslcmop
try:
+ # wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA('nsi', 'nsilcmops', nsilcmop_id)
+
step = "Getting nsir={} from db".format(nsir_id)
db_nsir = self.db.get_one("nsis", {"_id": nsir_id})
step = "Getting nsilcmop={} from db".format(nsilcmop_id)
db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id})
- # look if previous tasks is in process
- task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id)
- if task_dependency:
- step = db_nsilcmop_update["detailed-status"] = \
- "Waiting for related tasks to be completed: {}".format(task_name)
- self.logger.debug(logging_text + step)
- self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update)
- _, pending = await asyncio.wait(task_dependency, timeout=3600)
- if pending:
- raise LcmException("Timeout waiting related tasks to be completed")
-
# Empty list to keep track of network service records status in the netslice
nsir_admin = db_nsir_admin = db_nsir.get("_admin")
# Slice status Creating
db_nsir_update["detailed-status"] = "creating"
db_nsir_update["operational-status"] = "init"
- self.update_db_2("nsis", nsir_id, db_nsir_update)
-
+ self.update_db_2("nsis", nsir_id, db_nsir_update)
+
# Creating netslice VLDs networking before NS instantiation
db_nsir_update["_admin.deployed.RO"] = db_nsir_admin["deployed"]["RO"]
for vld_item in get_iterable(nsir_admin, "netslice-vld"):
await netslice_scenario_create(self, vld_item, nsir_id, db_nsir, db_nsir_admin, db_nsir_update)
self.update_db_2("nsis", nsir_id, db_nsir_update)
-
+
db_nsir_update["detailed-status"] = "Creating netslice subnets at RO"
- self.update_db_2("nsis", nsir_id, db_nsir_update)
+ self.update_db_2("nsis", nsir_id, db_nsir_update)
db_nsir = self.db.get_one("nsis", {"_id": nsir_id})
# netslice_scenarios = db_nsir["_admin"]["deployed"]["RO"]
# db_nsir_update_RO = deepcopy(netslice_scenarios)
# for netslice_scenario in netslice_scenarios:
- # await netslice_scenario_check(self, netslice_scenario["netslice_scenario_id"],
+ # await netslice_scenario_check(self, netslice_scenario["netslice_scenario_id"],
# nsir_id, db_nsir_update_RO)
-
+
# db_nsir_update["_admin.deployed.RO"] = db_nsir_update_RO
# self.update_db_2("nsis", nsir_id, db_nsir_update)
step = "Launching ns={} instantiate={} task".format(nsr_id, nslcmop_id)
task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id))
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task)
-
+
# Wait until Network Slice is ready
step = nsir_status_detailed = " Waiting nsi ready. nsi_id={}".format(nsir_id)
nsrs_detailed_list_old = None
self.logger.debug(logging_text + step)
-
+
# TODO: substitute while for await (all task to be done or not)
deployment_timeout = 2 * 3600 # Two hours
while deployment_timeout > 0:
self.lcm_tasks.remove("nsi", nsir_id, nsilcmop_id, "nsi_instantiate")
async def terminate(self, nsir_id, nsilcmop_id):
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA('nsi', 'nsilcmops', nsilcmop_id)
+ if not task_is_locked_by_me:
+ return
+
logging_text = "Task nsi={} terminate={} ".format(nsir_id, nsilcmop_id)
self.logger.debug(logging_text + "Enter")
- exc = None
+ exc = None
db_nsir = None
db_nsilcmop = None
db_nsir_update = {"_admin.nsilcmop": nsilcmop_id}
nsilcmop_operation_state = None
autoremove = False # autoremove after terminated
try:
+ # wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA('nsi', 'nsilcmops', nsilcmop_id)
+
step = "Getting nsir={} from db".format(nsir_id)
db_nsir = self.db.get_one("nsis", {"_id": nsir_id})
nsir_deployed = deepcopy(db_nsir["_admin"].get("deployed"))
step = "Getting nsilcmop={} from db".format(nsilcmop_id)
db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id})
-
+
# TODO: Check if makes sense check the nsiState=NOT_INSTANTIATED when terminate
# CASE: Instance was terminated but there is a second request to terminate the instance
if db_nsir["_admin"]["nsiState"] == "NOT_INSTANTIATED":
db_nsir_update["detailed-status"] = "Terminating Netslice subnets"
self.update_db_2("nsis", nsir_id, db_nsir_update)
- # look if previous tasks is in process
- task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id)
- if task_dependency:
- step = db_nsilcmop_update["detailed-status"] = \
- "Waiting for related tasks to be completed: {}".format(task_name)
- self.logger.debug(logging_text + step)
- self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update)
- _, pending = await asyncio.wait(task_dependency, timeout=3600)
- if pending:
- raise LcmException("Timeout waiting related tasks to be completed")
-
# Gets the list to keep track of network service records status in the netslice
nsir_admin = db_nsir["_admin"]
- nsrs_detailed_list = []
+ nsrs_detailed_list = []
# Iterate over the network services operation ids to terminate NSs
- # TODO: (future improvement) look another way check the tasks instead of keep asking
+ # TODO: (future improvement) look another way check the tasks instead of keep asking
# -> https://docs.python.org/3/library/asyncio-task.html#waiting-primitives
# steps: declare ns_tasks, add task when terminate is called, await asyncio.wait(vca_task_list, timeout=300)
nslcmop_ids = db_nsilcmop["operationParams"].get("nslcmops_ids")
step = nsir_status_detailed = " Waiting nsi terminated. nsi_id={}".format(nsir_id)
nsrs_detailed_list_old = None
self.logger.debug(logging_text + step)
-
+
termination_timeout = 2 * 3600 # Two hours
while termination_timeout > 0:
# Check ns termination status
raise LcmException("ns_update_vnfr: Not found member_vnf_index={} at RO info".format(vnf_index))
async def instantiate(self, nsr_id, nslcmop_id):
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
# get all needed from database
n2vc_key_list = [] # list of public keys to be injected as authorized to VMs
exc = None
try:
+ # wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+
step = "Getting nslcmop={} from db".format(nslcmop_id)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
step = "Getting nsr={} from db".format(nsr_id)
nsd = db_nsr["nsd"]
nsr_name = db_nsr["name"] # TODO short-name??
- # look if previous tasks in process
- task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
- if task_dependency:
- step = db_nslcmop_update["detailed-status"] = \
- "Waiting for related tasks to be completed: {}".format(task_name)
- self.logger.debug(logging_text + step)
- self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
- _, pending = await asyncio.wait(task_dependency, timeout=3600)
- if pending:
- raise LcmException("Timeout waiting related tasks to be completed")
-
step = "Getting vnfrs from db"
db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
db_vnfds_ref = {}
vnf_index, seq.get("name"), nslcmop_operation_state_detail))
async def terminate(self, nsr_id, nslcmop_id):
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
db_nsr = None
nslcmop_operation_state = None
autoremove = False # autoremove after terminated
try:
+ # wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
+
step = "Getting nslcmop={} from db".format(nslcmop_id)
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
step = "Getting nsr={} from db".format(nsr_id)
return "FAILED", str(e)
async def action(self, nsr_id, nslcmop_id):
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
# get all needed from database
nslcmop_operation_state_detail = None
exc = None
try:
+ # wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
+
step = "Getting information from database"
db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
step = "Getting nsd from database"
db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
- # look if previous tasks in process
- task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
- if task_dependency:
- step = db_nslcmop_update["detailed-status"] = \
- "Waiting for related tasks to be completed: {}".format(task_name)
- self.logger.debug(logging_text + step)
- self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
- _, pending = await asyncio.wait(task_dependency, timeout=3600)
- if pending:
- raise LcmException("Timeout waiting related tasks to be completed")
-
# for backward compatibility
if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
return nslcmop_operation_state, nslcmop_operation_state_detail
async def scale(self, nsr_id, nslcmop_id):
+
+ # Try to lock HA task here
+ task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
+ if not task_is_locked_by_me:
+ return
+
logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
self.logger.debug(logging_text + "Enter")
# get all needed from database
old_config_status = ""
vnfr_scaled = False
try:
- # look if previous tasks in process
- task_name, task_dependency = self.lcm_tasks.lookfor_related("ns", nsr_id, nslcmop_id)
- if task_dependency:
- step = db_nslcmop_update["detailed-status"] = \
- "Waiting for related tasks to be completed: {}".format(task_name)
- self.logger.debug(logging_text + step)
- self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
- _, pending = await asyncio.wait(task_dependency, timeout=3600)
- if pending:
- raise LcmException("Timeout waiting related tasks to be completed")
+ # wait for any previous tasks in process
+ await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
step = "Getting nslcmop from database"
self.logger.debug(step + " after having waited for previous tasks to be completed")