1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
21 from collections
import OrderedDict
24 from osm_lcm
.data_utils
.database
.database
import Database
25 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
27 # from osm_common.dbbase import DbException
29 __author__
= "Alfonso Tierno"
32 class LcmException(Exception):
36 class LcmExceptionNoMgmtIP(LcmException
):
40 class LcmExceptionExit(LcmException
):
45 """utility for compare dot separate versions. Fills with zeros to proper number comparison
46 package version will be something like 4.0.1.post11+gb3f024d.dirty-1. Where 4.0.1 is the git tag, postXX is the
47 number of commits from this tag, and +XXXXXXX is the git commit short id. Total length is 16 with until 999 commits
50 for point
in v
.split("."):
51 point
, _
, _
= point
.partition("+")
52 point
, _
, _
= point
.partition("-")
53 filled
.append(point
.zfill(20))
57 def deep_get(target_dict
, key_list
, default_value
=None):
59 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
60 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
61 :param target_dict: dictionary to be read
62 :param key_list: list of keys to read from target_dict
63 :param default_value: value to return if key is not present in the nested dictionary
64 :return: The wanted value if exist, None otherwise
67 if not isinstance(target_dict
, dict) or key
not in target_dict
:
69 target_dict
= target_dict
[key
]
73 def get_iterable(in_dict
, in_key
):
75 Similar to <dict>.get(), but if value is None, False, ..., An empty tuple is returned instead
76 :param in_dict: a dictionary
77 :param in_key: the key to look for at in_dict
78 :return: in_dict[in_var] or () if it is None or not present
80 if not in_dict
.get(in_key
):
82 return in_dict
[in_key
]
85 def check_juju_bundle_existence(vnfd
: dict) -> str:
86 """Checks the existence of juju-bundle in the descriptor
89 vnfd: Descriptor as a dictionary
92 Juju bundle if dictionary has juju-bundle else None
98 for kdu
in vnfd
.get("kdu", []):
99 return kdu
.get("juju-bundle", None)
102 def get_charm_artifact_path(base_folder
, charm_name
, charm_type
, revision
=str()) -> str:
103 """Finds the charm artifact paths
106 base_folder: Main folder which will be looked up for charm
107 charm_name: Charm name
108 charm_type: Type of charm native_charm, lxc_proxy_charm or k8s_proxy_charm
109 revision: vnf package revision number if there is
117 extension
= ":" + str(revision
)
119 if base_folder
.get("pkg-dir"):
120 artifact_path
= "{}/{}/{}/{}".format(
121 base_folder
["folder"] + extension
,
122 base_folder
["pkg-dir"],
124 if charm_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
130 # For SOL004 packages
131 artifact_path
= "{}/Scripts/{}/{}".format(
132 base_folder
["folder"] + extension
,
134 if charm_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
142 def populate_dict(target_dict
, key_list
, value
):
144 Update target_dict creating nested dictionaries with the key_list. Last key_list item is asigned the value.
145 Example target_dict={K: J}; key_list=[a,b,c]; target_dict will be {K: J, a: {b: {c: value}}}
146 :param target_dict: dictionary to be changed
147 :param key_list: list of keys to insert at target_dict
151 for key
in key_list
[0:-1]:
152 if key
not in target_dict
:
153 target_dict
[key
] = {}
154 target_dict
= target_dict
[key
]
155 target_dict
[key_list
[-1]] = value
159 def __init__(self
, msg
, logger
):
162 :param db: database connection
164 self
.db
= Database().instance
.db
166 self
.fs
= Filesystem().instance
.fs
169 def update_db_2(self
, item
, _id
, _desc
):
171 Updates database with _desc information. If success _desc is cleared
172 :param item: collection
173 :param _id: the _id to use in the query filter
174 :param _desc: dictionary with the content to update. Keys are dot separated keys for
175 :return: None. Exception is raised on error
180 _desc
["_admin.modified"] = now
181 self
.db
.set_one(item
, {"_id": _id
}, _desc
)
183 # except DbException as e:
184 # self.logger.error("Updating {} _id={} with '{}'. Error: {}".format(item, _id, _desc, e))
186 def check_charm_hash_changed(
187 self
, current_charm_path
: str, target_charm_path
: str
189 """Find the target charm has changed or not by checking the hash of
190 old and new charm packages
193 current_charm_path (str): Existing charm package artifact path
194 target_charm_path (str): Target charm package artifact path
197 True/False (bool): if charm has changed it returns True
200 # Check if the charm artifacts are available
201 if os
.path
.exists(self
.fs
.path
+ current_charm_path
) and os
.path
.exists(
202 self
.fs
.path
+ target_charm_path
204 # Compare the hash of charm folders
205 if checksumdir
.dirhash(
206 self
.fs
.path
+ current_charm_path
207 ) != checksumdir
.dirhash(self
.fs
.path
+ target_charm_path
):
215 "Charm artifact {} does not exist in the VNF Package".format(
216 self
.fs
.path
+ target_charm_path
221 class TaskRegistry(LcmBase
):
223 Implements a registry of task needed for later cancelation, look for related tasks that must be completed before
224 etc. It stores a four level dict
225 First level is the topic, ns, vim_account, sdn
226 Second level is the _id
227 Third level is the operation id
228 Fourth level is a descriptive name, the value is the task class
230 The HA (High-Availability) methods are used when more than one LCM instance is running.
231 To register the current task in the external DB, use LcmBase as base class, to be able
232 to reuse LcmBase.update_db_2()
233 The DB registry uses the following fields to distinguish a task:
234 - op_type: operation type ("nslcmops" or "nsilcmops")
235 - op_id: operation ID
236 - worker: the worker ID for this process
239 # NS/NSI: "services" VIM/WIM/SDN: "accounts"
240 topic_service_list
= ["ns", "nsi"]
241 topic_account_list
= ["vim", "wim", "sdn", "k8scluster", "vca", "k8srepo"]
243 # Map topic to InstanceID
244 topic2instid_dict
= {"ns": "nsInstanceId", "nsi": "netsliceInstanceId"}
246 # Map topic to DB table name
247 topic2dbtable_dict
= {
250 "vim": "vim_accounts",
251 "wim": "wim_accounts",
253 "k8scluster": "k8sclusters",
255 "k8srepo": "k8srepos",
258 def __init__(self
, worker_id
=None, logger
=None):
259 self
.task_registry
= {
269 self
.worker_id
= worker_id
270 self
.db
= Database().instance
.db
273 def register(self
, topic
, _id
, op_id
, task_name
, task
):
276 :param topic: Can be "ns", "nsi", "vim_account", "sdn"
277 :param _id: _id of the related item
278 :param op_id: id of the operation of the related item
279 :param task_name: Task descriptive name, as create, instantiate, terminate. Must be unique in this op_id
280 :param task: Task class
283 if _id
not in self
.task_registry
[topic
]:
284 self
.task_registry
[topic
][_id
] = OrderedDict()
285 if op_id
not in self
.task_registry
[topic
][_id
]:
286 self
.task_registry
[topic
][_id
][op_id
] = {task_name
: task
}
288 self
.task_registry
[topic
][_id
][op_id
][task_name
] = task
289 # print("registering task", topic, _id, op_id, task_name, task)
291 def remove(self
, topic
, _id
, op_id
, task_name
=None):
293 When task is ended, it should be removed. It ignores missing tasks. It also removes tasks done with this _id
294 :param topic: Can be "ns", "nsi", "vim_account", "sdn"
295 :param _id: _id of the related item
296 :param op_id: id of the operation of the related item
297 :param task_name: Task descriptive name. If none it deletes all tasks with same _id and op_id
300 if not self
.task_registry
[topic
].get(_id
):
303 self
.task_registry
[topic
][_id
].pop(op_id
, None)
304 elif self
.task_registry
[topic
][_id
].get(op_id
):
305 self
.task_registry
[topic
][_id
][op_id
].pop(task_name
, None)
308 for op_id_
in list(self
.task_registry
[topic
][_id
]):
309 for name
, task
in self
.task_registry
[topic
][_id
][op_id_
].items():
313 del self
.task_registry
[topic
][_id
][op_id_
]
314 if not self
.task_registry
[topic
][_id
]:
315 del self
.task_registry
[topic
][_id
]
317 def lookfor_related(self
, topic
, _id
, my_op_id
=None):
320 if _id
not in self
.task_registry
[topic
]:
321 return "", task_name_list
322 for op_id
in reversed(self
.task_registry
[topic
][_id
]):
324 if my_op_id
== op_id
:
325 my_op_id
= None # so that the next task is taken
328 for task_name
, task
in self
.task_registry
[topic
][_id
][op_id
].items():
330 task_list
.append(task
)
331 task_name_list
.append(task_name
)
333 return ", ".join(task_name_list
), task_list
335 def cancel(self
, topic
, _id
, target_op_id
=None, target_task_name
=None):
337 Cancel all active tasks of a concrete ns, nsi, vim_account, sdn identified for _id. If op_id is supplied only
338 this is cancelled, and the same with task_name
340 if not self
.task_registry
[topic
].get(_id
):
342 for op_id
in reversed(self
.task_registry
[topic
][_id
]):
343 if target_op_id
and target_op_id
!= op_id
:
345 for task_name
, task
in self
.task_registry
[topic
][_id
][op_id
].items():
346 if target_task_name
and target_task_name
!= task_name
:
351 # self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, op_id, task_name))
354 def _is_service_type_HA(self
, topic
):
355 return topic
in self
.topic_service_list
357 # Is topic VIM/WIM/SDN?
358 def _is_account_type_HA(self
, topic
):
359 return topic
in self
.topic_account_list
361 # Input: op_id, example: 'abc123def:3' Output: account_id='abc123def', op_index=3
362 def _get_account_and_op_HA(self
, op_id
):
365 account_id
, _
, op_index
= op_id
.rpartition(":")
366 if not account_id
or not op_index
.isdigit():
368 return account_id
, op_index
370 # Get '_id' for any topic and operation
371 def _get_instance_id_HA(self
, topic
, op_type
, op_id
):
373 # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
376 # NS/NSI: Use op_id as '_id'
377 elif self
._is
_service
_type
_HA
(topic
):
379 # VIM/SDN/WIM/K8SCLUSTER: Split op_id to get Account ID and Operation Index, use Account ID as '_id'
380 elif self
._is
_account
_type
_HA
(topic
):
381 _id
, _
= self
._get
_account
_and
_op
_HA
(op_id
)
384 # Set DB _filter for querying any related process state
385 def _get_waitfor_filter_HA(self
, db_lcmop
, topic
, op_type
, op_id
):
387 # Special operation 'ANY', for SDN account associated to a VIM account: op_id as '_id'
388 # In this special case, the timestamp is ignored
390 _filter
= {"operationState": "PROCESSING"}
391 # Otherwise, get 'startTime' timestamp for this operation
394 if self
._is
_service
_type
_HA
(topic
):
396 starttime_this_op
= db_lcmop
.get("startTime")
397 instance_id_label
= self
.topic2instid_dict
.get(topic
)
398 instance_id
= db_lcmop
.get(instance_id_label
)
400 instance_id_label
: instance_id
,
401 "operationState": "PROCESSING",
402 "startTime.lt": starttime_this_op
,
403 "_admin.modified.gt": now
404 - 2 * 3600, # ignore if tow hours of inactivity
406 # VIM/WIM/SDN/K8scluster
407 elif self
._is
_account
_type
_HA
(topic
):
408 _
, op_index
= self
._get
_account
_and
_op
_HA
(op_id
)
409 _ops
= db_lcmop
["_admin"]["operations"]
410 _this_op
= _ops
[int(op_index
)]
411 starttime_this_op
= _this_op
.get("startTime", None)
413 "operationState": "PROCESSING",
414 "startTime.lt": starttime_this_op
,
418 # Get DB params for any topic and operation
419 def _get_dbparams_for_lock_HA(self
, topic
, op_type
, op_id
):
423 if self
._is
_service
_type
_HA
(topic
):
424 q_filter
= {"_id": op_id
, "_admin.worker": None}
425 update_dict
= {"_admin.worker": self
.worker_id
}
427 elif self
._is
_account
_type
_HA
(topic
):
428 account_id
, op_index
= self
._get
_account
_and
_op
_HA
(op_id
)
431 if op_type
== "create":
432 # Creating a VIM/WIM/SDN account implies setting '_admin.current_operation' = 0
436 "_admin.operations.{}.worker".format(op_index
): None,
439 "_admin.operations.{}.worker".format(op_index
): self
.worker_id
,
440 "_admin.current_operation": op_index
,
442 return q_filter
, update_dict
444 def lock_HA(self
, topic
, op_type
, op_id
):
446 Lock a task, if possible, to indicate to the HA system that
447 the task will be executed in this LCM instance.
448 :param topic: Can be "ns", "nsi", "vim", "wim", or "sdn"
449 :param op_type: Operation type, can be "nslcmops", "nsilcmops", "create", "edit", "delete"
450 :param op_id: NS, NSI: Operation ID VIM,WIM,SDN: Account ID + ':' + Operation Index
452 True=lock was successful => execute the task (not registered by any other LCM instance)
453 False=lock failed => do NOT execute the task (already registered by another LCM instance)
455 HA tasks and backward compatibility:
456 If topic is "account type" (VIM/WIM/SDN) and op_id is None, 'op_id' was not provided by NBI.
457 This means that the running NBI instance does not support HA.
458 In such a case this method should always return True, to always execute
459 the task in this instance of LCM, without querying the DB.
462 # Backward compatibility for VIM/WIM/SDN/k8scluster without op_id
463 if self
._is
_account
_type
_HA
(topic
) and op_id
is None:
466 # Try to lock this task
467 db_table_name
= self
.topic2dbtable_dict
[topic
]
468 q_filter
, update_dict
= self
._get
_dbparams
_for
_lock
_HA
(topic
, op_type
, op_id
)
469 db_lock_task
= self
.db
.set_one(
472 update_dict
=update_dict
,
475 if db_lock_task
is None:
477 "Task {} operation={} already locked by another worker".format(
483 # Set 'detailed-status' to 'In progress' for VIM/WIM/SDN operations
484 if self
._is
_account
_type
_HA
(topic
):
485 detailed_status
= "In progress"
486 account_id
, op_index
= self
._get
_account
_and
_op
_HA
(op_id
)
487 q_filter
= {"_id": account_id
}
489 "_admin.operations.{}.detailed-status".format(
496 update_dict
=update_dict
,
501 def unlock_HA(self
, topic
, op_type
, op_id
, operationState
, detailed_status
):
503 Register a task, done when finished a VIM/WIM/SDN 'create' operation.
504 :param topic: Can be "vim", "wim", or "sdn"
505 :param op_type: Operation type, can be "create", "edit", "delete"
506 :param op_id: Account ID + ':' + Operation Index
510 # Backward compatibility
511 if not self
._is
_account
_type
_HA
(topic
) or not op_id
:
514 # Get Account ID and Operation Index
515 account_id
, op_index
= self
._get
_account
_and
_op
_HA
(op_id
)
516 db_table_name
= self
.topic2dbtable_dict
[topic
]
518 # If this is a 'delete' operation, the account may have been deleted (SUCCESS) or may still exist (FAILED)
519 # If the account exist, register the HA task.
520 # Update DB for HA tasks
521 q_filter
= {"_id": account_id
}
523 "_admin.operations.{}.operationState".format(op_index
): operationState
,
524 "_admin.operations.{}.detailed-status".format(op_index
): detailed_status
,
525 "_admin.operations.{}.worker".format(op_index
): None,
526 "_admin.current_operation": None,
531 update_dict
=update_dict
,
536 async def waitfor_related_HA(self
, topic
, op_type
, op_id
=None):
538 Wait for any pending related HA tasks
541 # Backward compatibility
543 self
._is
_service
_type
_HA
(topic
) or self
._is
_account
_type
_HA
(topic
)
544 ) and (op_id
is None):
548 db_table_name
= self
.topic2dbtable_dict
.get(topic
)
551 _id
= self
._get
_instance
_id
_HA
(topic
, op_type
, op_id
)
552 _filter
= {"_id": _id
}
553 db_lcmop
= self
.db
.get_one(db_table_name
, _filter
, fail_on_empty
=False)
557 # Set DB _filter for querying any related process state
558 _filter
= self
._get
_waitfor
_filter
_HA
(db_lcmop
, topic
, op_type
, op_id
)
560 # For HA, get list of tasks from DB instead of from dictionary (in-memory) variable.
561 timeout_wait_for_task
= (
562 3600 # Max time (seconds) to wait for a related task to finish
564 # interval_wait_for_task = 30 # A too long polling interval slows things down considerably
565 interval_wait_for_task
= 10 # Interval in seconds for polling related tasks
566 time_left
= timeout_wait_for_task
567 old_num_related_tasks
= 0
569 # Get related tasks (operations within the same instance as this) which are
570 # still running (operationState='PROCESSING') and which were started before this task.
571 # In the case of op_type='ANY', get any related tasks with operationState='PROCESSING', ignore timestamps.
572 db_waitfor_related_task
= self
.db
.get_list(db_table_name
, q_filter
=_filter
)
573 new_num_related_tasks
= len(db_waitfor_related_task
)
574 # If there are no related tasks, there is nothing to wait for, so return.
575 if not new_num_related_tasks
:
577 # If number of pending related tasks have changed,
578 # update the 'detailed-status' field and log the change.
579 # Do NOT update the 'detailed-status' for SDNC-associated-to-VIM operations ('ANY').
580 if (op_type
!= "ANY") and (new_num_related_tasks
!= old_num_related_tasks
):
581 step
= "Waiting for {} related tasks to be completed.".format(
582 new_num_related_tasks
585 q_filter
= {"_id": _id
}
587 if self
._is
_service
_type
_HA
(topic
):
589 "detailed-status": step
,
590 "queuePosition": new_num_related_tasks
,
593 elif self
._is
_account
_type
_HA
(topic
):
594 _
, op_index
= self
._get
_account
_and
_op
_HA
(op_id
)
596 "_admin.operations.{}.detailed-status".format(op_index
): step
598 self
.logger
.debug("Task {} operation={} {}".format(topic
, _id
, step
))
602 update_dict
=update_dict
,
605 old_num_related_tasks
= new_num_related_tasks
606 time_left
-= interval_wait_for_task
609 "Timeout ({}) when waiting for related tasks to be completed".format(
610 timeout_wait_for_task
613 await asyncio
.sleep(interval_wait_for_task
)