1 # -*- coding: utf-8 -*-
12 from random
import choice
as random_choice
13 from uuid
import uuid4
14 from hashlib
import sha256
, md5
15 from dbbase
import DbException
16 from fsbase
import FsException
17 from msgbase
import MsgException
18 from http
import HTTPStatus
20 from copy
import deepcopy
21 from validation
import validate_input
, ValidationError
23 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
26 class EngineException(Exception):
28 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
29 self
.http_code
= http_code
30 Exception.__init
__(self
, message
)
33 def _deep_update(dict_to_change
, dict_reference
):
35 Modifies one dictionary with the information of the other following https://tools.ietf.org/html/rfc7396
36 :param dict_to_change: Ends modified
37 :param dict_reference: reference
40 for k
in dict_reference
:
41 if dict_reference
[k
] is None: # None->Anything
42 if k
in dict_to_change
:
44 elif not isinstance(dict_reference
[k
], dict): # NotDict->Anything
45 dict_to_change
[k
] = dict_reference
[k
]
46 elif k
not in dict_to_change
: # Dict->Empty
47 dict_to_change
[k
] = deepcopy(dict_reference
[k
])
48 _deep_update(dict_to_change
[k
], dict_reference
[k
])
49 elif isinstance(dict_to_change
[k
], dict): # Dict->Dict
50 _deep_update(dict_to_change
[k
], dict_reference
[k
])
52 dict_to_change
[k
] = deepcopy(dict_reference
[k
])
53 _deep_update(dict_to_change
[k
], dict_reference
[k
])
64 self
.logger
= logging
.getLogger("nbi.engine")
66 def start(self
, config
):
68 Connect to database, filesystem storage, and messaging
69 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
75 if config
["database"]["driver"] == "mongo":
76 self
.db
= dbmongo
.DbMongo()
77 self
.db
.db_connect(config
["database"])
78 elif config
["database"]["driver"] == "memory":
79 self
.db
= dbmemory
.DbMemory()
80 self
.db
.db_connect(config
["database"])
82 raise EngineException("Invalid configuration param '{}' at '[database]':'driver'".format(
83 config
["database"]["driver"]))
85 if config
["storage"]["driver"] == "local":
86 self
.fs
= fslocal
.FsLocal()
87 self
.fs
.fs_connect(config
["storage"])
89 raise EngineException("Invalid configuration param '{}' at '[storage]':'driver'".format(
90 config
["storage"]["driver"]))
92 if config
["message"]["driver"] == "local":
93 self
.msg
= msglocal
.MsgLocal()
94 self
.msg
.connect(config
["message"])
95 elif config
["message"]["driver"] == "kafka":
96 self
.msg
= msgkafka
.MsgKafka()
97 self
.msg
.connect(config
["message"])
99 raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format(
100 config
["storage"]["driver"]))
101 except (DbException
, FsException
, MsgException
) as e
:
102 raise EngineException(str(e
), http_code
=e
.http_code
)
107 self
.db
.db_disconnect()
109 self
.fs
.fs_disconnect()
111 self
.fs
.fs_disconnect()
112 except (DbException
, FsException
, MsgException
) as e
:
113 raise EngineException(str(e
), http_code
=e
.http_code
)
115 def authorize(self
, token
):
118 raise EngineException("Needed a token or Authorization http header",
119 http_code
=HTTPStatus
.UNAUTHORIZED
)
120 if token
not in self
.tokens
:
121 raise EngineException("Invalid token or Authorization http header",
122 http_code
=HTTPStatus
.UNAUTHORIZED
)
123 session
= self
.tokens
[token
]
125 if session
["expires"] < now
:
126 del self
.tokens
[token
]
127 raise EngineException("Expired Token or Authorization http header",
128 http_code
=HTTPStatus
.UNAUTHORIZED
)
130 except EngineException
:
131 if self
.config
["global"].get("test.user_not_authorized"):
132 return {"id": "fake-token-id-for-test",
133 "project_id": self
.config
["global"].get("test.project_not_authorized", "admin"),
134 "username": self
.config
["global"]["test.user_not_authorized"]}
138 def new_token(self
, session
, indata
, remote
):
142 # Try using username/password
143 if indata
.get("username"):
144 user_rows
= self
.db
.get_list("users", {"username": indata
.get("username")})
147 user_content
= user_rows
[0]
148 salt
= user_content
["_admin"]["salt"]
149 shadow_password
= sha256(indata
.get("password", "").encode('utf-8') + salt
.encode('utf-8')).hexdigest()
150 if shadow_password
!= user_content
["password"]:
153 raise EngineException("Invalid username/password", http_code
=HTTPStatus
.UNAUTHORIZED
)
155 user_rows
= self
.db
.get_list("users", {"username": session
["username"]})
157 user_content
= user_rows
[0]
159 raise EngineException("Invalid token", http_code
=HTTPStatus
.UNAUTHORIZED
)
161 raise EngineException("Provide credentials: username/password or Authorization Bearer token",
162 http_code
=HTTPStatus
.UNAUTHORIZED
)
164 token_id
= ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
165 for _
in range(0, 32))
166 if indata
.get("project_id"):
167 project_id
= indata
.get("project_id")
168 if project_id
not in user_content
["projects"]:
169 raise EngineException("project {} not allowed for this user".format(project_id
),
170 http_code
=HTTPStatus
.UNAUTHORIZED
)
172 project_id
= user_content
["projects"][0]
173 if project_id
== "admin":
176 project
= self
.db
.get_one("projects", {"_id": project_id
})
177 session_admin
= project
.get("admin", False)
178 new_session
= {"issued_at": now
, "expires": now
+3600,
179 "_id": token_id
, "id": token_id
, "project_id": project_id
, "username": user_content
["username"],
180 "remote_port": remote
.port
, "admin": session_admin
}
182 new_session
["remote_host"] = remote
.name
184 new_session
["remote_host"] = remote
.ip
186 self
.tokens
[token_id
] = new_session
187 return deepcopy(new_session
)
189 def get_token_list(self
, session
):
191 for token_id
, token_value
in self
.tokens
.items():
192 if token_value
["username"] == session
["username"]:
193 token_list
.append(deepcopy(token_value
))
196 def get_token(self
, session
, token_id
):
197 token_value
= self
.tokens
.get(token_id
)
199 raise EngineException("token not found", http_code
=HTTPStatus
.NOT_FOUND
)
200 if token_value
["username"] != session
["username"] and not session
["admin"]:
201 raise EngineException("needed admin privileges", http_code
=HTTPStatus
.UNAUTHORIZED
)
204 def del_token(self
, token_id
):
206 del self
.tokens
[token_id
]
207 return "token '{}' deleted".format(token_id
)
209 raise EngineException("Token '{}' not found".format(token_id
), http_code
=HTTPStatus
.NOT_FOUND
)
212 def _remove_envelop(item
, indata
=None):
214 Obtain the useful data removing the envelop. It goes throw the vnfd or nsd catalog and returns the
216 :param item: can be vnfds, nsds, users, projects, userDefinedData (initial content of a vnfds, nsds
217 :param indata: Content to be inspected
218 :return: the useful part of indata
220 clean_indata
= indata
224 if clean_indata
.get('vnfd:vnfd-catalog'):
225 clean_indata
= clean_indata
['vnfd:vnfd-catalog']
226 elif clean_indata
.get('vnfd-catalog'):
227 clean_indata
= clean_indata
['vnfd-catalog']
228 if clean_indata
.get('vnfd'):
229 if not isinstance(clean_indata
['vnfd'], list) or len(clean_indata
['vnfd']) != 1:
230 raise EngineException("'vnfd' must be a list only one element")
231 clean_indata
= clean_indata
['vnfd'][0]
233 if clean_indata
.get('nsd:nsd-catalog'):
234 clean_indata
= clean_indata
['nsd:nsd-catalog']
235 elif clean_indata
.get('nsd-catalog'):
236 clean_indata
= clean_indata
['nsd-catalog']
237 if clean_indata
.get('nsd'):
238 if not isinstance(clean_indata
['nsd'], list) or len(clean_indata
['nsd']) != 1:
239 raise EngineException("'nsd' must be a list only one element")
240 clean_indata
= clean_indata
['nsd'][0]
241 elif item
== "userDefinedData":
242 if "userDefinedData" in indata
:
243 clean_indata
= clean_indata
['userDefinedData']
246 def _validate_new_data(self
, session
, item
, indata
, id=None):
248 if not indata
.get("username"):
249 raise EngineException("missing 'username'", HTTPStatus
.UNPROCESSABLE_ENTITY
)
250 if not indata
.get("password"):
251 raise EngineException("missing 'password'", HTTPStatus
.UNPROCESSABLE_ENTITY
)
252 if not indata
.get("projects"):
253 raise EngineException("missing 'projects'", HTTPStatus
.UNPROCESSABLE_ENTITY
)
254 # check username not exists
255 if self
.db
.get_one(item
, {"username": indata
.get("username")}, fail_on_empty
=False, fail_on_more
=False):
256 raise EngineException("username '{}' exists".format(indata
["username"]), HTTPStatus
.CONFLICT
)
257 elif item
== "projects":
258 if not indata
.get("name"):
259 raise EngineException("missing 'name'")
260 # check name not exists
261 if self
.db
.get_one(item
, {"name": indata
.get("name")}, fail_on_empty
=False, fail_on_more
=False):
262 raise EngineException("name '{}' exists".format(indata
["name"]), HTTPStatus
.CONFLICT
)
263 elif item
in ("vnfds", "nsds"):
264 filter = {"id": indata
["id"]}
266 filter["_id.neq"] = id
267 # TODO add admin to filter, validate rights
268 self
._add
_read
_filter
(session
, item
, filter)
269 if self
.db
.get_one(item
, filter, fail_on_empty
=False):
270 raise EngineException("{} with id '{}' already exists for this tenant".format(item
[:-1], indata
["id"]),
273 # TODO validate with pyangbind
274 elif item
== "userDefinedData":
275 # TODO validate userDefinedData is a keypair values
280 elif item
== "vim_accounts" or item
== "sdns":
281 if self
.db
.get_one(item
, {"name": indata
.get("name")}, fail_on_empty
=False, fail_on_more
=False):
282 raise EngineException("name '{}' already exists for {}".format(indata
["name"], item
),
285 def _format_new_data(self
, session
, item
, indata
):
287 if not "_admin" in indata
:
288 indata
["_admin"] = {}
289 indata
["_admin"]["created"] = now
290 indata
["_admin"]["modified"] = now
292 indata
["_id"] = indata
["username"]
294 indata
["_admin"]["salt"] = salt
295 indata
["password"] = sha256(indata
["password"].encode('utf-8') + salt
.encode('utf-8')).hexdigest()
296 elif item
== "projects":
297 indata
["_id"] = indata
["name"]
299 if not indata
.get("_id"):
300 indata
["_id"] = str(uuid4())
301 if item
in ("vnfds", "nsds", "nsrs", "vnfrs"):
302 if not indata
["_admin"].get("projects_read"):
303 indata
["_admin"]["projects_read"] = [session
["project_id"]]
304 if not indata
["_admin"].get("projects_write"):
305 indata
["_admin"]["projects_write"] = [session
["project_id"]]
306 if item
in ("vnfds", "nsds"):
307 indata
["_admin"]["onboardingState"] = "CREATED"
308 indata
["_admin"]["operationalState"] = "DISABLED"
309 indata
["_admin"]["usageSate"] = "NOT_IN_USE"
311 indata
["_admin"]["nsState"] = "NOT_INSTANTIATED"
312 if item
in ("vim_accounts", "sdns"):
313 indata
["_admin"]["operationalState"] = "PROCESSING"
315 def upload_content(self
, session
, item
, _id
, indata
, kwargs
, headers
):
317 Used for receiving content by chunks (with a transaction_id header and/or gzip file. It will store and extract)
318 :param session: session
319 :param item: can be nsds or vnfds
320 :param _id : the nsd,vnfd is already created, this is the id
321 :param indata: http body request
322 :param kwargs: user query string to override parameters. NOT USED
323 :param headers: http request headers
324 :return: True package has is completely uploaded or False if partial content has been uplodaed.
325 Raise exception on error
327 # Check that _id exists and it is valid
328 current_desc
= self
.get_item(session
, item
, _id
)
330 content_range_text
= headers
.get("Content-Range")
331 expected_md5
= headers
.get("Content-File-MD5")
333 content_type
= headers
.get("Content-Type")
334 if content_type
and "application/gzip" in content_type
or "application/x-gzip" in content_type
or \
335 "application/zip" in content_type
:
337 filename
= headers
.get("Content-Filename")
339 filename
= "package.tar.gz" if compressed
else "package"
340 # TODO change to Content-Disposition filename https://tools.ietf.org/html/rfc6266
344 if content_range_text
:
345 content_range
= content_range_text
.replace("-", " ").replace("/", " ").split()
346 if content_range
[0] != "bytes": # TODO check x<y not negative < total....
348 start
= int(content_range
[1])
349 end
= int(content_range
[2]) + 1
350 total
= int(content_range
[3])
355 if not self
.fs
.file_exists(_id
, 'dir'):
356 raise EngineException("invalid Transaction-Id header", HTTPStatus
.NOT_FOUND
)
358 self
.fs
.file_delete(_id
, ignore_non_exist
=True)
361 storage
= self
.fs
.get_params()
362 storage
["folder"] = _id
364 file_path
= (_id
, filename
)
365 if self
.fs
.file_exists(file_path
, 'file'):
366 file_size
= self
.fs
.file_size(file_path
)
369 if file_size
!= start
:
370 raise EngineException("invalid Content-Range start sequence, expected '{}' but received '{}'".format(
371 file_size
, start
), HTTPStatus
.REQUESTED_RANGE_NOT_SATISFIABLE
)
372 file_pkg
= self
.fs
.file_open(file_path
, 'a+b')
373 if isinstance(indata
, dict):
374 indata_text
= yaml
.safe_dump(indata
, indent
=4, default_flow_style
=False)
375 file_pkg
.write(indata_text
.encode(encoding
="utf-8"))
379 indata_text
= indata
.read(4096)
380 indata_len
+= len(indata_text
)
383 file_pkg
.write(indata_text
)
384 if content_range_text
:
385 if indata_len
!= end
-start
:
386 raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format(
387 start
, end
-1, indata_len
), HTTPStatus
.REQUESTED_RANGE_NOT_SATISFIABLE
)
389 # TODO update to UPLOADING
396 chunk_data
= file_pkg
.read(1024)
398 file_md5
.update(chunk_data
)
399 chunk_data
= file_pkg
.read(1024)
400 if expected_md5
!= file_md5
.hexdigest():
401 raise EngineException("Error, MD5 mismatch", HTTPStatus
.CONFLICT
)
403 if compressed
== "gzip":
404 tar
= tarfile
.open(mode
='r', fileobj
=file_pkg
)
405 descriptor_file_name
= None
407 tarname
= tarinfo
.name
408 tarname_path
= tarname
.split("/")
409 if not tarname_path
[0] or ".." in tarname_path
: # if start with "/" means absolute path
410 raise EngineException("Absolute path or '..' are not allowed for package descriptor tar.gz")
411 if len(tarname_path
) == 1 and not tarinfo
.isdir():
412 raise EngineException("All files must be inside a dir for package descriptor tar.gz")
413 if tarname
.endswith(".yaml") or tarname
.endswith(".json") or tarname
.endswith(".yml"):
414 storage
["pkg-dir"] = tarname_path
[0]
415 if len(tarname_path
) == 2:
416 if descriptor_file_name
:
417 raise EngineException("Found more than one descriptor file at package descriptor tar.gz")
418 descriptor_file_name
= tarname
419 if not descriptor_file_name
:
420 raise EngineException("Not found any descriptor file at package descriptor tar.gz")
421 storage
["descriptor"] = descriptor_file_name
422 storage
["zipfile"] = filename
423 self
.fs
.file_extract(tar
, _id
)
424 with self
.fs
.file_open((_id
, descriptor_file_name
), "r") as descriptor_file
:
425 content
= descriptor_file
.read()
427 content
= file_pkg
.read()
428 storage
["descriptor"] = descriptor_file_name
= filename
430 if descriptor_file_name
.endswith(".json"):
431 error_text
= "Invalid json format "
432 indata
= json
.load(content
)
434 error_text
= "Invalid yaml format "
435 indata
= yaml
.load(content
)
437 current_desc
["_admin"]["storage"] = storage
438 current_desc
["_admin"]["onboardingState"] = "ONBOARDED"
439 current_desc
["_admin"]["operationalState"] = "ENABLED"
441 self
._edit
_item
(session
, item
, _id
, current_desc
, indata
, kwargs
)
442 # TODO if descriptor has changed because kwargs update content and remove cached zip
443 # TODO if zip is not present creates one
446 except EngineException
:
449 raise EngineException("invalid Content-Range header format. Expected 'bytes start-end/total'",
450 HTTPStatus
.REQUESTED_RANGE_NOT_SATISFIABLE
)
452 raise EngineException("invalid upload transaction sequence: '{}'".format(e
), HTTPStatus
.BAD_REQUEST
)
453 except tarfile
.ReadError
as e
:
454 raise EngineException("invalid file content {}".format(e
), HTTPStatus
.BAD_REQUEST
)
455 except (ValueError, yaml
.YAMLError
) as e
:
456 raise EngineException(error_text
+ str(e
))
461 def new_nsr(self
, session
, ns_request
):
463 Creates a new nsr into database. It also creates needed vnfrs
464 :param session: contains the used login username and working project
465 :param ns_request: params to be used for the nsr
466 :return: the _id of nsr descriptor stored at database
472 step
= "getting nsd id='{}' from database".format(ns_request
.get("nsdId"))
473 nsd
= self
.get_item(session
, "nsds", ns_request
["nsdId"])
474 nsr_id
= str(uuid4())
476 step
= "filling nsr from input data"
478 "name": ns_request
["nsName"],
479 "name-ref": ns_request
["nsName"],
480 "short-name": ns_request
["nsName"],
481 "admin-status": "ENABLED",
483 "datacenter": ns_request
["vimAccountId"],
484 "resource-orchestrator": "osmopenmano",
485 "description": ns_request
.get("nsDescription", ""),
486 "constituent-vnfr-ref": [],
488 "operational-status": "init", # typedef ns-operational-
489 "config-status": "init", # typedef config-states
490 "detailed-status": "scheduled",
492 "orchestration-progress": {}, # {"networks": {"active": 0, "total": 0}, "vms": {"active": 0, "total": 0}},
495 "nsd-name-ref": nsd
["name"],
496 "operational-events": [], # "id", "timestamp", "description", "event",
497 "nsd-ref": nsd
["id"],
498 "instantiate_params": ns_request
,
499 "ns-instance-config-ref": nsr_id
,
502 # "input-parameter": xpath, value,
503 "ssh-authorized-key": ns_request
.get("key-pair-ref"),
505 ns_request
["nsr_id"] = nsr_id
509 for member_vnf
in nsd
["constituent-vnfd"]:
510 vnfd_id
= member_vnf
["vnfd-id-ref"]
511 step
= "getting vnfd id='{}' constituent-vnfd='{}' from database".format(
512 member_vnf
["vnfd-id-ref"], member_vnf
["member-vnf-index"])
513 if vnfd_id
not in needed_vnfds
:
515 vnf_filter
= {"id": vnfd_id
}
516 self
._add
_read
_filter
(session
, "vnfds", vnf_filter
)
517 vnfd
= self
.db
.get_one("vnfds", vnf_filter
)
519 needed_vnfds
[vnfd_id
] = vnfd
521 vnfd
= needed_vnfds
[vnfd_id
]
522 step
= "filling vnfr vnfd-id='{}' constituent-vnfd='{}'".format(
523 member_vnf
["vnfd-id-ref"], member_vnf
["member-vnf-index"])
524 vnfr_id
= str(uuid4())
528 "nsr-id-ref": nsr_id
,
529 "member-vnf-index-ref": member_vnf
["member-vnf-index"],
531 # "vnfd": vnfd, # at OSM model. TODO can it be removed in the future to avoid data duplication?
533 "vnfd-id": vnfr_id
, # not at OSM model, but useful
534 "vim-account-id": None,
536 "connection-point": [],
537 "ip-address": None, # mgmt-interface filled by LCM
539 for cp
in vnfd
.get("connection-point", ()):
542 "connection-point-id": cp
.get("id"),
544 # "ip-address", "mac-address" # filled by LCM
545 # vim-id # TODO it would be nice having a vim port id
547 vnfr_descriptor
["connection-point"].append(vnf_cp
)
548 for vdu
in vnfd
["vdu"]:
549 vdur_id
= str(uuid4())
552 "vdu-id-ref": vdu
["id"],
553 "ip-address": None, # mgmt-interface filled by LCM
554 # "vim-id", "flavor-id", "image-id", "management-ip" # filled by LCM
555 "internal-connection-point": [],
557 # TODO volumes: name, volume-id
558 for icp
in vdu
.get("internal-connection-point", ()):
561 "connection-point-id": icp
["id"],
562 "name": icp
.get("name"),
563 # "ip-address", "mac-address" # filled by LCM
564 # vim-id # TODO it would be nice having a vim port id
566 vdur
["internal-connection-point"].append(vdu_icp
)
567 vnfr_descriptor
["vdur"].append(vdur
)
569 step
= "creating vnfr vnfd-id='{}' constituent-vnfd='{}' at database".format(
570 member_vnf
["vnfd-id-ref"], member_vnf
["member-vnf-index"])
571 self
._format
_new
_data
(session
, "vnfrs", vnfr_descriptor
)
572 self
.db
.create("vnfrs", vnfr_descriptor
)
573 rollback
.append({"session": session
, "item": "vnfrs", "_id": vnfr_id
, "force": True})
574 nsr_descriptor
["constituent-vnfr-ref"].append(vnfr_id
)
576 step
= "creating nsr at database"
577 self
._format
_new
_data
(session
, "nsrs", nsr_descriptor
)
578 self
.db
.create("nsrs", nsr_descriptor
)
580 except Exception as e
:
581 raise EngineException("Error {}: {}".format(step
, e
))
582 for rollback_item
in rollback
:
584 self
.engine
.del_item(**rollback
)
585 except Exception as e2
:
586 self
.logger
.error("Rollback Exception {}: {}".format(rollback
, e2
))
589 def _update_descriptor(desc
, kwargs
):
591 Update descriptor with the kwargs
598 for k
, v
in kwargs
.items():
599 update_content
= content
603 if kitem_old
is not None:
604 update_content
= update_content
[kitem_old
]
605 if isinstance(update_content
, dict):
607 elif isinstance(update_content
, list):
608 kitem_old
= int(kitem
)
610 raise EngineException(
611 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k
, kitem
))
612 update_content
[kitem_old
] = v
614 raise EngineException(
615 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k
, kitem_old
))
617 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
620 raise EngineException(
621 "Invalid query string '{}'. Index '{}' out of range".format(k
, kitem_old
))
623 def new_item(self
, session
, item
, indata
={}, kwargs
=None, headers
={}):
625 Creates a new entry into database. For nsds and vnfds it creates an almost empty DISABLED entry,
626 that must be completed with a call to method upload_content
627 :param session: contains the used login username and working project
628 :param item: it can be: users, projects, vim_accounts, sdns, nsrs, nsds, vnfds
629 :param indata: data to be inserted
630 :param kwargs: used to override the indata descriptor
631 :param headers: http request headers
632 :return: _id: identity of the inserted data.
637 if item
in ("nsds", "vnfds"):
638 item_envelop
= "userDefinedData"
639 content
= self
._remove
_envelop
(item_envelop
, indata
)
641 # Override descriptor with query string kwargs
642 self
._update
_descriptor
(content
, kwargs
)
643 if not indata
and item
not in ("nsds", "vnfds"):
644 raise EngineException("Empty payload")
646 validate_input(content
, item
, new
=True)
649 # in this case the input descriptor is not the data to be stored
650 return self
.new_nsr(session
, ns_request
=content
)
652 self
._validate
_new
_data
(session
, item_envelop
, content
)
653 if item
in ("nsds", "vnfds"):
654 content
= {"_admin": {"userDefinedData": content
}}
655 self
._format
_new
_data
(session
, item
, content
)
656 _id
= self
.db
.create(item
, content
)
658 if item
== "vim_accounts":
659 msg_data
= self
.db
.get_one(item
, {"_id": _id
})
660 msg_data
.pop("_admin", None)
661 self
.msg
.write("vim_account", "create", msg_data
)
663 msg_data
= self
.db
.get_one(item
, {"_id": _id
})
664 msg_data
.pop("_admin", None)
665 self
.msg
.write("sdn", "create", msg_data
)
667 except ValidationError
as e
:
668 raise EngineException(e
, HTTPStatus
.UNPROCESSABLE_ENTITY
)
670 def new_nslcmop(self
, session
, nsInstanceId
, action
, params
):
676 "operationState": "PROCESSING", # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
677 "statusEnteredTime": now
,
678 "nsInstanceId": nsInstanceId
,
679 "lcmOperationType": action
,
681 "isAutomaticInvocation": False,
682 "operationParams": params
,
683 "isCancelPending": False,
685 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
686 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsInstanceId
,
691 def ns_action(self
, session
, nsInstanceId
, action
, indata
, kwargs
=None):
693 Performs a new action over a ns
694 :param session: contains the used login username and working project
695 :param nsInstanceId: _id of the nsr to perform the action
696 :param action: it can be: instantiate, terminate, action, TODO: update, heal
697 :param indata: descriptor with the parameters of the action
698 :param kwargs: used to override the indata descriptor
699 :return: id of the nslcmops
702 # Override descriptor with query string kwargs
703 self
._update
_descriptor
(indata
, kwargs
)
704 validate_input(indata
, "ns_" + action
, new
=True)
706 nsr
= self
.get_item(session
, "nsrs", nsInstanceId
)
707 if not nsr
["_admin"].get("nsState") or nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
708 if action
== "terminate" and indata
.get("autoremove"):
709 # NSR must be deleted
710 return self
.del_item(session
, "nsrs", nsInstanceId
)
711 if action
!= "instantiate":
712 raise EngineException("ns_instance '{}' cannot be '{}' because it is not instantiated".format(
713 nsInstanceId
, action
), HTTPStatus
.CONFLICT
)
715 if action
== "instantiate" and not indata
.get("force"):
716 raise EngineException("ns_instance '{}' cannot be '{}' because it is already instantiated".format(
717 nsInstanceId
, action
), HTTPStatus
.CONFLICT
)
718 indata
["nsInstanceId"] = nsInstanceId
720 nslcmop
= self
.new_nslcmop(session
, nsInstanceId
, action
, indata
)
721 self
._format
_new
_data
(session
, "nslcmops", nslcmop
)
722 _id
= self
.db
.create("nslcmops", nslcmop
)
724 self
.msg
.write("ns", action
, nslcmop
)
726 except ValidationError
as e
:
727 raise EngineException(e
, HTTPStatus
.UNPROCESSABLE_ENTITY
)
728 # except DbException as e:
729 # raise EngineException("Cannot get ns_instance '{}': {}".format(e), HTTPStatus.NOT_FOUND)
731 def _add_read_filter(self
, session
, item
, filter):
732 if session
["project_id"] == "admin": # allows all
735 filter["username"] = session
["username"]
736 elif item
in ("vnfds", "nsds", "nsrs"):
737 filter["_admin.projects_read.cont"] = ["ANY", session
["project_id"]]
739 def _add_delete_filter(self
, session
, item
, filter):
740 if session
["project_id"] != "admin" and item
in ("users", "projects"):
741 raise EngineException("Only admin users can perform this task", http_code
=HTTPStatus
.FORBIDDEN
)
743 if filter.get("_id") == session
["username"] or filter.get("username") == session
["username"]:
744 raise EngineException("You cannot delete your own user", http_code
=HTTPStatus
.CONFLICT
)
745 elif item
== "project":
746 if filter.get("_id") == session
["project_id"]:
747 raise EngineException("You cannot delete your own project", http_code
=HTTPStatus
.CONFLICT
)
748 elif item
in ("vnfds", "nsds") and session
["project_id"] != "admin":
749 filter["_admin.projects_write.cont"] = ["ANY", session
["project_id"]]
751 def get_file(self
, session
, item
, _id
, path
=None, accept_header
=None):
753 Return the file content of a vnfd or nsd
754 :param session: contains the used login username and working project
755 :param item: it can be vnfds or nsds
756 :param _id: Identity of the vnfd, ndsd
757 :param path: artifact path or "$DESCRIPTOR" or None
758 :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
759 :return: opened file or raises an exception
761 accept_text
= accept_zip
= False
763 if 'text/plain' in accept_header
or '*/*' in accept_header
:
765 if 'application/zip' in accept_header
or '*/*' in accept_header
:
767 if not accept_text
and not accept_zip
:
768 raise EngineException("provide request header 'Accept' with 'application/zip' or 'text/plain'",
769 http_code
=HTTPStatus
.NOT_ACCEPTABLE
)
771 content
= self
.get_item(session
, item
, _id
)
772 if content
["_admin"]["onboardingState"] != "ONBOARDED":
773 raise EngineException("Cannot get content because this resource is not at 'ONBOARDED' state. "
774 "onboardingState is {}".format(content
["_admin"]["onboardingState"]),
775 http_code
=HTTPStatus
.CONFLICT
)
776 storage
= content
["_admin"]["storage"]
777 if path
is not None and path
!= "$DESCRIPTOR": # artifacts
778 if not storage
.get('pkg-dir'):
779 raise EngineException("Packages does not contains artifacts", http_code
=HTTPStatus
.BAD_REQUEST
)
780 if self
.fs
.file_exists((storage
['folder'], storage
['pkg-dir'], *path
), 'dir'):
781 folder_content
= self
.fs
.dir_ls((storage
['folder'], storage
['pkg-dir'], *path
))
782 return folder_content
, "text/plain"
783 # TODO manage folders in http
785 return self
.fs
.file_open((storage
['folder'], storage
['pkg-dir'], *path
), "rb"), \
786 "application/octet-stream"
788 # pkgtype accept ZIP TEXT -> result
789 # manyfiles yes X -> zip
791 # onefile yes no -> zip
794 if accept_text
and (not storage
.get('pkg-dir') or path
== "$DESCRIPTOR"):
795 return self
.fs
.file_open((storage
['folder'], storage
['descriptor']), "r"), "text/plain"
796 elif storage
.get('pkg-dir') and not accept_zip
:
797 raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'"
798 "Accept header", http_code
=HTTPStatus
.NOT_ACCEPTABLE
)
800 if not storage
.get('zipfile'):
801 # TODO generate zipfile if not present
802 raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in future versions"
803 "", http_code
=HTTPStatus
.NOT_ACCEPTABLE
)
804 return self
.fs
.file_open((storage
['folder'], storage
['zipfile']), "rb"), "application/zip"
806 def get_item_list(self
, session
, item
, filter={}):
809 :param session: contains the used login username and working project
810 :param item: it can be: users, projects, vnfds, nsds, ...
811 :param filter: filter of data to be applied
812 :return: The list, it can be empty if no one match the filter.
814 # TODO add admin to filter, validate rights
815 # TODO transform data for SOL005 URL requests. Transform filtering
816 # TODO implement "field-type" query string SOL005
818 self
._add
_read
_filter
(session
, item
, filter)
819 return self
.db
.get_list(item
, filter)
821 def get_item(self
, session
, item
, _id
):
823 Get complete information on an items
824 :param session: contains the used login username and working project
825 :param item: it can be: users, projects, vnfds, nsds,
826 :param _id: server id of the item
827 :return: dictionary, raise exception if not found.
830 filter = {"_id": _id
}
831 # TODO add admin to filter, validate rights
832 # TODO transform data for SOL005 URL requests
833 self
._add
_read
_filter
(session
, item
, filter)
834 return self
.db
.get_one(item
, filter)
836 def del_item_list(self
, session
, item
, filter={}):
838 Delete a list of items
839 :param session: contains the used login username and working project
840 :param item: it can be: users, projects, vnfds, nsds, ...
841 :param filter: filter of data to be applied
842 :return: The deleted list, it can be empty if no one match the filter.
844 # TODO add admin to filter, validate rights
845 self
._add
_read
_filter
(session
, item
, filter)
846 return self
.db
.del_list(item
, filter)
848 def del_item(self
, session
, item
, _id
, force
=False):
850 Get complete information on an items
851 :param session: contains the used login username and working project
852 :param item: it can be: users, projects, vnfds, nsds, ...
853 :param _id: server id of the item
854 :param force: indicates if deletion must be forced in case of conflict
855 :return: dictionary with deleted item _id. It raises exception if not found.
857 # TODO add admin to filter, validate rights
858 # data = self.get_item(item, _id)
859 filter = {"_id": _id
}
860 self
._add
_delete
_filter
(session
, item
, filter)
863 nsr
= self
.db
.get_one(item
, filter)
864 if nsr
["_admin"]["nsState"] == "INSTANTIATED" and not force
:
865 raise EngineException("nsr '{}' cannot be deleted because it is in 'INSTANTIATED' state. "
866 "Launch 'terminate' action first; or force deletion".format(_id
),
867 http_code
=HTTPStatus
.CONFLICT
)
868 v
= self
.db
.del_one(item
, {"_id": _id
})
869 self
.db
.del_list("nslcmops", {"nsInstanceId": _id
})
870 self
.db
.del_list("vnfrs", {"nsr-id-ref": _id
})
871 self
.msg
.write("ns", "deleted", {"_id": _id
})
873 if item
in ("vim_accounts", "sdns"):
874 desc
= self
.db
.get_one(item
, filter)
875 desc
["_admin"]["to_delete"] = True
876 self
.db
.replace(item
, _id
, desc
) # TODO change to set_one
877 if item
== "vim_accounts":
878 self
.msg
.write("vim_account", "delete", {"_id": _id
})
880 self
.msg
.write("sdn", "delete", {"_id": _id
})
881 return {"deleted": 1} # TODO indicate an offline operation to return 202 ACCEPTED
883 v
= self
.db
.del_one(item
, filter)
884 self
.fs
.file_delete(_id
, ignore_non_exist
=True)
889 Prune database not needed content
892 return self
.db
.del_list("nsrs", {"_admin.to_delete": True})
894 def create_admin(self
):
896 Creates a new user admin/admin into database if database is empty. Useful for initialization
897 :return: _id identity of the inserted data, or None
899 users
= self
.db
.get_one("users", fail_on_empty
=False, fail_on_more
=False)
902 # raise EngineException("Unauthorized. Database users is not empty", HTTPStatus.UNAUTHORIZED)
903 indata
= {"username": "admin", "password": "admin", "projects": ["admin"]}
904 fake_session
= {"project_id": "admin", "username": "admin"}
905 self
._format
_new
_data
(fake_session
, "users", indata
)
906 _id
= self
.db
.create("users", indata
)
909 def init_db(self
, target_version
='1.0'):
911 Init database if empty. If not empty it checks that database version is ok.
912 If empty, it creates a new user admin/admin at 'users' and a new entry at 'version'
913 :return: None if ok, exception if error or if the version is different.
915 version
= self
.db
.get_one("version", fail_on_empty
=False, fail_on_more
=False)
919 # create database version
921 "_id": '1.0', # version text
922 "version": 1000, # version number
923 "date": "2018-04-12", # version date
924 "description": "initial design", # changes in this version
925 'status': 'ENABLED' # ENABLED, DISABLED (migration in process), ERROR,
927 self
.db
.create("version", version_data
)
928 elif version
["_id"] != target_version
:
929 # TODO implement migration process
930 raise EngineException("Wrong database version '{}'. Expected '{}'".format(
931 version
["_id"], target_version
), HTTPStatus
.INTERNAL_SERVER_ERROR
)
932 elif version
["status"] != 'ENABLED':
933 raise EngineException("Wrong database status '{}'".format(
934 version
["status"]), HTTPStatus
.INTERNAL_SERVER_ERROR
)
937 def _edit_item(self
, session
, item
, id, content
, indata
={}, kwargs
=None):
939 indata
= self
._remove
_envelop
(item
, indata
)
941 # Override descriptor with query string kwargs
944 for k
, v
in kwargs
.items():
945 update_content
= indata
949 if kitem_old
is not None:
950 update_content
= update_content
[kitem_old
]
951 if isinstance(update_content
, dict):
953 elif isinstance(update_content
, list):
954 kitem_old
= int(kitem
)
956 raise EngineException(
957 "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k
, kitem
))
958 update_content
[kitem_old
] = v
960 raise EngineException(
961 "Invalid query string '{}'. Descriptor does not contain '{}'".format(k
, kitem_old
))
963 raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
966 raise EngineException(
967 "Invalid query string '{}'. Index '{}' out of range".format(k
, kitem_old
))
969 validate_input(content
, item
, new
=False)
970 except ValidationError
as e
:
971 raise EngineException(e
, HTTPStatus
.UNPROCESSABLE_ENTITY
)
973 _deep_update(content
, indata
)
974 self
._validate
_new
_data
(session
, item
, content
, id)
975 # self._format_new_data(session, item, content)
976 self
.db
.replace(item
, id, content
)
977 if item
in ("vim_accounts", "sdns"):
978 indata
.pop("_admin", None)
980 if item
== "vim_accounts":
981 self
.msg
.write("vim_account", "edit", indata
)
983 self
.msg
.write("sdn", "edit", indata
)
986 def edit_item(self
, session
, item
, _id
, indata
={}, kwargs
=None):
988 Update an existing entry at database
989 :param session: contains the used login username and working project
990 :param item: it can be: users, projects, vnfds, nsds, ...
991 :param _id: identifier to be updated
992 :param indata: data to be inserted
993 :param kwargs: used to override the indata descriptor
994 :return: dictionary, raise exception if not found.
997 content
= self
.get_item(session
, item
, _id
)
998 return self
._edit
_item
(session
, item
, _id
, content
, indata
, kwargs
)