Adding suport for FsMongo
[osm/NBI.git] / osm_nbi / base_topic.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import logging
17 from uuid import uuid4
18 from http import HTTPStatus
19 from time import time
20 from osm_common.dbbase import deep_update_rfc7396
21 from osm_nbi.validation import validate_input, ValidationError, is_valid_uuid
22
23 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
24
25
26 class EngineException(Exception):
27
28 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
29 self.http_code = http_code
30 super(Exception, self).__init__(message)
31
32
33 def get_iterable(input_var):
34 """
35 Returns an iterable, in case input_var is None it just returns an empty tuple
36 :param input_var: can be a list, tuple or None
37 :return: input_var or () if it is None
38 """
39 if input_var is None:
40 return ()
41 return input_var
42
43
44 def versiontuple(v):
45 """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
46 filled = []
47 for point in v.split("."):
48 filled.append(point.zfill(8))
49 return tuple(filled)
50
51
52 class BaseTopic:
53 # static variables for all instance classes
54 topic = None # to_override
55 topic_msg = None # to_override
56 schema_new = None # to_override
57 schema_edit = None # to_override
58 multiproject = True # True if this Topic can be shared by several projects. Then it contains _admin.projects_read
59
60 default_quota = 500
61
62 # Alternative ID Fields for some Topics
63 alt_id_field = {
64 "projects": "name",
65 "users": "username",
66 "roles": "name"
67 }
68
69 def __init__(self, db, fs, msg, auth):
70 self.db = db
71 self.fs = fs
72 self.msg = msg
73 self.logger = logging.getLogger("nbi.engine")
74 self.auth = auth
75
76 @staticmethod
77 def id_field(topic, value):
78 """Returns ID Field for given topic and field value"""
79 if topic in BaseTopic.alt_id_field.keys() and not is_valid_uuid(value):
80 return BaseTopic.alt_id_field[topic]
81 else:
82 return "_id"
83
84 @staticmethod
85 def _remove_envelop(indata=None):
86 if not indata:
87 return {}
88 return indata
89
90 def check_quota(self, session):
91 """
92 Check whether topic quota is exceeded by the given project
93 Used by relevant topics' 'new' function to decide whether or not creation of the new item should be allowed
94 :param projects: projects (tuple) for which quota should be checked
95 :param override: boolean. If true, don't raise ValidationError even though quota be exceeded
96 :return: None
97 :raise:
98 DbException if project not found
99 ValidationError if quota exceeded and not overridden
100 """
101 if session["force"] or session["admin"]:
102 return
103 projects = session["project_id"]
104 for project in projects:
105 proj = self.auth.get_project(project)
106 pid = proj["_id"]
107 quota = proj.get("quotas", {}).get(self.topic, self.default_quota)
108 count = self.db.count(self.topic, {"_admin.projects_read": pid})
109 if count >= quota:
110 name = proj["name"]
111 raise ValidationError("{} quota ({}) exceeded for project {} ({})".format(self.topic, quota, name, pid))
112
113 def _validate_input_new(self, input, force=False):
114 """
115 Validates input user content for a new entry. It uses jsonschema. Some overrides will use pyangbind
116 :param input: user input content for the new topic
117 :param force: may be used for being more tolerant
118 :return: The same input content, or a changed version of it.
119 """
120 if self.schema_new:
121 validate_input(input, self.schema_new)
122 return input
123
124 def _validate_input_edit(self, input, force=False):
125 """
126 Validates input user content for an edition. It uses jsonschema. Some overrides will use pyangbind
127 :param input: user input content for the new topic
128 :param force: may be used for being more tolerant
129 :return: The same input content, or a changed version of it.
130 """
131 if self.schema_edit:
132 validate_input(input, self.schema_edit)
133 return input
134
135 @staticmethod
136 def _get_project_filter(session):
137 """
138 Generates a filter dictionary for querying database, so that only allowed items for this project can be
139 addressed. Only propietary or public can be used. Allowed projects are at _admin.project_read/write. If it is
140 not present or contains ANY mean public.
141 :param session: contains:
142 project_id: project list this session has rights to access. Can be empty, one or several
143 set_project: items created will contain this project list
144 force: True or False
145 public: True, False or None
146 method: "list", "show", "write", "delete"
147 admin: True or False
148 :return: dictionary with project filter
149 """
150 p_filter = {}
151 project_filter_n = []
152 project_filter = list(session["project_id"])
153
154 if session["method"] not in ("list", "delete"):
155 if project_filter:
156 project_filter.append("ANY")
157 elif session["public"] is not None:
158 if session["public"]:
159 project_filter.append("ANY")
160 else:
161 project_filter_n.append("ANY")
162
163 if session.get("PROJECT.ne"):
164 project_filter_n.append(session["PROJECT.ne"])
165
166 if project_filter:
167 if session["method"] in ("list", "show", "delete") or session.get("set_project"):
168 p_filter["_admin.projects_read.cont"] = project_filter
169 else:
170 p_filter["_admin.projects_write.cont"] = project_filter
171 if project_filter_n:
172 if session["method"] in ("list", "show", "delete") or session.get("set_project"):
173 p_filter["_admin.projects_read.ncont"] = project_filter_n
174 else:
175 p_filter["_admin.projects_write.ncont"] = project_filter_n
176
177 return p_filter
178
179 def check_conflict_on_new(self, session, indata):
180 """
181 Check that the data to be inserted is valid
182 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
183 :param indata: data to be inserted
184 :return: None or raises EngineException
185 """
186 pass
187
188 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
189 """
190 Check that the data to be edited/uploaded is valid
191 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
192 :param final_content: data once modified. This method may change it.
193 :param edit_content: incremental data that contains the modifications to apply
194 :param _id: internal _id
195 :return: None or raises EngineException
196 """
197 if not self.multiproject:
198 return
199 # Change public status
200 if session["public"] is not None:
201 if session["public"] and "ANY" not in final_content["_admin"]["projects_read"]:
202 final_content["_admin"]["projects_read"].append("ANY")
203 final_content["_admin"]["projects_write"].clear()
204 if not session["public"] and "ANY" in final_content["_admin"]["projects_read"]:
205 final_content["_admin"]["projects_read"].remove("ANY")
206
207 # Change project status
208 if session.get("set_project"):
209 for p in session["set_project"]:
210 if p not in final_content["_admin"]["projects_read"]:
211 final_content["_admin"]["projects_read"].append(p)
212
213 def check_unique_name(self, session, name, _id=None):
214 """
215 Check that the name is unique for this project
216 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
217 :param name: name to be checked
218 :param _id: If not None, ignore this entry that are going to change
219 :return: None or raises EngineException
220 """
221 if not self.multiproject:
222 _filter = {}
223 else:
224 _filter = self._get_project_filter(session)
225 _filter["name"] = name
226 if _id:
227 _filter["_id.neq"] = _id
228 if self.db.get_one(self.topic, _filter, fail_on_empty=False, fail_on_more=False):
229 raise EngineException("name '{}' already exists for {}".format(name, self.topic), HTTPStatus.CONFLICT)
230
231 @staticmethod
232 def format_on_new(content, project_id=None, make_public=False):
233 """
234 Modifies content descriptor to include _admin
235 :param content: descriptor to be modified
236 :param project_id: if included, it add project read/write permissions. Can be None or a list
237 :param make_public: if included it is generated as public for reading.
238 :return: op_id: operation id on asynchronous operation, None otherwise. In addition content is modified
239 """
240 now = time()
241 if "_admin" not in content:
242 content["_admin"] = {}
243 if not content["_admin"].get("created"):
244 content["_admin"]["created"] = now
245 content["_admin"]["modified"] = now
246 if not content.get("_id"):
247 content["_id"] = str(uuid4())
248 if project_id is not None:
249 if not content["_admin"].get("projects_read"):
250 content["_admin"]["projects_read"] = list(project_id)
251 if make_public:
252 content["_admin"]["projects_read"].append("ANY")
253 if not content["_admin"].get("projects_write"):
254 content["_admin"]["projects_write"] = list(project_id)
255 return None
256
257 @staticmethod
258 def format_on_edit(final_content, edit_content):
259 """
260 Modifies final_content to admin information upon edition
261 :param final_content: final content to be stored at database
262 :param edit_content: user requested update content
263 :return: operation id, if this edit implies an asynchronous operation; None otherwise
264 """
265 if final_content.get("_admin"):
266 now = time()
267 final_content["_admin"]["modified"] = now
268 return None
269
270 def _send_msg(self, action, content):
271 if self.topic_msg:
272 content.pop("_admin", None)
273 self.msg.write(self.topic_msg, action, content)
274
275 def check_conflict_on_del(self, session, _id, db_content):
276 """
277 Check if deletion can be done because of dependencies if it is not force. To override
278 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
279 :param _id: internal _id
280 :param db_content: The database content of this item _id
281 :return: None if ok or raises EngineException with the conflict
282 """
283 pass
284
285 @staticmethod
286 def _update_input_with_kwargs(desc, kwargs):
287 """
288 Update descriptor with the kwargs. It contains dot separated keys
289 :param desc: dictionary to be updated
290 :param kwargs: plain dictionary to be used for updating.
291 :return: None, 'desc' is modified. It raises EngineException.
292 """
293 if not kwargs:
294 return
295 try:
296 for k, v in kwargs.items():
297 update_content = desc
298 kitem_old = None
299 klist = k.split(".")
300 for kitem in klist:
301 if kitem_old is not None:
302 update_content = update_content[kitem_old]
303 if isinstance(update_content, dict):
304 kitem_old = kitem
305 elif isinstance(update_content, list):
306 kitem_old = int(kitem)
307 else:
308 raise EngineException(
309 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
310 update_content[kitem_old] = v
311 except KeyError:
312 raise EngineException(
313 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
314 except ValueError:
315 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
316 k, kitem))
317 except IndexError:
318 raise EngineException(
319 "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
320
321 def show(self, session, _id):
322 """
323 Get complete information on an topic
324 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
325 :param _id: server internal id
326 :return: dictionary, raise exception if not found.
327 """
328 if not self.multiproject:
329 filter_db = {}
330 else:
331 filter_db = self._get_project_filter(session)
332 # To allow project&user addressing by name AS WELL AS _id
333 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
334 return self.db.get_one(self.topic, filter_db)
335 # TODO transform data for SOL005 URL requests
336 # TODO remove _admin if not admin
337
338 def get_file(self, session, _id, path=None, accept_header=None):
339 """
340 Only implemented for descriptor topics. Return the file content of a descriptor
341 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
342 :param _id: Identity of the item to get content
343 :param path: artifact path or "$DESCRIPTOR" or None
344 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
345 :return: opened file or raises an exception
346 """
347 raise EngineException("Method get_file not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
348
349 def list(self, session, filter_q=None):
350 """
351 Get a list of the topic that matches a filter
352 :param session: contains the used login username and working project
353 :param filter_q: filter of data to be applied
354 :return: The list, it can be empty if no one match the filter.
355 """
356 if not filter_q:
357 filter_q = {}
358 if self.multiproject:
359 filter_q.update(self._get_project_filter(session))
360
361 # TODO transform data for SOL005 URL requests. Transform filtering
362 # TODO implement "field-type" query string SOL005
363 return self.db.get_list(self.topic, filter_q)
364
365 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
366 """
367 Creates a new entry into database.
368 :param rollback: list to append created items at database in case a rollback may to be done
369 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
370 :param indata: data to be inserted
371 :param kwargs: used to override the indata descriptor
372 :param headers: http request headers
373 :return: _id, op_id:
374 _id: identity of the inserted data.
375 op_id: operation id if this is asynchronous, None otherwise
376 """
377 try:
378 if self.multiproject:
379 self.check_quota(session)
380
381 content = self._remove_envelop(indata)
382
383 # Override descriptor with query string kwargs
384 self._update_input_with_kwargs(content, kwargs)
385 content = self._validate_input_new(content, force=session["force"])
386 self.check_conflict_on_new(session, content)
387 op_id = self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
388 _id = self.db.create(self.topic, content)
389 rollback.append({"topic": self.topic, "_id": _id})
390 if op_id:
391 content["op_id"] = op_id
392 self._send_msg("created", content)
393 return _id, op_id
394 except ValidationError as e:
395 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
396
397 def upload_content(self, session, _id, indata, kwargs, headers):
398 """
399 Only implemented for descriptor topics. Used for receiving content by chunks (with a transaction_id header
400 and/or gzip file. It will store and extract)
401 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
402 :param _id : the database id of entry to be updated
403 :param indata: http body request
404 :param kwargs: user query string to override parameters. NOT USED
405 :param headers: http request headers
406 :return: True package has is completely uploaded or False if partial content has been uplodaed.
407 Raise exception on error
408 """
409 raise EngineException("Method upload_content not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
410
411 def delete_list(self, session, filter_q=None):
412 """
413 Delete a several entries of a topic. This is for internal usage and test only, not exposed to NBI API
414 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
415 :param filter_q: filter of data to be applied
416 :return: The deleted list, it can be empty if no one match the filter.
417 """
418 # TODO add admin to filter, validate rights
419 if not filter_q:
420 filter_q = {}
421 if self.multiproject:
422 filter_q.update(self._get_project_filter(session))
423 return self.db.del_list(self.topic, filter_q)
424
425 def delete_extra(self, session, _id, db_content):
426 """
427 Delete other things apart from database entry of a item _id.
428 e.g.: other associated elements at database and other file system storage
429 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
430 :param _id: server internal id
431 :param db_content: The database content of the _id. It is already deleted when reached this method, but the
432 content is needed in same cases
433 :return: None if ok or raises EngineException with the problem
434 """
435 pass
436
437 def delete(self, session, _id, dry_run=False):
438 """
439 Delete item by its internal _id
440 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
441 :param _id: server internal id
442 :param dry_run: make checking but do not delete
443 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
444 """
445
446 # To allow addressing projects and users by name AS WELL AS by _id
447 filter_q = {BaseTopic.id_field(self.topic, _id): _id}
448 item_content = self.db.get_one(self.topic, filter_q)
449
450 # TODO add admin to filter, validate rights
451 # data = self.get_item(topic, _id)
452 self.check_conflict_on_del(session, _id, item_content)
453 if dry_run:
454 return None
455
456 if self.multiproject:
457 filter_q.update(self._get_project_filter(session))
458 if self.multiproject and session["project_id"]:
459 # remove reference from project_read. If not last delete
460 # if this topic is not part of session["project_id"] no midification at database is done and an exception
461 # is raised
462 self.db.set_one(self.topic, filter_q, update_dict=None,
463 pull={"_admin.projects_read": {"$in": session["project_id"]}})
464 # try to delete if there is not any more reference from projects. Ignore if it is not deleted
465 filter_q = {'_id': _id, '_admin.projects_read': [[], ["ANY"]]}
466 v = self.db.del_one(self.topic, filter_q, fail_on_empty=False)
467 if not v or not v["deleted"]:
468 return None
469 else:
470 self.db.del_one(self.topic, filter_q)
471 self.delete_extra(session, _id, item_content)
472 self._send_msg("deleted", {"_id": _id})
473 return None
474
475 def edit(self, session, _id, indata=None, kwargs=None, content=None):
476 """
477 Change the content of an item
478 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
479 :param _id: server internal id
480 :param indata: contains the changes to apply
481 :param kwargs: modifies indata
482 :param content: original content of the item
483 :return: op_id: operation id if this is processed asynchronously, None otherwise
484 """
485 indata = self._remove_envelop(indata)
486
487 # Override descriptor with query string kwargs
488 if kwargs:
489 self._update_input_with_kwargs(indata, kwargs)
490 try:
491 if indata and session.get("set_project"):
492 raise EngineException("Cannot edit content and set to project (query string SET_PROJECT) at same time",
493 HTTPStatus.UNPROCESSABLE_ENTITY)
494 indata = self._validate_input_edit(indata, force=session["force"])
495
496 # TODO self._check_edition(session, indata, _id, force)
497 if not content:
498 content = self.show(session, _id)
499 deep_update_rfc7396(content, indata)
500
501 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
502 _id = content.get("_id") or _id
503
504 self.check_conflict_on_edit(session, content, indata, _id=_id)
505 op_id = self.format_on_edit(content, indata)
506
507 self.db.replace(self.topic, _id, content)
508
509 indata.pop("_admin", None)
510 if op_id:
511 indata["op_id"] = op_id
512 indata["_id"] = _id
513 self._send_msg("edited", indata)
514 return op_id
515 except ValidationError as e:
516 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)