class LcmBase:
-
def __init__(self, msg, logger):
"""
"""
# NS/NSI: "services" VIM/WIM/SDN: "accounts"
- topic_service_list = ['ns', 'nsi']
- topic_account_list = ['vim', 'wim', 'sdn', 'k8scluster', 'k8srepo']
+ topic_service_list = ["ns", "nsi"]
+ topic_account_list = ["vim", "wim", "sdn", "k8scluster", "vca", "k8srepo"]
# Map topic to InstanceID
- topic2instid_dict = {
- 'ns': 'nsInstanceId',
- 'nsi': 'netsliceInstanceId'}
+ topic2instid_dict = {"ns": "nsInstanceId", "nsi": "netsliceInstanceId"}
# Map topic to DB table name
topic2dbtable_dict = {
- 'ns': 'nslcmops',
- 'nsi': 'nsilcmops',
- 'vim': 'vim_accounts',
- 'wim': 'wim_accounts',
- 'sdn': 'sdns',
- 'k8scluster': 'k8sclusters',
- 'k8srepo': 'k8srepos'}
+ "ns": "nslcmops",
+ "nsi": "nsilcmops",
+ "vim": "vim_accounts",
+ "wim": "wim_accounts",
+ "sdn": "sdns",
+ "k8scluster": "k8sclusters",
+ "vca": "vca",
+ "k8srepo": "k8srepos",
+ }
def __init__(self, worker_id=None, logger=None):
self.task_registry = {
"wim_account": {},
"sdn": {},
"k8scluster": {},
+ "vca": {},
"k8srepo": {},
}
self.worker_id = worker_id
def _get_account_and_op_HA(self, op_id):
if not op_id:
return None, None
- account_id, _, op_index = op_id.rpartition(':')
+ account_id, _, op_index = op_id.rpartition(":")
if not account_id or not op_index.isdigit():
return None, None
return account_id, op_index
def _get_instance_id_HA(self, topic, op_type, op_id):
_id = None
# Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
- if op_type == 'ANY':
+ if op_type == "ANY":
_id = op_id
# NS/NSI: Use op_id as '_id'
elif self._is_service_type_HA(topic):
_filter = {}
# Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
# In this special case, the timestamp is ignored
- if op_type == 'ANY':
- _filter = {'operationState': 'PROCESSING'}
+ if op_type == "ANY":
+ _filter = {"operationState": "PROCESSING"}
# Otherwise, get 'startTime' timestamp for this operation
else:
# NS/NSI
starttime_this_op = db_lcmop.get("startTime")
instance_id_label = self.topic2instid_dict.get(topic)
instance_id = db_lcmop.get(instance_id_label)
- _filter = {instance_id_label: instance_id,
- 'operationState': 'PROCESSING',
- 'startTime.lt': starttime_this_op,
- "_admin.modified.gt": now - 2*3600, # ignore if tow hours of inactivity
- }
+ _filter = {
+ instance_id_label: instance_id,
+ "operationState": "PROCESSING",
+ "startTime.lt": starttime_this_op,
+ "_admin.modified.gt": now
+ - 2 * 3600, # ignore if tow hours of inactivity
+ }
# VIM/WIM/SDN/K8scluster
elif self._is_account_type_HA(topic):
_, op_index = self._get_account_and_op_HA(op_id)
- _ops = db_lcmop['_admin']['operations']
+ _ops = db_lcmop["_admin"]["operations"]
_this_op = _ops[int(op_index)]
- starttime_this_op = _this_op.get('startTime', None)
- _filter = {'operationState': 'PROCESSING',
- 'startTime.lt': starttime_this_op}
+ starttime_this_op = _this_op.get("startTime", None)
+ _filter = {
+ "operationState": "PROCESSING",
+ "startTime.lt": starttime_this_op,
+ }
return _filter
# Get DB params for any topic and operation
update_dict = {}
# NS/NSI
if self._is_service_type_HA(topic):
- q_filter = {'_id': op_id, '_admin.worker': None}
- update_dict = {'_admin.worker': self.worker_id}
+ q_filter = {"_id": op_id, "_admin.worker": None}
+ update_dict = {"_admin.worker": self.worker_id}
# VIM/WIM/SDN
elif self._is_account_type_HA(topic):
account_id, op_index = self._get_account_and_op_HA(op_id)
if not account_id:
return None, None
- if op_type == 'create':
+ if op_type == "create":
# Creating a VIM/WIM/SDN account implies setting '_admin.current_operation' = 0
op_index = 0
- q_filter = {'_id': account_id, "_admin.operations.{}.worker".format(op_index): None}
- update_dict = {'_admin.operations.{}.worker'.format(op_index): self.worker_id,
- '_admin.current_operation': op_index}
+ q_filter = {
+ "_id": account_id,
+ "_admin.operations.{}.worker".format(op_index): None,
+ }
+ update_dict = {
+ "_admin.operations.{}.worker".format(op_index): self.worker_id,
+ "_admin.current_operation": op_index,
+ }
return q_filter, update_dict
def lock_HA(self, topic, op_type, op_id):
# Try to lock this task
db_table_name = self.topic2dbtable_dict[topic]
q_filter, update_dict = self._get_dbparams_for_lock_HA(topic, op_type, op_id)
- db_lock_task = self.db.set_one(db_table_name,
- q_filter=q_filter,
- update_dict=update_dict,
- fail_on_empty=False)
+ db_lock_task = self.db.set_one(
+ db_table_name,
+ q_filter=q_filter,
+ update_dict=update_dict,
+ fail_on_empty=False,
+ )
if db_lock_task is None:
- self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id))
+ self.logger.debug(
+ "Task {} operation={} already locked by another worker".format(
+ topic, op_id
+ )
+ )
return False
else:
# Set 'detailed-status' to 'In progress' for VIM/WIM/SDN operations
if self._is_account_type_HA(topic):
- detailed_status = 'In progress'
+ detailed_status = "In progress"
account_id, op_index = self._get_account_and_op_HA(op_id)
- q_filter = {'_id': account_id}
- update_dict = {'_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
- self.db.set_one(db_table_name,
- q_filter=q_filter,
- update_dict=update_dict,
- fail_on_empty=False)
+ q_filter = {"_id": account_id}
+ update_dict = {
+ "_admin.operations.{}.detailed-status".format(
+ op_index
+ ): detailed_status
+ }
+ self.db.set_one(
+ db_table_name,
+ q_filter=q_filter,
+ update_dict=update_dict,
+ fail_on_empty=False,
+ )
return True
def unlock_HA(self, topic, op_type, op_id, operationState, detailed_status):
# If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED)
# If the account exist, register the HA task.
# Update DB for HA tasks
- q_filter = {'_id': account_id}
- update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
- '_admin.operations.{}.detailed-status'.format(op_index): detailed_status,
- '_admin.operations.{}.worker'.format(op_index): None,
- '_admin.current_operation': None}
- self.db.set_one(db_table_name,
- q_filter=q_filter,
- update_dict=update_dict,
- fail_on_empty=False)
+ q_filter = {"_id": account_id}
+ update_dict = {
+ "_admin.operations.{}.operationState".format(op_index): operationState,
+ "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
+ "_admin.operations.{}.worker".format(op_index): None,
+ "_admin.current_operation": None,
+ }
+ self.db.set_one(
+ db_table_name,
+ q_filter=q_filter,
+ update_dict=update_dict,
+ fail_on_empty=False,
+ )
return
async def waitfor_related_HA(self, topic, op_type, op_id=None):
"""
# Backward compatibility
- if not (self._is_service_type_HA(topic) or self._is_account_type_HA(topic)) and (op_id is None):
+ if not (
+ self._is_service_type_HA(topic) or self._is_account_type_HA(topic)
+ ) and (op_id is None):
return
# Get DB table name
# Get instance ID
_id = self._get_instance_id_HA(topic, op_type, op_id)
_filter = {"_id": _id}
- db_lcmop = self.db.get_one(db_table_name,
- _filter,
- fail_on_empty=False)
+ db_lcmop = self.db.get_one(db_table_name, _filter, fail_on_empty=False)
if not db_lcmop:
return
_filter = self._get_waitfor_filter_HA(db_lcmop, topic, op_type, op_id)
# For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
- timeout_wait_for_task = 3600 # Max time (seconds) to wait for a related task to finish
+ timeout_wait_for_task = (
+ 3600 # Max time (seconds) to wait for a related task to finish
+ )
# interval_wait_for_task = 30 # A too long polling interval slows things down considerably
- interval_wait_for_task = 10 # Interval in seconds for polling related tasks
+ interval_wait_for_task = 10 # Interval in seconds for polling related tasks
time_left = timeout_wait_for_task
old_num_related_tasks = 0
while True:
# Get related tasks (operations within the same instance as this) which are
# still running (operationState='PROCESSING') and which were started before this task.
# In the case of op_type='ANY', get any related tasks with operationState='PROCESSING', ignore timestamps.
- db_waitfor_related_task = self.db.get_list(db_table_name,
- q_filter=_filter)
+ db_waitfor_related_task = self.db.get_list(db_table_name, q_filter=_filter)
new_num_related_tasks = len(db_waitfor_related_task)
# If there are no related tasks, there is nothing to wait for, so return.
if not new_num_related_tasks:
# If number of pending related tasks have changed,
# update the 'detailed-status' field and log the change.
# Do NOT update the 'detailed-status' for SDNC-associated-to-VIM operations ('ANY').
- if (op_type != 'ANY') and (new_num_related_tasks != old_num_related_tasks):
- step = "Waiting for {} related tasks to be completed.".format(new_num_related_tasks)
+ if (op_type != "ANY") and (new_num_related_tasks != old_num_related_tasks):
+ step = "Waiting for {} related tasks to be completed.".format(
+ new_num_related_tasks
+ )
update_dict = {}
- q_filter = {'_id': _id}
+ q_filter = {"_id": _id}
# NS/NSI
if self._is_service_type_HA(topic):
- update_dict = {'detailed-status': step, 'queuePosition': new_num_related_tasks}
+ update_dict = {
+ "detailed-status": step,
+ "queuePosition": new_num_related_tasks,
+ }
# VIM/WIM/SDN
elif self._is_account_type_HA(topic):
_, op_index = self._get_account_and_op_HA(op_id)
- update_dict = {'_admin.operations.{}.detailed-status'.format(op_index): step}
+ update_dict = {
+ "_admin.operations.{}.detailed-status".format(op_index): step
+ }
self.logger.debug("Task {} operation={} {}".format(topic, _id, step))
- self.db.set_one(db_table_name,
- q_filter=q_filter,
- update_dict=update_dict,
- fail_on_empty=False)
+ self.db.set_one(
+ db_table_name,
+ q_filter=q_filter,
+ update_dict=update_dict,
+ fail_on_empty=False,
+ )
old_num_related_tasks = new_num_related_tasks
time_left -= interval_wait_for_task
if time_left < 0:
raise LcmException(
"Timeout ({}) when waiting for related tasks to be completed".format(
- timeout_wait_for_task))
+ timeout_wait_for_task
+ )
+ )
await asyncio.sleep(interval_wait_for_task)
return