Code Coverage

Cobertura Coverage Report > osm_nbi >

base_topic.py

Trend

Classes100%
 
Lines74%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
base_topic.py
100%
1/1
74%
217/294
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
base_topic.py
74%
217/294
N/A

Source

osm_nbi/base_topic.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 1 import logging
17 1 from uuid import uuid4
18 1 from http import HTTPStatus
19 1 from time import time
20 1 from osm_common.dbbase import deep_update_rfc7396
21 1 from osm_nbi.validation import validate_input, ValidationError, is_valid_uuid
22 1 from yaml import safe_load, YAMLError
23
24 1 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
25
26
27 1 class EngineException(Exception):
28
29 1     def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
30 1         self.http_code = http_code
31 1         super(Exception, self).__init__(message)
32
33
34 1 def deep_get(target_dict, key_list):
35     """
36     Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
37     Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
38     :param target_dict: dictionary to be read
39     :param key_list: list of keys to read from  target_dict
40     :return: The wanted value if exist, None otherwise
41     """
42 1     for key in key_list:
43 1         if not isinstance(target_dict, dict) or key not in target_dict:
44 1             return None
45 0         target_dict = target_dict[key]
46 0     return target_dict
47
48
49 1 def get_iterable(input_var):
50     """
51     Returns an iterable, in case input_var is None it just returns an empty tuple
52     :param input_var: can be a list, tuple or None
53     :return: input_var or () if it is None
54     """
55 1     if input_var is None:
56 1         return ()
57 1     return input_var
58
59
60 1 def versiontuple(v):
61     """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
62 0     filled = []
63 0     for point in v.split("."):
64 0         filled.append(point.zfill(8))
65 0     return tuple(filled)
66
67
68 1 def increment_ip_mac(ip_mac, vm_index=1):
69 1     if not isinstance(ip_mac, str):
70 0         return ip_mac
71 1     try:
72         # try with ipv4 look for last dot
73 1         i = ip_mac.rfind(".")
74 1         if i > 0:
75 1             i += 1
76 1             return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
77         # try with ipv6 or mac look for last colon. Operate in hex
78 0         i = ip_mac.rfind(":")
79 0         if i > 0:
80 0             i += 1
81             # format in hex, len can be 2 for mac or 4 for ipv6
82 0             return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
83 0     except Exception:
84 0         pass
85 0     return None
86
87
88 1 class BaseTopic:
89     # static variables for all instance classes
90 1     topic = None        # to_override
91 1     topic_msg = None    # to_override
92 1     quota_name = None   # to_override. If not provided topic will be used for quota_name
93 1     schema_new = None   # to_override
94 1     schema_edit = None  # to_override
95 1     multiproject = True  # True if this Topic can be shared by several projects. Then it contains _admin.projects_read
96
97 1     default_quota = 500
98
99     # Alternative ID Fields for some Topics
100 1     alt_id_field = {
101         "projects": "name",
102         "users": "username",
103         "roles": "name"
104     }
105
106 1     def __init__(self, db, fs, msg, auth):
107 1         self.db = db
108 1         self.fs = fs
109 1         self.msg = msg
110 1         self.logger = logging.getLogger("nbi.engine")
111 1         self.auth = auth
112
113 1     @staticmethod
114     def id_field(topic, value):
115         """Returns ID Field for given topic and field value"""
116 1         if topic in BaseTopic.alt_id_field.keys() and not is_valid_uuid(value):
117 0             return BaseTopic.alt_id_field[topic]
118         else:
119 1             return "_id"
120
121 1     @staticmethod
122 1     def _remove_envelop(indata=None):
123 1         if not indata:
124 0             return {}
125 1         return indata
126
127 1     def check_quota(self, session):
128         """
129         Check whether topic quota is exceeded by the given project
130         Used by relevant topics' 'new' function to decide whether or not creation of the new item should be allowed
131         :param session[project_id]: projects (tuple) for which quota should be checked
132         :param session[force]: boolean. If true, skip quota checking
133         :return: None
134         :raise:
135             DbException if project not found
136             ValidationError if quota exceeded in one of the projects
137         """
138 0         if session["force"]:
139 0             return
140 0         projects = session["project_id"]
141 0         for project in projects:
142 0             proj = self.auth.get_project(project)
143 0             pid = proj["_id"]
144 0             quota_name = self.quota_name or self.topic
145 0             quota = proj.get("quotas", {}).get(quota_name, self.default_quota)
146 0             count = self.db.count(self.topic, {"_admin.projects_read": pid})
147 0             if count >= quota:
148 0                 name = proj["name"]
149 0                 raise ValidationError("quota ({}={}) exceeded for project {} ({})".format(quota_name, quota, name, pid),
150                                       http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
151
152 1     def _validate_input_new(self, input, force=False):
153         """
154         Validates input user content for a new entry. It uses jsonschema. Some overrides will use pyangbind
155         :param input: user input content for the new topic
156         :param force: may be used for being more tolerant
157         :return: The same input content, or a changed version of it.
158         """
159 1         if self.schema_new:
160 1             validate_input(input, self.schema_new)
161 1         return input
162
163 1     def _validate_input_edit(self, input, content, force=False):
164         """
165         Validates input user content for an edition. It uses jsonschema. Some overrides will use pyangbind
166         :param input: user input content for the new topic
167         :param force: may be used for being more tolerant
168         :return: The same input content, or a changed version of it.
169         """
170 1         if self.schema_edit:
171 1             validate_input(input, self.schema_edit)
172 1         return input
173
174 1     @staticmethod
175     def _get_project_filter(session):
176         """
177         Generates a filter dictionary for querying database, so that only allowed items for this project can be
178         addressed. Only proprietary or public can be used. Allowed projects are at _admin.project_read/write. If it is
179         not present or contains ANY mean public.
180         :param session: contains:
181             project_id: project list this session has rights to access. Can be empty, one or several
182             set_project: items created will contain this project list  
183             force: True or False
184             public: True, False or None
185             method: "list", "show", "write", "delete"
186             admin: True or False
187         :return: dictionary with project filter
188         """
189 1         p_filter = {}
190 1         project_filter_n = []
191 1         project_filter = list(session["project_id"])
192
193 1         if session["method"] not in ("list", "delete"):
194 1             if project_filter:
195 1                 project_filter.append("ANY")
196 1         elif session["public"] is not None:
197 0             if session["public"]:
198 0                 project_filter.append("ANY")
199             else:
200 0                 project_filter_n.append("ANY")
201
202 1         if session.get("PROJECT.ne"):
203 0             project_filter_n.append(session["PROJECT.ne"])
204
205 1         if project_filter:
206 1             if session["method"] in ("list", "show", "delete") or session.get("set_project"):
207 1                 p_filter["_admin.projects_read.cont"] = project_filter
208             else:
209 1                 p_filter["_admin.projects_write.cont"] = project_filter
210 1         if project_filter_n:
211 0             if session["method"] in ("list", "show", "delete") or session.get("set_project"):
212 0                 p_filter["_admin.projects_read.ncont"] = project_filter_n
213             else:
214 0                 p_filter["_admin.projects_write.ncont"] = project_filter_n
215
216 1         return p_filter
217
218 1     def check_conflict_on_new(self, session, indata):
219         """
220         Check that the data to be inserted is valid
221         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
222         :param indata: data to be inserted
223         :return: None or raises EngineException
224         """
225 0         pass
226
227 1     def check_conflict_on_edit(self, session, final_content, edit_content, _id):
228         """
229         Check that the data to be edited/uploaded is valid
230         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
231         :param final_content: data once modified. This method may change it.
232         :param edit_content: incremental data that contains the modifications to apply
233         :param _id: internal _id
234         :return: final_content or raises EngineException
235         """
236 1         if not self.multiproject:
237 0             return final_content
238         # Change public status
239 1         if session["public"] is not None:
240 1             if session["public"] and "ANY" not in final_content["_admin"]["projects_read"]:
241 0                 final_content["_admin"]["projects_read"].append("ANY")
242 0                 final_content["_admin"]["projects_write"].clear()
243 1             if not session["public"] and "ANY" in final_content["_admin"]["projects_read"]:
244 0                 final_content["_admin"]["projects_read"].remove("ANY")
245
246         # Change project status
247 1         if session.get("set_project"):
248 0             for p in session["set_project"]:
249 0                 if p not in final_content["_admin"]["projects_read"]:
250 0                     final_content["_admin"]["projects_read"].append(p)
251
252 1         return final_content
253
254 1     def check_unique_name(self, session, name, _id=None):
255         """
256         Check that the name is unique for this project
257         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
258         :param name: name to be checked
259         :param _id: If not None, ignore this entry that are going to change
260         :return: None or raises EngineException
261         """
262 1         if not self.multiproject:
263 0             _filter = {}
264         else:
265 1             _filter = self._get_project_filter(session)
266 1         _filter["name"] = name
267 1         if _id:
268 1             _filter["_id.neq"] = _id
269 1         if self.db.get_one(self.topic, _filter, fail_on_empty=False, fail_on_more=False):
270 1             raise EngineException("name '{}' already exists for {}".format(name, self.topic), HTTPStatus.CONFLICT)
271
272 1     @staticmethod
273 1     def format_on_new(content, project_id=None, make_public=False):
274         """
275         Modifies content descriptor to include _admin
276         :param content: descriptor to be modified
277         :param project_id: if included, it add project read/write permissions. Can be None or a list
278         :param make_public: if included it is generated as public for reading.
279         :return: op_id: operation id on asynchronous operation, None otherwise. In addition content is modified
280         """
281 1         now = time()
282 1         if "_admin" not in content:
283 1             content["_admin"] = {}
284 1         if not content["_admin"].get("created"):
285 1             content["_admin"]["created"] = now
286 1         content["_admin"]["modified"] = now
287 1         if not content.get("_id"):
288 1             content["_id"] = str(uuid4())
289 1         if project_id is not None:
290 1             if not content["_admin"].get("projects_read"):
291 1                 content["_admin"]["projects_read"] = list(project_id)
292 1                 if make_public:
293 0                     content["_admin"]["projects_read"].append("ANY")
294 1             if not content["_admin"].get("projects_write"):
295 1                 content["_admin"]["projects_write"] = list(project_id)
296 1         return None
297
298 1     @staticmethod
299     def format_on_edit(final_content, edit_content):
300         """
301         Modifies final_content to admin information upon edition
302         :param final_content: final content to be stored at database
303         :param edit_content: user requested update content
304         :return: operation id, if this edit implies an asynchronous operation; None otherwise
305         """
306 1         if final_content.get("_admin"):
307 1             now = time()
308 1             final_content["_admin"]["modified"] = now
309 1         return None
310
311 1     def _send_msg(self, action, content, not_send_msg=None):
312 1         if self.topic_msg and not_send_msg is not False:
313 1             content = content.copy()
314 1             content.pop("_admin", None)
315 1             if isinstance(not_send_msg, list):
316 0                 not_send_msg.append((self.topic_msg, action, content))
317             else:
318 1                 self.msg.write(self.topic_msg, action, content)
319
320 1     def check_conflict_on_del(self, session, _id, db_content):
321         """
322         Check if deletion can be done because of dependencies if it is not force. To override
323         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
324         :param _id: internal _id
325         :param db_content: The database content of this item _id
326         :return: None if ok or raises EngineException with the conflict
327         """
328 1         pass
329
330 1     @staticmethod
331 1     def _update_input_with_kwargs(desc, kwargs, yaml_format=False):
332         """
333         Update descriptor with the kwargs. It contains dot separated keys
334         :param desc: dictionary to be updated
335         :param kwargs: plain dictionary to be used for updating.
336         :param yaml_format: get kwargs values as yaml format.
337         :return: None, 'desc' is modified. It raises EngineException.
338         """
339 1         if not kwargs:
340 1             return
341 1         try:
342 1             for k, v in kwargs.items():
343 1                 update_content = desc
344 1                 kitem_old = None
345 1                 klist = k.split(".")
346 1                 for kitem in klist:
347 1                     if kitem_old is not None:
348 1                         update_content = update_content[kitem_old]
349 1                     if isinstance(update_content, dict):
350 1                         kitem_old = kitem
351 1                         if not isinstance(update_content.get(kitem_old), (dict, list)):
352 1                             update_content[kitem_old] = {}
353 1                     elif isinstance(update_content, list):
354                         # key must be an index of the list, must be integer
355 1                         kitem_old = int(kitem)
356                         # if index greater than list, extend the list
357 1                         if kitem_old >= len(update_content):
358 1                             update_content += [None] * (kitem_old - len(update_content) + 1)
359 1                         if not isinstance(update_content[kitem_old], (dict, list)):
360 1                             update_content[kitem_old] = {}
361                     else:
362 0                         raise EngineException(
363                             "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
364 1                 if v is None:
365 1                     del update_content[kitem_old]
366                 else:
367 1                     update_content[kitem_old] = v if not yaml_format else safe_load(v)
368 1         except KeyError:
369 0             raise EngineException(
370                 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
371 1         except ValueError:
372 1             raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
373                 k, kitem))
374 0         except IndexError:
375 0             raise EngineException(
376                 "Invalid query string '{}'. Index '{}' out of  range".format(k, kitem_old))
377 0         except YAMLError:
378 0             raise EngineException("Invalid query string '{}' yaml format".format(k))
379
380 1     def sol005_projection(self, data):
381         # Projection was moved to child classes
382 0         return data
383
384 1     def show(self, session, _id, api_req=False):
385         """
386         Get complete information on an topic
387         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
388         :param _id: server internal id
389         :param api_req: True if this call is serving an external API request. False if serving internal request.
390         :return: dictionary, raise exception if not found.
391         """
392 1         if not self.multiproject:
393 0             filter_db = {}
394         else:
395 1             filter_db = self._get_project_filter(session)
396         # To allow project&user addressing by name AS WELL AS _id
397 1         filter_db[BaseTopic.id_field(self.topic, _id)] = _id
398 1         data = self.db.get_one(self.topic, filter_db)
399
400         # Only perform SOL005 projection if we are serving an external request
401 1         if api_req:
402 0             self.sol005_projection(data)
403
404 1         return data
405         
406         # TODO transform data for SOL005 URL requests
407         # TODO remove _admin if not admin
408
409 1     def get_file(self, session, _id, path=None, accept_header=None):
410         """
411         Only implemented for descriptor topics. Return the file content of a descriptor
412         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
413         :param _id: Identity of the item to get content
414         :param path: artifact path or "$DESCRIPTOR" or None
415         :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
416         :return: opened file or raises an exception
417         """
418 0         raise EngineException("Method get_file not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
419
420 1     def list(self, session, filter_q=None, api_req=False):
421         """
422         Get a list of the topic that matches a filter
423         :param session: contains the used login username and working project
424         :param filter_q: filter of data to be applied
425         :param api_req: True if this call is serving an external API request. False if serving internal request.
426         :return: The list, it can be empty if no one match the filter.
427         """
428 0         if not filter_q:
429 0             filter_q = {}
430 0         if self.multiproject:
431 0             filter_q.update(self._get_project_filter(session))
432
433         # TODO transform data for SOL005 URL requests. Transform filtering
434         # TODO implement "field-type" query string SOL005
435 0         data = self.db.get_list(self.topic, filter_q)
436
437         # Only perform SOL005 projection if we are serving an external request
438 0         if api_req:
439 0             data = [self.sol005_projection(inst) for inst in data]
440                 
441 0         return data
442
443 1     def new(self, rollback, session, indata=None, kwargs=None, headers=None):
444         """
445         Creates a new entry into database.
446         :param rollback: list to append created items at database in case a rollback may to be done
447         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
448         :param indata: data to be inserted
449         :param kwargs: used to override the indata descriptor
450         :param headers: http request headers
451         :return: _id, op_id:
452             _id: identity of the inserted data.
453              op_id: operation id if this is asynchronous, None otherwise
454         """
455 1         try:
456 1             if self.multiproject:
457 1                 self.check_quota(session)
458
459 1             content = self._remove_envelop(indata)
460
461             # Override descriptor with query string kwargs
462 1             self._update_input_with_kwargs(content, kwargs)
463 1             content = self._validate_input_new(content, force=session["force"])
464 1             self.check_conflict_on_new(session, content)
465 1             op_id = self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
466 1             _id = self.db.create(self.topic, content)
467 1             rollback.append({"topic": self.topic, "_id": _id})
468 1             if op_id:
469 1                 content["op_id"] = op_id
470 1             self._send_msg("created", content)
471 1             return _id, op_id
472 1         except ValidationError as e:
473 0             raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
474
475 1     def upload_content(self, session, _id, indata, kwargs, headers):
476         """
477         Only implemented for descriptor topics.  Used for receiving content by chunks (with a transaction_id header
478         and/or gzip file. It will store and extract)
479         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
480         :param _id : the database id of entry to be updated
481         :param indata: http body request
482         :param kwargs: user query string to override parameters. NOT USED
483         :param headers:  http request headers
484         :return: True package has is completely uploaded or False if partial content has been uplodaed.
485             Raise exception on error
486         """
487 0         raise EngineException("Method upload_content not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
488
489 1     def delete_list(self, session, filter_q=None):
490         """
491         Delete a several entries of a topic. This is for internal usage and test only, not exposed to NBI API
492         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
493         :param filter_q: filter of data to be applied
494         :return: The deleted list, it can be empty if no one match the filter.
495         """
496         # TODO add admin to filter, validate rights
497 0         if not filter_q:
498 0             filter_q = {}
499 0         if self.multiproject:
500 0             filter_q.update(self._get_project_filter(session))
501 0         return self.db.del_list(self.topic, filter_q)
502
503 1     def delete_extra(self, session, _id, db_content, not_send_msg=None):
504         """
505         Delete other things apart from database entry of a item _id.
506         e.g.: other associated elements at database and other file system storage
507         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
508         :param _id: server internal id
509         :param db_content: The database content of the _id. It is already deleted when reached this method, but the
510             content is needed in same cases
511         :param not_send_msg: To not send message (False) or store content (list) instead
512         :return: None if ok or raises EngineException with the problem
513         """
514 0         pass
515
516 1     def delete(self, session, _id, dry_run=False, not_send_msg=None):
517         """
518         Delete item by its internal _id
519         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
520         :param _id: server internal id
521         :param dry_run: make checking but do not delete
522         :param not_send_msg: To not send message (False) or store content (list) instead
523         :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
524         """
525
526         # To allow addressing projects and users by name AS WELL AS by _id
527 1         if not self.multiproject:
528 0             filter_q = {}
529         else:
530 1             filter_q = self._get_project_filter(session)
531 1         filter_q[self.id_field(self.topic, _id)] = _id
532 1         item_content = self.db.get_one(self.topic, filter_q)
533
534 1         self.check_conflict_on_del(session, _id, item_content)
535 1         if dry_run:
536 0             return None
537         
538 1         if self.multiproject and session["project_id"]:
539             # remove reference from project_read if there are more projects referencing it. If it last one,
540             # do not remove reference, but delete
541 1             other_projects_referencing = next((p for p in item_content["_admin"]["projects_read"]
542                                                if p not in session["project_id"] and p != "ANY"), None)
543
544             # check if there are projects referencing it (apart from ANY, that means, public)....
545 1             if other_projects_referencing:
546                 # remove references but not delete
547 1                 update_dict_pull = {"_admin.projects_read": session["project_id"],
548                                     "_admin.projects_write": session["project_id"]}
549 1                 self.db.set_one(self.topic, filter_q, update_dict=None, pull_list=update_dict_pull)
550 1                 return None
551             else:
552 1                 can_write = next((p for p in item_content["_admin"]["projects_write"] if p == "ANY" or
553                                   p in session["project_id"]), None)
554 1                 if not can_write:
555 0                     raise EngineException("You have not write permission to delete it",
556                                           http_code=HTTPStatus.UNAUTHORIZED)
557
558         # delete
559 1         self.db.del_one(self.topic, filter_q)
560 1         self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg)
561 1         self._send_msg("deleted", {"_id": _id}, not_send_msg=not_send_msg)
562 1         return None
563
564 1     def edit(self, session, _id, indata=None, kwargs=None, content=None):
565         """
566         Change the content of an item
567         :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
568         :param _id: server internal id
569         :param indata: contains the changes to apply
570         :param kwargs: modifies indata
571         :param content: original content of the item
572         :return: op_id: operation id if this is processed asynchronously, None otherwise
573         """
574 1         indata = self._remove_envelop(indata)
575
576         # Override descriptor with query string kwargs
577 1         if kwargs:
578 0             self._update_input_with_kwargs(indata, kwargs)
579 1         try:
580 1             if indata and session.get("set_project"):
581 0                 raise EngineException("Cannot edit content and set to project (query string SET_PROJECT) at same time",
582                                       HTTPStatus.UNPROCESSABLE_ENTITY)
583             # TODO self._check_edition(session, indata, _id, force)
584 1             if not content:
585 1                 content = self.show(session, _id)
586 1             indata = self._validate_input_edit(indata, content, force=session["force"])
587 1             deep_update_rfc7396(content, indata)
588
589             # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
590 1             _id = content.get("_id") or _id
591
592 1             content = self.check_conflict_on_edit(session, content, indata, _id=_id)
593 1             op_id = self.format_on_edit(content, indata)
594
595 1             self.db.replace(self.topic, _id, content)
596
597 1             indata.pop("_admin", None)
598 1             if op_id:
599 1                 indata["op_id"] = op_id
600 1             indata["_id"] = _id
601 1             self._send_msg("edited", indata)
602 1             return op_id
603 1         except ValidationError as e:
604 1             raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)