blob: 357b4d4ec92f4643b0267aaf7c0ae81cb41f4ca4 [file] [log] [blame]
tierno59d22d22018-09-25 18:10:19 +02001# -*- coding: utf-8 -*-
2
tierno2e215512018-11-28 09:37:52 +00003##
4# Copyright 2018 Telefonica S.A.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17##
tierno59d22d22018-09-25 18:10:19 +020018
kuused124bfe2019-06-18 12:09:24 +020019import asyncio
tierno59d22d22018-09-25 18:10:19 +020020from collections import OrderedDict
tierno79cd8ad2019-10-18 13:03:10 +000021from time import time
tiernobaa51102018-12-14 13:16:18 +000022# from osm_common.dbbase import DbException
tierno59d22d22018-09-25 18:10:19 +020023
24__author__ = "Alfonso Tierno"
25
26
27class LcmException(Exception):
28 pass
29
30
tiernof578e552018-11-08 19:07:20 +010031class LcmExceptionNoMgmtIP(LcmException):
32 pass
33
34
gcalvinoed7f6d42018-12-14 14:44:56 +010035class LcmExceptionExit(LcmException):
36 pass
37
38
tierno59d22d22018-09-25 18:10:19 +020039def versiontuple(v):
tierno27246d82018-09-27 15:59:09 +020040 """utility for compare dot separate versions. Fills with zeros to proper number comparison
41 package version will be something like 4.0.1.post11+gb3f024d.dirty-1. Where 4.0.1 is the git tag, postXX is the
42 number of commits from this tag, and +XXXXXXX is the git commit short id. Total length is 16 with until 999 commits
43 """
tierno59d22d22018-09-25 18:10:19 +020044 filled = []
45 for point in v.split("."):
tiernoe64f7fb2019-09-11 08:55:52 +000046 point, _, _ = point.partition("+")
47 point, _, _ = point.partition("-")
48 filled.append(point.zfill(20))
tierno59d22d22018-09-25 18:10:19 +020049 return tuple(filled)
50
51
kuused124bfe2019-06-18 12:09:24 +020052# LcmBase must be listed before TaskRegistry, as it is a dependency.
53class LcmBase:
54
55 def __init__(self, db, msg, fs, logger):
56 """
57
58 :param db: database connection
59 """
60 self.db = db
61 self.msg = msg
62 self.fs = fs
63 self.logger = logger
64
65 def update_db_2(self, item, _id, _desc):
66 """
67 Updates database with _desc information. If success _desc is cleared
68 :param item:
69 :param _id:
70 :param _desc: dictionary with the content to update. Keys are dot separated keys for
71 :return: None. Exception is raised on error
72 """
73 if not _desc:
74 return
tierno79cd8ad2019-10-18 13:03:10 +000075 now = time()
76 _desc["_admin.modified"] = now
kuused124bfe2019-06-18 12:09:24 +020077 self.db.set_one(item, {"_id": _id}, _desc)
78 _desc.clear()
79 # except DbException as e:
80 # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
81
82
83class TaskRegistry(LcmBase):
tierno59d22d22018-09-25 18:10:19 +020084 """
85 Implements a registry of task needed for later cancelation, look for related tasks that must be completed before
86 etc. It stores a four level dict
87 First level is the topic, ns, vim_account, sdn
88 Second level is the _id
89 Third level is the operation id
90 Fourth level is a descriptive name, the value is the task class
kuused124bfe2019-06-18 12:09:24 +020091
92 The HA (High-Availability) methods are used when more than one LCM instance is running.
93 To register the current task in the external DB, use LcmBase as base class, to be able
94 to reuse LcmBase.update_db_2()
95 The DB registry uses the following fields to distinguish a task:
96 - op_type: operation type ("nslcmops" or "nsilcmops")
97 - op_id: operation ID
98 - worker: the worker ID for this process
tierno59d22d22018-09-25 18:10:19 +020099 """
100
kuuse6a470c62019-07-10 13:52:45 +0200101 # NS/NSI: "services" VIM/WIM/SDN: "accounts"
102 topic_service_list = ['ns', 'nsi']
103 topic_account_list = ['vim', 'wim', 'sdn']
104
105 # Map topic to InstanceID
106 topic2instid_dict = {
107 'ns': 'nsInstanceId',
108 'nsi': 'netsliceInstanceId'}
109
110 # Map topic to DB table name
111 topic2dbtable_dict = {
112 'ns': 'nslcmops',
113 'nsi': 'nsilcmops',
114 'vim': 'vim_accounts',
115 'wim': 'wim_accounts',
116 'sdn': 'sdns'}
kuused124bfe2019-06-18 12:09:24 +0200117
118 def __init__(self, worker_id=None, db=None, logger=None):
tierno59d22d22018-09-25 18:10:19 +0200119 self.task_registry = {
120 "ns": {},
Felipe Vicensc2033f22018-11-15 15:09:58 +0100121 "nsi": {},
tierno59d22d22018-09-25 18:10:19 +0200122 "vim_account": {},
tiernoe37b57d2018-12-11 17:22:51 +0000123 "wim_account": {},
tierno59d22d22018-09-25 18:10:19 +0200124 "sdn": {},
125 }
kuused124bfe2019-06-18 12:09:24 +0200126 self.worker_id = worker_id
127 self.db = db
128 self.logger = logger
tierno59d22d22018-09-25 18:10:19 +0200129
130 def register(self, topic, _id, op_id, task_name, task):
131 """
132 Register a new task
Felipe Vicensc2033f22018-11-15 15:09:58 +0100133 :param topic: Can be "ns", "nsi", "vim_account", "sdn"
tierno59d22d22018-09-25 18:10:19 +0200134 :param _id: _id of the related item
135 :param op_id: id of the operation of the related item
136 :param task_name: Task descriptive name, as create, instantiate, terminate. Must be unique in this op_id
137 :param task: Task class
138 :return: none
139 """
140 if _id not in self.task_registry[topic]:
141 self.task_registry[topic][_id] = OrderedDict()
142 if op_id not in self.task_registry[topic][_id]:
143 self.task_registry[topic][_id][op_id] = {task_name: task}
144 else:
145 self.task_registry[topic][_id][op_id][task_name] = task
146 # print("registering task", topic, _id, op_id, task_name, task)
147
148 def remove(self, topic, _id, op_id, task_name=None):
149 """
tiernobaa51102018-12-14 13:16:18 +0000150 When task is ended, it should be removed. It ignores missing tasks. It also removes tasks done with this _id
Felipe Vicensc2033f22018-11-15 15:09:58 +0100151 :param topic: Can be "ns", "nsi", "vim_account", "sdn"
tierno59d22d22018-09-25 18:10:19 +0200152 :param _id: _id of the related item
153 :param op_id: id of the operation of the related item
tiernobaa51102018-12-14 13:16:18 +0000154 :param task_name: Task descriptive name. If none it deletes all tasks with same _id and op_id
155 :return: None
tierno59d22d22018-09-25 18:10:19 +0200156 """
tiernobaa51102018-12-14 13:16:18 +0000157 if not self.task_registry[topic].get(_id):
tierno59d22d22018-09-25 18:10:19 +0200158 return
159 if not task_name:
tiernobaa51102018-12-14 13:16:18 +0000160 self.task_registry[topic][_id].pop(op_id, None)
161 elif self.task_registry[topic][_id].get(op_id):
162 self.task_registry[topic][_id][op_id].pop(task_name, None)
163
164 # delete done tasks
165 for op_id_ in list(self.task_registry[topic][_id]):
166 for name, task in self.task_registry[topic][_id][op_id_].items():
167 if not task.done():
168 break
169 else:
170 del self.task_registry[topic][_id][op_id_]
tierno59d22d22018-09-25 18:10:19 +0200171 if not self.task_registry[topic][_id]:
172 del self.task_registry[topic][_id]
173
174 def lookfor_related(self, topic, _id, my_op_id=None):
175 task_list = []
176 task_name_list = []
177 if _id not in self.task_registry[topic]:
178 return "", task_name_list
179 for op_id in reversed(self.task_registry[topic][_id]):
180 if my_op_id:
181 if my_op_id == op_id:
182 my_op_id = None # so that the next task is taken
183 continue
184
185 for task_name, task in self.task_registry[topic][_id][op_id].items():
tiernobaa51102018-12-14 13:16:18 +0000186 if not task.done():
187 task_list.append(task)
188 task_name_list.append(task_name)
tierno59d22d22018-09-25 18:10:19 +0200189 break
190 return ", ".join(task_name_list), task_list
191
192 def cancel(self, topic, _id, target_op_id=None, target_task_name=None):
193 """
kuused124bfe2019-06-18 12:09:24 +0200194 Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
Felipe Vicensc2033f22018-11-15 15:09:58 +0100195 this is cancelled, and the same with task_name
tierno59d22d22018-09-25 18:10:19 +0200196 """
197 if not self.task_registry[topic].get(_id):
198 return
199 for op_id in reversed(self.task_registry[topic][_id]):
200 if target_op_id and target_op_id != op_id:
201 continue
202 for task_name, task in self.task_registry[topic][_id][op_id].items():
203 if target_task_name and target_task_name != task_name:
204 continue
205 # result =
206 task.cancel()
207 # if result:
208 # self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name))
209
kuuse6a470c62019-07-10 13:52:45 +0200210 # Is topic NS/NSI?
211 def _is_service_type_HA(self, topic):
212 return topic in self.topic_service_list
213
214 # Is topic VIM/WIM/SDN?
215 def _is_account_type_HA(self, topic):
216 return topic in self.topic_account_list
217
218 # Input: op_id, example: 'abc123def:3' Output: account_id='abc123def', op_index=3
219 def _get_account_and_op_HA(self, op_id):
220 if not op_id:
221 return (None, None)
222 account_id, _, op_index = op_id.rpartition(':')
223 if not account_id:
224 return (None, None)
225 if not op_index.isdigit():
226 return (None, None)
227 return account_id, op_index
228
229 # Get '_id' for any topic and operation
230 def _get_instance_id_HA(self, topic, op_type, op_id):
231 _id = None
232 # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
233 if op_type == 'ANY':
234 _id = op_id
235 # NS/NSI: Use op_id as '_id'
236 elif self._is_service_type_HA(topic):
237 _id = op_id
238 # VIM/SDN/WIM: Split op_id to get Account ID and Operation Index, use Account ID as '_id'
239 elif self._is_account_type_HA(topic):
240 _id, _ = self._get_account_and_op_HA(op_id)
241 return _id
242
243 # Set DB _filter for querying any related process state
244 def _get_waitfor_filter_HA(self, db_lcmop, topic, op_type, op_id):
245 _filter = {}
246 # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
247 # In this special case, the timestamp is ignored
248 if op_type == 'ANY':
249 _filter = {'operationState': 'PROCESSING'}
250 # Otherwise, get 'startTime' timestamp for this operation
251 else:
252 # NS/NSI
253 if self._is_service_type_HA(topic):
tierno79cd8ad2019-10-18 13:03:10 +0000254 now = time()
kuuse6a470c62019-07-10 13:52:45 +0200255 starttime_this_op = db_lcmop.get("startTime")
256 instance_id_label = self.topic2instid_dict.get(topic)
257 instance_id = db_lcmop.get(instance_id_label)
258 _filter = {instance_id_label: instance_id,
259 'operationState': 'PROCESSING',
tierno79cd8ad2019-10-18 13:03:10 +0000260 'startTime.lt': starttime_this_op,
261 "_admin.modified.gt": now - 2*3600, # ignore if tow hours of inactivity
262 }
kuuse6a470c62019-07-10 13:52:45 +0200263 # VIM/WIM/SDN
264 elif self._is_account_type_HA(topic):
265 _, op_index = self._get_account_and_op_HA(op_id)
266 _ops = db_lcmop['_admin']['operations']
267 _this_op = _ops[int(op_index)]
268 starttime_this_op = _this_op.get('startTime', None)
269 _filter = {'operationState': 'PROCESSING',
270 'startTime.lt': starttime_this_op}
271 return _filter
272
273 # Get DB params for any topic and operation
274 def _get_dbparams_for_lock_HA(self, topic, op_type, op_id):
275 q_filter = {}
276 update_dict = {}
277 # NS/NSI
278 if self._is_service_type_HA(topic):
279 q_filter = {'_id': op_id, '_admin.worker': None}
280 update_dict = {'_admin.worker': self.worker_id}
281 # VIM/WIM/SDN
282 elif self._is_account_type_HA(topic):
283 account_id, op_index = self._get_account_and_op_HA(op_id)
284 if not account_id:
285 return None, None
286 if op_type == 'create':
287 # Creating a VIM/WIM/SDN account implies setting '_admin.current_operation' = 0
288 op_index = 0
289 q_filter = {'_id': account_id, "_admin.operations.{}.worker".format(op_index): None}
290 update_dict = {'_admin.operations.{}.worker'.format(op_index): self.worker_id,
291 '_admin.current_operation': op_index}
292 return q_filter, update_dict
293
kuused124bfe2019-06-18 12:09:24 +0200294 def lock_HA(self, topic, op_type, op_id):
295 """
kuuse6a470c62019-07-10 13:52:45 +0200296 Lock a task, if possible, to indicate to the HA system that
kuused124bfe2019-06-18 12:09:24 +0200297 the task will be executed in this LCM instance.
kuuse6a470c62019-07-10 13:52:45 +0200298 :param topic: Can be "ns", "nsi", "vim", "wim", or "sdn"
299 :param op_type: Operation type, can be "nslcmops", "nsilcmops", "create", "edit", "delete"
300 :param op_id: NS, NSI: Operation ID VIM,WIM,SDN: Account ID + ':' + Operation Index
kuused124bfe2019-06-18 12:09:24 +0200301 :return:
kuuse6a470c62019-07-10 13:52:45 +0200302 True=lock was successful => execute the task (not registered by any other LCM instance)
kuused124bfe2019-06-18 12:09:24 +0200303 False=lock failed => do NOT execute the task (already registered by another LCM instance)
kuuse6a470c62019-07-10 13:52:45 +0200304
305 HA tasks and backward compatibility:
306 If topic is "account type" (VIM/WIM/SDN) and op_id is None, 'op_id' was not provided by NBI.
307 This means that the running NBI instance does not support HA.
308 In such a case this method should always return True, to always execute
309 the task in this instance of LCM, without querying the DB.
tierno59d22d22018-09-25 18:10:19 +0200310 """
311
kuuse6a470c62019-07-10 13:52:45 +0200312 # Backward compatibility for VIM/WIM/SDN without op_id
313 if self._is_account_type_HA(topic) and op_id is None:
314 return True
tierno59d22d22018-09-25 18:10:19 +0200315
kuuse6a470c62019-07-10 13:52:45 +0200316 # Try to lock this task
317 db_table_name = self.topic2dbtable_dict.get(topic)
318 q_filter, update_dict = self._get_dbparams_for_lock_HA(topic, op_type, op_id)
319 db_lock_task = self.db.set_one(db_table_name,
320 q_filter=q_filter,
321 update_dict=update_dict,
322 fail_on_empty=False)
kuused124bfe2019-06-18 12:09:24 +0200323 if db_lock_task is None:
324 self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id))
325 return False
326 else:
kuuse6a470c62019-07-10 13:52:45 +0200327 # Set 'detailed-status' to 'In progress' for VIM/WIM/SDN operations
328 if self._is_account_type_HA(topic):
329 detailed_status = 'In progress'
330 account_id, op_index = self._get_account_and_op_HA(op_id)
331 q_filter = {'_id': account_id}
332 update_dict = {'_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
333 self.db.set_one(db_table_name,
334 q_filter=q_filter,
335 update_dict=update_dict,
336 fail_on_empty=False)
kuused124bfe2019-06-18 12:09:24 +0200337 return True
338
kuuse6a470c62019-07-10 13:52:45 +0200339 def register_HA(self, topic, op_type, op_id, operationState, detailed_status):
340 """
341 Register a task, done when finished a VIM/WIM/SDN 'create' operation.
342 :param topic: Can be "vim", "wim", or "sdn"
343 :param op_type: Operation type, can be "create", "edit", "delete"
344 :param op_id: Account ID + ':' + Operation Index
345 :return: nothing
346 """
347
348 # Backward compatibility
349 if not self._is_account_type_HA(topic) or (self._is_account_type_HA(topic) and op_id is None):
350 return
351
352 # Get Account ID and Operation Index
353 account_id, op_index = self._get_account_and_op_HA(op_id)
354 db_table_name = self.topic2dbtable_dict.get(topic)
355
356 # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED)
357 # If the account exist, register the HA task.
358 # Update DB for HA tasks
359 q_filter = {'_id': account_id}
360 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
361 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
362 self.db.set_one(db_table_name,
363 q_filter=q_filter,
364 update_dict=update_dict,
365 fail_on_empty=False)
366 return
367
kuused124bfe2019-06-18 12:09:24 +0200368 async def waitfor_related_HA(self, topic, op_type, op_id=None):
tierno59d22d22018-09-25 18:10:19 +0200369 """
kuused124bfe2019-06-18 12:09:24 +0200370 Wait for any pending related HA tasks
tierno59d22d22018-09-25 18:10:19 +0200371 """
kuused124bfe2019-06-18 12:09:24 +0200372
kuuse6a470c62019-07-10 13:52:45 +0200373 # Backward compatibility
374 if not (self._is_service_type_HA(topic) or self._is_account_type_HA(topic)) and (op_id is None):
375 return
kuused124bfe2019-06-18 12:09:24 +0200376
kuuse6a470c62019-07-10 13:52:45 +0200377 # Get DB table name
378 db_table_name = self.topic2dbtable_dict.get(topic)
379
380 # Get instance ID
381 _id = self._get_instance_id_HA(topic, op_type, op_id)
382 _filter = {"_id": _id}
383 db_lcmop = self.db.get_one(db_table_name,
384 _filter,
kuused124bfe2019-06-18 12:09:24 +0200385 fail_on_empty=False)
386 if not db_lcmop:
tierno59d22d22018-09-25 18:10:19 +0200387 return
kuuse6a470c62019-07-10 13:52:45 +0200388
389 # Set DB _filter for querying any related process state
390 _filter = self._get_waitfor_filter_HA(db_lcmop, topic, op_type, op_id)
kuused124bfe2019-06-18 12:09:24 +0200391
392 # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
393 timeout_wait_for_task = 3600 # Max time (seconds) to wait for a related task to finish
394 # interval_wait_for_task = 30 # A too long polling interval slows things down considerably
395 interval_wait_for_task = 10 # Interval in seconds for polling related tasks
396 time_left = timeout_wait_for_task
397 old_num_related_tasks = 0
398 while True:
kuuse6a470c62019-07-10 13:52:45 +0200399 # Get related tasks (operations within the same instance as this) which are
kuused124bfe2019-06-18 12:09:24 +0200400 # still running (operationState='PROCESSING') and which were started before this task.
kuuse6a470c62019-07-10 13:52:45 +0200401 # In the case of op_type='ANY', get any related tasks with operationState='PROCESSING', ignore timestamps.
402 db_waitfor_related_task = self.db.get_list(db_table_name,
kuused124bfe2019-06-18 12:09:24 +0200403 q_filter=_filter)
404 new_num_related_tasks = len(db_waitfor_related_task)
kuuse6a470c62019-07-10 13:52:45 +0200405 # If there are no related tasks, there is nothing to wait for, so return.
kuused124bfe2019-06-18 12:09:24 +0200406 if not new_num_related_tasks:
kuused124bfe2019-06-18 12:09:24 +0200407 return
408 # If number of pending related tasks have changed,
409 # update the 'detailed-status' field and log the change.
kuuse6a470c62019-07-10 13:52:45 +0200410 # Do NOT update the 'detailed-status' for SDNC-associated-to-VIM operations ('ANY').
411 if (op_type != 'ANY') and (new_num_related_tasks != old_num_related_tasks):
412 step = "Waiting for {} related tasks to be completed.".format(new_num_related_tasks)
413 update_dict = {}
414 q_filter = {'_id': _id}
415 # NS/NSI
416 if self._is_service_type_HA(topic):
417 update_dict = {'detailed-status': step}
418 # VIM/WIM/SDN
419 elif self._is_account_type_HA(topic):
420 _, op_index = self._get_account_and_op_HA(op_id)
421 update_dict = {'_admin.operations.{}.detailed-status'.format(op_index): step}
422 self.logger.debug("Task {} operation={} {}".format(topic, _id, step))
423 self.db.set_one(db_table_name,
424 q_filter=q_filter,
425 update_dict=update_dict,
426 fail_on_empty=False)
kuused124bfe2019-06-18 12:09:24 +0200427 old_num_related_tasks = new_num_related_tasks
428 time_left -= interval_wait_for_task
429 if time_left < 0:
430 raise LcmException(
431 "Timeout ({}) when waiting for related tasks to be completed".format(
432 timeout_wait_for_task))
433 await asyncio.sleep(interval_wait_for_task)
434
435 return