1 # -*- coding: utf-8 -*-
12 from random
import choice
as random_choice
13 from uuid
import uuid4
14 from hashlib
import sha256
, md5
15 from dbbase
import DbException
16 from fsbase
import FsException
17 from msgbase
import MsgException
18 from http
import HTTPStatus
20 from copy
import deepcopy
22 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
25 class EngineException(Exception):
27 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
28 self
.http_code
= http_code
29 Exception.__init
__(self
, message
)
40 self
.logger
= logging
.getLogger("nbi.engine")
42 def start(self
, config
):
44 Connect to database, filesystem storage, and messaging
45 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
51 if config
["database"]["driver"] == "mongo":
52 self
.db
= dbmongo
.DbMongo()
53 self
.db
.db_connect(config
["database"])
54 elif config
["database"]["driver"] == "memory":
55 self
.db
= dbmemory
.DbMemory()
56 self
.db
.db_connect(config
["database"])
58 raise EngineException("Invalid configuration param '{}' at '[database]':'driver'".format(
59 config
["database"]["driver"]))
61 if config
["storage"]["driver"] == "local":
62 self
.fs
= fslocal
.FsLocal()
63 self
.fs
.fs_connect(config
["storage"])
65 raise EngineException("Invalid configuration param '{}' at '[storage]':'driver'".format(
66 config
["storage"]["driver"]))
68 if config
["message"]["driver"] == "local":
69 self
.msg
= msglocal
.MsgLocal()
70 self
.msg
.connect(config
["message"])
71 elif config
["message"]["driver"] == "kafka":
72 self
.msg
= msgkafka
.MsgKafka()
73 self
.msg
.connect(config
["message"])
75 raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format(
76 config
["storage"]["driver"]))
77 except (DbException
, FsException
, MsgException
) as e
:
78 raise EngineException(str(e
), http_code
=e
.http_code
)
83 self
.db
.db_disconnect()
85 self
.fs
.fs_disconnect()
87 self
.fs
.fs_disconnect()
88 except (DbException
, FsException
, MsgException
) as e
:
89 raise EngineException(str(e
), http_code
=e
.http_code
)
91 def authorize(self
, token
):
94 raise EngineException("Needed a token or Authorization http header",
95 http_code
=HTTPStatus
.UNAUTHORIZED
)
96 if token
not in self
.tokens
:
97 raise EngineException("Invalid token or Authorization http header",
98 http_code
=HTTPStatus
.UNAUTHORIZED
)
99 session
= self
.tokens
[token
]
101 if session
["expires"] < now
:
102 del self
.tokens
[token
]
103 raise EngineException("Expired Token or Authorization http header",
104 http_code
=HTTPStatus
.UNAUTHORIZED
)
106 except EngineException
:
107 if self
.config
["global"].get("test.user_not_authorized"):
108 return {"id": "fake-token-id-for-test",
109 "project_id": self
.config
["global"].get("test.project_not_authorized", "admin"),
110 "username": self
.config
["global"]["test.user_not_authorized"]}
114 def new_token(self
, session
, indata
, remote
):
118 # Try using username/password
119 if indata
.get("username"):
120 user_rows
= self
.db
.get_list("users", {"username": indata
.get("username")})
123 user_content
= user_rows
[0]
124 salt
= user_content
["_admin"]["salt"]
125 shadow_password
= sha256(indata
.get("password", "").encode('utf-8') + salt
.encode('utf-8')).hexdigest()
126 if shadow_password
!= user_content
["password"]:
129 raise EngineException("Invalid username/password", http_code
=HTTPStatus
.UNAUTHORIZED
)
131 user_rows
= self
.db
.get_list("users", {"username": session
["username"]})
133 user_content
= user_rows
[0]
135 raise EngineException("Invalid token", http_code
=HTTPStatus
.UNAUTHORIZED
)
137 raise EngineException("Provide credentials: username/password or Authorization Bearer token",
138 http_code
=HTTPStatus
.UNAUTHORIZED
)
140 token_id
= ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
141 for _
in range(0, 32))
142 if indata
.get("project_id"):
143 project_id
= indata
.get("project_id")
144 if project_id
not in user_content
["projects"]:
145 raise EngineException("project {} not allowed for this user".format(project_id
),
146 http_code
=HTTPStatus
.UNAUTHORIZED
)
148 project_id
= user_content
["projects"][0]
149 if project_id
== "admin":
152 project
= self
.db
.get_one("projects", {"_id": project_id
})
153 session_admin
= project
.get("admin", False)
154 new_session
= {"issued_at": now
, "expires": now
+3600,
155 "_id": token_id
, "id": token_id
, "project_id": project_id
, "username": user_content
["username"],
156 "remote_port": remote
.port
, "admin": session_admin
}
158 new_session
["remote_host"] = remote
.name
160 new_session
["remote_host"] = remote
.ip
162 self
.tokens
[token_id
] = new_session
163 return deepcopy(new_session
)
165 def get_token_list(self
, session
):
167 for token_id
, token_value
in self
.tokens
.items():
168 if token_value
["username"] == session
["username"]:
169 token_list
.append(deepcopy(token_value
))
172 def get_token(self
, session
, token_id
):
173 token_value
= self
.tokens
.get(token_id
)
175 raise EngineException("token not found", http_code
=HTTPStatus
.NOT_FOUND
)
176 if token_value
["username"] != session
["username"] and not session
["admin"]:
177 raise EngineException("needed admin privileges", http_code
=HTTPStatus
.UNAUTHORIZED
)
180 def del_token(self
, token_id
):
182 del self
.tokens
[token_id
]
183 return "token '{}' deleted".format(token_id
)
185 raise EngineException("Token '{}' not found".format(token_id
), http_code
=HTTPStatus
.NOT_FOUND
)
188 def _remove_envelop(item
, indata
=None):
190 Obtain the useful data removing the envelop. It goes throw the vnfd or nsd catalog and returns the
192 :param item: can be vnfds, nsds, users, projects,
193 :param indata: Content to be inspected
194 :return: the useful part of indata
196 clean_indata
= indata
200 if clean_indata
.get('vnfd:vnfd-catalog'):
201 clean_indata
= clean_indata
['vnfd:vnfd-catalog']
202 elif clean_indata
.get('vnfd-catalog'):
203 clean_indata
= clean_indata
['vnfd-catalog']
204 if clean_indata
.get('vnfd'):
205 if not isinstance(clean_indata
['vnfd'], list) or len(clean_indata
['vnfd']) != 1:
206 raise EngineException("'vnfd' must be a list only one element")
207 clean_indata
= clean_indata
['vnfd'][0]
209 if clean_indata
.get('nsd:nsd-catalog'):
210 clean_indata
= clean_indata
['nsd:nsd-catalog']
211 elif clean_indata
.get('nsd-catalog'):
212 clean_indata
= clean_indata
['nsd-catalog']
213 if clean_indata
.get('nsd'):
214 if not isinstance(clean_indata
['nsd'], list) or len(clean_indata
['nsd']) != 1:
215 raise EngineException("'nsd' must be a list only one element")
216 clean_indata
= clean_indata
['nsd'][0]
219 def _validate_new_data(self
, session
, item
, indata
):
221 if not indata
.get("username"):
222 raise EngineException("missing 'username'", HTTPStatus
.UNPROCESSABLE_ENTITY
)
223 if not indata
.get("password"):
224 raise EngineException("missing 'password'", HTTPStatus
.UNPROCESSABLE_ENTITY
)
225 if not indata
.get("projects"):
226 raise EngineException("missing 'projects'", HTTPStatus
.UNPROCESSABLE_ENTITY
)
227 # check username not exist
228 if self
.db
.get_one(item
, {"username": indata
.get("username")}, fail_on_empty
=False, fail_on_more
=False):
229 raise EngineException("username '{}' exist".format(indata
["username"]), HTTPStatus
.CONFLICT
)
230 elif item
== "projects":
231 if not indata
.get("name"):
232 raise EngineException("missing 'name'")
233 # check name not exist
234 if self
.db
.get_one(item
, {"name": indata
.get("name")}, fail_on_empty
=False, fail_on_more
=False):
235 raise EngineException("name '{}' exist".format(indata
["name"]), HTTPStatus
.CONFLICT
)
236 elif item
== "vnfds" or item
== "nsds":
237 filter = {"id": indata
["id"]}
238 # TODO add admin to filter, validate rights
239 self
._add
_read
_filter
(session
, item
, filter)
240 if self
.db
.get_one(item
, filter, fail_on_empty
=False):
241 raise EngineException("{} with id '{}' already exist for this tenant".format(item
[:-1], indata
["id"]),
244 # TODO validate with pyangbind
248 def _format_new_data(self
, session
, item
, indata
, admin
=None):
250 if not "_admin" in indata
:
251 indata
["_admin"] = {}
252 indata
["_admin"]["created"] = now
253 indata
["_admin"]["modified"] = now
255 _id
= indata
["username"]
257 indata
["_admin"]["salt"] = salt
258 indata
["password"] = sha256(indata
["password"].encode('utf-8') + salt
.encode('utf-8')).hexdigest()
259 elif item
== "projects":
265 _id
= admin
.get("_id")
266 storage
= admin
.get("storage")
269 if item
== "vnfds" or item
== "nsds":
270 if not indata
["_admin"].get("projects_read"):
271 indata
["_admin"]["projects_read"] = [session
["project_id"]]
272 if not indata
["_admin"].get("projects_write"):
273 indata
["_admin"]["projects_write"] = [session
["project_id"]]
275 indata
["_admin"]["storage"] = storage
278 def _new_item_partial(self
, session
, item
, indata
, headers
):
280 Used for recieve content by chunks (with a transaction_id header and/or gzip file. It will store and extract
281 :param session: session
283 :param indata: http body request
284 :param headers: http request headers
285 :return: a dict with::
286 _id: <transaction_id>
287 storage: <path>: where it is saving
288 desc: <dict>: descriptor: Only present when all the content is received, extracted and read the descriptor
290 content_range_text
= headers
.get("Content-Range")
291 transaction_id
= headers
.get("Transaction-Id")
292 filename
= headers
.get("Content-Filename", "pkg")
293 # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266
294 expected_md5
= headers
.get("Content-File-MD5")
296 if "application/gzip" in headers
.get("Content-Type") or "application/x-gzip" in headers
.get("Content-Type") or \
297 "application/zip" in headers
.get("Content-Type"):
302 if content_range_text
:
303 content_range
= content_range_text
.replace("-", " ").replace("/", " ").split()
304 if content_range
[0] != "bytes": # TODO check x<y not negative < total....
306 start
= int(content_range
[1])
307 end
= int(content_range
[2]) + 1
308 total
= int(content_range
[3])
309 if len(indata
) != end
-start
:
310 raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format(
311 start
, end
-1, len(indata
)), HTTPStatus
.BAD_REQUEST
)
314 total
= end
= len(indata
)
315 if not transaction_id
:
316 # generate transaction
317 transaction_id
= str(uuid4())
318 self
.fs
.mkdir(transaction_id
)
319 # control_file = open(self.storage["path"] + transaction_id + "/.osm.yaml", 'wb')
320 # control = {"received": 0}
321 elif not self
.fs
.file_exists(transaction_id
):
322 raise EngineException("invalid Transaction-Id header", HTTPStatus
.NOT_FOUND
)
325 # control_file = open(self.storage["path"] + transaction_id + "/.osm.yaml", 'rw')
326 # control = yaml.load(control_file)
327 # control_file.seek(0, 0)
328 storage
= self
.fs
.get_params()
329 storage
["folder"] = transaction_id
330 storage
["file"] = filename
332 file_path
= (transaction_id
, filename
)
333 if self
.fs
.file_exists(file_path
):
334 file_size
= self
.fs
.file_size(file_path
)
337 if file_size
!= start
:
338 raise EngineException("invalid upload transaction sequence, expected '{}' but received '{}'".format(
339 file_size
, start
), HTTPStatus
.BAD_REQUEST
)
340 file_pkg
= self
.fs
.file_open(file_path
, 'a+b')
341 file_pkg
.write(indata
)
343 return {"_id": transaction_id
, "storage": storage
}
347 chunk_data
= file_pkg
.read(1024)
349 file_md5
.update(chunk_data
)
350 chunk_data
= file_pkg
.read(1024)
351 if expected_md5
!= file_md5
.hexdigest():
352 raise EngineException("Error, MD5 mismatch", HTTPStatus
.CONFLICT
)
354 if compressed
== "gzip":
356 storage
["tarfile"] = filename
357 tar
= tarfile
.open(mode
='r', fileobj
=file_pkg
)
358 descriptor_file_name
= None
360 tarname
= tarinfo
.name
361 tarname_path
= tarname
.split("/")
362 if not tarname_path
[0] or ".." in tarname_path
: # if start with "/" means absolute path
363 raise EngineException("Absolute path or '..' are not allowed for package descriptor tar.gz")
364 if len(tarname_path
) == 1 and not tarinfo
.isdir():
365 raise EngineException("All files must be inside a dir for package descriptor tar.gz")
366 if tarname
.endswith(".yaml") or tarname
.endswith(".json") or tarname
.endswith(".yml"):
367 storage
["file"] = tarname_path
[0]
368 if len(tarname_path
) == 2:
369 if descriptor_file_name
:
370 raise EngineException("Found more than one descriptor file at package descriptor tar.gz")
371 descriptor_file_name
= tarname
372 if not descriptor_file_name
:
373 raise EngineException("Not found any descriptor file at package descriptor tar.gz")
374 self
.fs
.file_extract(tar
, transaction_id
)
375 with self
.fs
.file_open((transaction_id
, descriptor_file_name
), "r") as descriptor_file
:
376 content
= descriptor_file
.read()
378 content
= file_pkg
.read()
381 if tarname
.endswith(".json"):
382 error_text
= "Invalid json format "
383 indata
= json
.load(content
)
385 error_text
= "Invalid yaml format "
386 indata
= yaml
.load(content
)
387 return {"_id": transaction_id
, "storage": storage
, "desc": indata
}
388 except EngineException
:
391 raise EngineException("invalid Content-Range header format. Expected 'bytes start-end/total'",
392 HTTPStatus
.BAD_REQUEST
)
394 raise EngineException("invalid upload transaction sequence: '{}'".format(e
), HTTPStatus
.BAD_REQUEST
)
395 except (ValueError, yaml
.YAMLError
) as e
:
396 raise EngineException(error_text
+ str(e
))
401 def new_nsr(self
, session
, ns_request
):
403 Creates a new nsr into database
404 :param session: contains the used login username and working project
405 :param ns_request: params to be used for the nsr
406 :return: nsr descriptor to be stored at database and the _id
410 nsd
= self
.get_item(session
, "nsds", ns_request
["nsdId"])
413 "name": ns_request
["nsName"],
414 "name-ref": ns_request
["nsName"],
415 "short-name": ns_request
["nsName"],
416 "admin-status": "ENABLED",
418 "datacenter": ns_request
["vimAccountId"],
419 "resource-orchestrator": "osmopenmano",
420 "description": ns_request
.get("nsDescription", ""),
421 "constituent-vnfr-ref": ["TODO datacenter-id, vnfr-id"],
423 "operational-status": "init", # typedef ns-operational-
424 "config-status": "init", # typedef config-states
425 "detailed-status": "scheduled",
427 "orchestration-progress": {}, # {"networks": {"active": 0, "total": 0}, "vms": {"active": 0, "total": 0}},
429 "crete-time": time(),
430 "nsd-name-ref": nsd
["name"],
431 "operational-events": [], # "id", "timestamp", "description", "event",
432 "nsd-ref": nsd
["id"],
433 "ns-instance-config-ref": _id
,
436 # "input-parameter": xpath, value,
437 "ssh-authorized-key": ns_request
.get("key-pair-ref"),
439 ns_request
["nsr_id"] = _id
440 return nsr_descriptor
, _id
442 def new_item(self
, session
, item
, indata
={}, kwargs
=None, headers
={}):
444 Creates a new entry into database
445 :param session: contains the used login username and working project
446 :param item: it can be: users, projects, vnfds, nsds, ...
447 :param indata: data to be inserted
448 :param kwargs: used to override the indata descriptor
449 :param headers: http request headers
450 :return: _id, transaction_id: identity of the inserted data. or transaction_id if Content-Range is used
452 # TODO validate input. Check not exist at database
453 # TODO add admin and status
456 if headers
.get("Content-Range") or "application/gzip" in headers
.get("Content-Type") or \
457 "application/x-gzip" in headers
.get("Content-Type") or "application/zip" in headers
.get("Content-Type"):
459 raise EngineException("Empty payload")
460 transaction
= self
._new
_item
_partial
(session
, item
, indata
, headers
)
461 if "desc" not in transaction
:
462 return transaction
["_id"], False
463 indata
= transaction
["desc"]
465 content
= self
._remove
_envelop
(item
, indata
)
467 # Override descriptor with query string kwargs
470 for k
, v
in kwargs
.items():
471 update_content
= content
475 if kitem_old
is not None:
476 update_content
= update_content
[kitem_old
]
477 if isinstance(update_content
, dict):
479 elif isinstance(update_content
, list):
480 kitem_old
= int(kitem
)
482 raise EngineException(
483 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k
, kitem
))
484 update_content
[kitem_old
] = v
486 raise EngineException(
487 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k
, kitem_old
))
489 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
492 raise EngineException(
493 "Invalid query string '{}'. Index '{}' out of range".format(k
, kitem_old
))
495 raise EngineException("Empty payload")
498 # in this case the imput descriptor is not the data to be stored
500 content
, _id
= self
.new_nsr(session
, ns_request
)
501 transaction
= {"_id": _id
}
503 self
._validate
_new
_data
(session
, item
, content
)
504 self
._format
_new
_data
(session
, item
, content
, transaction
)
505 _id
= self
.db
.create(item
, content
)
507 self
.msg
.write("ns", "create", _id
)
510 def _add_read_filter(self
, session
, item
, filter):
511 if session
["project_id"] == "admin": # allows all
514 filter["username"] = session
["username"]
515 elif item
== "vnfds" or item
== "nsds":
516 filter["_admin.projects_read.cont"] = ["ANY", session
["project_id"]]
518 def _add_delete_filter(self
, session
, item
, filter):
519 if session
["project_id"] != "admin" and item
in ("users", "projects"):
520 raise EngineException("Only admin users can perform this task", http_code
=HTTPStatus
.FORBIDDEN
)
522 if filter.get("_id") == session
["username"] or filter.get("username") == session
["username"]:
523 raise EngineException("You cannot delete your own user", http_code
=HTTPStatus
.CONFLICT
)
524 elif item
== "project":
525 if filter.get("_id") == session
["project_id"]:
526 raise EngineException("You cannot delete your own project", http_code
=HTTPStatus
.CONFLICT
)
527 elif item
in ("vnfds", "nsds") and session
["project_id"] != "admin":
528 filter["_admin.projects_write.cont"] = ["ANY", session
["project_id"]]
530 def get_item_list(self
, session
, item
, filter={}):
533 :param session: contains the used login username and working project
534 :param item: it can be: users, projects, vnfds, nsds, ...
535 :param filter: filter of data to be applied
536 :return: The list, it can be empty if no one match the filter.
538 # TODO add admin to filter, validate rights
539 self
._add
_read
_filter
(session
, item
, filter)
540 return self
.db
.get_list(item
, filter)
542 def get_item(self
, session
, item
, _id
):
544 Get complete information on an items
545 :param session: contains the used login username and working project
546 :param item: it can be: users, projects, vnfds, nsds, ...
547 :param _id: server id of the item
548 :return: dictionary, raise exception if not found.
550 filter = {"_id": _id
}
551 # TODO add admin to filter, validate rights
552 self
._add
_read
_filter
(session
, item
, filter)
553 return self
.db
.get_one(item
, filter)
555 def del_item_list(self
, session
, item
, filter={}):
557 Delete a list of items
558 :param session: contains the used login username and working project
559 :param item: it can be: users, projects, vnfds, nsds, ...
560 :param filter: filter of data to be applied
561 :return: The deleted list, it can be empty if no one match the filter.
563 # TODO add admin to filter, validate rights
564 self
._add
_read
_filter
(session
, item
, filter)
565 return self
.db
.del_list(item
, filter)
567 def del_item(self
, session
, item
, _id
):
569 Get complete information on an items
570 :param session: contains the used login username and working project
571 :param item: it can be: users, projects, vnfds, nsds, ...
572 :param _id: server id of the item
573 :return: dictionary, raise exception if not found.
575 # TODO add admin to filter, validate rights
576 # data = self.get_item(item, _id)
577 filter = {"_id": _id
}
578 self
._add
_delete
_filter
(session
, item
, filter)
581 desc
= self
.db
.get_one(item
, filter)
582 desc
["_admin"]["to_delete"] = True
583 self
.db
.replace(item
, _id
, desc
) # TODO change to set_one
584 self
.msg
.write("ns", "delete", _id
)
585 return {"deleted": 1}
587 v
= self
.db
.del_one(item
, filter)
588 self
.fs
.file_delete(_id
, ignore_non_exist
=True)
590 self
.msg
.write("ns", "delete", _id
)
595 Prune database not needed content
598 return self
.db
.del_list("nsrs", {"_admin.to_delete": True})
600 def create_admin(self
):
602 Creates a new user admin/admin into database. Only allowed if database is empty. Useful for initialization
603 :return: _id identity of the inserted data.
605 users
= self
.db
.get_one("users", fail_on_empty
=False, fail_on_more
=False)
607 raise EngineException("Unauthorized. Database users is not empty", HTTPStatus
.UNAUTHORIZED
)
608 indata
= {"username": "admin", "password": "admin", "projects": ["admin"]}
609 fake_session
= {"project_id": "admin", "username": "admin"}
610 self
._format
_new
_data
(fake_session
, "users", indata
)
611 _id
= self
.db
.create("users", indata
)
614 def edit_item(self
, session
, item
, id, indata
={}, kwargs
=None):
616 Update an existing entry at database
617 :param session: contains the used login username and working project
618 :param item: it can be: users, projects, vnfds, nsds, ...
619 :param id: identity of entry to be updated
620 :param indata: data to be inserted
621 :param kwargs: used to override the indata descriptor
622 :return: dictionary, raise exception if not found.
625 content
= self
.get_item(session
, item
, id)
627 indata
= self
._remove
_envelop
(item
, indata
)
628 # TODO update content with with a deep-update
630 # Override descriptor with query string kwargs
633 for k
, v
in kwargs
.items():
634 update_content
= content
638 if kitem_old
is not None:
639 update_content
= update_content
[kitem_old
]
640 if isinstance(update_content
, dict):
642 elif isinstance(update_content
, list):
643 kitem_old
= int(kitem
)
645 raise EngineException(
646 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k
, kitem
))
647 update_content
[kitem_old
] = v
649 raise EngineException(
650 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k
, kitem_old
))
652 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
655 raise EngineException(
656 "Invalid query string '{}'. Index '{}' out of range".format(k
, kitem_old
))
658 self
._validate
_new
_data
(session
, item
, content
)
659 # self._format_new_data(session, item, content)
660 self
.db
.replace(item
, id, content
)