Package generation fixing
[osm/NBI.git] / osm_nbi / engine.py
1 # -*- coding: utf-8 -*-
2
3 import dbmongo
4 import dbmemory
5 import fslocal
6 import msglocal
7 import msgkafka
8 import tarfile
9 import yaml
10 import json
11 import logging
12 from random import choice as random_choice
13 from uuid import uuid4
14 from hashlib import sha256, md5
15 from dbbase import DbException
16 from fsbase import FsException
17 from msgbase import MsgException
18 from http import HTTPStatus
19 from time import time
20 from copy import deepcopy
21 from validation import validate_input, ValidationError
22
23 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
24
25
26 class EngineException(Exception):
27
28 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
29 self.http_code = http_code
30 Exception.__init__(self, message)
31
32
33 def _deep_update(dict_to_change, dict_reference):
34 """
35 Modifies one dictionary with the information of the other following https://tools.ietf.org/html/rfc7396
36 :param dict_to_change: Ends modified
37 :param dict_reference: reference
38 :return: none
39 """
40
41 for k in dict_reference:
42 if dict_reference[k] is None: # None->Anything
43 if k in dict_to_change:
44 del dict_to_change[k]
45 elif not isinstance(dict_reference[k], dict): # NotDict->Anything
46 dict_to_change[k] = dict_reference[k]
47 elif k not in dict_to_change: # Dict->Empty
48 dict_to_change[k] = deepcopy(dict_reference[k])
49 _deep_update(dict_to_change[k], dict_reference[k])
50 elif isinstance(dict_to_change[k], dict): # Dict->Dict
51 _deep_update(dict_to_change[k], dict_reference[k])
52 else: # Dict->NotDict
53 dict_to_change[k] = deepcopy(dict_reference[k])
54 _deep_update(dict_to_change[k], dict_reference[k])
55
56 class Engine(object):
57
58 def __init__(self):
59 self.tokens = {}
60 self.db = None
61 self.fs = None
62 self.msg = None
63 self.config = None
64 self.logger = logging.getLogger("nbi.engine")
65
66 def start(self, config):
67 """
68 Connect to database, filesystem storage, and messaging
69 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
70 :return: None
71 """
72 self.config = config
73 try:
74 if not self.db:
75 if config["database"]["driver"] == "mongo":
76 self.db = dbmongo.DbMongo()
77 self.db.db_connect(config["database"])
78 elif config["database"]["driver"] == "memory":
79 self.db = dbmemory.DbMemory()
80 self.db.db_connect(config["database"])
81 else:
82 raise EngineException("Invalid configuration param '{}' at '[database]':'driver'".format(
83 config["database"]["driver"]))
84 if not self.fs:
85 if config["storage"]["driver"] == "local":
86 self.fs = fslocal.FsLocal()
87 self.fs.fs_connect(config["storage"])
88 else:
89 raise EngineException("Invalid configuration param '{}' at '[storage]':'driver'".format(
90 config["storage"]["driver"]))
91 if not self.msg:
92 if config["message"]["driver"] == "local":
93 self.msg = msglocal.MsgLocal()
94 self.msg.connect(config["message"])
95 elif config["message"]["driver"] == "kafka":
96 self.msg = msgkafka.MsgKafka()
97 self.msg.connect(config["message"])
98 else:
99 raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format(
100 config["storage"]["driver"]))
101 except (DbException, FsException, MsgException) as e:
102 raise EngineException(str(e), http_code=e.http_code)
103
104 def stop(self):
105 try:
106 if self.db:
107 self.db.db_disconnect()
108 if self.fs:
109 self.fs.fs_disconnect()
110 if self.fs:
111 self.fs.fs_disconnect()
112 except (DbException, FsException, MsgException) as e:
113 raise EngineException(str(e), http_code=e.http_code)
114
115 def authorize(self, token):
116 try:
117 if not token:
118 raise EngineException("Needed a token or Authorization http header",
119 http_code=HTTPStatus.UNAUTHORIZED)
120 if token not in self.tokens:
121 raise EngineException("Invalid token or Authorization http header",
122 http_code=HTTPStatus.UNAUTHORIZED)
123 session = self.tokens[token]
124 now = time()
125 if session["expires"] < now:
126 del self.tokens[token]
127 raise EngineException("Expired Token or Authorization http header",
128 http_code=HTTPStatus.UNAUTHORIZED)
129 return session
130 except EngineException:
131 if self.config["global"].get("test.user_not_authorized"):
132 return {"id": "fake-token-id-for-test",
133 "project_id": self.config["global"].get("test.project_not_authorized", "admin"),
134 "username": self.config["global"]["test.user_not_authorized"]}
135 else:
136 raise
137
138 def new_token(self, session, indata, remote):
139 now = time()
140 user_content = None
141
142 # Try using username/password
143 if indata.get("username"):
144 user_rows = self.db.get_list("users", {"username": indata.get("username")})
145 user_content = None
146 if user_rows:
147 user_content = user_rows[0]
148 salt = user_content["_admin"]["salt"]
149 shadow_password = sha256(indata.get("password", "").encode('utf-8') + salt.encode('utf-8')).hexdigest()
150 if shadow_password != user_content["password"]:
151 user_content = None
152 if not user_content:
153 raise EngineException("Invalid username/password", http_code=HTTPStatus.UNAUTHORIZED)
154 elif session:
155 user_rows = self.db.get_list("users", {"username": session["username"]})
156 if user_rows:
157 user_content = user_rows[0]
158 else:
159 raise EngineException("Invalid token", http_code=HTTPStatus.UNAUTHORIZED)
160 else:
161 raise EngineException("Provide credentials: username/password or Authorization Bearer token",
162 http_code=HTTPStatus.UNAUTHORIZED)
163
164 token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
165 for _ in range(0, 32))
166 if indata.get("project_id"):
167 project_id = indata.get("project_id")
168 if project_id not in user_content["projects"]:
169 raise EngineException("project {} not allowed for this user".format(project_id),
170 http_code=HTTPStatus.UNAUTHORIZED)
171 else:
172 project_id = user_content["projects"][0]
173 if project_id == "admin":
174 session_admin = True
175 else:
176 project = self.db.get_one("projects", {"_id": project_id})
177 session_admin = project.get("admin", False)
178 new_session = {"issued_at": now, "expires": now+3600,
179 "_id": token_id, "id": token_id, "project_id": project_id, "username": user_content["username"],
180 "remote_port": remote.port, "admin": session_admin}
181 if remote.name:
182 new_session["remote_host"] = remote.name
183 elif remote.ip:
184 new_session["remote_host"] = remote.ip
185
186 self.tokens[token_id] = new_session
187 return deepcopy(new_session)
188
189 def get_token_list(self, session):
190 token_list = []
191 for token_id, token_value in self.tokens.items():
192 if token_value["username"] == session["username"]:
193 token_list.append(deepcopy(token_value))
194 return token_list
195
196 def get_token(self, session, token_id):
197 token_value = self.tokens.get(token_id)
198 if not token_value:
199 raise EngineException("token not found", http_code=HTTPStatus.NOT_FOUND)
200 if token_value["username"] != session["username"] and not session["admin"]:
201 raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
202 return token_value
203
204 def del_token(self, token_id):
205 try:
206 del self.tokens[token_id]
207 return "token '{}' deleted".format(token_id)
208 except KeyError:
209 raise EngineException("Token '{}' not found".format(token_id), http_code=HTTPStatus.NOT_FOUND)
210
211 @staticmethod
212 def _remove_envelop(item, indata=None):
213 """
214 Obtain the useful data removing the envelop. It goes throw the vnfd or nsd catalog and returns the
215 vnfd or nsd content
216 :param item: can be vnfds, nsds, users, projects, userDefinedData (initial content of a vnfds, nsds
217 :param indata: Content to be inspected
218 :return: the useful part of indata
219 """
220 clean_indata = indata
221 if not indata:
222 return {}
223 if item == "vnfds":
224 if clean_indata.get('vnfd:vnfd-catalog'):
225 clean_indata = clean_indata['vnfd:vnfd-catalog']
226 elif clean_indata.get('vnfd-catalog'):
227 clean_indata = clean_indata['vnfd-catalog']
228 if clean_indata.get('vnfd'):
229 if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1:
230 raise EngineException("'vnfd' must be a list only one element")
231 clean_indata = clean_indata['vnfd'][0]
232 elif item == "nsds":
233 if clean_indata.get('nsd:nsd-catalog'):
234 clean_indata = clean_indata['nsd:nsd-catalog']
235 elif clean_indata.get('nsd-catalog'):
236 clean_indata = clean_indata['nsd-catalog']
237 if clean_indata.get('nsd'):
238 if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1:
239 raise EngineException("'nsd' must be a list only one element")
240 clean_indata = clean_indata['nsd'][0]
241 elif item == "userDefinedData":
242 if "userDefinedData" in indata:
243 clean_indata = clean_indata['userDefinedData']
244 return clean_indata
245
246 def _validate_new_data(self, session, item, indata, id=None):
247 if item == "users":
248 if not indata.get("username"):
249 raise EngineException("missing 'username'", HTTPStatus.UNPROCESSABLE_ENTITY)
250 if not indata.get("password"):
251 raise EngineException("missing 'password'", HTTPStatus.UNPROCESSABLE_ENTITY)
252 if not indata.get("projects"):
253 raise EngineException("missing 'projects'", HTTPStatus.UNPROCESSABLE_ENTITY)
254 # check username not exist
255 if self.db.get_one(item, {"username": indata.get("username")}, fail_on_empty=False, fail_on_more=False):
256 raise EngineException("username '{}' exist".format(indata["username"]), HTTPStatus.CONFLICT)
257 elif item == "projects":
258 if not indata.get("name"):
259 raise EngineException("missing 'name'")
260 # check name not exist
261 if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
262 raise EngineException("name '{}' exist".format(indata["name"]), HTTPStatus.CONFLICT)
263 elif item in ("vnfds", "nsds"):
264 filter = {"id": indata["id"]}
265 if id:
266 filter["_id.neq"] = id
267 # TODO add admin to filter, validate rights
268 self._add_read_filter(session, item, filter)
269 if self.db.get_one(item, filter, fail_on_empty=False):
270 raise EngineException("{} with id '{}' already exist for this tenant".format(item[:-1], indata["id"]),
271 HTTPStatus.CONFLICT)
272
273 # TODO validate with pyangbind
274 elif item == "userDefinedData":
275 # TODO validate userDefinedData is a keypair values
276 pass
277
278 elif item == "nsrs":
279 pass
280 elif item == "vims" or item == "sdns":
281 if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
282 raise EngineException("name '{}' already exist for {}".format(indata["name"], item),
283 HTTPStatus.CONFLICT)
284
285 def _format_new_data(self, session, item, indata):
286 now = time()
287 if not "_admin" in indata:
288 indata["_admin"] = {}
289 indata["_admin"]["created"] = now
290 indata["_admin"]["modified"] = now
291 if item == "users":
292 indata["_id"] = indata["username"]
293 salt = uuid4().hex
294 indata["_admin"]["salt"] = salt
295 indata["password"] = sha256(indata["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest()
296 elif item == "projects":
297 indata["_id"] = indata["name"]
298 else:
299 if not indata.get("_id"):
300 indata["_id"] = str(uuid4())
301 if item in ("vnfds", "nsds", "nsrs"):
302 if not indata["_admin"].get("projects_read"):
303 indata["_admin"]["projects_read"] = [session["project_id"]]
304 if not indata["_admin"].get("projects_write"):
305 indata["_admin"]["projects_write"] = [session["project_id"]]
306 if item in ("vnfds", "nsds"):
307 indata["_admin"]["onboardingState"] = "CREATED"
308 indata["_admin"]["operationalState"] = "DISABLED"
309 indata["_admin"]["usageSate"] = "NOT_IN_USE"
310 if item == "nsrs":
311 indata["_admin"]["nsState"] = "NOT_INSTANTIATED"
312 if item in ("vims", "sdns"):
313 indata["_admin"]["operationalState"] = "PROCESSING"
314
315 def upload_content(self, session, item, _id, indata, kwargs, headers):
316 """
317 Used for receiving content by chunks (with a transaction_id header and/or gzip file. It will store and extract)
318 :param session: session
319 :param item: can be nsds or vnfds
320 :param _id : the nsd,vnfd is already created, this is the id
321 :param indata: http body request
322 :param kwargs: user query string to override parameters. NOT USED
323 :param headers: http request headers
324 :return: True package has is completely uploaded or False if partial content has been uplodaed.
325 Raise exception on error
326 """
327 # Check that _id exist and it is valid
328 current_desc = self.get_item(session, item, _id)
329
330 content_range_text = headers.get("Content-Range")
331 expected_md5 = headers.get("Content-File-MD5")
332 compressed = None
333 content_type = headers.get("Content-Type")
334 if content_type and "application/gzip" in content_type or "application/x-gzip" in content_type or \
335 "application/zip" in content_type:
336 compressed = "gzip"
337 filename = headers.get("Content-Filename")
338 if not filename:
339 filename = "package.tar.gz" if compressed else "package"
340 # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266
341 file_pkg = None
342 error_text = ""
343 try:
344 if content_range_text:
345 content_range = content_range_text.replace("-", " ").replace("/", " ").split()
346 if content_range[0] != "bytes": # TODO check x<y not negative < total....
347 raise IndexError()
348 start = int(content_range[1])
349 end = int(content_range[2]) + 1
350 total = int(content_range[3])
351 else:
352 start = 0
353
354 if start:
355 if not self.fs.file_exists(_id, 'dir'):
356 raise EngineException("invalid Transaction-Id header", HTTPStatus.NOT_FOUND)
357 else:
358 self.fs.file_delete(_id, ignore_non_exist=True)
359 self.fs.mkdir(_id)
360
361 storage = self.fs.get_params()
362 storage["folder"] = _id
363
364 file_path = (_id, filename)
365 if self.fs.file_exists(file_path, 'file'):
366 file_size = self.fs.file_size(file_path)
367 else:
368 file_size = 0
369 if file_size != start:
370 raise EngineException("invalid Content-Range start sequence, expected '{}' but received '{}'".format(
371 file_size, start), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
372 file_pkg = self.fs.file_open(file_path, 'a+b')
373 if isinstance(indata, dict):
374 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
375 file_pkg.write(indata_text.encode(encoding="utf-8"))
376 else:
377 indata_len = 0
378 while True:
379 indata_text = indata.read(4096)
380 indata_len += len(indata_text)
381 if not indata_text:
382 break
383 file_pkg.write(indata_text)
384 if content_range_text:
385 if indata_len != end-start:
386 raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format(
387 start, end-1, indata_len), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
388 if end != total:
389 # TODO update to UPLOADING
390 return False
391
392 # PACKAGE UPLOADED
393 if expected_md5:
394 file_pkg.seek(0, 0)
395 file_md5 = md5()
396 chunk_data = file_pkg.read(1024)
397 while chunk_data:
398 file_md5.update(chunk_data)
399 chunk_data = file_pkg.read(1024)
400 if expected_md5 != file_md5.hexdigest():
401 raise EngineException("Error, MD5 mismatch", HTTPStatus.CONFLICT)
402 file_pkg.seek(0, 0)
403 if compressed == "gzip":
404 tar = tarfile.open(mode='r', fileobj=file_pkg)
405 descriptor_file_name = None
406 for tarinfo in tar:
407 tarname = tarinfo.name
408 tarname_path = tarname.split("/")
409 if not tarname_path[0] or ".." in tarname_path: # if start with "/" means absolute path
410 raise EngineException("Absolute path or '..' are not allowed for package descriptor tar.gz")
411 if len(tarname_path) == 1 and not tarinfo.isdir():
412 raise EngineException("All files must be inside a dir for package descriptor tar.gz")
413 if tarname.endswith(".yaml") or tarname.endswith(".json") or tarname.endswith(".yml"):
414 storage["pkg-dir"] = tarname_path[0]
415 if len(tarname_path) == 2:
416 if descriptor_file_name:
417 raise EngineException("Found more than one descriptor file at package descriptor tar.gz")
418 descriptor_file_name = tarname
419 if not descriptor_file_name:
420 raise EngineException("Not found any descriptor file at package descriptor tar.gz")
421 storage["descriptor"] = descriptor_file_name
422 storage["zipfile"] = filename
423 self.fs.file_extract(tar, _id)
424 with self.fs.file_open((_id, descriptor_file_name), "r") as descriptor_file:
425 content = descriptor_file.read()
426 else:
427 content = file_pkg.read()
428 storage["descriptor"] = descriptor_file_name = filename
429
430 if descriptor_file_name.endswith(".json"):
431 error_text = "Invalid json format "
432 indata = json.load(content)
433 else:
434 error_text = "Invalid yaml format "
435 indata = yaml.load(content)
436
437 current_desc["_admin"]["storage"] = storage
438 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
439 current_desc["_admin"]["operationalState"] = "ENABLED"
440
441 self._edit_item(session, item, _id, current_desc, indata, kwargs)
442 # TODO if descriptor has changed because kwargs update content and remove cached zip
443 # TODO if zip is not present creates one
444 return True
445
446 except EngineException:
447 raise
448 except IndexError:
449 raise EngineException("invalid Content-Range header format. Expected 'bytes start-end/total'",
450 HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
451 except IOError as e:
452 raise EngineException("invalid upload transaction sequence: '{}'".format(e), HTTPStatus.BAD_REQUEST)
453 except tarfile.ReadError as e:
454 raise EngineException("invalid file content {}".format(e), HTTPStatus.BAD_REQUEST)
455 except (ValueError, yaml.YAMLError) as e:
456 raise EngineException(error_text + str(e))
457 finally:
458 if file_pkg:
459 file_pkg.close()
460
461 def new_nsr(self, session, ns_request):
462 """
463 Creates a new nsr into database
464 :param session: contains the used login username and working project
465 :param ns_request: params to be used for the nsr
466 :return: nsr descriptor to be stored at database and the _id
467 """
468
469 # look for nsr
470 nsd = self.get_item(session, "nsds", ns_request["nsdId"])
471 _id = str(uuid4())
472 nsr_descriptor = {
473 "name": ns_request["nsName"],
474 "name-ref": ns_request["nsName"],
475 "short-name": ns_request["nsName"],
476 "admin-status": "ENABLED",
477 "nsd": nsd,
478 "datacenter": ns_request["vimAccountId"],
479 "resource-orchestrator": "osmopenmano",
480 "description": ns_request.get("nsDescription", ""),
481 "constituent-vnfr-ref": ["TODO datacenter-id, vnfr-id"],
482
483 "operational-status": "init", # typedef ns-operational-
484 "config-status": "init", # typedef config-states
485 "detailed-status": "scheduled",
486
487 "orchestration-progress": {}, # {"networks": {"active": 0, "total": 0}, "vms": {"active": 0, "total": 0}},
488
489 "crete-time": time(),
490 "nsd-name-ref": nsd["name"],
491 "operational-events": [], # "id", "timestamp", "description", "event",
492 "nsd-ref": nsd["id"],
493 "ns-instance-config-ref": _id,
494 "id": _id,
495 "_id": _id,
496
497 # "input-parameter": xpath, value,
498 "ssh-authorized-key": ns_request.get("key-pair-ref"),
499 }
500 ns_request["nsr_id"] = _id
501 return nsr_descriptor
502
503 @staticmethod
504 def _update_descriptor(desc, kwargs):
505 """
506 Update descriptor with the kwargs
507 :param kwargs:
508 :return:
509 """
510 if not kwargs:
511 return
512 try:
513 for k, v in kwargs.items():
514 update_content = content
515 kitem_old = None
516 klist = k.split(".")
517 for kitem in klist:
518 if kitem_old is not None:
519 update_content = update_content[kitem_old]
520 if isinstance(update_content, dict):
521 kitem_old = kitem
522 elif isinstance(update_content, list):
523 kitem_old = int(kitem)
524 else:
525 raise EngineException(
526 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
527 update_content[kitem_old] = v
528 except KeyError:
529 raise EngineException(
530 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
531 except ValueError:
532 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
533 k, kitem))
534 except IndexError:
535 raise EngineException(
536 "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
537
538 def new_item(self, session, item, indata={}, kwargs=None, headers={}):
539 """
540 Creates a new entry into database. For nsds and vnfds it creates an almost empty DISABLED entry,
541 that must be completed with a call to method upload_content
542 :param session: contains the used login username and working project
543 :param item: it can be: users, projects, vims, sdns, nsrs, nsds, vnfds
544 :param indata: data to be inserted
545 :param kwargs: used to override the indata descriptor
546 :param headers: http request headers
547 :return: _id, transaction_id: identity of the inserted data. or transaction_id if Content-Range is used
548 """
549
550 try:
551 item_envelop = item
552 if item in ("nsds", "vnfds"):
553 item_envelop = "userDefinedData"
554 content = self._remove_envelop(item_envelop, indata)
555
556 # Override descriptor with query string kwargs
557 self._update_descriptor(content, kwargs)
558 if not indata and item not in ("nsds", "vnfds"):
559 raise EngineException("Empty payload")
560
561 validate_input(content, item, new=True)
562
563 if item == "nsrs":
564 # in this case the imput descriptor is not the data to be stored
565 ns_request = content
566 content = self.new_nsr(session, ns_request)
567
568 self._validate_new_data(session, item_envelop, content)
569 if item in ("nsds", "vnfds"):
570 content = {"_admin": {"userDefinedData": content}}
571 self._format_new_data(session, item, content)
572 _id = self.db.create(item, content)
573 if item == "nsrs":
574 pass
575 # self.msg.write("ns", "created", _id) # sending just for information.
576 elif item == "vims":
577 msg_data = self.db.get_one(item, {"_id": _id})
578 msg_data.pop("_admin", None)
579 self.msg.write("vim_account", "create", msg_data)
580 elif item == "sdns":
581 msg_data = self.db.get_one(item, {"_id": _id})
582 msg_data.pop("_admin", None)
583 self.msg.write("sdn", "create", msg_data)
584 return _id
585 except ValidationError as e:
586 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
587
588 def new_nslcmop(self, session, nsInstanceId, action, params):
589 now = time()
590 _id = str(uuid4())
591 nslcmop = {
592 "id": _id,
593 "_id": _id,
594 "operationState": "PROCESSING", # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
595 "statusEnteredTime": now,
596 "nsInstanceId": nsInstanceId,
597 "lcmOperationType": action,
598 "startTime": now,
599 "isAutomaticInvocation": False,
600 "operationParams": params,
601 "isCancelPending": False,
602 "links": {
603 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
604 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsInstanceId,
605 }
606 }
607 return nslcmop
608
609 def ns_action(self, session, nsInstanceId, action, indata, kwargs=None):
610 """
611 Performs a new action over a ns
612 :param session: contains the used login username and working project
613 :param nsInstanceId: _id of the nsr to perform the action
614 :param action: it can be: instantiate, terminate, action, TODO: update, heal
615 :param indata: descriptor with the parameters of the action
616 :param kwargs: used to override the indata descriptor
617 :return: id of the nslcmops
618 """
619 try:
620 # Override descriptor with query string kwargs
621 self._update_descriptor(indata, kwargs)
622 validate_input(indata, "ns_" + action, new=True)
623 # get ns from nsr_id
624 nsr = self.get_item(session, "nsrs", nsInstanceId)
625 if nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
626 if action == "terminate" and indata.get("autoremove"):
627 # NSR must be deleted
628 return self.del_item(session, "nsrs", nsInstanceId)
629 if action != "instantiate":
630 raise EngineException("ns_instance '{}' cannot be '{}' because it is not instantiated".format(
631 nsInstanceId, action), HTTPStatus.CONFLICT)
632 else:
633 if action == "instantiate" and not indata.get("force"):
634 raise EngineException("ns_instance '{}' cannot be '{}' because it is already instantiated".format(
635 nsInstanceId, action), HTTPStatus.CONFLICT)
636 indata["nsInstanceId"] = nsInstanceId
637 # TODO
638 nslcmop = self.new_nslcmop(session, nsInstanceId, action, indata)
639 self._format_new_data(session, "nslcmops", nslcmop)
640 _id = self.db.create("nslcmops", nslcmop)
641 indata["_id"] = _id
642 self.msg.write("ns", action, nslcmop)
643 return _id
644 except ValidationError as e:
645 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
646 # except DbException as e:
647 # raise EngineException("Cannot get ns_instance '{}': {}".format(e), HTTPStatus.NOT_FOUND)
648
649 def _add_read_filter(self, session, item, filter):
650 if session["project_id"] == "admin": # allows all
651 return filter
652 if item == "users":
653 filter["username"] = session["username"]
654 elif item in ("vnfds", "nsds", "nsrs"):
655 filter["_admin.projects_read.cont"] = ["ANY", session["project_id"]]
656
657 def _add_delete_filter(self, session, item, filter):
658 if session["project_id"] != "admin" and item in ("users", "projects"):
659 raise EngineException("Only admin users can perform this task", http_code=HTTPStatus.FORBIDDEN)
660 if item == "users":
661 if filter.get("_id") == session["username"] or filter.get("username") == session["username"]:
662 raise EngineException("You cannot delete your own user", http_code=HTTPStatus.CONFLICT)
663 elif item == "project":
664 if filter.get("_id") == session["project_id"]:
665 raise EngineException("You cannot delete your own project", http_code=HTTPStatus.CONFLICT)
666 elif item in ("vnfds", "nsds") and session["project_id"] != "admin":
667 filter["_admin.projects_write.cont"] = ["ANY", session["project_id"]]
668
669 def get_file(self, session, item, _id, path=None, accept_header=None):
670 """
671 Return the file content of a vnfd or nsd
672 :param session: contains the used login username and working project
673 :param item: it can be vnfds or nsds
674 :param _id: Identity of the vnfd, ndsd
675 :param path: artifact path or "$DESCRIPTOR" or None
676 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
677 :return: opened file or raises an exception
678 """
679 accept_text = accept_zip = False
680 if accept_header:
681 if 'text/plain' in accept_header or '*/*' in accept_header:
682 accept_text = True
683 if 'application/zip' in accept_header or '*/*' in accept_header:
684 accept_zip = True
685 if not accept_text and not accept_zip:
686 raise EngineException("provide request header 'Accept' with 'application/zip' or 'text/plain'",
687 http_code=HTTPStatus.NOT_ACCEPTABLE)
688
689 content = self.get_item(session, item, _id)
690 if content["_admin"]["onboardingState"] != "ONBOARDED":
691 raise EngineException("Cannot get content because this resource is not at 'ONBOARDED' state. "
692 "onboardingState is {}".format(content["_admin"]["onboardingState"]),
693 http_code=HTTPStatus.CONFLICT)
694 storage = content["_admin"]["storage"]
695 if path is not None and path != "$DESCRIPTOR": # artifacts
696 if not storage.get('pkg-dir'):
697 raise EngineException("Packages does not contains artifacts", http_code=HTTPStatus.BAD_REQUEST)
698 if self.fs.file_exists((storage['folder'], storage['pkg-dir'], *path), 'dir'):
699 folder_content = self.fs.dir_ls((storage['folder'], storage['pkg-dir'], *path))
700 return folder_content, "text/plain"
701 # TODO manage folders in http
702 else:
703 return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"), \
704 "application/octet-stream"
705
706 # pkgtype accept ZIP TEXT -> result
707 # manyfiles yes X -> zip
708 # no yes -> error
709 # onefile yes no -> zip
710 # X yes -> text
711
712 if accept_text and (not storage.get('pkg-dir') or path == "$DESCRIPTOR"):
713 return self.fs.file_open((storage['folder'], storage['descriptor']), "r"), "text/plain"
714 elif storage.get('pkg-dir') and not accept_zip:
715 raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'"
716 "Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE)
717 else:
718 if not storage.get('zipfile'):
719 # TODO generate zipfile if not present
720 raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in future versions"
721 "", http_code=HTTPStatus.NOT_ACCEPTABLE)
722 return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), "application/zip"
723
724 def get_item_list(self, session, item, filter={}):
725 """
726 Get a list of items
727 :param session: contains the used login username and working project
728 :param item: it can be: users, projects, vnfds, nsds, ...
729 :param filter: filter of data to be applied
730 :return: The list, it can be empty if no one match the filter.
731 """
732 # TODO add admin to filter, validate rights
733 # TODO transform data for SOL005 URL requests. Transform filtering
734 # TODO implement "field-type" query string SOL005
735
736 self._add_read_filter(session, item, filter)
737 return self.db.get_list(item, filter)
738
739 def get_item(self, session, item, _id):
740 """
741 Get complete information on an items
742 :param session: contains the used login username and working project
743 :param item: it can be: users, projects, vnfds, nsds,
744 :param _id: server id of the item
745 :return: dictionary, raise exception if not found.
746 """
747 database_item = item
748 filter = {"_id": _id}
749 # TODO add admin to filter, validate rights
750 # TODO transform data for SOL005 URL requests
751 self._add_read_filter(session, item, filter)
752 return self.db.get_one(item, filter)
753
754 def del_item_list(self, session, item, filter={}):
755 """
756 Delete a list of items
757 :param session: contains the used login username and working project
758 :param item: it can be: users, projects, vnfds, nsds, ...
759 :param filter: filter of data to be applied
760 :return: The deleted list, it can be empty if no one match the filter.
761 """
762 # TODO add admin to filter, validate rights
763 self._add_read_filter(session, item, filter)
764 return self.db.del_list(item, filter)
765
766 def del_item(self, session, item, _id, force=False):
767 """
768 Get complete information on an items
769 :param session: contains the used login username and working project
770 :param item: it can be: users, projects, vnfds, nsds, ...
771 :param _id: server id of the item
772 :param force: indicates if deletion must be forced in case of conflict
773 :return: dictionary, raise exception if not found.
774 """
775 # TODO add admin to filter, validate rights
776 # data = self.get_item(item, _id)
777 filter = {"_id": _id}
778 self._add_delete_filter(session, item, filter)
779
780 if item == "nsrs":
781 nsr = self.db.get_one(item, filter)
782 if nsr["_admin"]["nsState"] == "INSTANTIATED" and not force:
783 raise EngineException("nsr '{}' cannot be deleted because it is in 'INSTANTIATED' state. "
784 "Launch 'terminate' action first; or force deletion".format(_id),
785 http_code=HTTPStatus.CONFLICT)
786 v = self.db.del_one(item, {"_id": _id})
787 self.db.del_list("nslcmops", {"nsInstanceId": _id})
788 self.msg.write("ns", "deleted", {"_id": _id})
789 return v
790 if item in ("vims", "sdns"):
791 desc = self.db.get_one(item, filter)
792 desc["_admin"]["to_delete"] = True
793 self.db.replace(item, _id, desc) # TODO change to set_one
794 if item == "vims":
795 self.msg.write("vim_account", "delete", {"_id": _id})
796 elif item == "sdns":
797 self.msg.write("sdn", "delete", {"_id": _id})
798 return {"deleted": 1} # TODO indicate an offline operation to return 202 ACCEPTED
799
800 v = self.db.del_one(item, filter)
801 self.fs.file_delete(_id, ignore_non_exist=True)
802 return v
803
804 def prune(self):
805 """
806 Prune database not needed content
807 :return: None
808 """
809 return self.db.del_list("nsrs", {"_admin.to_delete": True})
810
811 def create_admin(self):
812 """
813 Creates a new user admin/admin into database if database is empty. Useful for initialization
814 :return: _id identity of the inserted data, or None
815 """
816 users = self.db.get_one("users", fail_on_empty=False, fail_on_more=False)
817 if users:
818 return None
819 # raise EngineException("Unauthorized. Database users is not empty", HTTPStatus.UNAUTHORIZED)
820 indata = {"username": "admin", "password": "admin", "projects": ["admin"]}
821 fake_session = {"project_id": "admin", "username": "admin"}
822 self._format_new_data(fake_session, "users", indata)
823 _id = self.db.create("users", indata)
824 return _id
825
826 def init_db(self, target_version='1.0'):
827 """
828 Init database if empty. If not empty it checks that database version is ok.
829 If empty, it creates a new user admin/admin at 'users' and a new entry at 'version'
830 :return: None if ok, exception if error or if the version is different.
831 """
832 version = self.db.get_one("version", fail_on_empty=False, fail_on_more=False)
833 if not version:
834 # create user admin
835 self.create_admin()
836 # create database version
837 version_data = {
838 "_id": '1.0', # version text
839 "version": 1000, # version number
840 "date": "2018-04-12", # version date
841 "description": "initial design", # changes in this version
842 'status': 'ENABLED' # ENABLED, DISABLED (migration in process), ERROR,
843 }
844 self.db.create("version", version_data)
845 elif version["_id"] != target_version:
846 # TODO implement migration process
847 raise EngineException("Wrong database version '{}'. Expected '{}'".format(
848 version["_id"], target_version), HTTPStatus.INTERNAL_SERVER_ERROR)
849 elif version["status"] != 'ENABLED':
850 raise EngineException("Wrong database status '{}'".format(
851 version["status"]), HTTPStatus.INTERNAL_SERVER_ERROR)
852 return
853
854 def _edit_item(self, session, item, id, content, indata={}, kwargs=None):
855 if indata:
856 indata = self._remove_envelop(item, indata)
857
858 # Override descriptor with query string kwargs
859 if kwargs:
860 try:
861 for k, v in kwargs.items():
862 update_content = indata
863 kitem_old = None
864 klist = k.split(".")
865 for kitem in klist:
866 if kitem_old is not None:
867 update_content = update_content[kitem_old]
868 if isinstance(update_content, dict):
869 kitem_old = kitem
870 elif isinstance(update_content, list):
871 kitem_old = int(kitem)
872 else:
873 raise EngineException(
874 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
875 update_content[kitem_old] = v
876 except KeyError:
877 raise EngineException(
878 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
879 except ValueError:
880 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
881 k, kitem))
882 except IndexError:
883 raise EngineException(
884 "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
885 try:
886 validate_input(content, item, new=False)
887 except ValidationError as e:
888 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
889
890 _deep_update(content, indata)
891 self._validate_new_data(session, item, content, id)
892 # self._format_new_data(session, item, content)
893 self.db.replace(item, id, content)
894 if item in ("vims", "sdns"):
895 indata.pop("_admin", None)
896 indata["_id"] = id
897 if item == "vims":
898 self.msg.write("vim_account", "edit", indata)
899 elif item == "sdns":
900 self.msg.write("sdn", "edit", indata)
901 return id
902
903 def edit_item(self, session, item, _id, indata={}, kwargs=None):
904 """
905 Update an existing entry at database
906 :param session: contains the used login username and working project
907 :param item: it can be: users, projects, vnfds, nsds, ...
908 :param _id: identifier to be updated
909 :param indata: data to be inserted
910 :param kwargs: used to override the indata descriptor
911 :return: dictionary, raise exception if not found.
912 """
913
914 content = self.get_item(session, item, _id)
915 return self._edit_item(session, item, _id, content, indata, kwargs)
916
917
918