update database '_admin.modified' when modified.
[osm/LCM.git] / osm_lcm / lcm_utils.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from collections import OrderedDict
21 from time import time
22 # from osm_common.dbbase import DbException
23
24 __author__ = "Alfonso Tierno"
25
26
27 class LcmException(Exception):
28 pass
29
30
31 class LcmExceptionNoMgmtIP(LcmException):
32 pass
33
34
35 class LcmExceptionExit(LcmException):
36 pass
37
38
39 def versiontuple(v):
40 """utility for compare dot separate versions. Fills with zeros to proper number comparison
41 package version will be something like 4.0.1.post11+gb3f024d.dirty-1. Where 4.0.1 is the git tag, postXX is the
42 number of commits from this tag, and +XXXXXXX is the git commit short id. Total length is 16 with until 999 commits
43 """
44 filled = []
45 for point in v.split("."):
46 point, _, _ = point.partition("+")
47 point, _, _ = point.partition("-")
48 filled.append(point.zfill(20))
49 return tuple(filled)
50
51
52 # LcmBase must be listed before TaskRegistry, as it is a dependency.
53 class LcmBase:
54
55 def __init__(self, db, msg, fs, logger):
56 """
57
58 :param db: database connection
59 """
60 self.db = db
61 self.msg = msg
62 self.fs = fs
63 self.logger = logger
64
65 def update_db_2(self, item, _id, _desc):
66 """
67 Updates database with _desc information. If success _desc is cleared
68 :param item:
69 :param _id:
70 :param _desc: dictionary with the content to update. Keys are dot separated keys for
71 :return: None. Exception is raised on error
72 """
73 if not _desc:
74 return
75 now = time()
76 _desc["_admin.modified"] = now
77 self.db.set_one(item, {"_id": _id}, _desc)
78 _desc.clear()
79 # except DbException as e:
80 # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
81
82
83 class TaskRegistry(LcmBase):
84 """
85 Implements a registry of task needed for later cancelation, look for related tasks that must be completed before
86 etc. It stores a four level dict
87 First level is the topic, ns, vim_account, sdn
88 Second level is the _id
89 Third level is the operation id
90 Fourth level is a descriptive name, the value is the task class
91
92 The HA (High-Availability) methods are used when more than one LCM instance is running.
93 To register the current task in the external DB, use LcmBase as base class, to be able
94 to reuse LcmBase.update_db_2()
95 The DB registry uses the following fields to distinguish a task:
96 - op_type: operation type ("nslcmops" or "nsilcmops")
97 - op_id: operation ID
98 - worker: the worker ID for this process
99 """
100
101 # NS/NSI: "services" VIM/WIM/SDN: "accounts"
102 topic_service_list = ['ns', 'nsi']
103 topic_account_list = ['vim', 'wim', 'sdn']
104
105 # Map topic to InstanceID
106 topic2instid_dict = {
107 'ns': 'nsInstanceId',
108 'nsi': 'netsliceInstanceId'}
109
110 # Map topic to DB table name
111 topic2dbtable_dict = {
112 'ns': 'nslcmops',
113 'nsi': 'nsilcmops',
114 'vim': 'vim_accounts',
115 'wim': 'wim_accounts',
116 'sdn': 'sdns'}
117
118 def __init__(self, worker_id=None, db=None, logger=None):
119 self.task_registry = {
120 "ns": {},
121 "nsi": {},
122 "vim_account": {},
123 "wim_account": {},
124 "sdn": {},
125 }
126 self.worker_id = worker_id
127 self.db = db
128 self.logger = logger
129
130 def register(self, topic, _id, op_id, task_name, task):
131 """
132 Register a new task
133 :param topic: Can be "ns", "nsi", "vim_account", "sdn"
134 :param _id: _id of the related item
135 :param op_id: id of the operation of the related item
136 :param task_name: Task descriptive name, as create, instantiate, terminate. Must be unique in this op_id
137 :param task: Task class
138 :return: none
139 """
140 if _id not in self.task_registry[topic]:
141 self.task_registry[topic][_id] = OrderedDict()
142 if op_id not in self.task_registry[topic][_id]:
143 self.task_registry[topic][_id][op_id] = {task_name: task}
144 else:
145 self.task_registry[topic][_id][op_id][task_name] = task
146 # print("registering task", topic, _id, op_id, task_name, task)
147
148 def remove(self, topic, _id, op_id, task_name=None):
149 """
150 When task is ended, it should be removed. It ignores missing tasks. It also removes tasks done with this _id
151 :param topic: Can be "ns", "nsi", "vim_account", "sdn"
152 :param _id: _id of the related item
153 :param op_id: id of the operation of the related item
154 :param task_name: Task descriptive name. If none it deletes all tasks with same _id and op_id
155 :return: None
156 """
157 if not self.task_registry[topic].get(_id):
158 return
159 if not task_name:
160 self.task_registry[topic][_id].pop(op_id, None)
161 elif self.task_registry[topic][_id].get(op_id):
162 self.task_registry[topic][_id][op_id].pop(task_name, None)
163
164 # delete done tasks
165 for op_id_ in list(self.task_registry[topic][_id]):
166 for name, task in self.task_registry[topic][_id][op_id_].items():
167 if not task.done():
168 break
169 else:
170 del self.task_registry[topic][_id][op_id_]
171 if not self.task_registry[topic][_id]:
172 del self.task_registry[topic][_id]
173
174 def lookfor_related(self, topic, _id, my_op_id=None):
175 task_list = []
176 task_name_list = []
177 if _id not in self.task_registry[topic]:
178 return "", task_name_list
179 for op_id in reversed(self.task_registry[topic][_id]):
180 if my_op_id:
181 if my_op_id == op_id:
182 my_op_id = None # so that the next task is taken
183 continue
184
185 for task_name, task in self.task_registry[topic][_id][op_id].items():
186 if not task.done():
187 task_list.append(task)
188 task_name_list.append(task_name)
189 break
190 return ", ".join(task_name_list), task_list
191
192 def cancel(self, topic, _id, target_op_id=None, target_task_name=None):
193 """
194 Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
195 this is cancelled, and the same with task_name
196 """
197 if not self.task_registry[topic].get(_id):
198 return
199 for op_id in reversed(self.task_registry[topic][_id]):
200 if target_op_id and target_op_id != op_id:
201 continue
202 for task_name, task in self.task_registry[topic][_id][op_id].items():
203 if target_task_name and target_task_name != task_name:
204 continue
205 # result =
206 task.cancel()
207 # if result:
208 # self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name))
209
210 # Is topic NS/NSI?
211 def _is_service_type_HA(self, topic):
212 return topic in self.topic_service_list
213
214 # Is topic VIM/WIM/SDN?
215 def _is_account_type_HA(self, topic):
216 return topic in self.topic_account_list
217
218 # Input: op_id, example: 'abc123def:3' Output: account_id='abc123def', op_index=3
219 def _get_account_and_op_HA(self, op_id):
220 if not op_id:
221 return (None, None)
222 account_id, _, op_index = op_id.rpartition(':')
223 if not account_id:
224 return (None, None)
225 if not op_index.isdigit():
226 return (None, None)
227 return account_id, op_index
228
229 # Get '_id' for any topic and operation
230 def _get_instance_id_HA(self, topic, op_type, op_id):
231 _id = None
232 # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
233 if op_type == 'ANY':
234 _id = op_id
235 # NS/NSI: Use op_id as '_id'
236 elif self._is_service_type_HA(topic):
237 _id = op_id
238 # VIM/SDN/WIM: Split op_id to get Account ID and Operation Index, use Account ID as '_id'
239 elif self._is_account_type_HA(topic):
240 _id, _ = self._get_account_and_op_HA(op_id)
241 return _id
242
243 # Set DB _filter for querying any related process state
244 def _get_waitfor_filter_HA(self, db_lcmop, topic, op_type, op_id):
245 _filter = {}
246 # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
247 # In this special case, the timestamp is ignored
248 if op_type == 'ANY':
249 _filter = {'operationState': 'PROCESSING'}
250 # Otherwise, get 'startTime' timestamp for this operation
251 else:
252 # NS/NSI
253 if self._is_service_type_HA(topic):
254 now = time()
255 starttime_this_op = db_lcmop.get("startTime")
256 instance_id_label = self.topic2instid_dict.get(topic)
257 instance_id = db_lcmop.get(instance_id_label)
258 _filter = {instance_id_label: instance_id,
259 'operationState': 'PROCESSING',
260 'startTime.lt': starttime_this_op,
261 "_admin.modified.gt": now - 2*3600, # ignore if tow hours of inactivity
262 }
263 # VIM/WIM/SDN
264 elif self._is_account_type_HA(topic):
265 _, op_index = self._get_account_and_op_HA(op_id)
266 _ops = db_lcmop['_admin']['operations']
267 _this_op = _ops[int(op_index)]
268 starttime_this_op = _this_op.get('startTime', None)
269 _filter = {'operationState': 'PROCESSING',
270 'startTime.lt': starttime_this_op}
271 return _filter
272
273 # Get DB params for any topic and operation
274 def _get_dbparams_for_lock_HA(self, topic, op_type, op_id):
275 q_filter = {}
276 update_dict = {}
277 # NS/NSI
278 if self._is_service_type_HA(topic):
279 q_filter = {'_id': op_id, '_admin.worker': None}
280 update_dict = {'_admin.worker': self.worker_id}
281 # VIM/WIM/SDN
282 elif self._is_account_type_HA(topic):
283 account_id, op_index = self._get_account_and_op_HA(op_id)
284 if not account_id:
285 return None, None
286 if op_type == 'create':
287 # Creating a VIM/WIM/SDN account implies setting '_admin.current_operation' = 0
288 op_index = 0
289 q_filter = {'_id': account_id, "_admin.operations.{}.worker".format(op_index): None}
290 update_dict = {'_admin.operations.{}.worker'.format(op_index): self.worker_id,
291 '_admin.current_operation': op_index}
292 return q_filter, update_dict
293
294 def lock_HA(self, topic, op_type, op_id):
295 """
296 Lock a task, if possible, to indicate to the HA system that
297 the task will be executed in this LCM instance.
298 :param topic: Can be "ns", "nsi", "vim", "wim", or "sdn"
299 :param op_type: Operation type, can be "nslcmops", "nsilcmops", "create", "edit", "delete"
300 :param op_id: NS, NSI: Operation ID VIM,WIM,SDN: Account ID + ':' + Operation Index
301 :return:
302 True=lock was successful => execute the task (not registered by any other LCM instance)
303 False=lock failed => do NOT execute the task (already registered by another LCM instance)
304
305 HA tasks and backward compatibility:
306 If topic is "account type" (VIM/WIM/SDN) and op_id is None, 'op_id' was not provided by NBI.
307 This means that the running NBI instance does not support HA.
308 In such a case this method should always return True, to always execute
309 the task in this instance of LCM, without querying the DB.
310 """
311
312 # Backward compatibility for VIM/WIM/SDN without op_id
313 if self._is_account_type_HA(topic) and op_id is None:
314 return True
315
316 # Try to lock this task
317 db_table_name = self.topic2dbtable_dict.get(topic)
318 q_filter, update_dict = self._get_dbparams_for_lock_HA(topic, op_type, op_id)
319 db_lock_task = self.db.set_one(db_table_name,
320 q_filter=q_filter,
321 update_dict=update_dict,
322 fail_on_empty=False)
323 if db_lock_task is None:
324 self.logger.debug("Task {} operation={} already locked by another worker".format(topic, op_id))
325 return False
326 else:
327 # Set 'detailed-status' to 'In progress' for VIM/WIM/SDN operations
328 if self._is_account_type_HA(topic):
329 detailed_status = 'In progress'
330 account_id, op_index = self._get_account_and_op_HA(op_id)
331 q_filter = {'_id': account_id}
332 update_dict = {'_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
333 self.db.set_one(db_table_name,
334 q_filter=q_filter,
335 update_dict=update_dict,
336 fail_on_empty=False)
337 return True
338
339 def register_HA(self, topic, op_type, op_id, operationState, detailed_status):
340 """
341 Register a task, done when finished a VIM/WIM/SDN 'create' operation.
342 :param topic: Can be "vim", "wim", or "sdn"
343 :param op_type: Operation type, can be "create", "edit", "delete"
344 :param op_id: Account ID + ':' + Operation Index
345 :return: nothing
346 """
347
348 # Backward compatibility
349 if not self._is_account_type_HA(topic) or (self._is_account_type_HA(topic) and op_id is None):
350 return
351
352 # Get Account ID and Operation Index
353 account_id, op_index = self._get_account_and_op_HA(op_id)
354 db_table_name = self.topic2dbtable_dict.get(topic)
355
356 # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED)
357 # If the account exist, register the HA task.
358 # Update DB for HA tasks
359 q_filter = {'_id': account_id}
360 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
361 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
362 self.db.set_one(db_table_name,
363 q_filter=q_filter,
364 update_dict=update_dict,
365 fail_on_empty=False)
366 return
367
368 async def waitfor_related_HA(self, topic, op_type, op_id=None):
369 """
370 Wait for any pending related HA tasks
371 """
372
373 # Backward compatibility
374 if not (self._is_service_type_HA(topic) or self._is_account_type_HA(topic)) and (op_id is None):
375 return
376
377 # Get DB table name
378 db_table_name = self.topic2dbtable_dict.get(topic)
379
380 # Get instance ID
381 _id = self._get_instance_id_HA(topic, op_type, op_id)
382 _filter = {"_id": _id}
383 db_lcmop = self.db.get_one(db_table_name,
384 _filter,
385 fail_on_empty=False)
386 if not db_lcmop:
387 return
388
389 # Set DB _filter for querying any related process state
390 _filter = self._get_waitfor_filter_HA(db_lcmop, topic, op_type, op_id)
391
392 # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
393 timeout_wait_for_task = 3600 # Max time (seconds) to wait for a related task to finish
394 # interval_wait_for_task = 30 # A too long polling interval slows things down considerably
395 interval_wait_for_task = 10 # Interval in seconds for polling related tasks
396 time_left = timeout_wait_for_task
397 old_num_related_tasks = 0
398 while True:
399 # Get related tasks (operations within the same instance as this) which are
400 # still running (operationState='PROCESSING') and which were started before this task.
401 # In the case of op_type='ANY', get any related tasks with operationState='PROCESSING', ignore timestamps.
402 db_waitfor_related_task = self.db.get_list(db_table_name,
403 q_filter=_filter)
404 new_num_related_tasks = len(db_waitfor_related_task)
405 # If there are no related tasks, there is nothing to wait for, so return.
406 if not new_num_related_tasks:
407 return
408 # If number of pending related tasks have changed,
409 # update the 'detailed-status' field and log the change.
410 # Do NOT update the 'detailed-status' for SDNC-associated-to-VIM operations ('ANY').
411 if (op_type != 'ANY') and (new_num_related_tasks != old_num_related_tasks):
412 step = "Waiting for {} related tasks to be completed.".format(new_num_related_tasks)
413 update_dict = {}
414 q_filter = {'_id': _id}
415 # NS/NSI
416 if self._is_service_type_HA(topic):
417 update_dict = {'detailed-status': step}
418 # VIM/WIM/SDN
419 elif self._is_account_type_HA(topic):
420 _, op_index = self._get_account_and_op_HA(op_id)
421 update_dict = {'_admin.operations.{}.detailed-status'.format(op_index): step}
422 self.logger.debug("Task {} operation={} {}".format(topic, _id, step))
423 self.db.set_one(db_table_name,
424 q_filter=q_filter,
425 update_dict=update_dict,
426 fail_on_empty=False)
427 old_num_related_tasks = new_num_related_tasks
428 time_left -= interval_wait_for_task
429 if time_left < 0:
430 raise LcmException(
431 "Timeout ({}) when waiting for related tasks to be completed".format(
432 timeout_wait_for_task))
433 await asyncio.sleep(interval_wait_for_task)
434
435 return