fix 1042 add timeout parameter to ns-action ns-scale
[osm/NBI.git] / osm_nbi / base_topic.py
1 # -*- coding: utf-8 -*-
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import logging
17 from uuid import uuid4
18 from http import HTTPStatus
19 from time import time
20 from osm_common.dbbase import deep_update_rfc7396
21 from osm_nbi.validation import validate_input, ValidationError, is_valid_uuid
22 from yaml import safe_load, YAMLError
23
24 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
25
26
27 class EngineException(Exception):
28
29 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
30 self.http_code = http_code
31 super(Exception, self).__init__(message)
32
33
34 def deep_get(target_dict, key_list):
35 """
36 Get a value from target_dict entering in the nested keys. If keys does not exist, it returns None
37 Example target_dict={a: {b: 5}}; key_list=[a,b] returns 5; both key_list=[a,b,c] and key_list=[f,h] return None
38 :param target_dict: dictionary to be read
39 :param key_list: list of keys to read from target_dict
40 :return: The wanted value if exist, None otherwise
41 """
42 for key in key_list:
43 if not isinstance(target_dict, dict) or key not in target_dict:
44 return None
45 target_dict = target_dict[key]
46 return target_dict
47
48
49 def get_iterable(input_var):
50 """
51 Returns an iterable, in case input_var is None it just returns an empty tuple
52 :param input_var: can be a list, tuple or None
53 :return: input_var or () if it is None
54 """
55 if input_var is None:
56 return ()
57 return input_var
58
59
60 def versiontuple(v):
61 """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
62 filled = []
63 for point in v.split("."):
64 filled.append(point.zfill(8))
65 return tuple(filled)
66
67
68 class BaseTopic:
69 # static variables for all instance classes
70 topic = None # to_override
71 topic_msg = None # to_override
72 schema_new = None # to_override
73 schema_edit = None # to_override
74 multiproject = True # True if this Topic can be shared by several projects. Then it contains _admin.projects_read
75
76 default_quota = 500
77
78 # Alternative ID Fields for some Topics
79 alt_id_field = {
80 "projects": "name",
81 "users": "username",
82 "roles": "name"
83 }
84
85 def __init__(self, db, fs, msg, auth):
86 self.db = db
87 self.fs = fs
88 self.msg = msg
89 self.logger = logging.getLogger("nbi.engine")
90 self.auth = auth
91
92 @staticmethod
93 def id_field(topic, value):
94 """Returns ID Field for given topic and field value"""
95 if topic in BaseTopic.alt_id_field.keys() and not is_valid_uuid(value):
96 return BaseTopic.alt_id_field[topic]
97 else:
98 return "_id"
99
100 @staticmethod
101 def _remove_envelop(indata=None):
102 if not indata:
103 return {}
104 return indata
105
106 def check_quota(self, session):
107 """
108 Check whether topic quota is exceeded by the given project
109 Used by relevant topics' 'new' function to decide whether or not creation of the new item should be allowed
110 :param projects: projects (tuple) for which quota should be checked
111 :param override: boolean. If true, don't raise ValidationError even though quota be exceeded
112 :return: None
113 :raise:
114 DbException if project not found
115 ValidationError if quota exceeded and not overridden
116 """
117 if session["force"] or session["admin"]:
118 return
119 projects = session["project_id"]
120 for project in projects:
121 proj = self.auth.get_project(project)
122 pid = proj["_id"]
123 quota = proj.get("quotas", {}).get(self.topic, self.default_quota)
124 count = self.db.count(self.topic, {"_admin.projects_read": pid})
125 if count >= quota:
126 name = proj["name"]
127 raise ValidationError("{} quota ({}) exceeded for project {} ({})".format(self.topic, quota, name, pid))
128
129 def _validate_input_new(self, input, force=False):
130 """
131 Validates input user content for a new entry. It uses jsonschema. Some overrides will use pyangbind
132 :param input: user input content for the new topic
133 :param force: may be used for being more tolerant
134 :return: The same input content, or a changed version of it.
135 """
136 if self.schema_new:
137 validate_input(input, self.schema_new)
138 return input
139
140 def _validate_input_edit(self, input, force=False):
141 """
142 Validates input user content for an edition. It uses jsonschema. Some overrides will use pyangbind
143 :param input: user input content for the new topic
144 :param force: may be used for being more tolerant
145 :return: The same input content, or a changed version of it.
146 """
147 if self.schema_edit:
148 validate_input(input, self.schema_edit)
149 return input
150
151 @staticmethod
152 def _get_project_filter(session):
153 """
154 Generates a filter dictionary for querying database, so that only allowed items for this project can be
155 addressed. Only proprietary or public can be used. Allowed projects are at _admin.project_read/write. If it is
156 not present or contains ANY mean public.
157 :param session: contains:
158 project_id: project list this session has rights to access. Can be empty, one or several
159 set_project: items created will contain this project list
160 force: True or False
161 public: True, False or None
162 method: "list", "show", "write", "delete"
163 admin: True or False
164 :return: dictionary with project filter
165 """
166 p_filter = {}
167 project_filter_n = []
168 project_filter = list(session["project_id"])
169
170 if session["method"] not in ("list", "delete"):
171 if project_filter:
172 project_filter.append("ANY")
173 elif session["public"] is not None:
174 if session["public"]:
175 project_filter.append("ANY")
176 else:
177 project_filter_n.append("ANY")
178
179 if session.get("PROJECT.ne"):
180 project_filter_n.append(session["PROJECT.ne"])
181
182 if project_filter:
183 if session["method"] in ("list", "show", "delete") or session.get("set_project"):
184 p_filter["_admin.projects_read.cont"] = project_filter
185 else:
186 p_filter["_admin.projects_write.cont"] = project_filter
187 if project_filter_n:
188 if session["method"] in ("list", "show", "delete") or session.get("set_project"):
189 p_filter["_admin.projects_read.ncont"] = project_filter_n
190 else:
191 p_filter["_admin.projects_write.ncont"] = project_filter_n
192
193 return p_filter
194
195 def check_conflict_on_new(self, session, indata):
196 """
197 Check that the data to be inserted is valid
198 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
199 :param indata: data to be inserted
200 :return: None or raises EngineException
201 """
202 pass
203
204 def check_conflict_on_edit(self, session, final_content, edit_content, _id):
205 """
206 Check that the data to be edited/uploaded is valid
207 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
208 :param final_content: data once modified. This method may change it.
209 :param edit_content: incremental data that contains the modifications to apply
210 :param _id: internal _id
211 :return: None or raises EngineException
212 """
213 if not self.multiproject:
214 return
215 # Change public status
216 if session["public"] is not None:
217 if session["public"] and "ANY" not in final_content["_admin"]["projects_read"]:
218 final_content["_admin"]["projects_read"].append("ANY")
219 final_content["_admin"]["projects_write"].clear()
220 if not session["public"] and "ANY" in final_content["_admin"]["projects_read"]:
221 final_content["_admin"]["projects_read"].remove("ANY")
222
223 # Change project status
224 if session.get("set_project"):
225 for p in session["set_project"]:
226 if p not in final_content["_admin"]["projects_read"]:
227 final_content["_admin"]["projects_read"].append(p)
228
229 def check_unique_name(self, session, name, _id=None):
230 """
231 Check that the name is unique for this project
232 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
233 :param name: name to be checked
234 :param _id: If not None, ignore this entry that are going to change
235 :return: None or raises EngineException
236 """
237 if not self.multiproject:
238 _filter = {}
239 else:
240 _filter = self._get_project_filter(session)
241 _filter["name"] = name
242 if _id:
243 _filter["_id.neq"] = _id
244 if self.db.get_one(self.topic, _filter, fail_on_empty=False, fail_on_more=False):
245 raise EngineException("name '{}' already exists for {}".format(name, self.topic), HTTPStatus.CONFLICT)
246
247 @staticmethod
248 def format_on_new(content, project_id=None, make_public=False):
249 """
250 Modifies content descriptor to include _admin
251 :param content: descriptor to be modified
252 :param project_id: if included, it add project read/write permissions. Can be None or a list
253 :param make_public: if included it is generated as public for reading.
254 :return: op_id: operation id on asynchronous operation, None otherwise. In addition content is modified
255 """
256 now = time()
257 if "_admin" not in content:
258 content["_admin"] = {}
259 if not content["_admin"].get("created"):
260 content["_admin"]["created"] = now
261 content["_admin"]["modified"] = now
262 if not content.get("_id"):
263 content["_id"] = str(uuid4())
264 if project_id is not None:
265 if not content["_admin"].get("projects_read"):
266 content["_admin"]["projects_read"] = list(project_id)
267 if make_public:
268 content["_admin"]["projects_read"].append("ANY")
269 if not content["_admin"].get("projects_write"):
270 content["_admin"]["projects_write"] = list(project_id)
271 return None
272
273 @staticmethod
274 def format_on_edit(final_content, edit_content):
275 """
276 Modifies final_content to admin information upon edition
277 :param final_content: final content to be stored at database
278 :param edit_content: user requested update content
279 :return: operation id, if this edit implies an asynchronous operation; None otherwise
280 """
281 if final_content.get("_admin"):
282 now = time()
283 final_content["_admin"]["modified"] = now
284 return None
285
286 def _send_msg(self, action, content, not_send_msg=None):
287 if self.topic_msg and not_send_msg is not False:
288 content.pop("_admin", None)
289 if isinstance(not_send_msg, list):
290 not_send_msg.append((self.topic_msg, action, content))
291 else:
292 self.msg.write(self.topic_msg, action, content)
293
294 def check_conflict_on_del(self, session, _id, db_content):
295 """
296 Check if deletion can be done because of dependencies if it is not force. To override
297 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
298 :param _id: internal _id
299 :param db_content: The database content of this item _id
300 :return: None if ok or raises EngineException with the conflict
301 """
302 pass
303
304 @staticmethod
305 def _update_input_with_kwargs(desc, kwargs, yaml_format=False):
306 """
307 Update descriptor with the kwargs. It contains dot separated keys
308 :param desc: dictionary to be updated
309 :param kwargs: plain dictionary to be used for updating.
310 :param yaml_format: get kwargs values as yaml format.
311 :return: None, 'desc' is modified. It raises EngineException.
312 """
313 if not kwargs:
314 return
315 try:
316 for k, v in kwargs.items():
317 update_content = desc
318 kitem_old = None
319 klist = k.split(".")
320 for kitem in klist:
321 if kitem_old is not None:
322 update_content = update_content[kitem_old]
323 if isinstance(update_content, dict):
324 kitem_old = kitem
325 elif isinstance(update_content, list):
326 kitem_old = int(kitem)
327 else:
328 raise EngineException(
329 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
330 update_content[kitem_old] = v if not yaml_format else safe_load(v)
331 except KeyError:
332 raise EngineException(
333 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
334 except ValueError:
335 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
336 k, kitem))
337 except IndexError:
338 raise EngineException(
339 "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
340 except YAMLError:
341 raise EngineException("Invalid query string '{}' yaml format".format(k))
342
343 def show(self, session, _id):
344 """
345 Get complete information on an topic
346 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
347 :param _id: server internal id
348 :return: dictionary, raise exception if not found.
349 """
350 if not self.multiproject:
351 filter_db = {}
352 else:
353 filter_db = self._get_project_filter(session)
354 # To allow project&user addressing by name AS WELL AS _id
355 filter_db[BaseTopic.id_field(self.topic, _id)] = _id
356 return self.db.get_one(self.topic, filter_db)
357 # TODO transform data for SOL005 URL requests
358 # TODO remove _admin if not admin
359
360 def get_file(self, session, _id, path=None, accept_header=None):
361 """
362 Only implemented for descriptor topics. Return the file content of a descriptor
363 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
364 :param _id: Identity of the item to get content
365 :param path: artifact path or "$DESCRIPTOR" or None
366 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
367 :return: opened file or raises an exception
368 """
369 raise EngineException("Method get_file not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
370
371 def list(self, session, filter_q=None):
372 """
373 Get a list of the topic that matches a filter
374 :param session: contains the used login username and working project
375 :param filter_q: filter of data to be applied
376 :return: The list, it can be empty if no one match the filter.
377 """
378 if not filter_q:
379 filter_q = {}
380 if self.multiproject:
381 filter_q.update(self._get_project_filter(session))
382
383 # TODO transform data for SOL005 URL requests. Transform filtering
384 # TODO implement "field-type" query string SOL005
385 return self.db.get_list(self.topic, filter_q)
386
387 def new(self, rollback, session, indata=None, kwargs=None, headers=None):
388 """
389 Creates a new entry into database.
390 :param rollback: list to append created items at database in case a rollback may to be done
391 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
392 :param indata: data to be inserted
393 :param kwargs: used to override the indata descriptor
394 :param headers: http request headers
395 :return: _id, op_id:
396 _id: identity of the inserted data.
397 op_id: operation id if this is asynchronous, None otherwise
398 """
399 try:
400 if self.multiproject:
401 self.check_quota(session)
402
403 content = self._remove_envelop(indata)
404
405 # Override descriptor with query string kwargs
406 self._update_input_with_kwargs(content, kwargs)
407 content = self._validate_input_new(content, force=session["force"])
408 self.check_conflict_on_new(session, content)
409 op_id = self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
410 _id = self.db.create(self.topic, content)
411 rollback.append({"topic": self.topic, "_id": _id})
412 if op_id:
413 content["op_id"] = op_id
414 self._send_msg("created", content)
415 return _id, op_id
416 except ValidationError as e:
417 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
418
419 def upload_content(self, session, _id, indata, kwargs, headers):
420 """
421 Only implemented for descriptor topics. Used for receiving content by chunks (with a transaction_id header
422 and/or gzip file. It will store and extract)
423 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
424 :param _id : the database id of entry to be updated
425 :param indata: http body request
426 :param kwargs: user query string to override parameters. NOT USED
427 :param headers: http request headers
428 :return: True package has is completely uploaded or False if partial content has been uplodaed.
429 Raise exception on error
430 """
431 raise EngineException("Method upload_content not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
432
433 def delete_list(self, session, filter_q=None):
434 """
435 Delete a several entries of a topic. This is for internal usage and test only, not exposed to NBI API
436 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
437 :param filter_q: filter of data to be applied
438 :return: The deleted list, it can be empty if no one match the filter.
439 """
440 # TODO add admin to filter, validate rights
441 if not filter_q:
442 filter_q = {}
443 if self.multiproject:
444 filter_q.update(self._get_project_filter(session))
445 return self.db.del_list(self.topic, filter_q)
446
447 def delete_extra(self, session, _id, db_content, not_send_msg=None):
448 """
449 Delete other things apart from database entry of a item _id.
450 e.g.: other associated elements at database and other file system storage
451 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
452 :param _id: server internal id
453 :param db_content: The database content of the _id. It is already deleted when reached this method, but the
454 content is needed in same cases
455 :param not_send_msg: To not send message (False) or store content (list) instead
456 :return: None if ok or raises EngineException with the problem
457 """
458 pass
459
460 def delete(self, session, _id, dry_run=False, not_send_msg=None):
461 """
462 Delete item by its internal _id
463 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
464 :param _id: server internal id
465 :param dry_run: make checking but do not delete
466 :param not_send_msg: To not send message (False) or store content (list) instead
467 :return: operation id (None if there is not operation), raise exception if error or not found, conflict, ...
468 """
469
470 # To allow addressing projects and users by name AS WELL AS by _id
471 if not self.multiproject:
472 filter_q = {}
473 else:
474 filter_q = self._get_project_filter(session)
475 filter_q[self.id_field(self.topic, _id)] = _id
476 item_content = self.db.get_one(self.topic, filter_q)
477
478 self.check_conflict_on_del(session, _id, item_content)
479 if dry_run:
480 return None
481
482 if self.multiproject and session["project_id"]:
483 # remove reference from project_read if there are more projects referencing it. If it last one,
484 # do not remove reference, but delete
485 other_projects_referencing = next((p for p in item_content["_admin"]["projects_read"]
486 if p not in session["project_id"]), None)
487
488 # check if there are projects referencing it (apart from ANY, that means, public)....
489 if other_projects_referencing:
490 # remove references but not delete
491 update_dict_pull = {"_admin.projects_read.{}".format(p): None for p in session["project_id"]}
492 update_dict_pull.update({"_admin.projects_write.{}".format(p): None for p in session["project_id"]})
493 self.db.set_one(self.topic, filter_q, update_dict=None, pull=update_dict_pull)
494 return None
495 else:
496 can_write = next((p for p in item_content["_admin"]["projects_write"] if p == "ANY" or
497 p in session["project_id"]), None)
498 if not can_write:
499 raise EngineException("You have not write permission to delete it",
500 http_code=HTTPStatus.UNAUTHORIZED)
501
502 # delete
503 self.db.del_one(self.topic, filter_q)
504 self.delete_extra(session, _id, item_content, not_send_msg=not_send_msg)
505 self._send_msg("deleted", {"_id": _id}, not_send_msg=not_send_msg)
506 return None
507
508 def edit(self, session, _id, indata=None, kwargs=None, content=None):
509 """
510 Change the content of an item
511 :param session: contains "username", "admin", "force", "public", "project_id", "set_project"
512 :param _id: server internal id
513 :param indata: contains the changes to apply
514 :param kwargs: modifies indata
515 :param content: original content of the item
516 :return: op_id: operation id if this is processed asynchronously, None otherwise
517 """
518 indata = self._remove_envelop(indata)
519
520 # Override descriptor with query string kwargs
521 if kwargs:
522 self._update_input_with_kwargs(indata, kwargs)
523 try:
524 if indata and session.get("set_project"):
525 raise EngineException("Cannot edit content and set to project (query string SET_PROJECT) at same time",
526 HTTPStatus.UNPROCESSABLE_ENTITY)
527 indata = self._validate_input_edit(indata, force=session["force"])
528
529 # TODO self._check_edition(session, indata, _id, force)
530 if not content:
531 content = self.show(session, _id)
532 deep_update_rfc7396(content, indata)
533
534 # To allow project addressing by name AS WELL AS _id. Get the _id, just in case the provided one is a name
535 _id = content.get("_id") or _id
536
537 self.check_conflict_on_edit(session, content, indata, _id=_id)
538 op_id = self.format_on_edit(content, indata)
539
540 self.db.replace(self.topic, _id, content)
541
542 indata.pop("_admin", None)
543 if op_id:
544 indata["op_id"] = op_id
545 indata["_id"] = _id
546 self._send_msg("edited", indata)
547 return op_id
548 except ValidationError as e:
549 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)