1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from uuid
import uuid4
18 from http
import HTTPStatus
20 from osm_common
.dbbase
import deep_update_rfc7396
, DbException
21 from osm_nbi
.validation
import validate_input
, ValidationError
, is_valid_uuid
22 from yaml
import safe_load
, YAMLError
24 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
27 class EngineException(Exception):
28 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
29 self
.http_code
= http_code
30 super(Exception, self
).__init
__(message
)
33 class NBIBadArgumentsException(Exception):
35 Bad argument values exception
38 def __init__(self
, message
: str = "", bad_args
: list = None):
39 Exception.__init
__(self
, message
)
40 self
.message
= message
41 self
.bad_args
= bad_args
44 return "{}, Bad arguments: {}".format(self
.message
, self
.bad_args
)
47 def deep_get(target_dict
, key_list
):
49 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
50 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
51 :param target_dict: dictionary to be read
52 :param key_list: list of keys to read from target_dict
53 :return: The wanted value if exist, None otherwise
56 if not isinstance(target_dict
, dict) or key
not in target_dict
:
58 target_dict
= target_dict
[key
]
62 def detect_descriptor_usage(descriptor
: dict, db_collection
: str, db
: object) -> bool:
63 """Detect the descriptor usage state.
66 descriptor (dict): VNF or NS Descriptor as dictionary
67 db_collection (str): collection name which is looked for in DB
68 db (object): name of db object
71 True if descriptor is in use else None
76 raise NBIBadArgumentsException(
77 "Argument is mandatory and can not be empty", "descriptor"
81 raise NBIBadArgumentsException("A valid DB object should be provided", "db")
84 "vnfds": ("vnfrs", "vnfd-id"),
85 "nsds": ("nsrs", "nsd-id"),
88 if db_collection
not in search_dict
:
89 raise NBIBadArgumentsException(
90 "db_collection should be equal to vnfds or nsds", "db_collection"
93 record_list
= db
.get_list(
94 search_dict
[db_collection
][0],
95 {search_dict
[db_collection
][1]: descriptor
["_id"]},
101 except (DbException
, KeyError, NBIBadArgumentsException
) as error
:
102 raise EngineException(
103 f
"Error occured while detecting the descriptor usage: {error}"
107 def update_descriptor_usage_state(
108 descriptor
: dict, db_collection
: str, db
: object
110 """Updates the descriptor usage state.
113 descriptor (dict): VNF or NS Descriptor as dictionary
114 db_collection (str): collection name which is looked for in DB
115 db (object): name of db object
122 descriptor_update
= {
123 "_admin.usageState": "NOT_IN_USE",
126 if detect_descriptor_usage(descriptor
, db_collection
, db
):
127 descriptor_update
= {
128 "_admin.usageState": "IN_USE",
132 db_collection
, {"_id": descriptor
["_id"]}, update_dict
=descriptor_update
135 except (DbException
, KeyError, NBIBadArgumentsException
) as error
:
136 raise EngineException(
137 f
"Error occured while updating the descriptor usage state: {error}"
141 def get_iterable(input_var
):
143 Returns an iterable, in case input_var is None it just returns an empty tuple
144 :param input_var: can be a list, tuple or None
145 :return: input_var or () if it is None
147 if input_var
is None:
153 """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
155 for point
in v
.split("."):
156 filled
.append(point
.zfill(8))
160 def increment_ip_mac(ip_mac
, vm_index
=1):
161 if not isinstance(ip_mac
, str):
164 # try with ipv4 look for last dot
165 i
= ip_mac
.rfind(".")
168 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
169 # try with ipv6 or mac look for last colon. Operate in hex
170 i
= ip_mac
.rfind(":")
173 # format in hex, len can be 2 for mac or 4 for ipv6
174 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
175 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
183 # static variables for all instance classes
184 topic
= None # to_override
185 topic_msg
= None # to_override
186 quota_name
= None # to_override. If not provided topic will be used for quota_name
187 schema_new
= None # to_override
188 schema_edit
= None # to_override
189 multiproject
= True # True if this Topic can be shared by several projects. Then it contains _admin.projects_read
193 # Alternative ID Fields for some Topics
194 alt_id_field
= {"projects": "name", "users": "username", "roles": "name"}
196 def __init__(self
, db
, fs
, msg
, auth
):
200 self
.logger
= logging
.getLogger("nbi.engine")
204 def id_field(topic
, value
):
205 """Returns ID Field for given topic and field value"""
206 if topic
in BaseTopic
.alt_id_field
.keys() and not is_valid_uuid(value
):
207 return BaseTopic
.alt_id_field
[topic
]
212 def _remove_envelop(indata
=None):
217 def check_quota(self
, session
):
219 Check whether topic quota is exceeded by the given project
220 Used by relevant topics' 'new' function to decide whether or not creation of the new item should be allowed
221 :param session[project_id]: projects (tuple) for which quota should be checked
222 :param session[force]: boolean. If true, skip quota checking
225 DbException if project not found
226 ValidationError if quota exceeded in one of the projects
230 projects
= session
["project_id"]
231 for project
in projects
:
232 proj
= self
.auth
.get_project(project
)
234 quota_name
= self
.quota_name
or self
.topic
235 quota
= proj
.get("quotas", {}).get(quota_name
, self
.default_quota
)
236 count
= self
.db
.count(self
.topic
, {"_admin.projects_read": pid
})
239 raise ValidationError(
240 "quota ({}={}) exceeded for project {} ({})".format(
241 quota_name
, quota
, name
, pid
243 http_code
=HTTPStatus
.UNPROCESSABLE_ENTITY
,
246 def _validate_input_new(self
, input, force
=False):
248 Validates input user content for a new entry. It uses jsonschema. Some overrides will use pyangbind
249 :param input: user input content for the new topic
250 :param force: may be used for being more tolerant
251 :return: The same input content, or a changed version of it.
254 validate_input(input, self
.schema_new
)
257 def _validate_input_edit(self
, input, content
, force
=False):
259 Validates input user content for an edition. It uses jsonschema. Some overrides will use pyangbind
260 :param input: user input content for the new topic
261 :param force: may be used for being more tolerant
262 :return: The same input content, or a changed version of it.
265 validate_input(input, self
.schema_edit
)
269 def _get_project_filter(session
):
271 Generates a filter dictionary for querying database, so that only allowed items for this project can be
272 addressed. Only proprietary or public can be used. Allowed projects are at _admin.project_read/write. If it is
273 not present or contains ANY mean public.
274 :param session: contains:
275 project_id: project list this session has rights to access. Can be empty, one or several
276 set_project: items created will contain this project list
278 public: True, False or None
279 method: "list", "show", "write", "delete"
281 :return: dictionary with project filter
284 project_filter_n
= []
285 project_filter
= list(session
["project_id"])
287 if session
["method"] not in ("list", "delete"):
289 project_filter
.append("ANY")
290 elif session
["public"] is not None:
291 if session
["public"]:
292 project_filter
.append("ANY")
294 project_filter_n
.append("ANY")
296 if session
.get("PROJECT.ne"):
297 project_filter_n
.append(session
["PROJECT.ne"])
300 if session
["method"] in ("list", "show", "delete") or session
.get(
303 p_filter
["_admin.projects_read.cont"] = project_filter
305 p_filter
["_admin.projects_write.cont"] = project_filter
307 if session
["method"] in ("list", "show", "delete") or session
.get(
310 p_filter
["_admin.projects_read.ncont"] = project_filter_n
312 p_filter
["_admin.projects_write.ncont"] = project_filter_n
316 def check_conflict_on_new(self
, session
, indata
):
318 Check that the data to be inserted is valid
319 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
320 :param indata: data to be inserted
321 :return: None or raises EngineException
325 def check_conflict_on_edit(self
, session
, final_content
, edit_content
, _id
):
327 Check that the data to be edited/uploaded is valid
328 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
329 :param final_content: data once modified. This method may change it.
330 :param edit_content: incremental data that contains the modifications to apply
331 :param _id: internal _id
332 :return: final_content or raises EngineException
334 if not self
.multiproject
:
336 # Change public status
337 if session
["public"] is not None:
340 and "ANY" not in final_content
["_admin"]["projects_read"]
342 final_content
["_admin"]["projects_read"].append("ANY")
343 final_content
["_admin"]["projects_write"].clear()
345 not session
["public"]
346 and "ANY" in final_content
["_admin"]["projects_read"]
348 final_content
["_admin"]["projects_read"].remove("ANY")
350 # Change project status
351 if session
.get("set_project"):
352 for p
in session
["set_project"]:
353 if p
not in final_content
["_admin"]["projects_read"]:
354 final_content
["_admin"]["projects_read"].append(p
)
358 def check_unique_name(self
, session
, name
, _id
=None):
360 Check that the name is unique for this project
361 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
362 :param name: name to be checked
363 :param _id: If not None, ignore this entry that are going to change
364 :return: None or raises EngineException
366 if not self
.multiproject
:
369 _filter
= self
._get
_project
_filter
(session
)
370 _filter
["name"] = name
372 _filter
["_id.neq"] = _id
374 self
.topic
, _filter
, fail_on_empty
=False, fail_on_more
=False
376 raise EngineException(
377 "name '{}' already exists for {}".format(name
, self
.topic
),
382 def format_on_new(content
, project_id
=None, make_public
=False):
384 Modifies content descriptor to include _admin
385 :param content: descriptor to be modified
386 :param project_id: if included, it add project read/write permissions. Can be None or a list
387 :param make_public: if included it is generated as public for reading.
388 :return: op_id: operation id on asynchronous operation, None otherwise. In addition content is modified
391 if "_admin" not in content
:
392 content
["_admin"] = {}
393 if not content
["_admin"].get("created"):
394 content
["_admin"]["created"] = now
395 content
["_admin"]["modified"] = now
396 if not content
.get("_id"):
397 content
["_id"] = str(uuid4())
398 if project_id
is not None:
399 if not content
["_admin"].get("projects_read"):
400 content
["_admin"]["projects_read"] = list(project_id
)
402 content
["_admin"]["projects_read"].append("ANY")
403 if not content
["_admin"].get("projects_write"):
404 content
["_admin"]["projects_write"] = list(project_id
)
408 def format_on_edit(final_content
, edit_content
):
410 Modifies final_content to admin information upon edition
411 :param final_content: final content to be stored at database
412 :param edit_content: user requested update content
413 :return: operation id, if this edit implies an asynchronous operation; None otherwise
415 if final_content
.get("_admin"):
417 final_content
["_admin"]["modified"] = now
420 def _send_msg(self
, action
, content
, not_send_msg
=None):
421 if self
.topic_msg
and not_send_msg
is not False:
422 content
= content
.copy()
423 content
.pop("_admin", None)
424 if isinstance(not_send_msg
, list):
425 not_send_msg
.append((self
.topic_msg
, action
, content
))
427 self
.msg
.write(self
.topic_msg
, action
, content
)
429 def check_conflict_on_del(self
, session
, _id
, db_content
):
431 Check if deletion can be done because of dependencies if it is not force. To override
432 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
433 :param _id: internal _id
434 :param db_content: The database content of this item _id
435 :return: None if ok or raises EngineException with the conflict
440 def _update_input_with_kwargs(desc
, kwargs
, yaml_format
=False):
442 Update descriptor with the kwargs. It contains dot separated keys
443 :param desc: dictionary to be updated
444 :param kwargs: plain dictionary to be used for updating.
445 :param yaml_format: get kwargs values as yaml format.
446 :return: None, 'desc' is modified. It raises EngineException.
451 for k
, v
in kwargs
.items():
452 update_content
= desc
456 if kitem_old
is not None:
457 update_content
= update_content
[kitem_old
]
458 if isinstance(update_content
, dict):
460 if not isinstance(update_content
.get(kitem_old
), (dict, list)):
461 update_content
[kitem_old
] = {}
462 elif isinstance(update_content
, list):
463 # key must be an index of the list, must be integer
464 kitem_old
= int(kitem
)
465 # if index greater than list, extend the list
466 if kitem_old
>= len(update_content
):
467 update_content
+= [None] * (
468 kitem_old
- len(update_content
) + 1
470 if not isinstance(update_content
[kitem_old
], (dict, list)):
471 update_content
[kitem_old
] = {}
473 raise EngineException(
474 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(
479 del update_content
[kitem_old
]
481 update_content
[kitem_old
] = v
if not yaml_format
else safe_load(v
)
483 raise EngineException(
484 "Invalid query string '{}'. Descriptor does not contain '{}'".format(
489 raise EngineException(
490 "Invalid query string '{}'. Expected integer index list instead of '{}'".format(
495 raise EngineException(
496 "Invalid query string '{}'. Index '{}' out of range".format(
501 raise EngineException("Invalid query string '{}' yaml format".format(k
))
503 def sol005_projection(self
, data
):
504 # Projection was moved to child classes
507 def show(self
, session
, _id
, filter_q
=None, api_req
=False):
509 Get complete information on an topic
510 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
511 :param _id: server internal id
512 :param filter_q: dict: query parameter
513 :param api_req: True if this call is serving an external API request. False if serving internal request.
514 :return: dictionary, raise exception if not found.
516 if not self
.multiproject
:
519 filter_db
= self
._get
_project
_filter
(session
)
520 # To allow project&user addressing by name AS WELL AS _id
521 filter_db
[BaseTopic
.id_field(self
.topic
, _id
)] = _id
522 data
= self
.db
.get_one(self
.topic
, filter_db
)
524 # Only perform SOL005 projection if we are serving an external request
526 self
.sol005_projection(data
)
530 # TODO transform data for SOL005 URL requests
531 # TODO remove _admin if not admin
533 def get_file(self
, session
, _id
, path
=None, accept_header
=None):
535 Only implemented for descriptor topics. Return the file content of a descriptor
536 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
537 :param _id: Identity of the item to get content
538 :param path: artifact path or "$DESCRIPTOR" or None
539 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
540 :return: opened file or raises an exception
542 raise EngineException(
543 "Method get_file not valid for this topic", HTTPStatus
.INTERNAL_SERVER_ERROR
546 def list(self
, session
, filter_q
=None, api_req
=False):
548 Get a list of the topic that matches a filter
549 :param session: contains the used login username and working project
550 :param filter_q: filter of data to be applied
551 :param api_req: True if this call is serving an external API request. False if serving internal request.
552 :return: The list, it can be empty if no one match the filter.
556 if self
.multiproject
:
557 filter_q
.update(self
._get
_project
_filter
(session
))
559 # TODO transform data for SOL005 URL requests. Transform filtering
560 # TODO implement "field-type" query string SOL005
561 data
= self
.db
.get_list(self
.topic
, filter_q
)
563 # Only perform SOL005 projection if we are serving an external request
565 data
= [self
.sol005_projection(inst
) for inst
in data
]
569 def new(self
, rollback
, session
, indata
=None, kwargs
=None, headers
=None):
571 Creates a new entry into database.
572 :param rollback: list to append created items at database in case a rollback may to be done
573 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
574 :param indata: data to be inserted
575 :param kwargs: used to override the indata descriptor
576 :param headers: http request headers
578 _id: identity of the inserted data.
579 op_id: operation id if this is asynchronous, None otherwise
582 if self
.multiproject
:
583 self
.check_quota(session
)
585 content
= self
._remove
_envelop
(indata
)
587 # Override descriptor with query string kwargs
588 self
._update
_input
_with
_kwargs
(content
, kwargs
)
589 content
= self
._validate
_input
_new
(content
, force
=session
["force"])
590 self
.check_conflict_on_new(session
, content
)
591 op_id
= self
.format_on_new(
592 content
, project_id
=session
["project_id"], make_public
=session
["public"]
594 _id
= self
.db
.create(self
.topic
, content
)
595 rollback
.append({"topic": self
.topic
, "_id": _id
})
597 content
["op_id"] = op_id
598 self
._send
_msg
("created", content
)
600 except ValidationError
as e
:
601 raise EngineException(e
, HTTPStatus
.UNPROCESSABLE_ENTITY
)
603 def upload_content(self
, session
, _id
, indata
, kwargs
, headers
):
605 Only implemented for descriptor topics. Used for receiving content by chunks (with a transaction_id header
606 and/or gzip file. It will store and extract)
607 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
608 :param _id : the database id of entry to be updated
609 :param indata: http body request
610 :param kwargs: user query string to override parameters. NOT USED
611 :param headers: http request headers
612 :return: True package has is completely uploaded or False if partial content has been uplodaed.
613 Raise exception on error
615 raise EngineException(
616 "Method upload_content not valid for this topic",
617 HTTPStatus
.INTERNAL_SERVER_ERROR
,
620 def delete_list(self
, session
, filter_q
=None):
622 Delete a several entries of a topic. This is for internal usage and test only, not exposed to NBI API
623 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
624 :param filter_q: filter of data to be applied
625 :return: The deleted list, it can be empty if no one match the filter.
627 # TODO add admin to filter, validate rights
630 if self
.multiproject
:
631 filter_q
.update(self
._get
_project
_filter
(session
))
632 return self
.db
.del_list(self
.topic
, filter_q
)
634 def delete_extra(self
, session
, _id
, db_content
, not_send_msg
=None):
636 Delete other things apart from database entry of a item _id.
637 e.g.: other associated elements at database and other file system storage
638 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
639 :param _id: server internal id
640 :param db_content: The database content of the _id. It is already deleted when reached this method, but the
641 content is needed in same cases
642 :param not_send_msg: To not send message (False) or store content (list) instead
643 :return: None if ok or raises EngineException with the problem
647 def delete(self
, session
, _id
, dry_run
=False, not_send_msg
=None):
649 Delete item by its internal _id
650 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
651 :param _id: server internal id
652 :param dry_run: make checking but do not delete
653 :param not_send_msg: To not send message (False) or store content (list) instead
654 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
657 # To allow addressing projects and users by name AS WELL AS by _id
658 if not self
.multiproject
:
661 filter_q
= self
._get
_project
_filter
(session
)
662 filter_q
[self
.id_field(self
.topic
, _id
)] = _id
663 item_content
= self
.db
.get_one(self
.topic
, filter_q
)
665 self
.check_conflict_on_del(session
, _id
, item_content
)
669 if self
.multiproject
and session
["project_id"]:
670 # remove reference from project_read if there are more projects referencing it. If it last one,
671 # do not remove reference, but delete
672 other_projects_referencing
= next(
675 for p
in item_content
["_admin"]["projects_read"]
676 if p
not in session
["project_id"] and p
!= "ANY"
681 # check if there are projects referencing it (apart from ANY, that means, public)....
682 if other_projects_referencing
:
683 # remove references but not delete
685 "_admin.projects_read": session
["project_id"],
686 "_admin.projects_write": session
["project_id"],
689 self
.topic
, filter_q
, update_dict
=None, pull_list
=update_dict_pull
696 for p
in item_content
["_admin"]["projects_write"]
697 if p
== "ANY" or p
in session
["project_id"]
702 raise EngineException(
703 "You have not write permission to delete it",
704 http_code
=HTTPStatus
.UNAUTHORIZED
,
708 self
.db
.del_one(self
.topic
, filter_q
)
709 self
.delete_extra(session
, _id
, item_content
, not_send_msg
=not_send_msg
)
710 self
._send
_msg
("deleted", {"_id": _id
}, not_send_msg
=not_send_msg
)
713 def edit(self
, session
, _id
, indata
=None, kwargs
=None, content
=None):
715 Change the content of an item
716 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
717 :param _id: server internal id
718 :param indata: contains the changes to apply
719 :param kwargs: modifies indata
720 :param content: original content of the item
721 :return: op_id: operation id if this is processed asynchronously, None otherwise
723 indata
= self
._remove
_envelop
(indata
)
725 # Override descriptor with query string kwargs
727 self
._update
_input
_with
_kwargs
(indata
, kwargs
)
729 if indata
and session
.get("set_project"):
730 raise EngineException(
731 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
732 HTTPStatus
.UNPROCESSABLE_ENTITY
,
734 # TODO self._check_edition(session, indata, _id, force)
736 content
= self
.show(session
, _id
)
737 indata
= self
._validate
_input
_edit
(indata
, content
, force
=session
["force"])
738 deep_update_rfc7396(content
, indata
)
740 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
741 _id
= content
.get("_id") or _id
743 content
= self
.check_conflict_on_edit(session
, content
, indata
, _id
=_id
)
744 op_id
= self
.format_on_edit(content
, indata
)
746 self
.db
.replace(self
.topic
, _id
, content
)
748 indata
.pop("_admin", None)
750 indata
["op_id"] = op_id
752 self
._send
_msg
("edited", indata
)
754 except ValidationError
as e
:
755 raise EngineException(e
, HTTPStatus
.UNPROCESSABLE_ENTITY
)