5492a8f2798177411e320a2e2163ef9c74a62e8b
[osm/NBI.git] / osm_nbi / base_topic.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import logging
17 from uuid import uuid4
18 from http import HTTPStatus
19 from time import time
20 from osm_common.dbbase import deep_update_rfc7396
21 from osm_nbi.validation import validate_input, ValidationError, is_valid_uuid
22 from yaml import safe_load, YAMLError
23
24 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
25
26
27 class EngineException(Exception):
28
29 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
30 self.http_code = http_code
31 super(Exception, self).__init__(message)
32
33
34 def deep_get(target_dict, key_list):
35 """
36 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
37 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
38 :param target_dict: dictionary to be read
39 :param key_list: list of keys to read from target_dict
40 :return: The wanted value if exist, None otherwise
41 """
42 for key in key_list:
43 if not isinstance(target_dict, dict) or key not in target_dict:
44 return None
45 target_dict = target_dict[key]
46 return target_dict
47
48
49 def get_iterable(input_var):
50 """
51 Returns an iterable, in case input_var is None it just returns an empty tuple
52 :param input_var: can be a list, tuple or None
53 :return: input_var or () if it is None
54 """
55 if input_var is None:
56 return ()
57 return input_var
58
59
60 def versiontuple(v):
61 """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
62 filled = []
63 for point in v.split("."):
64 filled.append(point.zfill(8))
65 return tuple(filled)
66
67
68 class BaseTopic:
69 # static variables for all instance classes
70 topic = None # to_override
71 topic_msg = None # to_override
72 quota_name = None # to_override. If not provided topic will be used for quota_name
73 schema_new = None # to_override
74 schema_edit = None # to_override
75 multiproject = True # True if this Topic can be shared by several projects. Then it contains _admin.projects_read
76
77 default_quota = 500
78
79 # Alternative ID Fields for some Topics
80 alt_id_field = {
81 "projects": "name",
82 "users": "username",
83 "roles": "name"
84 }
85
86 def __init__(self, db, fs, msg, auth):
87 self.db = db
88 self.fs = fs
89 self.msg = msg
90 self.logger = logging.getLogger("nbi.engine")
91 self.auth = auth
92
93 @staticmethod
94 def id_field(topic, value):
95 """Returns ID Field for given topic and field value"""
96 if topic in BaseTopic.alt_id_field.keys() and not is_valid_uuid(value):
97 return BaseTopic.alt_id_field[topic]
98 else:
99 return "_id"
100
101 @staticmethod
102 def _remove_envelop(indata=None):
103 if not indata:
104 return {}
105 return indata
106
107 def check_quota(self, session):
108 """
109 Check whether topic quota is exceeded by the given project
110 Used by relevant topics' 'new' function to decide whether or not creation of the new item should be allowed
111 :param session[project_id]: projects (tuple) for which quota should be checked
112 :param session[force]: boolean. If true, skip quota checking
113 :return: None
114 :raise:
115 DbException if project not found
116 ValidationError if quota exceeded in one of the projects
117 """
118 if session["force"]:
119 return
120 projects = session["project_id"]
121 for project in projects:
122 proj = self.auth.get_project(project)
123 pid = proj["_id"]
124 quota_name = self.quota_name or self.topic
125 quota = proj.get("quotas", {}).get(quota_name, self.default_quota)
126 count = self.db.count(self.topic, {"_admin.projects_read": pid})
127 if count >= quota:
128 name = proj["name"]
129 raise ValidationError("quota ({}={}) exceeded for project {} ({})".format(quota_name, quota, name, pid),
130 http_code=HTTPStatus.UNAUTHORIZED)
131
132 def _validate_input_new(self, input, force=False):
133 """
134 Validates input user content for a new entry. It uses jsonschema. Some overrides will use pyangbind
135 :param input: user input content for the new topic
136 :param force: may be used for being more tolerant
137 :return: The same input content, or a changed version of it.
138 """
139 if self.schema_new:
140 validate_input(input, self.schema_new)
141 return input
142
143 def _validate_input_edit(self, input, content, force=False):
144 """
145 Validates input user content for an edition. It uses jsonschema. Some overrides will use pyangbind
146 :param input: user input content for the new topic
147 :param force: may be used for being more tolerant
148 :return: The same input content, or a changed version of it.
149 """
150 if self.schema_edit:
151 validate_input(input, self.schema_edit)
152 return input
153
154 @staticmethod
155 def _get_project_filter(session):
156 """
157 Generates a filter dictionary for querying database, so that only allowed items for this project can be
158 addressed. Only proprietary or public can be used. Allowed projects are at _admin.project_read/write. If it is
159 not present or contains ANY mean public.
160 :param session: contains:
161 project_id: project list this session has rights to access. Can be empty, one or several
162 set_project: items created will contain this project list
163 force: True or False
164 public: True, False or None
165 method: "list", "show", "write", "delete"
166 admin: True or False
167 :return: dictionary with project filter
168 """
169 p_filter = {}
170 project_filter_n = []
171 project_filter = list(session["project_id"])
172
173 if session["method"] not in ("list", "delete"):
174 if project_filter:
175 project_filter.append("ANY")
176 elif session["public"] is not None:
177 if session["public"]:
178 project_filter.append("ANY")
179 else:
180 project_filter_n.append("ANY")
181
182 if session.get("PROJECT.ne"):
183 project_filter_n.append(session["PROJECT.ne"])
184
185 if project_filter:
186 if session["method"] in ("list", "show", "delete") or session.get("set_project"):
187 p_filter["_admin.projects_read.cont"] = project_filter
188 else:
189 p_filter["_admin.projects_write.cont"] = project_filter
190 if project_filter_n:
191 if session["method"] in ("list", "show", "delete") or session.get("set_project"):
192 p_filter["_admin.projects_read.ncont"] = project_filter_n
193 else:
194 p_filter["_admin.projects_write.ncont"] = project_filter_n
195
196 return p_filter
197
198 def check_conflict_on_new(self, session, indata):
199 """
200 Check that the data to be inserted is valid
201 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
202 :param indata: data to be inserted
203 :return: None or raises EngineException
204 """
205 pass
206
207 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
208 """
209 Check that the data to be edited/uploaded is valid
210 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
211 :param final_content: data once modified. This method may change it.
212 :param edit_content: incremental data that contains the modifications to apply
213 :param _id: internal _id
214 :return: None or raises EngineException
215 """
216 if not self.multiproject:
217 return
218 # Change public status
219 if session["public"] is not None:
220 if session["public"] and "ANY" not in final_content["_admin"]["projects_read"]:
221 final_content["_admin"]["projects_read"].append("ANY")
222 final_content["_admin"]["projects_write"].clear()
223 if not session["public"] and "ANY" in final_content["_admin"]["projects_read"]:
224 final_content["_admin"]["projects_read"].remove("ANY")
225
226 # Change project status
227 if session.get("set_project"):
228 for p in session["set_project"]:
229 if p not in final_content["_admin"]["projects_read"]:
230 final_content["_admin"]["projects_read"].append(p)
231
232 def check_unique_name(self, session, name, _id=None):
233 """
234 Check that the name is unique for this project
235 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
236 :param name: name to be checked
237 :param _id: If not None, ignore this entry that are going to change
238 :return: None or raises EngineException
239 """
240 if not self.multiproject:
241 _filter = {}
242 else:
243 _filter = self._get_project_filter(session)
244 _filter["name"] = name
245 if _id:
246 _filter["_id.neq"] = _id
247 if self.db.get_one(self.topic, _filter, fail_on_empty=False, fail_on_more=False):
248 raise EngineException("name '{}' already exists for {}".format(name, self.topic), HTTPStatus.CONFLICT)
249
250 @staticmethod
251 def format_on_new(content, project_id=None, make_public=False):
252 """
253 Modifies content descriptor to include _admin
254 :param content: descriptor to be modified
255 :param project_id: if included, it add project read/write permissions. Can be None or a list
256 :param make_public: if included it is generated as public for reading.
257 :return: op_id: operation id on asynchronous operation, None otherwise. In addition content is modified
258 """
259 now = time()
260 if "_admin" not in content:
261 content["_admin"] = {}
262 if not content["_admin"].get("created"):
263 content["_admin"]["created"] = now
264 content["_admin"]["modified"] = now
265 if not content.get("_id"):
266 content["_id"] = str(uuid4())
267 if project_id is not None:
268 if not content["_admin"].get("projects_read"):
269 content["_admin"]["projects_read"] = list(project_id)
270 if make_public:
271 content["_admin"]["projects_read"].append("ANY")
272 if not content["_admin"].get("projects_write"):
273 content["_admin"]["projects_write"] = list(project_id)
274 return None
275
276 @staticmethod
277 def format_on_edit(final_content, edit_content):
278 """
279 Modifies final_content to admin information upon edition
280 :param final_content: final content to be stored at database
281 :param edit_content: user requested update content
282 :return: operation id, if this edit implies an asynchronous operation; None otherwise
283 """
284 if final_content.get("_admin"):
285 now = time()
286 final_content["_admin"]["modified"] = now
287 return None
288
289 def _send_msg(self, action, content, not_send_msg=None):
290 if self.topic_msg and not_send_msg is not False:
291 content.pop("_admin", None)
292 if isinstance(not_send_msg, list):
293 not_send_msg.append((self.topic_msg, action, content))
294 else:
295 self.msg.write(self.topic_msg, action, content)
296
297 def check_conflict_on_del(self, session, _id, db_content):
298 """
299 Check if deletion can be done because of dependencies if it is not force. To override
300 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
301 :param _id: internal _id
302 :param db_content: The database content of this item _id
303 :return: None if ok or raises EngineException with the conflict
304 """
305 pass
306
307 @staticmethod
308 def _update_input_with_kwargs(desc, kwargs, yaml_format=False):
309 """
310 Update descriptor with the kwargs. It contains dot separated keys
311 :param desc: dictionary to be updated
312 :param kwargs: plain dictionary to be used for updating.
313 :param yaml_format: get kwargs values as yaml format.
314 :return: None, 'desc' is modified. It raises EngineException.
315 """
316 if not kwargs:
317 return
318 try:
319 for k, v in kwargs.items():
320 update_content = desc
321 kitem_old = None
322 klist = k.split(".")
323 for kitem in klist:
324 if kitem_old is not None:
325 update_content = update_content[kitem_old]
326 if isinstance(update_content, dict):
327 kitem_old = kitem
328 if not isinstance(update_content.get(kitem_old), (dict, list)):
329 update_content[kitem_old] = {}
330 elif isinstance(update_content, list):
331 # key must be an index of the list, must be integer
332 kitem_old = int(kitem)
333 # if index greater than list, extend the list
334 if kitem_old >= len(update_content):
335 update_content += [None] * (kitem_old - len(update_content) + 1)
336 if not isinstance(update_content[kitem_old], (dict, list)):
337 update_content[kitem_old] = {}
338 else:
339 raise EngineException(
340 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
341 if v is None:
342 del update_content[kitem_old]
343 else:
344 update_content[kitem_old] = v if not yaml_format else safe_load(v)
345 except KeyError:
346 raise EngineException(
347 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
348 except ValueError:
349 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
350 k, kitem))
351 except IndexError:
352 raise EngineException(
353 "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
354 except YAMLError:
355 raise EngineException("Invalid query string '{}' yaml format".format(k))
356
357 def sol005_projection(self, data):
358 # Projection was moved to child classes
359 return data
360
361 def show(self, session, _id, api_req=False):
362 """
363 Get complete information on an topic
364 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
365 :param _id: server internal id
366 :param api_req: True if this call is serving an external API request. False if serving internal request.
367 :return: dictionary, raise exception if not found.
368 """
369 if not self.multiproject:
370 filter_db = {}
371 else:
372 filter_db = self._get_project_filter(session)
373 # To allow project&user addressing by name AS WELL AS _id
374 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
375 data = self.db.get_one(self.topic, filter_db)
376
377 # Only perform SOL005 projection if we are serving an external request
378 if api_req:
379 self.sol005_projection(data)
380
381 return data
382
383 # TODO transform data for SOL005 URL requests
384 # TODO remove _admin if not admin
385
386 def get_file(self, session, _id, path=None, accept_header=None):
387 """
388 Only implemented for descriptor topics. Return the file content of a descriptor
389 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
390 :param _id: Identity of the item to get content
391 :param path: artifact path or "$DESCRIPTOR" or None
392 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
393 :return: opened file or raises an exception
394 """
395 raise EngineException("Method get_file not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
396
397 def list(self, session, filter_q=None, api_req=False):
398 """
399 Get a list of the topic that matches a filter
400 :param session: contains the used login username and working project
401 :param filter_q: filter of data to be applied
402 :param api_req: True if this call is serving an external API request. False if serving internal request.
403 :return: The list, it can be empty if no one match the filter.
404 """
405 if not filter_q:
406 filter_q = {}
407 if self.multiproject:
408 filter_q.update(self._get_project_filter(session))
409
410 # TODO transform data for SOL005 URL requests. Transform filtering
411 # TODO implement "field-type" query string SOL005
412 data = self.db.get_list(self.topic, filter_q)
413
414 # Only perform SOL005 projection if we are serving an external request
415 if api_req:
416 data = [self.sol005_projection(inst) for inst in data]
417
418 return data
419
420 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
421 """
422 Creates a new entry into database.
423 :param rollback: list to append created items at database in case a rollback may to be done
424 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
425 :param indata: data to be inserted
426 :param kwargs: used to override the indata descriptor
427 :param headers: http request headers
428 :return: _id, op_id:
429 _id: identity of the inserted data.
430 op_id: operation id if this is asynchronous, None otherwise
431 """
432 try:
433 if self.multiproject:
434 self.check_quota(session)
435
436 content = self._remove_envelop(indata)
437
438 # Override descriptor with query string kwargs
439 self._update_input_with_kwargs(content, kwargs)
440 content = self._validate_input_new(content, force=session["force"])
441 self.check_conflict_on_new(session, content)
442 op_id = self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
443 _id = self.db.create(self.topic, content)
444 rollback.append({"topic": self.topic, "_id": _id})
445 if op_id:
446 content["op_id"] = op_id
447 self._send_msg("created", content)
448 return _id, op_id
449 except ValidationError as e:
450 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
451
452 def upload_content(self, session, _id, indata, kwargs, headers):
453 """
454 Only implemented for descriptor topics. Used for receiving content by chunks (with a transaction_id header
455 and/or gzip file. It will store and extract)
456 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
457 :param _id : the database id of entry to be updated
458 :param indata: http body request
459 :param kwargs: user query string to override parameters. NOT USED
460 :param headers: http request headers
461 :return: True package has is completely uploaded or False if partial content has been uplodaed.
462 Raise exception on error
463 """
464 raise EngineException("Method upload_content not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
465
466 def delete_list(self, session, filter_q=None):
467 """
468 Delete a several entries of a topic. This is for internal usage and test only, not exposed to NBI API
469 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
470 :param filter_q: filter of data to be applied
471 :return: The deleted list, it can be empty if no one match the filter.
472 """
473 # TODO add admin to filter, validate rights
474 if not filter_q:
475 filter_q = {}
476 if self.multiproject:
477 filter_q.update(self._get_project_filter(session))
478 return self.db.del_list(self.topic, filter_q)
479
480 def delete_extra(self, session, _id, db_content, not_send_msg=None):
481 """
482 Delete other things apart from database entry of a item _id.
483 e.g.: other associated elements at database and other file system storage
484 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
485 :param _id: server internal id
486 :param db_content: The database content of the _id. It is already deleted when reached this method, but the
487 content is needed in same cases
488 :param not_send_msg: To not send message (False) or store content (list) instead
489 :return: None if ok or raises EngineException with the problem
490 """
491 pass
492
493 def delete(self, session, _id, dry_run=False, not_send_msg=None):
494 """
495 Delete item by its internal _id
496 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
497 :param _id: server internal id
498 :param dry_run: make checking but do not delete
499 :param not_send_msg: To not send message (False) or store content (list) instead
500 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
501 """
502
503 # To allow addressing projects and users by name AS WELL AS by _id
504 if not self.multiproject:
505 filter_q = {}
506 else:
507 filter_q = self._get_project_filter(session)
508 filter_q[self.id_field(self.topic, _id)] = _id
509 item_content = self.db.get_one(self.topic, filter_q)
510
511 self.check_conflict_on_del(session, _id, item_content)
512 if dry_run:
513 return None
514
515 if self.multiproject and session["project_id"]:
516 # remove reference from project_read if there are more projects referencing it. If it last one,
517 # do not remove reference, but delete
518 other_projects_referencing = next((p for p in item_content["_admin"]["projects_read"]
519 if p not in session["project_id"] and p != "ANY"), None)
520
521 # check if there are projects referencing it (apart from ANY, that means, public)....
522 if other_projects_referencing:
523 # remove references but not delete
524 update_dict_pull = {"_admin.projects_read": session["project_id"],
525 "_admin.projects_write": session["project_id"]}
526 self.db.set_one(self.topic, filter_q, update_dict=None, pull_list=update_dict_pull)
527 return None
528 else:
529 can_write = next((p for p in item_content["_admin"]["projects_write"] if p == "ANY" or
530 p in session["project_id"]), None)
531 if not can_write:
532 raise EngineException("You have not write permission to delete it",
533 http_code=HTTPStatus.UNAUTHORIZED)
534
535 # delete
536 self.db.del_one(self.topic, filter_q)
537 self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg)
538 self._send_msg("deleted", {"_id": _id}, not_send_msg=not_send_msg)
539 return None
540
541 def edit(self, session, _id, indata=None, kwargs=None, content=None):
542 """
543 Change the content of an item
544 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
545 :param _id: server internal id
546 :param indata: contains the changes to apply
547 :param kwargs: modifies indata
548 :param content: original content of the item
549 :return: op_id: operation id if this is processed asynchronously, None otherwise
550 """
551 indata = self._remove_envelop(indata)
552
553 # Override descriptor with query string kwargs
554 if kwargs:
555 self._update_input_with_kwargs(indata, kwargs)
556 try:
557 if indata and session.get("set_project"):
558 raise EngineException("Cannot edit content and set to project (query string SET_PROJECT) at same time",
559 HTTPStatus.UNPROCESSABLE_ENTITY)
560
561 # TODO self._check_edition(session, indata, _id, force)
562 if not content:
563 content = self.show(session, _id)
564
565 indata = self._validate_input_edit(indata, content, force=session["force"])
566
567 deep_update_rfc7396(content, indata)
568
569 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
570 _id = content.get("_id") or _id
571
572 self.check_conflict_on_edit(session, content, indata, _id=_id)
573 op_id = self.format_on_edit(content, indata)
574
575 self.db.replace(self.topic, _id, content)
576
577 indata.pop("_admin", None)
578 if op_id:
579 indata["op_id"] = op_id
580 indata["_id"] = _id
581 self._send_msg("edited", indata)
582 return op_id
583 except ValidationError as e:
584 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)