blob: 38bbe776abe4975c3fcd77e0cd488d7c0a8a35ef [file] [log] [blame]
tiernoc94c3df2018-02-09 15:38:54 +01001# -*- coding: utf-8 -*-
2
3import dbmongo
4import dbmemory
5import fslocal
6import msglocal
7import msgkafka
8import tarfile
9import yaml
10import json
11import logging
12from random import choice as random_choice
13from uuid import uuid4
14from hashlib import sha256, md5
15from dbbase import DbException
16from fsbase import FsException
17from msgbase import MsgException
18from http import HTTPStatus
19from time import time
20from copy import deepcopy
tierno0f98af52018-03-19 10:28:22 +010021from validation import validate_input, ValidationError
tiernoc94c3df2018-02-09 15:38:54 +010022
23__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
24
25
26class EngineException(Exception):
27
28 def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
29 self.http_code = http_code
30 Exception.__init__(self, message)
31
32
tiernof27c79b2018-03-12 17:08:42 +010033def _deep_update(dict_to_change, dict_reference):
34 """
35 Modifies one dictionary with the information of the other following https://tools.ietf.org/html/rfc7396
36 :param dict_to_change: Ends modified
37 :param dict_reference: reference
38 :return: none
39 """
40
41 for k in dict_reference:
42 if dict_reference[k] is None: # None->Anything
43 if k in dict_to_change:
44 del dict_to_change[k]
45 elif not isinstance(dict_reference[k], dict): # NotDict->Anything
46 dict_to_change[k] = dict_reference[k]
47 elif k not in dict_to_change: # Dict->Empty
48 dict_to_change[k] = deepcopy(dict_reference[k])
49 _deep_update(dict_to_change[k], dict_reference[k])
50 elif isinstance(dict_to_change[k], dict): # Dict->Dict
51 _deep_update(dict_to_change[k], dict_reference[k])
52 else: # Dict->NotDict
53 dict_to_change[k] = deepcopy(dict_reference[k])
54 _deep_update(dict_to_change[k], dict_reference[k])
55
tiernoc94c3df2018-02-09 15:38:54 +010056class Engine(object):
57
58 def __init__(self):
59 self.tokens = {}
60 self.db = None
61 self.fs = None
62 self.msg = None
63 self.config = None
64 self.logger = logging.getLogger("nbi.engine")
65
66 def start(self, config):
67 """
68 Connect to database, filesystem storage, and messaging
69 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
70 :return: None
71 """
72 self.config = config
73 try:
74 if not self.db:
75 if config["database"]["driver"] == "mongo":
76 self.db = dbmongo.DbMongo()
77 self.db.db_connect(config["database"])
78 elif config["database"]["driver"] == "memory":
79 self.db = dbmemory.DbMemory()
80 self.db.db_connect(config["database"])
81 else:
82 raise EngineException("Invalid configuration param '{}' at '[database]':'driver'".format(
83 config["database"]["driver"]))
84 if not self.fs:
85 if config["storage"]["driver"] == "local":
86 self.fs = fslocal.FsLocal()
87 self.fs.fs_connect(config["storage"])
88 else:
89 raise EngineException("Invalid configuration param '{}' at '[storage]':'driver'".format(
90 config["storage"]["driver"]))
91 if not self.msg:
92 if config["message"]["driver"] == "local":
93 self.msg = msglocal.MsgLocal()
94 self.msg.connect(config["message"])
95 elif config["message"]["driver"] == "kafka":
96 self.msg = msgkafka.MsgKafka()
97 self.msg.connect(config["message"])
98 else:
99 raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format(
100 config["storage"]["driver"]))
101 except (DbException, FsException, MsgException) as e:
102 raise EngineException(str(e), http_code=e.http_code)
103
104 def stop(self):
105 try:
106 if self.db:
107 self.db.db_disconnect()
108 if self.fs:
109 self.fs.fs_disconnect()
110 if self.fs:
111 self.fs.fs_disconnect()
112 except (DbException, FsException, MsgException) as e:
113 raise EngineException(str(e), http_code=e.http_code)
114
115 def authorize(self, token):
116 try:
117 if not token:
118 raise EngineException("Needed a token or Authorization http header",
119 http_code=HTTPStatus.UNAUTHORIZED)
120 if token not in self.tokens:
121 raise EngineException("Invalid token or Authorization http header",
122 http_code=HTTPStatus.UNAUTHORIZED)
123 session = self.tokens[token]
124 now = time()
125 if session["expires"] < now:
126 del self.tokens[token]
127 raise EngineException("Expired Token or Authorization http header",
128 http_code=HTTPStatus.UNAUTHORIZED)
129 return session
130 except EngineException:
131 if self.config["global"].get("test.user_not_authorized"):
132 return {"id": "fake-token-id-for-test",
133 "project_id": self.config["global"].get("test.project_not_authorized", "admin"),
134 "username": self.config["global"]["test.user_not_authorized"]}
135 else:
136 raise
137
138 def new_token(self, session, indata, remote):
139 now = time()
140 user_content = None
141
142 # Try using username/password
143 if indata.get("username"):
144 user_rows = self.db.get_list("users", {"username": indata.get("username")})
145 user_content = None
146 if user_rows:
147 user_content = user_rows[0]
148 salt = user_content["_admin"]["salt"]
149 shadow_password = sha256(indata.get("password", "").encode('utf-8') + salt.encode('utf-8')).hexdigest()
150 if shadow_password != user_content["password"]:
151 user_content = None
152 if not user_content:
153 raise EngineException("Invalid username/password", http_code=HTTPStatus.UNAUTHORIZED)
154 elif session:
155 user_rows = self.db.get_list("users", {"username": session["username"]})
156 if user_rows:
157 user_content = user_rows[0]
158 else:
159 raise EngineException("Invalid token", http_code=HTTPStatus.UNAUTHORIZED)
160 else:
161 raise EngineException("Provide credentials: username/password or Authorization Bearer token",
162 http_code=HTTPStatus.UNAUTHORIZED)
163
164 token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
165 for _ in range(0, 32))
166 if indata.get("project_id"):
167 project_id = indata.get("project_id")
168 if project_id not in user_content["projects"]:
169 raise EngineException("project {} not allowed for this user".format(project_id),
170 http_code=HTTPStatus.UNAUTHORIZED)
171 else:
172 project_id = user_content["projects"][0]
173 if project_id == "admin":
174 session_admin = True
175 else:
176 project = self.db.get_one("projects", {"_id": project_id})
177 session_admin = project.get("admin", False)
178 new_session = {"issued_at": now, "expires": now+3600,
179 "_id": token_id, "id": token_id, "project_id": project_id, "username": user_content["username"],
180 "remote_port": remote.port, "admin": session_admin}
181 if remote.name:
182 new_session["remote_host"] = remote.name
183 elif remote.ip:
184 new_session["remote_host"] = remote.ip
185
186 self.tokens[token_id] = new_session
187 return deepcopy(new_session)
188
189 def get_token_list(self, session):
190 token_list = []
191 for token_id, token_value in self.tokens.items():
192 if token_value["username"] == session["username"]:
193 token_list.append(deepcopy(token_value))
194 return token_list
195
196 def get_token(self, session, token_id):
197 token_value = self.tokens.get(token_id)
198 if not token_value:
199 raise EngineException("token not found", http_code=HTTPStatus.NOT_FOUND)
200 if token_value["username"] != session["username"] and not session["admin"]:
201 raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
202 return token_value
203
204 def del_token(self, token_id):
205 try:
206 del self.tokens[token_id]
207 return "token '{}' deleted".format(token_id)
208 except KeyError:
209 raise EngineException("Token '{}' not found".format(token_id), http_code=HTTPStatus.NOT_FOUND)
210
211 @staticmethod
212 def _remove_envelop(item, indata=None):
213 """
214 Obtain the useful data removing the envelop. It goes throw the vnfd or nsd catalog and returns the
215 vnfd or nsd content
tiernof27c79b2018-03-12 17:08:42 +0100216 :param item: can be vnfds, nsds, users, projects, userDefinedData (initial content of a vnfds, nsds
tiernoc94c3df2018-02-09 15:38:54 +0100217 :param indata: Content to be inspected
218 :return: the useful part of indata
219 """
220 clean_indata = indata
221 if not indata:
222 return {}
223 if item == "vnfds":
224 if clean_indata.get('vnfd:vnfd-catalog'):
225 clean_indata = clean_indata['vnfd:vnfd-catalog']
226 elif clean_indata.get('vnfd-catalog'):
227 clean_indata = clean_indata['vnfd-catalog']
228 if clean_indata.get('vnfd'):
229 if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1:
230 raise EngineException("'vnfd' must be a list only one element")
231 clean_indata = clean_indata['vnfd'][0]
232 elif item == "nsds":
233 if clean_indata.get('nsd:nsd-catalog'):
234 clean_indata = clean_indata['nsd:nsd-catalog']
235 elif clean_indata.get('nsd-catalog'):
236 clean_indata = clean_indata['nsd-catalog']
237 if clean_indata.get('nsd'):
238 if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1:
239 raise EngineException("'nsd' must be a list only one element")
240 clean_indata = clean_indata['nsd'][0]
tiernof27c79b2018-03-12 17:08:42 +0100241 elif item == "userDefinedData":
242 if "userDefinedData" in indata:
243 clean_indata = clean_indata['userDefinedData']
tiernoc94c3df2018-02-09 15:38:54 +0100244 return clean_indata
245
tiernof27c79b2018-03-12 17:08:42 +0100246 def _validate_new_data(self, session, item, indata, id=None):
tiernoc94c3df2018-02-09 15:38:54 +0100247 if item == "users":
248 if not indata.get("username"):
249 raise EngineException("missing 'username'", HTTPStatus.UNPROCESSABLE_ENTITY)
250 if not indata.get("password"):
251 raise EngineException("missing 'password'", HTTPStatus.UNPROCESSABLE_ENTITY)
252 if not indata.get("projects"):
253 raise EngineException("missing 'projects'", HTTPStatus.UNPROCESSABLE_ENTITY)
254 # check username not exist
255 if self.db.get_one(item, {"username": indata.get("username")}, fail_on_empty=False, fail_on_more=False):
256 raise EngineException("username '{}' exist".format(indata["username"]), HTTPStatus.CONFLICT)
257 elif item == "projects":
258 if not indata.get("name"):
259 raise EngineException("missing 'name'")
260 # check name not exist
261 if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
262 raise EngineException("name '{}' exist".format(indata["name"]), HTTPStatus.CONFLICT)
tiernof27c79b2018-03-12 17:08:42 +0100263 elif item in ("vnfds", "nsds"):
tiernoc94c3df2018-02-09 15:38:54 +0100264 filter = {"id": indata["id"]}
tiernof27c79b2018-03-12 17:08:42 +0100265 if id:
266 filter["_id.neq"] = id
tiernoc94c3df2018-02-09 15:38:54 +0100267 # TODO add admin to filter, validate rights
268 self._add_read_filter(session, item, filter)
269 if self.db.get_one(item, filter, fail_on_empty=False):
270 raise EngineException("{} with id '{}' already exist for this tenant".format(item[:-1], indata["id"]),
271 HTTPStatus.CONFLICT)
272
273 # TODO validate with pyangbind
tiernof27c79b2018-03-12 17:08:42 +0100274 elif item == "userDefinedData":
275 # TODO validate userDefinedData is a keypair values
276 pass
277
tiernoc94c3df2018-02-09 15:38:54 +0100278 elif item == "nsrs":
279 pass
tierno09c073e2018-04-26 13:36:48 +0200280 elif item == "vim_accounts" or item == "sdns":
tierno0f98af52018-03-19 10:28:22 +0100281 if self.db.get_one(item, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
282 raise EngineException("name '{}' already exist for {}".format(indata["name"], item),
283 HTTPStatus.CONFLICT)
284
tierno65acb4d2018-04-06 16:42:40 +0200285 def _format_new_data(self, session, item, indata):
tiernoc94c3df2018-02-09 15:38:54 +0100286 now = time()
287 if not "_admin" in indata:
288 indata["_admin"] = {}
289 indata["_admin"]["created"] = now
290 indata["_admin"]["modified"] = now
291 if item == "users":
tierno65acb4d2018-04-06 16:42:40 +0200292 indata["_id"] = indata["username"]
tiernoc94c3df2018-02-09 15:38:54 +0100293 salt = uuid4().hex
tiernof27c79b2018-03-12 17:08:42 +0100294 indata["_admin"]["salt"] = salt
tiernoc94c3df2018-02-09 15:38:54 +0100295 indata["password"] = sha256(indata["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest()
296 elif item == "projects":
tierno65acb4d2018-04-06 16:42:40 +0200297 indata["_id"] = indata["name"]
tiernoc94c3df2018-02-09 15:38:54 +0100298 else:
tierno65acb4d2018-04-06 16:42:40 +0200299 if not indata.get("_id"):
300 indata["_id"] = str(uuid4())
301 if item in ("vnfds", "nsds", "nsrs"):
tiernoc94c3df2018-02-09 15:38:54 +0100302 if not indata["_admin"].get("projects_read"):
303 indata["_admin"]["projects_read"] = [session["project_id"]]
304 if not indata["_admin"].get("projects_write"):
305 indata["_admin"]["projects_write"] = [session["project_id"]]
tierno65acb4d2018-04-06 16:42:40 +0200306 if item in ("vnfds", "nsds"):
tiernof27c79b2018-03-12 17:08:42 +0100307 indata["_admin"]["onboardingState"] = "CREATED"
308 indata["_admin"]["operationalState"] = "DISABLED"
309 indata["_admin"]["usageSate"] = "NOT_IN_USE"
tierno65acb4d2018-04-06 16:42:40 +0200310 if item == "nsrs":
311 indata["_admin"]["nsState"] = "NOT_INSTANTIATED"
tierno09c073e2018-04-26 13:36:48 +0200312 if item in ("vim_accounts", "sdns"):
tierno0f98af52018-03-19 10:28:22 +0100313 indata["_admin"]["operationalState"] = "PROCESSING"
314
tiernof27c79b2018-03-12 17:08:42 +0100315 def upload_content(self, session, item, _id, indata, kwargs, headers):
tiernoc94c3df2018-02-09 15:38:54 +0100316 """
tiernof27c79b2018-03-12 17:08:42 +0100317 Used for receiving content by chunks (with a transaction_id header and/or gzip file. It will store and extract)
tiernoc94c3df2018-02-09 15:38:54 +0100318 :param session: session
tiernof27c79b2018-03-12 17:08:42 +0100319 :param item: can be nsds or vnfds
320 :param _id : the nsd,vnfd is already created, this is the id
tiernoc94c3df2018-02-09 15:38:54 +0100321 :param indata: http body request
tiernof27c79b2018-03-12 17:08:42 +0100322 :param kwargs: user query string to override parameters. NOT USED
tiernoc94c3df2018-02-09 15:38:54 +0100323 :param headers: http request headers
tiernof27c79b2018-03-12 17:08:42 +0100324 :return: True package has is completely uploaded or False if partial content has been uplodaed.
325 Raise exception on error
tiernoc94c3df2018-02-09 15:38:54 +0100326 """
tiernof27c79b2018-03-12 17:08:42 +0100327 # Check that _id exist and it is valid
328 current_desc = self.get_item(session, item, _id)
329
tiernoc94c3df2018-02-09 15:38:54 +0100330 content_range_text = headers.get("Content-Range")
tiernoc94c3df2018-02-09 15:38:54 +0100331 expected_md5 = headers.get("Content-File-MD5")
332 compressed = None
tiernof27c79b2018-03-12 17:08:42 +0100333 content_type = headers.get("Content-Type")
334 if content_type and "application/gzip" in content_type or "application/x-gzip" in content_type or \
335 "application/zip" in content_type:
tiernoc94c3df2018-02-09 15:38:54 +0100336 compressed = "gzip"
tiernof27c79b2018-03-12 17:08:42 +0100337 filename = headers.get("Content-Filename")
338 if not filename:
339 filename = "package.tar.gz" if compressed else "package"
340 # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266
tiernoc94c3df2018-02-09 15:38:54 +0100341 file_pkg = None
342 error_text = ""
343 try:
344 if content_range_text:
345 content_range = content_range_text.replace("-", " ").replace("/", " ").split()
346 if content_range[0] != "bytes": # TODO check x<y not negative < total....
347 raise IndexError()
348 start = int(content_range[1])
349 end = int(content_range[2]) + 1
350 total = int(content_range[3])
tiernoc94c3df2018-02-09 15:38:54 +0100351 else:
352 start = 0
tiernoc94c3df2018-02-09 15:38:54 +0100353
tiernof27c79b2018-03-12 17:08:42 +0100354 if start:
355 if not self.fs.file_exists(_id, 'dir'):
356 raise EngineException("invalid Transaction-Id header", HTTPStatus.NOT_FOUND)
357 else:
358 self.fs.file_delete(_id, ignore_non_exist=True)
359 self.fs.mkdir(_id)
360
361 storage = self.fs.get_params()
362 storage["folder"] = _id
363
364 file_path = (_id, filename)
365 if self.fs.file_exists(file_path, 'file'):
tiernoc94c3df2018-02-09 15:38:54 +0100366 file_size = self.fs.file_size(file_path)
367 else:
368 file_size = 0
369 if file_size != start:
tiernof27c79b2018-03-12 17:08:42 +0100370 raise EngineException("invalid Content-Range start sequence, expected '{}' but received '{}'".format(
371 file_size, start), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
tiernoc94c3df2018-02-09 15:38:54 +0100372 file_pkg = self.fs.file_open(file_path, 'a+b')
tiernof27c79b2018-03-12 17:08:42 +0100373 if isinstance(indata, dict):
374 indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
375 file_pkg.write(indata_text.encode(encoding="utf-8"))
376 else:
377 indata_len = 0
378 while True:
379 indata_text = indata.read(4096)
380 indata_len += len(indata_text)
381 if not indata_text:
382 break
383 file_pkg.write(indata_text)
384 if content_range_text:
385 if indata_len != end-start:
386 raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format(
387 start, end-1, indata_len), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
388 if end != total:
389 # TODO update to UPLOADING
390 return False
391
392 # PACKAGE UPLOADED
tiernoc94c3df2018-02-09 15:38:54 +0100393 if expected_md5:
394 file_pkg.seek(0, 0)
395 file_md5 = md5()
396 chunk_data = file_pkg.read(1024)
397 while chunk_data:
398 file_md5.update(chunk_data)
399 chunk_data = file_pkg.read(1024)
400 if expected_md5 != file_md5.hexdigest():
401 raise EngineException("Error, MD5 mismatch", HTTPStatus.CONFLICT)
402 file_pkg.seek(0, 0)
403 if compressed == "gzip":
tiernoc94c3df2018-02-09 15:38:54 +0100404 tar = tarfile.open(mode='r', fileobj=file_pkg)
405 descriptor_file_name = None
406 for tarinfo in tar:
407 tarname = tarinfo.name
408 tarname_path = tarname.split("/")
409 if not tarname_path[0] or ".." in tarname_path: # if start with "/" means absolute path
410 raise EngineException("Absolute path or '..' are not allowed for package descriptor tar.gz")
411 if len(tarname_path) == 1 and not tarinfo.isdir():
412 raise EngineException("All files must be inside a dir for package descriptor tar.gz")
413 if tarname.endswith(".yaml") or tarname.endswith(".json") or tarname.endswith(".yml"):
tiernof27c79b2018-03-12 17:08:42 +0100414 storage["pkg-dir"] = tarname_path[0]
tiernoc94c3df2018-02-09 15:38:54 +0100415 if len(tarname_path) == 2:
416 if descriptor_file_name:
417 raise EngineException("Found more than one descriptor file at package descriptor tar.gz")
418 descriptor_file_name = tarname
419 if not descriptor_file_name:
420 raise EngineException("Not found any descriptor file at package descriptor tar.gz")
tiernof27c79b2018-03-12 17:08:42 +0100421 storage["descriptor"] = descriptor_file_name
422 storage["zipfile"] = filename
423 self.fs.file_extract(tar, _id)
424 with self.fs.file_open((_id, descriptor_file_name), "r") as descriptor_file:
tiernoc94c3df2018-02-09 15:38:54 +0100425 content = descriptor_file.read()
426 else:
427 content = file_pkg.read()
tiernof27c79b2018-03-12 17:08:42 +0100428 storage["descriptor"] = descriptor_file_name = filename
tiernoc94c3df2018-02-09 15:38:54 +0100429
tiernof27c79b2018-03-12 17:08:42 +0100430 if descriptor_file_name.endswith(".json"):
tiernoc94c3df2018-02-09 15:38:54 +0100431 error_text = "Invalid json format "
432 indata = json.load(content)
433 else:
434 error_text = "Invalid yaml format "
435 indata = yaml.load(content)
tiernof27c79b2018-03-12 17:08:42 +0100436
437 current_desc["_admin"]["storage"] = storage
438 current_desc["_admin"]["onboardingState"] = "ONBOARDED"
439 current_desc["_admin"]["operationalState"] = "ENABLED"
440
441 self._edit_item(session, item, _id, current_desc, indata, kwargs)
442 # TODO if descriptor has changed because kwargs update content and remove cached zip
443 # TODO if zip is not present creates one
444 return True
445
tiernoc94c3df2018-02-09 15:38:54 +0100446 except EngineException:
447 raise
448 except IndexError:
449 raise EngineException("invalid Content-Range header format. Expected 'bytes start-end/total'",
tiernof27c79b2018-03-12 17:08:42 +0100450 HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
tiernoc94c3df2018-02-09 15:38:54 +0100451 except IOError as e:
452 raise EngineException("invalid upload transaction sequence: '{}'".format(e), HTTPStatus.BAD_REQUEST)
tierno65acb4d2018-04-06 16:42:40 +0200453 except tarfile.ReadError as e:
454 raise EngineException("invalid file content {}".format(e), HTTPStatus.BAD_REQUEST)
tiernoc94c3df2018-02-09 15:38:54 +0100455 except (ValueError, yaml.YAMLError) as e:
456 raise EngineException(error_text + str(e))
457 finally:
458 if file_pkg:
459 file_pkg.close()
460
461 def new_nsr(self, session, ns_request):
462 """
463 Creates a new nsr into database
464 :param session: contains the used login username and working project
465 :param ns_request: params to be used for the nsr
466 :return: nsr descriptor to be stored at database and the _id
467 """
468
469 # look for nsr
470 nsd = self.get_item(session, "nsds", ns_request["nsdId"])
471 _id = str(uuid4())
472 nsr_descriptor = {
473 "name": ns_request["nsName"],
474 "name-ref": ns_request["nsName"],
475 "short-name": ns_request["nsName"],
476 "admin-status": "ENABLED",
477 "nsd": nsd,
478 "datacenter": ns_request["vimAccountId"],
479 "resource-orchestrator": "osmopenmano",
480 "description": ns_request.get("nsDescription", ""),
481 "constituent-vnfr-ref": ["TODO datacenter-id, vnfr-id"],
482
483 "operational-status": "init", # typedef ns-operational-
484 "config-status": "init", # typedef config-states
485 "detailed-status": "scheduled",
486
487 "orchestration-progress": {}, # {"networks": {"active": 0, "total": 0}, "vms": {"active": 0, "total": 0}},
488
489 "crete-time": time(),
490 "nsd-name-ref": nsd["name"],
491 "operational-events": [], # "id", "timestamp", "description", "event",
492 "nsd-ref": nsd["id"],
tierno21025602018-04-27 14:36:23 +0200493 "instantiate_params": ns_request,
tiernoc94c3df2018-02-09 15:38:54 +0100494 "ns-instance-config-ref": _id,
495 "id": _id,
tierno65acb4d2018-04-06 16:42:40 +0200496 "_id": _id,
tiernoc94c3df2018-02-09 15:38:54 +0100497
498 # "input-parameter": xpath, value,
499 "ssh-authorized-key": ns_request.get("key-pair-ref"),
500 }
501 ns_request["nsr_id"] = _id
tierno65acb4d2018-04-06 16:42:40 +0200502 return nsr_descriptor
503
504 @staticmethod
505 def _update_descriptor(desc, kwargs):
506 """
507 Update descriptor with the kwargs
508 :param kwargs:
509 :return:
510 """
511 if not kwargs:
512 return
513 try:
514 for k, v in kwargs.items():
515 update_content = content
516 kitem_old = None
517 klist = k.split(".")
518 for kitem in klist:
519 if kitem_old is not None:
520 update_content = update_content[kitem_old]
521 if isinstance(update_content, dict):
522 kitem_old = kitem
523 elif isinstance(update_content, list):
524 kitem_old = int(kitem)
525 else:
526 raise EngineException(
527 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
528 update_content[kitem_old] = v
529 except KeyError:
530 raise EngineException(
531 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
532 except ValueError:
533 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
534 k, kitem))
535 except IndexError:
536 raise EngineException(
537 "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
tiernoc94c3df2018-02-09 15:38:54 +0100538
539 def new_item(self, session, item, indata={}, kwargs=None, headers={}):
540 """
tiernof27c79b2018-03-12 17:08:42 +0100541 Creates a new entry into database. For nsds and vnfds it creates an almost empty DISABLED entry,
542 that must be completed with a call to method upload_content
tiernoc94c3df2018-02-09 15:38:54 +0100543 :param session: contains the used login username and working project
tierno09c073e2018-04-26 13:36:48 +0200544 :param item: it can be: users, projects, vim_accounts, sdns, nsrs, nsds, vnfds
tiernoc94c3df2018-02-09 15:38:54 +0100545 :param indata: data to be inserted
546 :param kwargs: used to override the indata descriptor
547 :param headers: http request headers
548 :return: _id, transaction_id: identity of the inserted data. or transaction_id if Content-Range is used
549 """
tiernoc94c3df2018-02-09 15:38:54 +0100550
tierno0f98af52018-03-19 10:28:22 +0100551 try:
tierno65acb4d2018-04-06 16:42:40 +0200552 item_envelop = item
553 if item in ("nsds", "vnfds"):
554 item_envelop = "userDefinedData"
555 content = self._remove_envelop(item_envelop, indata)
556
557 # Override descriptor with query string kwargs
558 self._update_descriptor(content, kwargs)
559 if not indata and item not in ("nsds", "vnfds"):
560 raise EngineException("Empty payload")
561
tierno0f98af52018-03-19 10:28:22 +0100562 validate_input(content, item, new=True)
tierno65acb4d2018-04-06 16:42:40 +0200563
564 if item == "nsrs":
565 # in this case the imput descriptor is not the data to be stored
566 ns_request = content
567 content = self.new_nsr(session, ns_request)
568
569 self._validate_new_data(session, item_envelop, content)
570 if item in ("nsds", "vnfds"):
571 content = {"_admin": {"userDefinedData": content}}
572 self._format_new_data(session, item, content)
573 _id = self.db.create(item, content)
574 if item == "nsrs":
575 pass
576 # self.msg.write("ns", "created", _id) # sending just for information.
tierno09c073e2018-04-26 13:36:48 +0200577 elif item == "vim_accounts":
tierno65acb4d2018-04-06 16:42:40 +0200578 msg_data = self.db.get_one(item, {"_id": _id})
579 msg_data.pop("_admin", None)
580 self.msg.write("vim_account", "create", msg_data)
581 elif item == "sdns":
582 msg_data = self.db.get_one(item, {"_id": _id})
583 msg_data.pop("_admin", None)
584 self.msg.write("sdn", "create", msg_data)
585 return _id
tierno0f98af52018-03-19 10:28:22 +0100586 except ValidationError as e:
587 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
588
tierno65acb4d2018-04-06 16:42:40 +0200589 def new_nslcmop(self, session, nsInstanceId, action, params):
590 now = time()
591 _id = str(uuid4())
592 nslcmop = {
593 "id": _id,
594 "_id": _id,
595 "operationState": "PROCESSING", # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
596 "statusEnteredTime": now,
597 "nsInstanceId": nsInstanceId,
598 "lcmOperationType": action,
599 "startTime": now,
600 "isAutomaticInvocation": False,
601 "operationParams": params,
602 "isCancelPending": False,
603 "links": {
604 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
605 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsInstanceId,
606 }
607 }
608 return nslcmop
tiernoc94c3df2018-02-09 15:38:54 +0100609
tierno65acb4d2018-04-06 16:42:40 +0200610 def ns_action(self, session, nsInstanceId, action, indata, kwargs=None):
611 """
612 Performs a new action over a ns
613 :param session: contains the used login username and working project
614 :param nsInstanceId: _id of the nsr to perform the action
615 :param action: it can be: instantiate, terminate, action, TODO: update, heal
616 :param indata: descriptor with the parameters of the action
617 :param kwargs: used to override the indata descriptor
618 :return: id of the nslcmops
619 """
620 try:
621 # Override descriptor with query string kwargs
622 self._update_descriptor(indata, kwargs)
623 validate_input(indata, "ns_" + action, new=True)
624 # get ns from nsr_id
625 nsr = self.get_item(session, "nsrs", nsInstanceId)
tierno21025602018-04-27 14:36:23 +0200626 if not nsr["_admin"].get("nsState") or nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
tierno65acb4d2018-04-06 16:42:40 +0200627 if action == "terminate" and indata.get("autoremove"):
628 # NSR must be deleted
629 return self.del_item(session, "nsrs", nsInstanceId)
630 if action != "instantiate":
631 raise EngineException("ns_instance '{}' cannot be '{}' because it is not instantiated".format(
632 nsInstanceId, action), HTTPStatus.CONFLICT)
633 else:
634 if action == "instantiate" and not indata.get("force"):
635 raise EngineException("ns_instance '{}' cannot be '{}' because it is already instantiated".format(
636 nsInstanceId, action), HTTPStatus.CONFLICT)
637 indata["nsInstanceId"] = nsInstanceId
638 # TODO
639 nslcmop = self.new_nslcmop(session, nsInstanceId, action, indata)
640 self._format_new_data(session, "nslcmops", nslcmop)
641 _id = self.db.create("nslcmops", nslcmop)
642 indata["_id"] = _id
643 self.msg.write("ns", action, nslcmop)
644 return _id
645 except ValidationError as e:
646 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
647 # except DbException as e:
648 # raise EngineException("Cannot get ns_instance '{}': {}".format(e), HTTPStatus.NOT_FOUND)
tiernoc94c3df2018-02-09 15:38:54 +0100649
650 def _add_read_filter(self, session, item, filter):
651 if session["project_id"] == "admin": # allows all
652 return filter
653 if item == "users":
654 filter["username"] = session["username"]
tierno65acb4d2018-04-06 16:42:40 +0200655 elif item in ("vnfds", "nsds", "nsrs"):
tiernoc94c3df2018-02-09 15:38:54 +0100656 filter["_admin.projects_read.cont"] = ["ANY", session["project_id"]]
657
658 def _add_delete_filter(self, session, item, filter):
659 if session["project_id"] != "admin" and item in ("users", "projects"):
660 raise EngineException("Only admin users can perform this task", http_code=HTTPStatus.FORBIDDEN)
661 if item == "users":
662 if filter.get("_id") == session["username"] or filter.get("username") == session["username"]:
663 raise EngineException("You cannot delete your own user", http_code=HTTPStatus.CONFLICT)
664 elif item == "project":
665 if filter.get("_id") == session["project_id"]:
666 raise EngineException("You cannot delete your own project", http_code=HTTPStatus.CONFLICT)
667 elif item in ("vnfds", "nsds") and session["project_id"] != "admin":
668 filter["_admin.projects_write.cont"] = ["ANY", session["project_id"]]
669
tiernof27c79b2018-03-12 17:08:42 +0100670 def get_file(self, session, item, _id, path=None, accept_header=None):
671 """
672 Return the file content of a vnfd or nsd
673 :param session: contains the used login username and working project
674 :param item: it can be vnfds or nsds
675 :param _id: Identity of the vnfd, ndsd
676 :param path: artifact path or "$DESCRIPTOR" or None
677 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
678 :return: opened file or raises an exception
679 """
680 accept_text = accept_zip = False
681 if accept_header:
682 if 'text/plain' in accept_header or '*/*' in accept_header:
683 accept_text = True
684 if 'application/zip' in accept_header or '*/*' in accept_header:
685 accept_zip = True
686 if not accept_text and not accept_zip:
687 raise EngineException("provide request header 'Accept' with 'application/zip' or 'text/plain'",
688 http_code=HTTPStatus.NOT_ACCEPTABLE)
689
690 content = self.get_item(session, item, _id)
691 if content["_admin"]["onboardingState"] != "ONBOARDED":
692 raise EngineException("Cannot get content because this resource is not at 'ONBOARDED' state. "
693 "onboardingState is {}".format(content["_admin"]["onboardingState"]),
694 http_code=HTTPStatus.CONFLICT)
695 storage = content["_admin"]["storage"]
696 if path is not None and path != "$DESCRIPTOR": # artifacts
697 if not storage.get('pkg-dir'):
698 raise EngineException("Packages does not contains artifacts", http_code=HTTPStatus.BAD_REQUEST)
699 if self.fs.file_exists((storage['folder'], storage['pkg-dir'], *path), 'dir'):
700 folder_content = self.fs.dir_ls((storage['folder'], storage['pkg-dir'], *path))
701 return folder_content, "text/plain"
702 # TODO manage folders in http
703 else:
704 return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"), \
705 "application/octet-stream"
706
707 # pkgtype accept ZIP TEXT -> result
708 # manyfiles yes X -> zip
709 # no yes -> error
710 # onefile yes no -> zip
711 # X yes -> text
712
713 if accept_text and (not storage.get('pkg-dir') or path == "$DESCRIPTOR"):
714 return self.fs.file_open((storage['folder'], storage['descriptor']), "r"), "text/plain"
715 elif storage.get('pkg-dir') and not accept_zip:
716 raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'"
717 "Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE)
718 else:
719 if not storage.get('zipfile'):
720 # TODO generate zipfile if not present
721 raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in future versions"
722 "", http_code=HTTPStatus.NOT_ACCEPTABLE)
723 return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), "application/zip"
724
tiernoc94c3df2018-02-09 15:38:54 +0100725 def get_item_list(self, session, item, filter={}):
726 """
727 Get a list of items
728 :param session: contains the used login username and working project
729 :param item: it can be: users, projects, vnfds, nsds, ...
730 :param filter: filter of data to be applied
731 :return: The list, it can be empty if no one match the filter.
732 """
733 # TODO add admin to filter, validate rights
tiernof27c79b2018-03-12 17:08:42 +0100734 # TODO transform data for SOL005 URL requests. Transform filtering
735 # TODO implement "field-type" query string SOL005
736
tiernoc94c3df2018-02-09 15:38:54 +0100737 self._add_read_filter(session, item, filter)
738 return self.db.get_list(item, filter)
739
740 def get_item(self, session, item, _id):
741 """
742 Get complete information on an items
743 :param session: contains the used login username and working project
tiernof27c79b2018-03-12 17:08:42 +0100744 :param item: it can be: users, projects, vnfds, nsds,
tiernoc94c3df2018-02-09 15:38:54 +0100745 :param _id: server id of the item
746 :return: dictionary, raise exception if not found.
747 """
tiernof27c79b2018-03-12 17:08:42 +0100748 database_item = item
tiernoc94c3df2018-02-09 15:38:54 +0100749 filter = {"_id": _id}
750 # TODO add admin to filter, validate rights
tiernof27c79b2018-03-12 17:08:42 +0100751 # TODO transform data for SOL005 URL requests
tiernoc94c3df2018-02-09 15:38:54 +0100752 self._add_read_filter(session, item, filter)
753 return self.db.get_one(item, filter)
754
755 def del_item_list(self, session, item, filter={}):
756 """
757 Delete a list of items
758 :param session: contains the used login username and working project
759 :param item: it can be: users, projects, vnfds, nsds, ...
760 :param filter: filter of data to be applied
761 :return: The deleted list, it can be empty if no one match the filter.
762 """
763 # TODO add admin to filter, validate rights
764 self._add_read_filter(session, item, filter)
765 return self.db.del_list(item, filter)
766
tierno65acb4d2018-04-06 16:42:40 +0200767 def del_item(self, session, item, _id, force=False):
tiernoc94c3df2018-02-09 15:38:54 +0100768 """
769 Get complete information on an items
770 :param session: contains the used login username and working project
771 :param item: it can be: users, projects, vnfds, nsds, ...
772 :param _id: server id of the item
tierno65acb4d2018-04-06 16:42:40 +0200773 :param force: indicates if deletion must be forced in case of conflict
tierno09c073e2018-04-26 13:36:48 +0200774 :return: dictionary with deleted item _id. It raises exception if not found.
tiernoc94c3df2018-02-09 15:38:54 +0100775 """
776 # TODO add admin to filter, validate rights
777 # data = self.get_item(item, _id)
778 filter = {"_id": _id}
779 self._add_delete_filter(session, item, filter)
780
tierno65acb4d2018-04-06 16:42:40 +0200781 if item == "nsrs":
782 nsr = self.db.get_one(item, filter)
783 if nsr["_admin"]["nsState"] == "INSTANTIATED" and not force:
784 raise EngineException("nsr '{}' cannot be deleted because it is in 'INSTANTIATED' state. "
785 "Launch 'terminate' action first; or force deletion".format(_id),
786 http_code=HTTPStatus.CONFLICT)
787 v = self.db.del_one(item, {"_id": _id})
788 self.db.del_list("nslcmops", {"nsInstanceId": _id})
789 self.msg.write("ns", "deleted", {"_id": _id})
790 return v
tierno09c073e2018-04-26 13:36:48 +0200791 if item in ("vim_accounts", "sdns"):
tiernoc94c3df2018-02-09 15:38:54 +0100792 desc = self.db.get_one(item, filter)
793 desc["_admin"]["to_delete"] = True
794 self.db.replace(item, _id, desc) # TODO change to set_one
tierno09c073e2018-04-26 13:36:48 +0200795 if item == "vim_accounts":
tierno0f98af52018-03-19 10:28:22 +0100796 self.msg.write("vim_account", "delete", {"_id": _id})
797 elif item == "sdns":
798 self.msg.write("sdn", "delete", {"_id": _id})
799 return {"deleted": 1} # TODO indicate an offline operation to return 202 ACCEPTED
tiernoc94c3df2018-02-09 15:38:54 +0100800
801 v = self.db.del_one(item, filter)
802 self.fs.file_delete(_id, ignore_non_exist=True)
tiernoc94c3df2018-02-09 15:38:54 +0100803 return v
804
805 def prune(self):
806 """
807 Prune database not needed content
808 :return: None
809 """
810 return self.db.del_list("nsrs", {"_admin.to_delete": True})
811
812 def create_admin(self):
813 """
tierno4a946e42018-04-12 17:48:49 +0200814 Creates a new user admin/admin into database if database is empty. Useful for initialization
815 :return: _id identity of the inserted data, or None
tiernoc94c3df2018-02-09 15:38:54 +0100816 """
817 users = self.db.get_one("users", fail_on_empty=False, fail_on_more=False)
818 if users:
tierno4a946e42018-04-12 17:48:49 +0200819 return None
820 # raise EngineException("Unauthorized. Database users is not empty", HTTPStatus.UNAUTHORIZED)
tiernoc94c3df2018-02-09 15:38:54 +0100821 indata = {"username": "admin", "password": "admin", "projects": ["admin"]}
822 fake_session = {"project_id": "admin", "username": "admin"}
823 self._format_new_data(fake_session, "users", indata)
824 _id = self.db.create("users", indata)
825 return _id
826
tierno4a946e42018-04-12 17:48:49 +0200827 def init_db(self, target_version='1.0'):
828 """
829 Init database if empty. If not empty it checks that database version is ok.
830 If empty, it creates a new user admin/admin at 'users' and a new entry at 'version'
831 :return: None if ok, exception if error or if the version is different.
832 """
tierno56ac2452018-04-17 16:06:26 +0200833 version = self.db.get_one("version", fail_on_empty=False, fail_on_more=False)
tierno4a946e42018-04-12 17:48:49 +0200834 if not version:
835 # create user admin
836 self.create_admin()
837 # create database version
838 version_data = {
839 "_id": '1.0', # version text
840 "version": 1000, # version number
841 "date": "2018-04-12", # version date
842 "description": "initial design", # changes in this version
843 'status': 'ENABLED' # ENABLED, DISABLED (migration in process), ERROR,
844 }
845 self.db.create("version", version_data)
846 elif version["_id"] != target_version:
847 # TODO implement migration process
848 raise EngineException("Wrong database version '{}'. Expected '{}'".format(
849 version["_id"], target_version), HTTPStatus.INTERNAL_SERVER_ERROR)
850 elif version["status"] != 'ENABLED':
851 raise EngineException("Wrong database status '{}'".format(
852 version["status"]), HTTPStatus.INTERNAL_SERVER_ERROR)
853 return
854
tiernof27c79b2018-03-12 17:08:42 +0100855 def _edit_item(self, session, item, id, content, indata={}, kwargs=None):
tiernoc94c3df2018-02-09 15:38:54 +0100856 if indata:
857 indata = self._remove_envelop(item, indata)
tiernoc94c3df2018-02-09 15:38:54 +0100858
859 # Override descriptor with query string kwargs
860 if kwargs:
861 try:
862 for k, v in kwargs.items():
tiernof27c79b2018-03-12 17:08:42 +0100863 update_content = indata
tiernoc94c3df2018-02-09 15:38:54 +0100864 kitem_old = None
865 klist = k.split(".")
866 for kitem in klist:
867 if kitem_old is not None:
868 update_content = update_content[kitem_old]
869 if isinstance(update_content, dict):
870 kitem_old = kitem
871 elif isinstance(update_content, list):
872 kitem_old = int(kitem)
873 else:
874 raise EngineException(
875 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
876 update_content[kitem_old] = v
877 except KeyError:
878 raise EngineException(
879 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
880 except ValueError:
881 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
882 k, kitem))
883 except IndexError:
884 raise EngineException(
885 "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
tierno0f98af52018-03-19 10:28:22 +0100886 try:
887 validate_input(content, item, new=False)
888 except ValidationError as e:
889 raise EngineException(e, HTTPStatus.UNPROCESSABLE_ENTITY)
tiernoc94c3df2018-02-09 15:38:54 +0100890
tiernof27c79b2018-03-12 17:08:42 +0100891 _deep_update(content, indata)
892 self._validate_new_data(session, item, content, id)
tiernoc94c3df2018-02-09 15:38:54 +0100893 # self._format_new_data(session, item, content)
894 self.db.replace(item, id, content)
tierno09c073e2018-04-26 13:36:48 +0200895 if item in ("vim_accounts", "sdns"):
tierno0f98af52018-03-19 10:28:22 +0100896 indata.pop("_admin", None)
897 indata["_id"] = id
tierno09c073e2018-04-26 13:36:48 +0200898 if item == "vim_accounts":
tierno0f98af52018-03-19 10:28:22 +0100899 self.msg.write("vim_account", "edit", indata)
900 elif item == "sdns":
901 self.msg.write("sdn", "edit", indata)
tiernoc94c3df2018-02-09 15:38:54 +0100902 return id
903
tiernof27c79b2018-03-12 17:08:42 +0100904 def edit_item(self, session, item, _id, indata={}, kwargs=None):
905 """
906 Update an existing entry at database
907 :param session: contains the used login username and working project
908 :param item: it can be: users, projects, vnfds, nsds, ...
909 :param _id: identifier to be updated
910 :param indata: data to be inserted
911 :param kwargs: used to override the indata descriptor
912 :return: dictionary, raise exception if not found.
913 """
914
915 content = self.get_item(session, item, _id)
916 return self._edit_item(session, item, _id, content, indata, kwargs)
917
918
tiernoc94c3df2018-02-09 15:38:54 +0100919