1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 from uuid
import uuid4
18 from http
import HTTPStatus
20 from osm_common
.dbbase
import deep_update_rfc7396
, DbException
21 from osm_nbi
.validation
import validate_input
, ValidationError
, is_valid_uuid
22 from yaml
import safe_load
, YAMLError
24 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
27 class EngineException(Exception):
28 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
29 self
.http_code
= http_code
30 super(Exception, self
).__init
__(message
)
32 class NBIBadArgumentsException(Exception):
34 Bad argument values exception
37 def __init__(self
, message
: str = "", bad_args
: list = None):
38 Exception.__init
__(self
, message
)
39 self
.message
= message
40 self
.bad_args
= bad_args
43 return "{}, Bad arguments: {}".format(
44 self
.message
, self
.bad_args
47 def deep_get(target_dict
, key_list
):
49 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
50 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
51 :param target_dict: dictionary to be read
52 :param key_list: list of keys to read from target_dict
53 :return: The wanted value if exist, None otherwise
56 if not isinstance(target_dict
, dict) or key
not in target_dict
:
58 target_dict
= target_dict
[key
]
62 def detect_descriptor_usage(
63 descriptor
: dict, db_collection
: str, db
: object
65 """Detect the descriptor usage state.
68 descriptor (dict): VNF or NS Descriptor as dictionary
69 db_collection (str): collection name which is looked for in DB
70 db (object): name of db object
73 True if descriptor is in use else None
78 raise NBIBadArgumentsException(
79 "Argument is mandatory and can not be empty", "descriptor"
83 raise NBIBadArgumentsException("A valid DB object should be provided", "db")
86 "vnfds": ("vnfrs", "vnfd-id"),
87 "nsds": ("nsrs", "nsd-id"),
90 if db_collection
not in search_dict
:
91 raise NBIBadArgumentsException("db_collection should be equal to vnfds or nsds", "db_collection")
93 record_list
= db
.get_list(
94 search_dict
[db_collection
][0],
95 {search_dict
[db_collection
][1]: descriptor
["_id"]},
101 except (DbException
, KeyError, NBIBadArgumentsException
) as error
:
102 raise EngineException(f
"Error occured while detecting the descriptor usage: {error}")
105 def update_descriptor_usage_state(
106 descriptor
: dict, db_collection
: str, db
: object
108 """Updates the descriptor usage state.
111 descriptor (dict): VNF or NS Descriptor as dictionary
112 db_collection (str): collection name which is looked for in DB
113 db (object): name of db object
120 descriptor_update
= {
121 "_admin.usageState": "NOT_IN_USE",
124 if detect_descriptor_usage(descriptor
, db_collection
, db
):
125 descriptor_update
= {
126 "_admin.usageState": "IN_USE",
129 db
.set_one(db_collection
, {"_id": descriptor
["_id"]}, update_dict
=descriptor_update
)
131 except (DbException
, KeyError, NBIBadArgumentsException
) as error
:
132 raise EngineException(f
"Error occured while updating the descriptor usage state: {error}")
135 def get_iterable(input_var
):
137 Returns an iterable, in case input_var is None it just returns an empty tuple
138 :param input_var: can be a list, tuple or None
139 :return: input_var or () if it is None
141 if input_var
is None:
147 """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
149 for point
in v
.split("."):
150 filled
.append(point
.zfill(8))
154 def increment_ip_mac(ip_mac
, vm_index
=1):
155 if not isinstance(ip_mac
, str):
158 # try with ipv4 look for last dot
159 i
= ip_mac
.rfind(".")
162 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
163 # try with ipv6 or mac look for last colon. Operate in hex
164 i
= ip_mac
.rfind(":")
167 # format in hex, len can be 2 for mac or 4 for ipv6
168 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(
169 ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
177 # static variables for all instance classes
178 topic
= None # to_override
179 topic_msg
= None # to_override
180 quota_name
= None # to_override. If not provided topic will be used for quota_name
181 schema_new
= None # to_override
182 schema_edit
= None # to_override
183 multiproject
= True # True if this Topic can be shared by several projects. Then it contains _admin.projects_read
187 # Alternative ID Fields for some Topics
188 alt_id_field
= {"projects": "name", "users": "username", "roles": "name"}
190 def __init__(self
, db
, fs
, msg
, auth
):
194 self
.logger
= logging
.getLogger("nbi.engine")
198 def id_field(topic
, value
):
199 """Returns ID Field for given topic and field value"""
200 if topic
in BaseTopic
.alt_id_field
.keys() and not is_valid_uuid(value
):
201 return BaseTopic
.alt_id_field
[topic
]
206 def _remove_envelop(indata
=None):
211 def check_quota(self
, session
):
213 Check whether topic quota is exceeded by the given project
214 Used by relevant topics' 'new' function to decide whether or not creation of the new item should be allowed
215 :param session[project_id]: projects (tuple) for which quota should be checked
216 :param session[force]: boolean. If true, skip quota checking
219 DbException if project not found
220 ValidationError if quota exceeded in one of the projects
224 projects
= session
["project_id"]
225 for project
in projects
:
226 proj
= self
.auth
.get_project(project
)
228 quota_name
= self
.quota_name
or self
.topic
229 quota
= proj
.get("quotas", {}).get(quota_name
, self
.default_quota
)
230 count
= self
.db
.count(self
.topic
, {"_admin.projects_read": pid
})
233 raise ValidationError(
234 "quota ({}={}) exceeded for project {} ({})".format(
235 quota_name
, quota
, name
, pid
237 http_code
=HTTPStatus
.UNPROCESSABLE_ENTITY
,
240 def _validate_input_new(self
, input, force
=False):
242 Validates input user content for a new entry. It uses jsonschema. Some overrides will use pyangbind
243 :param input: user input content for the new topic
244 :param force: may be used for being more tolerant
245 :return: The same input content, or a changed version of it.
248 validate_input(input, self
.schema_new
)
251 def _validate_input_edit(self
, input, content
, force
=False):
253 Validates input user content for an edition. It uses jsonschema. Some overrides will use pyangbind
254 :param input: user input content for the new topic
255 :param force: may be used for being more tolerant
256 :return: The same input content, or a changed version of it.
259 validate_input(input, self
.schema_edit
)
263 def _get_project_filter(session
):
265 Generates a filter dictionary for querying database, so that only allowed items for this project can be
266 addressed. Only proprietary or public can be used. Allowed projects are at _admin.project_read/write. If it is
267 not present or contains ANY mean public.
268 :param session: contains:
269 project_id: project list this session has rights to access. Can be empty, one or several
270 set_project: items created will contain this project list
272 public: True, False or None
273 method: "list", "show", "write", "delete"
275 :return: dictionary with project filter
278 project_filter_n
= []
279 project_filter
= list(session
["project_id"])
281 if session
["method"] not in ("list", "delete"):
283 project_filter
.append("ANY")
284 elif session
["public"] is not None:
285 if session
["public"]:
286 project_filter
.append("ANY")
288 project_filter_n
.append("ANY")
290 if session
.get("PROJECT.ne"):
291 project_filter_n
.append(session
["PROJECT.ne"])
294 if session
["method"] in ("list", "show", "delete") or session
.get(
297 p_filter
["_admin.projects_read.cont"] = project_filter
299 p_filter
["_admin.projects_write.cont"] = project_filter
301 if session
["method"] in ("list", "show", "delete") or session
.get(
304 p_filter
["_admin.projects_read.ncont"] = project_filter_n
306 p_filter
["_admin.projects_write.ncont"] = project_filter_n
310 def check_conflict_on_new(self
, session
, indata
):
312 Check that the data to be inserted is valid
313 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
314 :param indata: data to be inserted
315 :return: None or raises EngineException
319 def check_conflict_on_edit(self
, session
, final_content
, edit_content
, _id
):
321 Check that the data to be edited/uploaded is valid
322 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
323 :param final_content: data once modified. This method may change it.
324 :param edit_content: incremental data that contains the modifications to apply
325 :param _id: internal _id
326 :return: final_content or raises EngineException
328 if not self
.multiproject
:
330 # Change public status
331 if session
["public"] is not None:
334 and "ANY" not in final_content
["_admin"]["projects_read"]
336 final_content
["_admin"]["projects_read"].append("ANY")
337 final_content
["_admin"]["projects_write"].clear()
339 not session
["public"]
340 and "ANY" in final_content
["_admin"]["projects_read"]
342 final_content
["_admin"]["projects_read"].remove("ANY")
344 # Change project status
345 if session
.get("set_project"):
346 for p
in session
["set_project"]:
347 if p
not in final_content
["_admin"]["projects_read"]:
348 final_content
["_admin"]["projects_read"].append(p
)
352 def check_unique_name(self
, session
, name
, _id
=None):
354 Check that the name is unique for this project
355 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
356 :param name: name to be checked
357 :param _id: If not None, ignore this entry that are going to change
358 :return: None or raises EngineException
360 if not self
.multiproject
:
363 _filter
= self
._get
_project
_filter
(session
)
364 _filter
["name"] = name
366 _filter
["_id.neq"] = _id
368 self
.topic
, _filter
, fail_on_empty
=False, fail_on_more
=False
370 raise EngineException(
371 "name '{}' already exists for {}".format(name
, self
.topic
),
376 def format_on_new(content
, project_id
=None, make_public
=False):
378 Modifies content descriptor to include _admin
379 :param content: descriptor to be modified
380 :param project_id: if included, it add project read/write permissions. Can be None or a list
381 :param make_public: if included it is generated as public for reading.
382 :return: op_id: operation id on asynchronous operation, None otherwise. In addition content is modified
385 if "_admin" not in content
:
386 content
["_admin"] = {}
387 if not content
["_admin"].get("created"):
388 content
["_admin"]["created"] = now
389 content
["_admin"]["modified"] = now
390 if not content
.get("_id"):
391 content
["_id"] = str(uuid4())
392 if project_id
is not None:
393 if not content
["_admin"].get("projects_read"):
394 content
["_admin"]["projects_read"] = list(project_id
)
396 content
["_admin"]["projects_read"].append("ANY")
397 if not content
["_admin"].get("projects_write"):
398 content
["_admin"]["projects_write"] = list(project_id
)
402 def format_on_edit(final_content
, edit_content
):
404 Modifies final_content to admin information upon edition
405 :param final_content: final content to be stored at database
406 :param edit_content: user requested update content
407 :return: operation id, if this edit implies an asynchronous operation; None otherwise
409 if final_content
.get("_admin"):
411 final_content
["_admin"]["modified"] = now
414 def _send_msg(self
, action
, content
, not_send_msg
=None):
415 if self
.topic_msg
and not_send_msg
is not False:
416 content
= content
.copy()
417 content
.pop("_admin", None)
418 if isinstance(not_send_msg
, list):
419 not_send_msg
.append((self
.topic_msg
, action
, content
))
421 self
.msg
.write(self
.topic_msg
, action
, content
)
423 def check_conflict_on_del(self
, session
, _id
, db_content
):
425 Check if deletion can be done because of dependencies if it is not force. To override
426 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
427 :param _id: internal _id
428 :param db_content: The database content of this item _id
429 :return: None if ok or raises EngineException with the conflict
434 def _update_input_with_kwargs(desc
, kwargs
, yaml_format
=False):
436 Update descriptor with the kwargs. It contains dot separated keys
437 :param desc: dictionary to be updated
438 :param kwargs: plain dictionary to be used for updating.
439 :param yaml_format: get kwargs values as yaml format.
440 :return: None, 'desc' is modified. It raises EngineException.
445 for k
, v
in kwargs
.items():
446 update_content
= desc
450 if kitem_old
is not None:
451 update_content
= update_content
[kitem_old
]
452 if isinstance(update_content
, dict):
454 if not isinstance(update_content
.get(kitem_old
), (dict, list)):
455 update_content
[kitem_old
] = {}
456 elif isinstance(update_content
, list):
457 # key must be an index of the list, must be integer
458 kitem_old
= int(kitem
)
459 # if index greater than list, extend the list
460 if kitem_old
>= len(update_content
):
461 update_content
+= [None] * (
462 kitem_old
- len(update_content
) + 1
464 if not isinstance(update_content
[kitem_old
], (dict, list)):
465 update_content
[kitem_old
] = {}
467 raise EngineException(
468 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(
473 del update_content
[kitem_old
]
475 update_content
[kitem_old
] = v
if not yaml_format
else safe_load(v
)
477 raise EngineException(
478 "Invalid query string '{}'. Descriptor does not contain '{}'".format(
483 raise EngineException(
484 "Invalid query string '{}'. Expected integer index list instead of '{}'".format(
489 raise EngineException(
490 "Invalid query string '{}'. Index '{}' out of range".format(
495 raise EngineException("Invalid query string '{}' yaml format".format(k
))
497 def sol005_projection(self
, data
):
498 # Projection was moved to child classes
501 def show(self
, session
, _id
, filter_q
=None, api_req
=False):
503 Get complete information on an topic
504 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
505 :param _id: server internal id
506 :param filter_q: dict: query parameter
507 :param api_req: True if this call is serving an external API request. False if serving internal request.
508 :return: dictionary, raise exception if not found.
510 if not self
.multiproject
:
513 filter_db
= self
._get
_project
_filter
(session
)
514 # To allow project&user addressing by name AS WELL AS _id
515 filter_db
[BaseTopic
.id_field(self
.topic
, _id
)] = _id
516 data
= self
.db
.get_one(self
.topic
, filter_db
)
518 # Only perform SOL005 projection if we are serving an external request
520 self
.sol005_projection(data
)
524 # TODO transform data for SOL005 URL requests
525 # TODO remove _admin if not admin
527 def get_file(self
, session
, _id
, path
=None, accept_header
=None):
529 Only implemented for descriptor topics. Return the file content of a descriptor
530 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
531 :param _id: Identity of the item to get content
532 :param path: artifact path or "$DESCRIPTOR" or None
533 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
534 :return: opened file or raises an exception
536 raise EngineException(
537 "Method get_file not valid for this topic", HTTPStatus
.INTERNAL_SERVER_ERROR
540 def list(self
, session
, filter_q
=None, api_req
=False):
542 Get a list of the topic that matches a filter
543 :param session: contains the used login username and working project
544 :param filter_q: filter of data to be applied
545 :param api_req: True if this call is serving an external API request. False if serving internal request.
546 :return: The list, it can be empty if no one match the filter.
550 if self
.multiproject
:
551 filter_q
.update(self
._get
_project
_filter
(session
))
553 # TODO transform data for SOL005 URL requests. Transform filtering
554 # TODO implement "field-type" query string SOL005
555 data
= self
.db
.get_list(self
.topic
, filter_q
)
557 # Only perform SOL005 projection if we are serving an external request
559 data
= [self
.sol005_projection(inst
) for inst
in data
]
563 def new(self
, rollback
, session
, indata
=None, kwargs
=None, headers
=None):
565 Creates a new entry into database.
566 :param rollback: list to append created items at database in case a rollback may to be done
567 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
568 :param indata: data to be inserted
569 :param kwargs: used to override the indata descriptor
570 :param headers: http request headers
572 _id: identity of the inserted data.
573 op_id: operation id if this is asynchronous, None otherwise
576 if self
.multiproject
:
577 self
.check_quota(session
)
579 content
= self
._remove
_envelop
(indata
)
581 # Override descriptor with query string kwargs
582 self
._update
_input
_with
_kwargs
(content
, kwargs
)
583 content
= self
._validate
_input
_new
(content
, force
=session
["force"])
584 self
.check_conflict_on_new(session
, content
)
585 op_id
= self
.format_on_new(
586 content
, project_id
=session
["project_id"], make_public
=session
["public"]
588 _id
= self
.db
.create(self
.topic
, content
)
589 rollback
.append({"topic": self
.topic
, "_id": _id
})
591 content
["op_id"] = op_id
592 self
._send
_msg
("created", content
)
594 except ValidationError
as e
:
595 raise EngineException(e
, HTTPStatus
.UNPROCESSABLE_ENTITY
)
597 def upload_content(self
, session
, _id
, indata
, kwargs
, headers
):
599 Only implemented for descriptor topics. Used for receiving content by chunks (with a transaction_id header
600 and/or gzip file. It will store and extract)
601 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
602 :param _id : the database id of entry to be updated
603 :param indata: http body request
604 :param kwargs: user query string to override parameters. NOT USED
605 :param headers: http request headers
606 :return: True package has is completely uploaded or False if partial content has been uplodaed.
607 Raise exception on error
609 raise EngineException(
610 "Method upload_content not valid for this topic",
611 HTTPStatus
.INTERNAL_SERVER_ERROR
,
614 def delete_list(self
, session
, filter_q
=None):
616 Delete a several entries of a topic. This is for internal usage and test only, not exposed to NBI API
617 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
618 :param filter_q: filter of data to be applied
619 :return: The deleted list, it can be empty if no one match the filter.
621 # TODO add admin to filter, validate rights
624 if self
.multiproject
:
625 filter_q
.update(self
._get
_project
_filter
(session
))
626 return self
.db
.del_list(self
.topic
, filter_q
)
628 def delete_extra(self
, session
, _id
, db_content
, not_send_msg
=None):
630 Delete other things apart from database entry of a item _id.
631 e.g.: other associated elements at database and other file system storage
632 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
633 :param _id: server internal id
634 :param db_content: The database content of the _id. It is already deleted when reached this method, but the
635 content is needed in same cases
636 :param not_send_msg: To not send message (False) or store content (list) instead
637 :return: None if ok or raises EngineException with the problem
641 def delete(self
, session
, _id
, dry_run
=False, not_send_msg
=None):
643 Delete item by its internal _id
644 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
645 :param _id: server internal id
646 :param dry_run: make checking but do not delete
647 :param not_send_msg: To not send message (False) or store content (list) instead
648 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
651 # To allow addressing projects and users by name AS WELL AS by _id
652 if not self
.multiproject
:
655 filter_q
= self
._get
_project
_filter
(session
)
656 filter_q
[self
.id_field(self
.topic
, _id
)] = _id
657 item_content
= self
.db
.get_one(self
.topic
, filter_q
)
659 self
.check_conflict_on_del(session
, _id
, item_content
)
663 if self
.multiproject
and session
["project_id"]:
664 # remove reference from project_read if there are more projects referencing it. If it last one,
665 # do not remove reference, but delete
666 other_projects_referencing
= next(
669 for p
in item_content
["_admin"]["projects_read"]
670 if p
not in session
["project_id"] and p
!= "ANY"
675 # check if there are projects referencing it (apart from ANY, that means, public)....
676 if other_projects_referencing
:
677 # remove references but not delete
679 "_admin.projects_read": session
["project_id"],
680 "_admin.projects_write": session
["project_id"],
683 self
.topic
, filter_q
, update_dict
=None, pull_list
=update_dict_pull
690 for p
in item_content
["_admin"]["projects_write"]
691 if p
== "ANY" or p
in session
["project_id"]
696 raise EngineException(
697 "You have not write permission to delete it",
698 http_code
=HTTPStatus
.UNAUTHORIZED
,
702 self
.db
.del_one(self
.topic
, filter_q
)
703 self
.delete_extra(session
, _id
, item_content
, not_send_msg
=not_send_msg
)
704 self
._send
_msg
("deleted", {"_id": _id
}, not_send_msg
=not_send_msg
)
707 def edit(self
, session
, _id
, indata
=None, kwargs
=None, content
=None):
709 Change the content of an item
710 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
711 :param _id: server internal id
712 :param indata: contains the changes to apply
713 :param kwargs: modifies indata
714 :param content: original content of the item
715 :return: op_id: operation id if this is processed asynchronously, None otherwise
717 indata
= self
._remove
_envelop
(indata
)
719 # Override descriptor with query string kwargs
721 self
._update
_input
_with
_kwargs
(indata
, kwargs
)
723 if indata
and session
.get("set_project"):
724 raise EngineException(
725 "Cannot edit content and set to project (query string SET_PROJECT) at same time",
726 HTTPStatus
.UNPROCESSABLE_ENTITY
,
728 # TODO self._check_edition(session, indata, _id, force)
730 content
= self
.show(session
, _id
)
731 indata
= self
._validate
_input
_edit
(indata
, content
, force
=session
["force"])
732 deep_update_rfc7396(content
, indata
)
734 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
735 _id
= content
.get("_id") or _id
737 content
= self
.check_conflict_on_edit(session
, content
, indata
, _id
=_id
)
738 op_id
= self
.format_on_edit(content
, indata
)
740 self
.db
.replace(self
.topic
, _id
, content
)
742 indata
.pop("_admin", None)
744 indata
["op_id"] = op_id
746 self
._send
_msg
("edited", indata
)
748 except ValidationError
as e
:
749 raise EngineException(e
, HTTPStatus
.UNPROCESSABLE_ENTITY
)