228e29dd7a4a24b0c0fd421a086cd4b25084fd4d
[osm/NBI.git] / osm_nbi / base_topic.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import logging
17 from uuid import uuid4
18 from http import HTTPStatus
19 from time import time
20 from osm_common.dbbase import deep_update_rfc7396
21 from osm_nbi.validation import validate_input, ValidationError, is_valid_uuid
22 from yaml import safe_load, YAMLError
23
24 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
25
26
27 class EngineException(Exception):
28
29 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
30 self.http_code = http_code
31 super(Exception, self).__init__(message)
32
33
34 def deep_get(target_dict, key_list):
35 """
36 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
37 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
38 :param target_dict: dictionary to be read
39 :param key_list: list of keys to read from target_dict
40 :return: The wanted value if exist, None otherwise
41 """
42 for key in key_list:
43 if not isinstance(target_dict, dict) or key not in target_dict:
44 return None
45 target_dict = target_dict[key]
46 return target_dict
47
48
49 def get_iterable(input_var):
50 """
51 Returns an iterable, in case input_var is None it just returns an empty tuple
52 :param input_var: can be a list, tuple or None
53 :return: input_var or () if it is None
54 """
55 if input_var is None:
56 return ()
57 return input_var
58
59
60 def versiontuple(v):
61 """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
62 filled = []
63 for point in v.split("."):
64 filled.append(point.zfill(8))
65 return tuple(filled)
66
67
68 class BaseTopic:
69 # static variables for all instance classes
70 topic = None # to_override
71 topic_msg = None # to_override
72 schema_new = None # to_override
73 schema_edit = None # to_override
74 multiproject = True # True if this Topic can be shared by several projects. Then it contains _admin.projects_read
75
76 default_quota = 500
77
78 # Alternative ID Fields for some Topics
79 alt_id_field = {
80 "projects": "name",
81 "users": "username",
82 "roles": "name"
83 }
84
85 def __init__(self, db, fs, msg, auth):
86 self.db = db
87 self.fs = fs
88 self.msg = msg
89 self.logger = logging.getLogger("nbi.engine")
90 self.auth = auth
91
92 @staticmethod
93 def id_field(topic, value):
94 """Returns ID Field for given topic and field value"""
95 if topic in BaseTopic.alt_id_field.keys() and not is_valid_uuid(value):
96 return BaseTopic.alt_id_field[topic]
97 else:
98 return "_id"
99
100 @staticmethod
101 def _remove_envelop(indata=None):
102 if not indata:
103 return {}
104 return indata
105
106 def check_quota(self, session):
107 """
108 Check whether topic quota is exceeded by the given project
109 Used by relevant topics' 'new' function to decide whether or not creation of the new item should be allowed
110 :param projects: projects (tuple) for which quota should be checked
111 :param override: boolean. If true, don't raise ValidationError even though quota be exceeded
112 :return: None
113 :raise:
114 DbException if project not found
115 ValidationError if quota exceeded and not overridden
116 """
117 if session["force"] or session["admin"]:
118 return
119 projects = session["project_id"]
120 for project in projects:
121 proj = self.auth.get_project(project)
122 pid = proj["_id"]
123 quota = proj.get("quotas", {}).get(self.topic, self.default_quota)
124 count = self.db.count(self.topic, {"_admin.projects_read": pid})
125 if count >= quota:
126 name = proj["name"]
127 raise ValidationError("{} quota ({}) exceeded for project {} ({})".format(self.topic, quota, name, pid))
128
129 def _validate_input_new(self, input, force=False):
130 """
131 Validates input user content for a new entry. It uses jsonschema. Some overrides will use pyangbind
132 :param input: user input content for the new topic
133 :param force: may be used for being more tolerant
134 :return: The same input content, or a changed version of it.
135 """
136 if self.schema_new:
137 validate_input(input, self.schema_new)
138 return input
139
140 def _validate_input_edit(self, input, force=False):
141 """
142 Validates input user content for an edition. It uses jsonschema. Some overrides will use pyangbind
143 :param input: user input content for the new topic
144 :param force: may be used for being more tolerant
145 :return: The same input content, or a changed version of it.
146 """
147 if self.schema_edit:
148 validate_input(input, self.schema_edit)
149 return input
150
151 @staticmethod
152 def _get_project_filter(session):
153 """
154 Generates a filter dictionary for querying database, so that only allowed items for this project can be
155 addressed. Only propietary or public can be used. Allowed projects are at _admin.project_read/write. If it is
156 not present or contains ANY mean public.
157 :param session: contains:
158 project_id: project list this session has rights to access. Can be empty, one or several
159 set_project: items created will contain this project list
160 force: True or False
161 public: True, False or None
162 method: "list", "show", "write", "delete"
163 admin: True or False
164 :return: dictionary with project filter
165 """
166 p_filter = {}
167 project_filter_n = []
168 project_filter = list(session["project_id"])
169
170 if session["method"] not in ("list", "delete"):
171 if project_filter:
172 project_filter.append("ANY")
173 elif session["public"] is not None:
174 if session["public"]:
175 project_filter.append("ANY")
176 else:
177 project_filter_n.append("ANY")
178
179 if session.get("PROJECT.ne"):
180 project_filter_n.append(session["PROJECT.ne"])
181
182 if project_filter:
183 if session["method"] in ("list", "show", "delete") or session.get("set_project"):
184 p_filter["_admin.projects_read.cont"] = project_filter
185 else:
186 p_filter["_admin.projects_write.cont"] = project_filter
187 if project_filter_n:
188 if session["method"] in ("list", "show", "delete") or session.get("set_project"):
189 p_filter["_admin.projects_read.ncont"] = project_filter_n
190 else:
191 p_filter["_admin.projects_write.ncont"] = project_filter_n
192
193 return p_filter
194
195 def check_conflict_on_new(self, session, indata):
196 """
197 Check that the data to be inserted is valid
198 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
199 :param indata: data to be inserted
200 :return: None or raises EngineException
201 """
202 pass
203
204 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
205 """
206 Check that the data to be edited/uploaded is valid
207 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
208 :param final_content: data once modified. This method may change it.
209 :param edit_content: incremental data that contains the modifications to apply
210 :param _id: internal _id
211 :return: None or raises EngineException
212 """
213 if not self.multiproject:
214 return
215 # Change public status
216 if session["public"] is not None:
217 if session["public"] and "ANY" not in final_content["_admin"]["projects_read"]:
218 final_content["_admin"]["projects_read"].append("ANY")
219 final_content["_admin"]["projects_write"].clear()
220 if not session["public"] and "ANY" in final_content["_admin"]["projects_read"]:
221 final_content["_admin"]["projects_read"].remove("ANY")
222
223 # Change project status
224 if session.get("set_project"):
225 for p in session["set_project"]:
226 if p not in final_content["_admin"]["projects_read"]:
227 final_content["_admin"]["projects_read"].append(p)
228
229 def check_unique_name(self, session, name, _id=None):
230 """
231 Check that the name is unique for this project
232 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
233 :param name: name to be checked
234 :param _id: If not None, ignore this entry that are going to change
235 :return: None or raises EngineException
236 """
237 if not self.multiproject:
238 _filter = {}
239 else:
240 _filter = self._get_project_filter(session)
241 _filter["name"] = name
242 if _id:
243 _filter["_id.neq"] = _id
244 if self.db.get_one(self.topic, _filter, fail_on_empty=False, fail_on_more=False):
245 raise EngineException("name '{}' already exists for {}".format(name, self.topic), HTTPStatus.CONFLICT)
246
247 @staticmethod
248 def format_on_new(content, project_id=None, make_public=False):
249 """
250 Modifies content descriptor to include _admin
251 :param content: descriptor to be modified
252 :param project_id: if included, it add project read/write permissions. Can be None or a list
253 :param make_public: if included it is generated as public for reading.
254 :return: op_id: operation id on asynchronous operation, None otherwise. In addition content is modified
255 """
256 now = time()
257 if "_admin" not in content:
258 content["_admin"] = {}
259 if not content["_admin"].get("created"):
260 content["_admin"]["created"] = now
261 content["_admin"]["modified"] = now
262 if not content.get("_id"):
263 content["_id"] = str(uuid4())
264 if project_id is not None:
265 if not content["_admin"].get("projects_read"):
266 content["_admin"]["projects_read"] = list(project_id)
267 if make_public:
268 content["_admin"]["projects_read"].append("ANY")
269 if not content["_admin"].get("projects_write"):
270 content["_admin"]["projects_write"] = list(project_id)
271 return None
272
273 @staticmethod
274 def format_on_edit(final_content, edit_content):
275 """
276 Modifies final_content to admin information upon edition
277 :param final_content: final content to be stored at database
278 :param edit_content: user requested update content
279 :return: operation id, if this edit implies an asynchronous operation; None otherwise
280 """
281 if final_content.get("_admin"):
282 now = time()
283 final_content["_admin"]["modified"] = now
284 return None
285
286 def _send_msg(self, action, content, not_send_msg=None):
287 if self.topic_msg and not_send_msg is not False:
288 content.pop("_admin", None)
289 if isinstance(not_send_msg, list):
290 not_send_msg.append((self.topic_msg, action, content))
291 else:
292 self.msg.write(self.topic_msg, action, content)
293
294 def check_conflict_on_del(self, session, _id, db_content):
295 """
296 Check if deletion can be done because of dependencies if it is not force. To override
297 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
298 :param _id: internal _id
299 :param db_content: The database content of this item _id
300 :return: None if ok or raises EngineException with the conflict
301 """
302 pass
303
304 @staticmethod
305 def _update_input_with_kwargs(desc, kwargs, yaml_format=False):
306 """
307 Update descriptor with the kwargs. It contains dot separated keys
308 :param desc: dictionary to be updated
309 :param kwargs: plain dictionary to be used for updating.
310 :param yaml_format: get kwargs values as yaml format.
311 :return: None, 'desc' is modified. It raises EngineException.
312 """
313 if not kwargs:
314 return
315 try:
316 for k, v in kwargs.items():
317 update_content = desc
318 kitem_old = None
319 klist = k.split(".")
320 for kitem in klist:
321 if kitem_old is not None:
322 update_content = update_content[kitem_old]
323 if isinstance(update_content, dict):
324 kitem_old = kitem
325 elif isinstance(update_content, list):
326 kitem_old = int(kitem)
327 else:
328 raise EngineException(
329 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
330 update_content[kitem_old] = v if not yaml_format else safe_load(v)
331 except KeyError:
332 raise EngineException(
333 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
334 except ValueError:
335 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
336 k, kitem))
337 except IndexError:
338 raise EngineException(
339 "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
340 except YAMLError:
341 raise EngineException("Invalid query string '{}' yaml format".format(k))
342
343 def show(self, session, _id):
344 """
345 Get complete information on an topic
346 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
347 :param _id: server internal id
348 :return: dictionary, raise exception if not found.
349 """
350 if not self.multiproject:
351 filter_db = {}
352 else:
353 filter_db = self._get_project_filter(session)
354 # To allow project&user addressing by name AS WELL AS _id
355 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
356 return self.db.get_one(self.topic, filter_db)
357 # TODO transform data for SOL005 URL requests
358 # TODO remove _admin if not admin
359
360 def get_file(self, session, _id, path=None, accept_header=None):
361 """
362 Only implemented for descriptor topics. Return the file content of a descriptor
363 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
364 :param _id: Identity of the item to get content
365 :param path: artifact path or "$DESCRIPTOR" or None
366 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
367 :return: opened file or raises an exception
368 """
369 raise EngineException("Method get_file not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
370
371 def list(self, session, filter_q=None):
372 """
373 Get a list of the topic that matches a filter
374 :param session: contains the used login username and working project
375 :param filter_q: filter of data to be applied
376 :return: The list, it can be empty if no one match the filter.
377 """
378 if not filter_q:
379 filter_q = {}
380 if self.multiproject:
381 filter_q.update(self._get_project_filter(session))
382
383 # TODO transform data for SOL005 URL requests. Transform filtering
384 # TODO implement "field-type" query string SOL005
385 return self.db.get_list(self.topic, filter_q)
386
387 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
388 """
389 Creates a new entry into database.
390 :param rollback: list to append created items at database in case a rollback may to be done
391 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
392 :param indata: data to be inserted
393 :param kwargs: used to override the indata descriptor
394 :param headers: http request headers
395 :return: _id, op_id:
396 _id: identity of the inserted data.
397 op_id: operation id if this is asynchronous, None otherwise
398 """
399 try:
400 if self.multiproject:
401 self.check_quota(session)
402
403 content = self._remove_envelop(indata)
404
405 # Override descriptor with query string kwargs
406 self._update_input_with_kwargs(content, kwargs)
407 content = self._validate_input_new(content, force=session["force"])
408 self.check_conflict_on_new(session, content)
409 op_id = self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
410 _id = self.db.create(self.topic, content)
411 rollback.append({"topic": self.topic, "_id": _id})
412 if op_id:
413 content["op_id"] = op_id
414 self._send_msg("created", content)
415 return _id, op_id
416 except ValidationError as e:
417 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
418
419 def upload_content(self, session, _id, indata, kwargs, headers):
420 """
421 Only implemented for descriptor topics. Used for receiving content by chunks (with a transaction_id header
422 and/or gzip file. It will store and extract)
423 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
424 :param _id : the database id of entry to be updated
425 :param indata: http body request
426 :param kwargs: user query string to override parameters. NOT USED
427 :param headers: http request headers
428 :return: True package has is completely uploaded or False if partial content has been uplodaed.
429 Raise exception on error
430 """
431 raise EngineException("Method upload_content not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
432
433 def delete_list(self, session, filter_q=None):
434 """
435 Delete a several entries of a topic. This is for internal usage and test only, not exposed to NBI API
436 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
437 :param filter_q: filter of data to be applied
438 :return: The deleted list, it can be empty if no one match the filter.
439 """
440 # TODO add admin to filter, validate rights
441 if not filter_q:
442 filter_q = {}
443 if self.multiproject:
444 filter_q.update(self._get_project_filter(session))
445 return self.db.del_list(self.topic, filter_q)
446
447 def delete_extra(self, session, _id, db_content, not_send_msg=None):
448 """
449 Delete other things apart from database entry of a item _id.
450 e.g.: other associated elements at database and other file system storage
451 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
452 :param _id: server internal id
453 :param db_content: The database content of the _id. It is already deleted when reached this method, but the
454 content is needed in same cases
455 :param not_send_msg: To not send message (False) or store content (list) instead
456 :return: None if ok or raises EngineException with the problem
457 """
458 pass
459
460 def delete(self, session, _id, dry_run=False, not_send_msg=None):
461 """
462 Delete item by its internal _id
463 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
464 :param _id: server internal id
465 :param dry_run: make checking but do not delete
466 :param not_send_msg: To not send message (False) or store content (list) instead
467 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
468 """
469
470 # To allow addressing projects and users by name AS WELL AS by _id
471 filter_q = {BaseTopic.id_field(self.topic, _id): _id}
472 item_content = self.db.get_one(self.topic, filter_q)
473
474 # TODO add admin to filter, validate rights
475 # data = self.get_item(topic, _id)
476 self.check_conflict_on_del(session, _id, item_content)
477 if dry_run:
478 return None
479
480 if self.multiproject:
481 filter_q.update(self._get_project_filter(session))
482 if self.multiproject and session["project_id"]:
483 # remove reference from project_read. If not last delete
484 # if this topic is not part of session["project_id"] no midification at database is done and an exception
485 # is raised
486 self.db.set_one(self.topic, filter_q, update_dict=None,
487 pull={"_admin.projects_read": {"$in": session["project_id"]}})
488 # try to delete if there is not any more reference from projects. Ignore if it is not deleted
489 filter_q = {'_id': _id, '_admin.projects_read': [[], ["ANY"]]}
490 v = self.db.del_one(self.topic, filter_q, fail_on_empty=False)
491 if not v or not v["deleted"]:
492 return None
493 else:
494 self.db.del_one(self.topic, filter_q)
495 self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg)
496 self._send_msg("deleted", {"_id": _id}, not_send_msg=not_send_msg)
497 return None
498
499 def edit(self, session, _id, indata=None, kwargs=None, content=None):
500 """
501 Change the content of an item
502 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
503 :param _id: server internal id
504 :param indata: contains the changes to apply
505 :param kwargs: modifies indata
506 :param content: original content of the item
507 :return: op_id: operation id if this is processed asynchronously, None otherwise
508 """
509 indata = self._remove_envelop(indata)
510
511 # Override descriptor with query string kwargs
512 if kwargs:
513 self._update_input_with_kwargs(indata, kwargs)
514 try:
515 if indata and session.get("set_project"):
516 raise EngineException("Cannot edit content and set to project (query string SET_PROJECT) at same time",
517 HTTPStatus.UNPROCESSABLE_ENTITY)
518 indata = self._validate_input_edit(indata, force=session["force"])
519
520 # TODO self._check_edition(session, indata, _id, force)
521 if not content:
522 content = self.show(session, _id)
523 deep_update_rfc7396(content, indata)
524
525 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
526 _id = content.get("_id") or _id
527
528 self.check_conflict_on_edit(session, content, indata, _id=_id)
529 op_id = self.format_on_edit(content, indata)
530
531 self.db.replace(self.topic, _id, content)
532
533 indata.pop("_admin", None)
534 if op_id:
535 indata["op_id"] = op_id
536 indata["_id"] = _id
537 self._send_msg("edited", indata)
538 return op_id
539 except ValidationError as e:
540 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)