fixed database init version issue
[osm/NBI.git] / osm_nbi / engine.py
1 # -*- coding: utf-8 -*-
2
3 import dbmongo
4 import dbmemory
5 import fslocal
6 import msglocal
7 import msgkafka
8 import tarfile
9 import yaml
10 import json
11 import logging
12 from random import choice as random_choice
13 from uuid import uuid4
14 from hashlib import sha256, md5
15 from dbbase import DbException
16 from fsbase import FsException
17 from msgbase import MsgException
18 from http import HTTPStatus
19 from time import time
20 from copy import deepcopy
21 from validation import validate_input, ValidationError
22
23 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
24
25
26 class EngineException(Exception):
27
28 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
29 self.http_code = http_code
30 Exception.__init__(self, message)
31
32
33 def _deep_update(dict_to_change, dict_reference):
34 """
35 Modifies one dictionary with the information of the other following https://tools.ietf.org/html/rfc7396
36 :param dict_to_change: Ends modified
37 :param dict_reference: reference
38 :return: none
39 """
40
41 for k in dict_reference:
42 if dict_reference[k] is None: # None->Anything
43 if k in dict_to_change:
44 del dict_to_change[k]
45 elif not isinstance(dict_reference[k], dict): # NotDict->Anything
46 dict_to_change[k] = dict_reference[k]
47 elif k not in dict_to_change: # Dict->Empty
48 dict_to_change[k] = deepcopy(dict_reference[k])
49 _deep_update(dict_to_change[k], dict_reference[k])
50 elif isinstance(dict_to_change[k], dict): # Dict->Dict
51 _deep_update(dict_to_change[k], dict_reference[k])
52 else: # Dict->NotDict
53 dict_to_change[k] = deepcopy(dict_reference[k])
54 _deep_update(dict_to_change[k], dict_reference[k])
55
56 class Engine(object):
57
58 def __init__(self):
59 self.tokens = {}
60 self.db = None
61 self.fs = None
62 self.msg = None
63 self.config = None
64 self.logger = logging.getLogger("nbi.engine")
65
66 def start(self, config):
67 """
68 Connect to database, filesystem storage, and messaging
69 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
70 :return: None
71 """
72 self.config = config
73 try:
74 if not self.db:
75 if config["database"]["driver"] == "mongo":
76 self.db = dbmongo.DbMongo()
77 self.db.db_connect(config["database"])
78 elif config["database"]["driver"] == "memory":
79 self.db = dbmemory.DbMemory()
80 self.db.db_connect(config["database"])
81 else:
82 raise EngineException("Invalid configuration param '{}' at '[database]':'driver'".format(
83 config["database"]["driver"]))
84 if not self.fs:
85 if config["storage"]["driver"] == "local":
86 self.fs = fslocal.FsLocal()
87 self.fs.fs_connect(config["storage"])
88 else:
89 raise EngineException("Invalid configuration param '{}' at '[storage]':'driver'".format(
90 config["storage"]["driver"]))
91 if not self.msg:
92 if config["message"]["driver"] == "local":
93 self.msg = msglocal.MsgLocal()
94 self.msg.connect(config["message"])
95 elif config["message"]["driver"] == "kafka":
96 self.msg = msgkafka.MsgKafka()
97 self.msg.connect(config["message"])
98 else:
99 raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format(
100 config["storage"]["driver"]))
101 except (DbException, FsException, MsgException) as e:
102 raise EngineException(str(e), http_code=e.http_code)
103
104 def stop(self):
105 try:
106 if self.db:
107 self.db.db_disconnect()
108 if self.fs:
109 self.fs.fs_disconnect()
110 if self.fs:
111 self.fs.fs_disconnect()
112 except (DbException, FsException, MsgException) as e:
113 raise EngineException(str(e), http_code=e.http_code)
114
115 def authorize(self, token):
116 try:
117 if not token:
118 raise EngineException("Needed a token or Authorization http header",
119 http_code=HTTPStatus.UNAUTHORIZED)
120 if token not in self.tokens:
121 raise EngineException("Invalid token or Authorization http header",
122 http_code=HTTPStatus.UNAUTHORIZED)
123 session = self.tokens[token]
124 now = time()
125 if session["expires"] < now:
126 del self.tokens[token]
127 raise EngineException("Expired Token or Authorization http header",
128 http_code=HTTPStatus.UNAUTHORIZED)
129 return session
130 except EngineException:
131 if self.config["global"].get("test.user_not_authorized"):
132 return {"id": "fake-token-id-for-test",
133 "project_id": self.config["global"].get("test.project_not_authorized", "admin"),
134 "username": self.config["global"]["test.user_not_authorized"]}
135 else:
136 raise
137
138 def new_token(self, session, indata, remote):
139 now = time()
140 user_content = None
141
142 # Try using username/password
143 if indata.get("username"):
144 user_rows = self.db.get_list("users", {"username": indata.get("username")})
145 user_content = None
146 if user_rows:
147 user_content = user_rows[0]
148 salt = user_content["_admin"]["salt"]
149 shadow_password = sha256(indata.get("password", "").encode('utf-8') + salt.encode('utf-8')).hexdigest()
150 if shadow_password != user_content["password"]:
151 user_content = None
152 if not user_content:
153 raise EngineException("Invalid username/password", http_code=HTTPStatus.UNAUTHORIZED)
154 elif session:
155 user_rows = self.db.get_list("users", {"username": session["username"]})
156 if user_rows:
157 user_content = user_rows[0]
158 else:
159 raise EngineException("Invalid token", http_code=HTTPStatus.UNAUTHORIZED)
160 else:
161 raise EngineException("Provide credentials: username/password or Authorization Bearer token",
162 http_code=HTTPStatus.UNAUTHORIZED)
163
164 token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
165 for _ in range(0, 32))
166 if indata.get("project_id"):
167 project_id = indata.get("project_id")
168 if project_id not in user_content["projects"]:
169 raise EngineException("project {} not allowed for this user".format(project_id),
170 http_code=HTTPStatus.UNAUTHORIZED)
171 else:
172 project_id = user_content["projects"][0]
173 if project_id == "admin":
174 session_admin = True
175 else:
176 project = self.db.get_one("projects", {"_id": project_id})
177 session_admin = project.get("admin", False)
178 new_session = {"issued_at": now, "expires": now+3600,
179 "_id": token_id, "id": token_id, "project_id": project_id, "username": user_content["username"],
180 "remote_port": remote.port, "admin": session_admin}
181 if remote.name:
182 new_session["remote_host"] = remote.name
183 elif remote.ip:
184 new_session["remote_host"] = remote.ip
185
186 self.tokens[token_id] = new_session
187 return deepcopy(new_session)
188
189 def get_token_list(self, session):
190 token_list = []
191 for token_id, token_value in self.tokens.items():
192 if token_value["username"] == session["username"]:
193 token_list.append(deepcopy(token_value))
194 return token_list
195
196 def get_token(self, session, token_id):
197 token_value = self.tokens.get(token_id)
198 if not token_value:
199 raise EngineException("token not found", http_code=HTTPStatus.NOT_FOUND)
200 if token_value["username"] != session["username"] and not session["admin"]:
201 raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
202 return token_value
203
204 def del_token(self, token_id):
205 try:
206 del self.tokens[token_id]
207 return "token '{}' deleted".format(token_id)
208 except KeyError:
209 raise EngineException("Token '{}' not found".format(token_id), http_code=HTTPStatus.NOT_FOUND)
210
211 @staticmethod
212 def _remove_envelop(item, indata=None):
213 """
214 Obtain the useful data removing the envelop. It goes throw the vnfd or nsd catalog and returns the
215 vnfd or nsd content
216 :param item: can be vnfds, nsds, users, projects, userDefinedData (initial content of a vnfds, nsds
217 :param indata: Content to be inspected
218 :return: the useful part of indata
219 """
220 clean_indata = indata
221 if not indata:
222 return {}
223 if item == "vnfds":
224 if clean_indata.get('vnfd:vnfd-catalog'):
225 clean_indata = clean_indata['vnfd:vnfd-catalog']
226 elif clean_indata.get('vnfd-catalog'):
227 clean_indata = clean_indata['vnfd-catalog']
228 if clean_indata.get('vnfd'):
229 if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1:
230 raise EngineException("'vnfd' must be a list only one element")
231 clean_indata = clean_indata['vnfd'][0]
232 elif item == "nsds":
233 if clean_indata.get('nsd:nsd-catalog'):
234 clean_indata = clean_indata['nsd:nsd-catalog']
235 elif clean_indata.get('nsd-catalog'):
236 clean_indata = clean_indata['nsd-catalog']
237 if clean_indata.get('nsd'):
238 if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1:
239 raise EngineException("'nsd' must be a list only one element")
240 clean_indata = clean_indata['nsd'][0]
241 elif item == "userDefinedData":
242 if "userDefinedData" in indata:
243 clean_indata = clean_indata['userDefinedData']
244 return clean_indata
245
246 def _validate_new_data(self, session, item, indata, id=None):
247 if item == "users":
248 if not indata.get("username"):
249 raise EngineException("missing 'username'", HTTPStatus.UNPROCESSABLE_ENTITY)
250 if not indata.get("password"):
251 raise EngineException("missing 'password'", HTTPStatus.UNPROCESSABLE_ENTITY)
252 if not indata.get("projects"):
253 raise EngineException("missing 'projects'", HTTPStatus.UNPROCESSABLE_ENTITY)
254 # check username not exist
255 if self.db.get_one(item, {"username": indata.get("username")}, fail_on_empty=False, fail_on_more=False):
256 raise EngineException("username '{}' exist".format(indata["username"]), HTTPStatus.CONFLICT)
257 elif item == "projects":
258 if not indata.get("name"):
259 raise EngineException("missing 'name'")
260 # check name not exist
261 if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
262 raise EngineException("name '{}' exist".format(indata["name"]), HTTPStatus.CONFLICT)
263 elif item in ("vnfds", "nsds"):
264 filter = {"id": indata["id"]}
265 if id:
266 filter["_id.neq"] = id
267 # TODO add admin to filter, validate rights
268 self._add_read_filter(session, item, filter)
269 if self.db.get_one(item, filter, fail_on_empty=False):
270 raise EngineException("{} with id '{}' already exist for this tenant".format(item[:-1], indata["id"]),
271 HTTPStatus.CONFLICT)
272
273 # TODO validate with pyangbind
274 elif item == "userDefinedData":
275 # TODO validate userDefinedData is a keypair values
276 pass
277
278 elif item == "nsrs":
279 pass
280 elif item == "vims" or item == "sdns":
281 if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
282 raise EngineException("name '{}' already exist for {}".format(indata["name"], item),
283 HTTPStatus.CONFLICT)
284
285
286 def _format_new_data(self, session, item, indata, admin=None):
287 now = time()
288 if not "_admin" in indata:
289 indata["_admin"] = {}
290 indata["_admin"]["created"] = now
291 indata["_admin"]["modified"] = now
292 if item == "users":
293 _id = indata["username"]
294 salt = uuid4().hex
295 indata["_admin"]["salt"] = salt
296 indata["password"] = sha256(indata["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest()
297 elif item == "projects":
298 _id = indata["name"]
299 else:
300 _id = None
301 storage = None
302 if admin:
303 _id = admin.get("_id")
304 storage = admin.get("storage")
305 if not _id:
306 _id = str(uuid4())
307 if item in ("vnfds", "nsds"):
308 if not indata["_admin"].get("projects_read"):
309 indata["_admin"]["projects_read"] = [session["project_id"]]
310 if not indata["_admin"].get("projects_write"):
311 indata["_admin"]["projects_write"] = [session["project_id"]]
312 indata["_admin"]["onboardingState"] = "CREATED"
313 indata["_admin"]["operationalState"] = "DISABLED"
314 indata["_admin"]["usageSate"] = "NOT_IN_USE"
315 elif item in ("vims", "sdns"):
316 indata["_admin"]["operationalState"] = "PROCESSING"
317
318 if storage:
319 indata["_admin"]["storage"] = storage
320 indata["_id"] = _id
321
322 def upload_content(self, session, item, _id, indata, kwargs, headers):
323 """
324 Used for receiving content by chunks (with a transaction_id header and/or gzip file. It will store and extract)
325 :param session: session
326 :param item: can be nsds or vnfds
327 :param _id : the nsd,vnfd is already created, this is the id
328 :param indata: http body request
329 :param kwargs: user query string to override parameters. NOT USED
330 :param headers: http request headers
331 :return: True package has is completely uploaded or False if partial content has been uplodaed.
332 Raise exception on error
333 """
334 # Check that _id exist and it is valid
335 current_desc = self.get_item(session, item, _id)
336
337 content_range_text = headers.get("Content-Range")
338 expected_md5 = headers.get("Content-File-MD5")
339 compressed = None
340 content_type = headers.get("Content-Type")
341 if content_type and "application/gzip" in content_type or "application/x-gzip" in content_type or \
342 "application/zip" in content_type:
343 compressed = "gzip"
344 filename = headers.get("Content-Filename")
345 if not filename:
346 filename = "package.tar.gz" if compressed else "package"
347 # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266
348 file_pkg = None
349 error_text = ""
350 try:
351 if content_range_text:
352 content_range = content_range_text.replace("-", " ").replace("/", " ").split()
353 if content_range[0] != "bytes": # TODO check x<y not negative < total....
354 raise IndexError()
355 start = int(content_range[1])
356 end = int(content_range[2]) + 1
357 total = int(content_range[3])
358 else:
359 start = 0
360
361 if start:
362 if not self.fs.file_exists(_id, 'dir'):
363 raise EngineException("invalid Transaction-Id header", HTTPStatus.NOT_FOUND)
364 else:
365 self.fs.file_delete(_id, ignore_non_exist=True)
366 self.fs.mkdir(_id)
367
368 storage = self.fs.get_params()
369 storage["folder"] = _id
370
371 file_path = (_id, filename)
372 if self.fs.file_exists(file_path, 'file'):
373 file_size = self.fs.file_size(file_path)
374 else:
375 file_size = 0
376 if file_size != start:
377 raise EngineException("invalid Content-Range start sequence, expected '{}' but received '{}'".format(
378 file_size, start), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
379 file_pkg = self.fs.file_open(file_path, 'a+b')
380 if isinstance(indata, dict):
381 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
382 file_pkg.write(indata_text.encode(encoding="utf-8"))
383 else:
384 indata_len = 0
385 while True:
386 indata_text = indata.read(4096)
387 indata_len += len(indata_text)
388 if not indata_text:
389 break
390 file_pkg.write(indata_text)
391 if content_range_text:
392 if indata_len != end-start:
393 raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format(
394 start, end-1, indata_len), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
395 if end != total:
396 # TODO update to UPLOADING
397 return False
398
399 # PACKAGE UPLOADED
400 if expected_md5:
401 file_pkg.seek(0, 0)
402 file_md5 = md5()
403 chunk_data = file_pkg.read(1024)
404 while chunk_data:
405 file_md5.update(chunk_data)
406 chunk_data = file_pkg.read(1024)
407 if expected_md5 != file_md5.hexdigest():
408 raise EngineException("Error, MD5 mismatch", HTTPStatus.CONFLICT)
409 file_pkg.seek(0, 0)
410 if compressed == "gzip":
411 tar = tarfile.open(mode='r', fileobj=file_pkg)
412 descriptor_file_name = None
413 for tarinfo in tar:
414 tarname = tarinfo.name
415 tarname_path = tarname.split("/")
416 if not tarname_path[0] or ".." in tarname_path: # if start with "/" means absolute path
417 raise EngineException("Absolute path or '..' are not allowed for package descriptor tar.gz")
418 if len(tarname_path) == 1 and not tarinfo.isdir():
419 raise EngineException("All files must be inside a dir for package descriptor tar.gz")
420 if tarname.endswith(".yaml") or tarname.endswith(".json") or tarname.endswith(".yml"):
421 storage["pkg-dir"] = tarname_path[0]
422 if len(tarname_path) == 2:
423 if descriptor_file_name:
424 raise EngineException("Found more than one descriptor file at package descriptor tar.gz")
425 descriptor_file_name = tarname
426 if not descriptor_file_name:
427 raise EngineException("Not found any descriptor file at package descriptor tar.gz")
428 storage["descriptor"] = descriptor_file_name
429 storage["zipfile"] = filename
430 self.fs.file_extract(tar, _id)
431 with self.fs.file_open((_id, descriptor_file_name), "r") as descriptor_file:
432 content = descriptor_file.read()
433 else:
434 content = file_pkg.read()
435 storage["descriptor"] = descriptor_file_name = filename
436
437 if descriptor_file_name.endswith(".json"):
438 error_text = "Invalid json format "
439 indata = json.load(content)
440 else:
441 error_text = "Invalid yaml format "
442 indata = yaml.load(content)
443
444 current_desc["_admin"]["storage"] = storage
445 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
446 current_desc["_admin"]["operationalState"] = "ENABLED"
447
448 self._edit_item(session, item, _id, current_desc, indata, kwargs)
449 # TODO if descriptor has changed because kwargs update content and remove cached zip
450 # TODO if zip is not present creates one
451 return True
452
453 except EngineException:
454 raise
455 except IndexError:
456 raise EngineException("invalid Content-Range header format. Expected 'bytes start-end/total'",
457 HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
458 except IOError as e:
459 raise EngineException("invalid upload transaction sequence: '{}'".format(e), HTTPStatus.BAD_REQUEST)
460 except (ValueError, yaml.YAMLError) as e:
461 raise EngineException(error_text + str(e))
462 finally:
463 if file_pkg:
464 file_pkg.close()
465
466 def new_nsr(self, session, ns_request):
467 """
468 Creates a new nsr into database
469 :param session: contains the used login username and working project
470 :param ns_request: params to be used for the nsr
471 :return: nsr descriptor to be stored at database and the _id
472 """
473
474 # look for nsr
475 nsd = self.get_item(session, "nsds", ns_request["nsdId"])
476 _id = str(uuid4())
477 nsr_descriptor = {
478 "name": ns_request["nsName"],
479 "name-ref": ns_request["nsName"],
480 "short-name": ns_request["nsName"],
481 "admin-status": "ENABLED",
482 "nsd": nsd,
483 "datacenter": ns_request["vimAccountId"],
484 "resource-orchestrator": "osmopenmano",
485 "description": ns_request.get("nsDescription", ""),
486 "constituent-vnfr-ref": ["TODO datacenter-id, vnfr-id"],
487
488 "operational-status": "init", # typedef ns-operational-
489 "config-status": "init", # typedef config-states
490 "detailed-status": "scheduled",
491
492 "orchestration-progress": {}, # {"networks": {"active": 0, "total": 0}, "vms": {"active": 0, "total": 0}},
493
494 "crete-time": time(),
495 "nsd-name-ref": nsd["name"],
496 "operational-events": [], # "id", "timestamp", "description", "event",
497 "nsd-ref": nsd["id"],
498 "ns-instance-config-ref": _id,
499 "id": _id,
500
501 # "input-parameter": xpath, value,
502 "ssh-authorized-key": ns_request.get("key-pair-ref"),
503 }
504 ns_request["nsr_id"] = _id
505 return nsr_descriptor, _id
506
507 def new_item(self, session, item, indata={}, kwargs=None, headers={}):
508 """
509 Creates a new entry into database. For nsds and vnfds it creates an almost empty DISABLED entry,
510 that must be completed with a call to method upload_content
511 :param session: contains the used login username and working project
512 :param item: it can be: users, projects, vims, sdns, nsrs, nsds, vnfds
513 :param indata: data to be inserted
514 :param kwargs: used to override the indata descriptor
515 :param headers: http request headers
516 :return: _id, transaction_id: identity of the inserted data. or transaction_id if Content-Range is used
517 """
518
519 transaction = None
520
521 item_envelop = item
522 if item in ("nsds", "vnfds"):
523 item_envelop = "userDefinedData"
524 content = self._remove_envelop(item_envelop, indata)
525
526 # Override descriptor with query string kwargs
527 if kwargs:
528 try:
529 for k, v in kwargs.items():
530 update_content = content
531 kitem_old = None
532 klist = k.split(".")
533 for kitem in klist:
534 if kitem_old is not None:
535 update_content = update_content[kitem_old]
536 if isinstance(update_content, dict):
537 kitem_old = kitem
538 elif isinstance(update_content, list):
539 kitem_old = int(kitem)
540 else:
541 raise EngineException(
542 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
543 update_content[kitem_old] = v
544 except KeyError:
545 raise EngineException(
546 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
547 except ValueError:
548 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
549 k, kitem))
550 except IndexError:
551 raise EngineException(
552 "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
553 try:
554 validate_input(content, item, new=True)
555 except ValidationError as e:
556 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
557
558 if not indata and item not in ("nsds", "vnfds"):
559 raise EngineException("Empty payload")
560
561 if item == "nsrs":
562 # in this case the imput descriptor is not the data to be stored
563 ns_request = content
564 content, _id = self.new_nsr(session, ns_request)
565 transaction = {"_id": _id}
566
567 self._validate_new_data(session, item_envelop, content)
568 if item in ("nsds", "vnfds"):
569 content = {"_admin": {"userDefinedData": content}}
570 self._format_new_data(session, item, content, transaction)
571 _id = self.db.create(item, content)
572 if item == "nsrs":
573 self.msg.write("ns", "create", _id)
574 elif item == "vims":
575 msg_data = self.db.get_one(item, {"_id": _id})
576 msg_data.pop("_admin", None)
577 self.msg.write("vim_account", "create", msg_data)
578 elif item == "sdns":
579 msg_data = self.db.get_one(item, {"_id": _id})
580 msg_data.pop("_admin", None)
581 self.msg.write("sdn", "create", msg_data)
582 return _id
583
584 def _add_read_filter(self, session, item, filter):
585 if session["project_id"] == "admin": # allows all
586 return filter
587 if item == "users":
588 filter["username"] = session["username"]
589 elif item == "vnfds" or item == "nsds":
590 filter["_admin.projects_read.cont"] = ["ANY", session["project_id"]]
591
592 def _add_delete_filter(self, session, item, filter):
593 if session["project_id"] != "admin" and item in ("users", "projects"):
594 raise EngineException("Only admin users can perform this task", http_code=HTTPStatus.FORBIDDEN)
595 if item == "users":
596 if filter.get("_id") == session["username"] or filter.get("username") == session["username"]:
597 raise EngineException("You cannot delete your own user", http_code=HTTPStatus.CONFLICT)
598 elif item == "project":
599 if filter.get("_id") == session["project_id"]:
600 raise EngineException("You cannot delete your own project", http_code=HTTPStatus.CONFLICT)
601 elif item in ("vnfds", "nsds") and session["project_id"] != "admin":
602 filter["_admin.projects_write.cont"] = ["ANY", session["project_id"]]
603
604 def get_file(self, session, item, _id, path=None, accept_header=None):
605 """
606 Return the file content of a vnfd or nsd
607 :param session: contains the used login username and working project
608 :param item: it can be vnfds or nsds
609 :param _id: Identity of the vnfd, ndsd
610 :param path: artifact path or "$DESCRIPTOR" or None
611 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
612 :return: opened file or raises an exception
613 """
614 accept_text = accept_zip = False
615 if accept_header:
616 if 'text/plain' in accept_header or '*/*' in accept_header:
617 accept_text = True
618 if 'application/zip' in accept_header or '*/*' in accept_header:
619 accept_zip = True
620 if not accept_text and not accept_zip:
621 raise EngineException("provide request header 'Accept' with 'application/zip' or 'text/plain'",
622 http_code=HTTPStatus.NOT_ACCEPTABLE)
623
624 content = self.get_item(session, item, _id)
625 if content["_admin"]["onboardingState"] != "ONBOARDED":
626 raise EngineException("Cannot get content because this resource is not at 'ONBOARDED' state. "
627 "onboardingState is {}".format(content["_admin"]["onboardingState"]),
628 http_code=HTTPStatus.CONFLICT)
629 storage = content["_admin"]["storage"]
630 if path is not None and path != "$DESCRIPTOR": # artifacts
631 if not storage.get('pkg-dir'):
632 raise EngineException("Packages does not contains artifacts", http_code=HTTPStatus.BAD_REQUEST)
633 if self.fs.file_exists((storage['folder'], storage['pkg-dir'], *path), 'dir'):
634 folder_content = self.fs.dir_ls((storage['folder'], storage['pkg-dir'], *path))
635 return folder_content, "text/plain"
636 # TODO manage folders in http
637 else:
638 return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"), \
639 "application/octet-stream"
640
641 # pkgtype accept ZIP TEXT -> result
642 # manyfiles yes X -> zip
643 # no yes -> error
644 # onefile yes no -> zip
645 # X yes -> text
646
647 if accept_text and (not storage.get('pkg-dir') or path == "$DESCRIPTOR"):
648 return self.fs.file_open((storage['folder'], storage['descriptor']), "r"), "text/plain"
649 elif storage.get('pkg-dir') and not accept_zip:
650 raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'"
651 "Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE)
652 else:
653 if not storage.get('zipfile'):
654 # TODO generate zipfile if not present
655 raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in future versions"
656 "", http_code=HTTPStatus.NOT_ACCEPTABLE)
657 return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), "application/zip"
658
659 def get_item_list(self, session, item, filter={}):
660 """
661 Get a list of items
662 :param session: contains the used login username and working project
663 :param item: it can be: users, projects, vnfds, nsds, ...
664 :param filter: filter of data to be applied
665 :return: The list, it can be empty if no one match the filter.
666 """
667 # TODO add admin to filter, validate rights
668 # TODO transform data for SOL005 URL requests. Transform filtering
669 # TODO implement "field-type" query string SOL005
670
671 self._add_read_filter(session, item, filter)
672 return self.db.get_list(item, filter)
673
674 def get_item(self, session, item, _id):
675 """
676 Get complete information on an items
677 :param session: contains the used login username and working project
678 :param item: it can be: users, projects, vnfds, nsds,
679 :param _id: server id of the item
680 :return: dictionary, raise exception if not found.
681 """
682 database_item = item
683 filter = {"_id": _id}
684 # TODO add admin to filter, validate rights
685 # TODO transform data for SOL005 URL requests
686 self._add_read_filter(session, item, filter)
687 return self.db.get_one(item, filter)
688
689 def del_item_list(self, session, item, filter={}):
690 """
691 Delete a list of items
692 :param session: contains the used login username and working project
693 :param item: it can be: users, projects, vnfds, nsds, ...
694 :param filter: filter of data to be applied
695 :return: The deleted list, it can be empty if no one match the filter.
696 """
697 # TODO add admin to filter, validate rights
698 self._add_read_filter(session, item, filter)
699 return self.db.del_list(item, filter)
700
701 def del_item(self, session, item, _id):
702 """
703 Get complete information on an items
704 :param session: contains the used login username and working project
705 :param item: it can be: users, projects, vnfds, nsds, ...
706 :param _id: server id of the item
707 :return: dictionary, raise exception if not found.
708 """
709 # TODO add admin to filter, validate rights
710 # data = self.get_item(item, _id)
711 filter = {"_id": _id}
712 self._add_delete_filter(session, item, filter)
713
714 if item in ("nsrs", "vims", "sdns"):
715 desc = self.db.get_one(item, filter)
716 desc["_admin"]["to_delete"] = True
717 self.db.replace(item, _id, desc) # TODO change to set_one
718 if item == "nsrs":
719 self.msg.write("ns", "delete", _id)
720 elif item == "vims":
721 self.msg.write("vim_account", "delete", {"_id": _id})
722 elif item == "sdns":
723 self.msg.write("sdn", "delete", {"_id": _id})
724 return {"deleted": 1} # TODO indicate an offline operation to return 202 ACCEPTED
725
726 v = self.db.del_one(item, filter)
727 self.fs.file_delete(_id, ignore_non_exist=True)
728 return v
729
730 def prune(self):
731 """
732 Prune database not needed content
733 :return: None
734 """
735 return self.db.del_list("nsrs", {"_admin.to_delete": True})
736
737 def create_admin(self):
738 """
739 Creates a new user admin/admin into database if database is empty. Useful for initialization
740 :return: _id identity of the inserted data, or None
741 """
742 users = self.db.get_one("users", fail_on_empty=False, fail_on_more=False)
743 if users:
744 return None
745 # raise EngineException("Unauthorized. Database users is not empty", HTTPStatus.UNAUTHORIZED)
746 indata = {"username": "admin", "password": "admin", "projects": ["admin"]}
747 fake_session = {"project_id": "admin", "username": "admin"}
748 self._format_new_data(fake_session, "users", indata)
749 _id = self.db.create("users", indata)
750 return _id
751
752 def init_db(self, target_version='1.0'):
753 """
754 Init database if empty. If not empty it checks that database version is ok.
755 If empty, it creates a new user admin/admin at 'users' and a new entry at 'version'
756 :return: None if ok, exception if error or if the version is different.
757 """
758 version = self.db.get_one("version", fail_on_empty=False, fail_on_more=False)
759 if not version:
760 # create user admin
761 self.create_admin()
762 # create database version
763 version_data = {
764 "_id": '1.0', # version text
765 "version": 1000, # version number
766 "date": "2018-04-12", # version date
767 "description": "initial design", # changes in this version
768 'status': 'ENABLED' # ENABLED, DISABLED (migration in process), ERROR,
769 }
770 self.db.create("version", version_data)
771 elif version["_id"] != target_version:
772 # TODO implement migration process
773 raise EngineException("Wrong database version '{}'. Expected '{}'".format(
774 version["_id"], target_version), HTTPStatus.INTERNAL_SERVER_ERROR)
775 elif version["status"] != 'ENABLED':
776 raise EngineException("Wrong database status '{}'".format(
777 version["status"]), HTTPStatus.INTERNAL_SERVER_ERROR)
778 return
779
780 def _edit_item(self, session, item, id, content, indata={}, kwargs=None):
781 if indata:
782 indata = self._remove_envelop(item, indata)
783
784 # Override descriptor with query string kwargs
785 if kwargs:
786 try:
787 for k, v in kwargs.items():
788 update_content = indata
789 kitem_old = None
790 klist = k.split(".")
791 for kitem in klist:
792 if kitem_old is not None:
793 update_content = update_content[kitem_old]
794 if isinstance(update_content, dict):
795 kitem_old = kitem
796 elif isinstance(update_content, list):
797 kitem_old = int(kitem)
798 else:
799 raise EngineException(
800 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
801 update_content[kitem_old] = v
802 except KeyError:
803 raise EngineException(
804 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
805 except ValueError:
806 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
807 k, kitem))
808 except IndexError:
809 raise EngineException(
810 "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
811 try:
812 validate_input(content, item, new=False)
813 except ValidationError as e:
814 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
815
816 _deep_update(content, indata)
817 self._validate_new_data(session, item, content, id)
818 # self._format_new_data(session, item, content)
819 self.db.replace(item, id, content)
820 if item in ("vims", "sdns"):
821 indata.pop("_admin", None)
822 indata["_id"] = id
823 if item == "vims":
824 self.msg.write("vim_account", "edit", indata)
825 elif item == "sdns":
826 self.msg.write("sdn", "edit", indata)
827 return id
828
829 def edit_item(self, session, item, _id, indata={}, kwargs=None):
830 """
831 Update an existing entry at database
832 :param session: contains the used login username and working project
833 :param item: it can be: users, projects, vnfds, nsds, ...
834 :param _id: identifier to be updated
835 :param indata: data to be inserted
836 :param kwargs: used to override the indata descriptor
837 :return: dictionary, raise exception if not found.
838 """
839
840 content = self.get_item(session, item, _id)
841 return self._edit_item(session, item, _id, content, indata, kwargs)
842
843
844