# under the License.
##
-version = '7.0.1.post23'
-version_date = '2020-04-17'
+version = "7.0.1.post23"
+version_date = "2020-04-17"
# Obtain installed package version. Ignore if error, e.g. pkg_resources not installed
try:
from pkg_resources import get_distribution
+
version = get_distribution("osm_nbi").version
except Exception:
pass
from hashlib import sha256
from http import HTTPStatus
from time import time
-from osm_nbi.validation import user_new_schema, user_edit_schema, project_new_schema, project_edit_schema, \
- vim_account_new_schema, vim_account_edit_schema, sdn_new_schema, sdn_edit_schema, \
- wim_account_new_schema, wim_account_edit_schema, roles_new_schema, roles_edit_schema, \
- k8scluster_new_schema, k8scluster_edit_schema, k8srepo_new_schema, k8srepo_edit_schema, \
- vca_new_schema, vca_edit_schema, \
- osmrepo_new_schema, osmrepo_edit_schema, \
- validate_input, ValidationError, is_valid_uuid # To check that User/Project Names don't look like UUIDs
+from osm_nbi.validation import (
+ user_new_schema,
+ user_edit_schema,
+ project_new_schema,
+ project_edit_schema,
+ vim_account_new_schema,
+ vim_account_edit_schema,
+ sdn_new_schema,
+ sdn_edit_schema,
+ wim_account_new_schema,
+ wim_account_edit_schema,
+ roles_new_schema,
+ roles_edit_schema,
+ k8scluster_new_schema,
+ k8scluster_edit_schema,
+ k8srepo_new_schema,
+ k8srepo_edit_schema,
+ vca_new_schema,
+ vca_edit_schema,
+ osmrepo_new_schema,
+ osmrepo_edit_schema,
+ validate_input,
+ ValidationError,
+ is_valid_uuid,
+) # To check that User/Project Names don't look like UUIDs
from osm_nbi.base_topic import BaseTopic, EngineException
from osm_nbi.authconn import AuthconnNotFoundException, AuthconnConflictException
from osm_common.dbbase import deep_update_rfc7396
def check_conflict_on_new(self, session, indata):
# check username not exists
- if self.db.get_one(self.topic, {"username": indata.get("username")}, fail_on_empty=False, fail_on_more=False):
- raise EngineException("username '{}' exists".format(indata["username"]), HTTPStatus.CONFLICT)
+ if self.db.get_one(
+ self.topic,
+ {"username": indata.get("username")},
+ fail_on_empty=False,
+ fail_on_more=False,
+ ):
+ raise EngineException(
+ "username '{}' exists".format(indata["username"]), HTTPStatus.CONFLICT
+ )
# check projects
if not session["force"]:
for p in indata.get("projects") or []:
# To allow project addressing by Name as well as ID
- if not self.db.get_one("projects", {BaseTopic.id_field("projects", p): p}, fail_on_empty=False,
- fail_on_more=False):
- raise EngineException("project '{}' does not exist".format(p), HTTPStatus.CONFLICT)
+ if not self.db.get_one(
+ "projects",
+ {BaseTopic.id_field("projects", p): p},
+ fail_on_empty=False,
+ fail_on_more=False,
+ ):
+ raise EngineException(
+ "project '{}' does not exist".format(p), HTTPStatus.CONFLICT
+ )
def check_conflict_on_del(self, session, _id, db_content):
"""
:return: None if ok or raises EngineException with the conflict
"""
if _id == session["username"]:
- raise EngineException("You cannot delete your own user", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "You cannot delete your own user", http_code=HTTPStatus.CONFLICT
+ )
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
salt = uuid4().hex
content["_admin"]["salt"] = salt
if content.get("password"):
- content["password"] = sha256(content["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest()
+ content["password"] = sha256(
+ content["password"].encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
if content.get("project_role_mappings"):
- projects = [mapping["project"] for mapping in content["project_role_mappings"]]
+ projects = [
+ mapping["project"] for mapping in content["project_role_mappings"]
+ ]
if content.get("projects"):
content["projects"] += projects
if edit_content.get("password"):
salt = uuid4().hex
final_content["_admin"]["salt"] = salt
- final_content["password"] = sha256(edit_content["password"].encode('utf-8') +
- salt.encode('utf-8')).hexdigest()
+ final_content["password"] = sha256(
+ edit_content["password"].encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
return None
def edit(self, session, _id, indata=None, kwargs=None, content=None):
if not session["admin"]:
- raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+ raise EngineException(
+ "needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED
+ )
# Names that look like UUIDs are not allowed
name = (indata if indata else kwargs).get("username")
if is_valid_uuid(name):
- raise EngineException("Usernames that look like UUIDs are not allowed",
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- return BaseTopic.edit(self, session, _id, indata=indata, kwargs=kwargs, content=content)
+ raise EngineException(
+ "Usernames that look like UUIDs are not allowed",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ return BaseTopic.edit(
+ self, session, _id, indata=indata, kwargs=kwargs, content=content
+ )
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
if not session["admin"]:
- raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+ raise EngineException(
+ "needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED
+ )
# Names that look like UUIDs are not allowed
name = indata["username"] if indata else kwargs["username"]
if is_valid_uuid(name):
- raise EngineException("Usernames that look like UUIDs are not allowed",
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- return BaseTopic.new(self, rollback, session, indata=indata, kwargs=kwargs, headers=headers)
+ raise EngineException(
+ "Usernames that look like UUIDs are not allowed",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ return BaseTopic.new(
+ self, rollback, session, indata=indata, kwargs=kwargs, headers=headers
+ )
class ProjectTopic(BaseTopic):
if not indata.get("name"):
raise EngineException("missing 'name'")
# check name not exists
- if self.db.get_one(self.topic, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
- raise EngineException("name '{}' exists".format(indata["name"]), HTTPStatus.CONFLICT)
+ if self.db.get_one(
+ self.topic,
+ {"name": indata.get("name")},
+ fail_on_empty=False,
+ fail_on_more=False,
+ ):
+ raise EngineException(
+ "name '{}' exists".format(indata["name"]), HTTPStatus.CONFLICT
+ )
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
:return: None if ok or raises EngineException with the conflict
"""
if _id in session["project_id"]:
- raise EngineException("You cannot delete your own project", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "You cannot delete your own project", http_code=HTTPStatus.CONFLICT
+ )
if session["force"]:
return
_filter = {"projects": _id}
if self.db.get_list("users", _filter):
- raise EngineException("There is some USER that contains this project", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is some USER that contains this project",
+ http_code=HTTPStatus.CONFLICT,
+ )
def edit(self, session, _id, indata=None, kwargs=None, content=None):
if not session["admin"]:
- raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+ raise EngineException(
+ "needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED
+ )
# Names that look like UUIDs are not allowed
name = (indata if indata else kwargs).get("name")
if is_valid_uuid(name):
- raise EngineException("Project names that look like UUIDs are not allowed",
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- return BaseTopic.edit(self, session, _id, indata=indata, kwargs=kwargs, content=content)
+ raise EngineException(
+ "Project names that look like UUIDs are not allowed",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ return BaseTopic.edit(
+ self, session, _id, indata=indata, kwargs=kwargs, content=content
+ )
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
if not session["admin"]:
- raise EngineException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+ raise EngineException(
+ "needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED
+ )
# Names that look like UUIDs are not allowed
name = indata["name"] if indata else kwargs["name"]
if is_valid_uuid(name):
- raise EngineException("Project names that look like UUIDs are not allowed",
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
- return BaseTopic.new(self, rollback, session, indata=indata, kwargs=kwargs, headers=headers)
+ raise EngineException(
+ "Project names that look like UUIDs are not allowed",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
+ return BaseTopic.new(
+ self, rollback, session, indata=indata, kwargs=kwargs, headers=headers
+ )
class CommonVimWimSdn(BaseTopic):
"""Common class for VIM, WIM SDN just to unify methods that are equal to all of them"""
- config_to_encrypt = {} # what keys at config must be encrypted because contains passwords
- password_to_encrypt = "" # key that contains a password
+
+ config_to_encrypt = (
+ {}
+ ) # what keys at config must be encrypted because contains passwords
+ password_to_encrypt = "" # key that contains a password
@staticmethod
def _create_operation(op_type, params=None):
schema_version = final_content.get("schema_version")
if schema_version:
if edit_content.get(self.password_to_encrypt):
- final_content[self.password_to_encrypt] = self.db.encrypt(edit_content[self.password_to_encrypt],
- schema_version=schema_version,
- salt=final_content["_id"])
- config_to_encrypt_keys = self.config_to_encrypt.get(schema_version) or self.config_to_encrypt.get("default")
+ final_content[self.password_to_encrypt] = self.db.encrypt(
+ edit_content[self.password_to_encrypt],
+ schema_version=schema_version,
+ salt=final_content["_id"],
+ )
+ config_to_encrypt_keys = self.config_to_encrypt.get(
+ schema_version
+ ) or self.config_to_encrypt.get("default")
if edit_content.get("config") and config_to_encrypt_keys:
for p in config_to_encrypt_keys:
if edit_content["config"].get(p):
- final_content["config"][p] = self.db.encrypt(edit_content["config"][p],
- schema_version=schema_version,
- salt=final_content["_id"])
+ final_content["config"][p] = self.db.encrypt(
+ edit_content["config"][p],
+ schema_version=schema_version,
+ salt=final_content["_id"],
+ )
# create edit operation
final_content["_admin"]["operations"].append(self._create_operation("edit"))
- return "{}:{}".format(final_content["_id"], len(final_content["_admin"]["operations"]) - 1)
+ return "{}:{}".format(
+ final_content["_id"], len(final_content["_admin"]["operations"]) - 1
+ )
def format_on_new(self, content, project_id=None, make_public=False):
"""
# encrypt passwords
if content.get(self.password_to_encrypt):
- content[self.password_to_encrypt] = self.db.encrypt(content[self.password_to_encrypt],
- schema_version=schema_version,
- salt=content["_id"])
- config_to_encrypt_keys = self.config_to_encrypt.get(schema_version) or self.config_to_encrypt.get("default")
+ content[self.password_to_encrypt] = self.db.encrypt(
+ content[self.password_to_encrypt],
+ schema_version=schema_version,
+ salt=content["_id"],
+ )
+ config_to_encrypt_keys = self.config_to_encrypt.get(
+ schema_version
+ ) or self.config_to_encrypt.get("default")
if content.get("config") and config_to_encrypt_keys:
for p in config_to_encrypt_keys:
if content["config"].get(p):
- content["config"][p] = self.db.encrypt(content["config"][p],
- schema_version=schema_version,
- salt=content["_id"])
+ content["config"][p] = self.db.encrypt(
+ content["config"][p],
+ schema_version=schema_version,
+ salt=content["_id"],
+ )
content["_admin"]["operationalState"] = "PROCESSING"
# remove reference from project_read if there are more projects referencing it. If it last one,
# do not remove reference, but order via kafka to delete it
if session["project_id"] and session["project_id"]:
- other_projects_referencing = next((p for p in db_content["_admin"]["projects_read"]
- if p not in session["project_id"] and p != "ANY"), None)
+ other_projects_referencing = next(
+ (
+ p
+ for p in db_content["_admin"]["projects_read"]
+ if p not in session["project_id"] and p != "ANY"
+ ),
+ None,
+ )
# check if there are projects referencing it (apart from ANY, that means, public)....
if other_projects_referencing:
# remove references but not delete
- update_dict_pull = {"_admin.projects_read": session["project_id"],
- "_admin.projects_write": session["project_id"]}
- self.db.set_one(self.topic, filter_q, update_dict=None, pull_list=update_dict_pull)
+ update_dict_pull = {
+ "_admin.projects_read": session["project_id"],
+ "_admin.projects_write": session["project_id"],
+ }
+ self.db.set_one(
+ self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
+ )
return None
else:
- can_write = next((p for p in db_content["_admin"]["projects_write"] if p == "ANY" or
- p in session["project_id"]), None)
+ can_write = next(
+ (
+ p
+ for p in db_content["_admin"]["projects_write"]
+ if p == "ANY" or p in session["project_id"]
+ ),
+ None,
+ )
if not can_write:
- raise EngineException("You have not write permission to delete it",
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise EngineException(
+ "You have not write permission to delete it",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
# It must be deleted
if session["force"]:
self.db.del_one(self.topic, {"_id": _id})
op_id = None
- self._send_msg("deleted", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg)
+ self._send_msg(
+ "deleted", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg
+ )
else:
update_dict = {"_admin.to_delete": True}
- self.db.set_one(self.topic, {"_id": _id},
- update_dict=update_dict,
- push={"_admin.operations": self._create_operation("delete")}
- )
+ self.db.set_one(
+ self.topic,
+ {"_id": _id},
+ update_dict=update_dict,
+ push={"_admin.operations": self._create_operation("delete")},
+ )
# the number of operations is the operation_id. db_content does not contains the new operation inserted,
# so the -1 is not needed
- op_id = "{}:{}".format(db_content["_id"], len(db_content["_admin"]["operations"]))
- self._send_msg("delete", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg)
+ op_id = "{}:{}".format(
+ db_content["_id"], len(db_content["_admin"]["operations"])
+ )
+ self._send_msg(
+ "delete", {"_id": _id, "op_id": op_id}, not_send_msg=not_send_msg
+ )
return op_id
schema_edit = vim_account_edit_schema
multiproject = True
password_to_encrypt = "vim_password"
- config_to_encrypt = {"1.1": ("admin_password", "nsx_password", "vcenter_password"),
- "default": ("admin_password", "nsx_password", "vcenter_password", "vrops_password")}
+ config_to_encrypt = {
+ "1.1": ("admin_password", "nsx_password", "vcenter_password"),
+ "default": (
+ "admin_password",
+ "nsx_password",
+ "vcenter_password",
+ "vrops_password",
+ ),
+ }
def check_conflict_on_del(self, session, _id, db_content):
"""
return
# check if used by VNF
if self.db.get_list("vnfrs", {"vim-account-id": _id}):
- raise EngineException("There is at least one VNF using this VIM account", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one VNF using this VIM account",
+ http_code=HTTPStatus.CONFLICT,
+ )
super().check_conflict_on_del(session, _id, db_content)
def _obtain_url(self, input, create):
if input.get("ip") or input.get("port"):
- if not input.get("ip") or not input.get("port") or input.get('url'):
- raise ValidationError("You must provide both 'ip' and 'port' (deprecated); or just 'url' (prefered)")
- input['url'] = "http://{}:{}/".format(input["ip"], input["port"])
+ if not input.get("ip") or not input.get("port") or input.get("url"):
+ raise ValidationError(
+ "You must provide both 'ip' and 'port' (deprecated); or just 'url' (prefered)"
+ )
+ input["url"] = "http://{}:{}/".format(input["ip"], input["port"])
del input["ip"]
del input["port"]
- elif create and not input.get('url'):
+ elif create and not input.get("url"):
raise ValidationError("You must provide 'url'")
return input
def format_on_new(self, content, project_id=None, make_public=False):
oid = super().format_on_new(content, project_id, make_public)
- self.db.encrypt_decrypt_fields(content["credentials"], 'encrypt', ['password', 'secret'],
- schema_version=content["schema_version"], salt=content["_id"])
+ self.db.encrypt_decrypt_fields(
+ content["credentials"],
+ "encrypt",
+ ["password", "secret"],
+ schema_version=content["schema_version"],
+ salt=content["_id"],
+ )
# Add Helm/Juju Repo lists
repos = {"helm-chart": [], "juju-bundle": []}
for proj in content["_admin"]["projects_read"]:
- if proj != 'ANY':
- for repo in self.db.get_list("k8srepos", {"_admin.projects_read": proj}):
+ if proj != "ANY":
+ for repo in self.db.get_list(
+ "k8srepos", {"_admin.projects_read": proj}
+ ):
if repo["_id"] not in repos[repo["type"]]:
repos[repo["type"]].append(repo["_id"])
for k in repos:
- content["_admin"][k.replace('-', '_')+"_repos"] = repos[k]
+ content["_admin"][k.replace("-", "_") + "_repos"] = repos[k]
return oid
def format_on_edit(self, final_content, edit_content):
if final_content.get("schema_version") and edit_content.get("credentials"):
- self.db.encrypt_decrypt_fields(edit_content["credentials"], 'encrypt', ['password', 'secret'],
- schema_version=final_content["schema_version"], salt=final_content["_id"])
- deep_update_rfc7396(final_content["credentials"], edit_content["credentials"])
+ self.db.encrypt_decrypt_fields(
+ edit_content["credentials"],
+ "encrypt",
+ ["password", "secret"],
+ schema_version=final_content["schema_version"],
+ salt=final_content["_id"],
+ )
+ deep_update_rfc7396(
+ final_content["credentials"], edit_content["credentials"]
+ )
oid = super().format_on_edit(final_content, edit_content)
return oid
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- final_content = super(CommonVimWimSdn, self).check_conflict_on_edit(session, final_content, edit_content, _id)
- final_content = super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super(CommonVimWimSdn, self).check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
+ final_content = super().check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
# Update Helm/Juju Repo lists
repos = {"helm-chart": [], "juju-bundle": []}
for proj in session.get("set_project", []):
- if proj != 'ANY':
- for repo in self.db.get_list("k8srepos", {"_admin.projects_read": proj}):
+ if proj != "ANY":
+ for repo in self.db.get_list(
+ "k8srepos", {"_admin.projects_read": proj}
+ ):
if repo["_id"] not in repos[repo["type"]]:
repos[repo["type"]].append(repo["_id"])
for k in repos:
- rlist = k.replace('-', '_') + "_repos"
+ rlist = k.replace("-", "_") + "_repos"
if rlist not in final_content["_admin"]:
final_content["_admin"][rlist] = []
final_content["_admin"][rlist] += repos[k]
if session["project_id"]:
filter_q["_admin.projects_read.cont"] = session["project_id"]
if self.db.get_list("vnfrs", filter_q):
- raise EngineException("There is at least one VNF using this k8scluster", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one VNF using this k8scluster",
+ http_code=HTTPStatus.CONFLICT,
+ )
super().check_conflict_on_del(session, _id, db_content)
content["schema_version"] = schema_version = "1.11"
for key in ["secret", "cacert"]:
content[key] = self.db.encrypt(
- content[key],
- schema_version=schema_version,
- salt=content["_id"]
+ content[key], schema_version=schema_version, salt=content["_id"]
)
return oid
final_content[key] = self.db.encrypt(
edit_content[key],
schema_version=schema_version,
- salt=final_content["_id"]
+ salt=final_content["_id"],
)
return oid
if session["project_id"]:
filter_q["_admin.projects_read.cont"] = session["project_id"]
if self.db.get_list("vim_accounts", filter_q):
- raise EngineException("There is at least one VIM account using this vca", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one VIM account using this vca",
+ http_code=HTTPStatus.CONFLICT,
+ )
super().check_conflict_on_del(session, _id, db_content)
def format_on_new(self, content, project_id=None, make_public=False):
oid = super().format_on_new(content, project_id, make_public)
# Update Helm/Juju Repo lists
- repo_list = content["type"].replace('-', '_')+"_repos"
+ repo_list = content["type"].replace("-", "_") + "_repos"
for proj in content["_admin"]["projects_read"]:
- if proj != 'ANY':
- self.db.set_list("k8sclusters",
- {"_admin.projects_read": proj, "_admin."+repo_list+".ne": content["_id"]}, {},
- push={"_admin."+repo_list: content["_id"]})
+ if proj != "ANY":
+ self.db.set_list(
+ "k8sclusters",
+ {
+ "_admin.projects_read": proj,
+ "_admin." + repo_list + ".ne": content["_id"],
+ },
+ {},
+ push={"_admin." + repo_list: content["_id"]},
+ )
return oid
def delete(self, session, _id, dry_run=False, not_send_msg=None):
oid = super().delete(session, _id, dry_run, not_send_msg)
if oid:
# Remove from Helm/Juju Repo lists
- repo_list = type.replace('-', '_') + "_repos"
- self.db.set_list("k8sclusters", {"_admin."+repo_list: _id}, {}, pull={"_admin."+repo_list: _id})
+ repo_list = type.replace("-", "_") + "_repos"
+ self.db.set_list(
+ "k8sclusters",
+ {"_admin." + repo_list: _id},
+ {},
+ pull={"_admin." + repo_list: _id},
+ )
return oid
"""
username = indata.get("username")
if is_valid_uuid(username):
- raise EngineException("username '{}' cannot have a uuid format".format(username),
- HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "username '{}' cannot have a uuid format".format(username),
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
# Check that username is not used, regardless keystone already checks this
if self.auth.get_user_list(filter_q={"name": username}):
- raise EngineException("username '{}' is already used".format(username), HTTPStatus.CONFLICT)
+ raise EngineException(
+ "username '{}' is already used".format(username), HTTPStatus.CONFLICT
+ )
if "projects" in indata.keys():
# convert to new format project_role_mappings
if not role:
role = self.auth.get_role_list()
if not role:
- raise AuthconnNotFoundException("Can't find default role for user '{}'".format(username))
+ raise AuthconnNotFoundException(
+ "Can't find default role for user '{}'".format(username)
+ )
rid = role[0]["_id"]
if not indata.get("project_role_mappings"):
indata["project_role_mappings"] = []
if "username" in edit_content:
username = edit_content.get("username")
if is_valid_uuid(username):
- raise EngineException("username '{}' cannot have an uuid format".format(username),
- HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "username '{}' cannot have an uuid format".format(username),
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
# Check that username is not used, regardless keystone already checks this
if self.auth.get_user_list(filter_q={"name": username}):
- raise EngineException("username '{}' is already used".format(username), HTTPStatus.CONFLICT)
+ raise EngineException(
+ "username '{}' is already used".format(username),
+ HTTPStatus.CONFLICT,
+ )
if final_content["username"] == "admin":
for mapping in edit_content.get("remove_project_role_mappings", ()):
- if mapping["project"] == "admin" and mapping.get("role") in (None, "system_admin"):
+ if mapping["project"] == "admin" and mapping.get("role") in (
+ None,
+ "system_admin",
+ ):
# TODO make this also available for project id and role id
- raise EngineException("You cannot remove system_admin role from admin user",
- http_code=HTTPStatus.FORBIDDEN)
+ raise EngineException(
+ "You cannot remove system_admin role from admin user",
+ http_code=HTTPStatus.FORBIDDEN,
+ )
return final_content
:return: None if ok or raises EngineException with the conflict
"""
if db_content["username"] == session["username"]:
- raise EngineException("You cannot delete your own login user ", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "You cannot delete your own login user ", http_code=HTTPStatus.CONFLICT
+ )
# TODO: Check that user is not logged in ? How? (Would require listing current tokens)
@staticmethod
if "projects" in content:
for project in content["projects"]:
for role in project["roles"]:
- project_role_mappings.append({"project": project["_id"],
- "project_name": project["name"],
- "role": role["_id"],
- "role_name": role["name"]})
+ project_role_mappings.append(
+ {
+ "project": project["_id"],
+ "project_name": project["name"],
+ "role": role["_id"],
+ "role_name": role["name"],
+ }
+ )
del content["projects"]
content["project_role_mappings"] = project_role_mappings
# Allow _id to be a name or uuid
filter_q = {"username": _id}
# users = self.auth.get_user_list(filter_q)
- users = self.list(session, filter_q) # To allow default filtering (Bug 853)
+ users = self.list(session, filter_q) # To allow default filtering (Bug 853)
if len(users) == 1:
return users[0]
elif len(users) > 1:
- raise EngineException("Too many users found for '{}'".format(_id), HTTPStatus.CONFLICT)
+ raise EngineException(
+ "Too many users found for '{}'".format(_id), HTTPStatus.CONFLICT
+ )
else:
- raise EngineException("User '{}' not found".format(_id), HTTPStatus.NOT_FOUND)
+ raise EngineException(
+ "User '{}' not found".format(_id), HTTPStatus.NOT_FOUND
+ )
def edit(self, session, _id, indata=None, kwargs=None, content=None):
"""
content = self.check_conflict_on_edit(session, content, indata, _id=_id)
# self.format_on_edit(content, indata)
- if not ("password" in indata or "username" in indata or indata.get("remove_project_role_mappings") or
- indata.get("add_project_role_mappings") or indata.get("project_role_mappings") or
- indata.get("projects") or indata.get("add_projects")):
+ if not (
+ "password" in indata
+ or "username" in indata
+ or indata.get("remove_project_role_mappings")
+ or indata.get("add_project_role_mappings")
+ or indata.get("project_role_mappings")
+ or indata.get("projects")
+ or indata.get("add_projects")
+ ):
return _id
- if indata.get("project_role_mappings") \
- and (indata.get("remove_project_role_mappings") or indata.get("add_project_role_mappings")):
- raise EngineException("Option 'project_role_mappings' is incompatible with 'add_project_role_mappings"
- "' or 'remove_project_role_mappings'", http_code=HTTPStatus.BAD_REQUEST)
+ if indata.get("project_role_mappings") and (
+ indata.get("remove_project_role_mappings")
+ or indata.get("add_project_role_mappings")
+ ):
+ raise EngineException(
+ "Option 'project_role_mappings' is incompatible with 'add_project_role_mappings"
+ "' or 'remove_project_role_mappings'",
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
if indata.get("projects") or indata.get("add_projects"):
role = self.auth.get_role_list({"name": "project_admin"})
if not role:
role = self.auth.get_role_list()
if not role:
- raise AuthconnNotFoundException("Can't find a default role for user '{}'"
- .format(content["username"]))
+ raise AuthconnNotFoundException(
+ "Can't find a default role for user '{}'".format(
+ content["username"]
+ )
+ )
rid = role[0]["_id"]
if "add_project_role_mappings" not in indata:
indata["add_project_role_mappings"] = []
# backward compatible
for k, v in indata["projects"].items():
if k.startswith("$") and v is None:
- indata["remove_project_role_mappings"].append({"project": k[1:]})
+ indata["remove_project_role_mappings"].append(
+ {"project": k[1:]}
+ )
elif k.startswith("$+"):
- indata["add_project_role_mappings"].append({"project": v, "role": rid})
+ indata["add_project_role_mappings"].append(
+ {"project": v, "role": rid}
+ )
del indata["projects"]
for proj in indata.get("projects", []) + indata.get("add_projects", []):
- indata["add_project_role_mappings"].append({"project": proj, "role": rid})
+ indata["add_project_role_mappings"].append(
+ {"project": proj, "role": rid}
+ )
# user = self.show(session, _id) # Already in 'content'
original_mapping = content["project_role_mappings"]
# remove
for to_remove in indata.get("remove_project_role_mappings", ()):
for mapping in original_mapping:
- if to_remove["project"] in (mapping["project"], mapping["project_name"]):
- if not to_remove.get("role") or to_remove["role"] in (mapping["role"], mapping["role_name"]):
+ if to_remove["project"] in (
+ mapping["project"],
+ mapping["project_name"],
+ ):
+ if not to_remove.get("role") or to_remove["role"] in (
+ mapping["role"],
+ mapping["role_name"],
+ ):
mappings_to_remove.append(mapping)
# add
for to_add in indata.get("add_project_role_mappings", ()):
for mapping in original_mapping:
- if to_add["project"] in (mapping["project"], mapping["project_name"]) and \
- to_add["role"] in (mapping["role"], mapping["role_name"]):
-
- if mapping in mappings_to_remove: # do not remove
+ if to_add["project"] in (
+ mapping["project"],
+ mapping["project_name"],
+ ) and to_add["role"] in (
+ mapping["role"],
+ mapping["role_name"],
+ ):
+
+ if mapping in mappings_to_remove: # do not remove
mappings_to_remove.remove(mapping)
break # do not add, it is already at user
else:
if indata.get("project_role_mappings"):
for to_set in indata["project_role_mappings"]:
for mapping in original_mapping:
- if to_set["project"] in (mapping["project"], mapping["project_name"]) and \
- to_set["role"] in (mapping["role"], mapping["role_name"]):
- if mapping in mappings_to_remove: # do not remove
+ if to_set["project"] in (
+ mapping["project"],
+ mapping["project_name"],
+ ) and to_set["role"] in (
+ mapping["role"],
+ mapping["role_name"],
+ ):
+ if mapping in mappings_to_remove: # do not remove
mappings_to_remove.remove(mapping)
break # do not add, it is already at user
else:
mappings_to_add.append({"project": pid, "role": rid})
for mapping in original_mapping:
for to_set in indata["project_role_mappings"]:
- if to_set["project"] in (mapping["project"], mapping["project_name"]) and \
- to_set["role"] in (mapping["role"], mapping["role_name"]):
+ if to_set["project"] in (
+ mapping["project"],
+ mapping["project_name"],
+ ) and to_set["role"] in (
+ mapping["role"],
+ mapping["role_name"],
+ ):
break
else:
# delete
- if mapping not in mappings_to_remove: # do not remove
+ if mapping not in mappings_to_remove: # do not remove
mappings_to_remove.append(mapping)
- self.auth.update_user({"_id": _id, "username": indata.get("username"), "password": indata.get("password"),
- "add_project_role_mappings": mappings_to_add,
- "remove_project_role_mappings": mappings_to_remove
- })
- data_to_send = {'_id': _id, "changes": indata}
+ self.auth.update_user(
+ {
+ "_id": _id,
+ "username": indata.get("username"),
+ "password": indata.get("password"),
+ "add_project_role_mappings": mappings_to_add,
+ "remove_project_role_mappings": mappings_to_remove,
+ }
+ )
+ data_to_send = {"_id": _id, "changes": indata}
self._send_msg("edited", data_to_send, not_send_msg=None)
# return _id
user_list = self.auth.get_user_list(filter_q)
if not session["allow_show_user_project_role"]:
# Bug 853 - Default filtering
- user_list = [usr for usr in user_list if usr["username"] == session["username"]]
+ user_list = [
+ usr for usr in user_list if usr["username"] == session["username"]
+ ]
return user_list
def delete(self, session, _id, dry_run=False, not_send_msg=None):
"""
project_name = indata.get("name")
if is_valid_uuid(project_name):
- raise EngineException("project name '{}' cannot have an uuid format".format(project_name),
- HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "project name '{}' cannot have an uuid format".format(project_name),
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
project_list = self.auth.get_project_list(filter_q={"name": project_name})
if project_list:
- raise EngineException("project '{}' exists".format(project_name), HTTPStatus.CONFLICT)
+ raise EngineException(
+ "project '{}' exists".format(project_name), HTTPStatus.CONFLICT
+ )
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
"""
project_name = edit_content.get("name")
if project_name != final_content["name"]: # It is a true renaming
if is_valid_uuid(project_name):
- raise EngineException("project name '{}' cannot have an uuid format".format(project_name),
- HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "project name '{}' cannot have an uuid format".format(project_name),
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
if final_content["name"] == "admin":
- raise EngineException("You cannot rename project 'admin'", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "You cannot rename project 'admin'", http_code=HTTPStatus.CONFLICT
+ )
# Check that project name is not used, regardless keystone already checks this
- if project_name and self.auth.get_project_list(filter_q={"name": project_name}):
- raise EngineException("project '{}' is already used".format(project_name), HTTPStatus.CONFLICT)
+ if project_name and self.auth.get_project_list(
+ filter_q={"name": project_name}
+ ):
+ raise EngineException(
+ "project '{}' is already used".format(project_name),
+ HTTPStatus.CONFLICT,
+ )
return final_content
def check_conflict_on_del(self, session, _id, db_content):
def check_rw_projects(topic, title, id_field):
for desc in self.db.get_list(topic):
- if _id in desc["_admin"]["projects_read"] + desc["_admin"]["projects_write"]:
- raise EngineException("Project '{}' ({}) is being used by {} '{}'"
- .format(db_content["name"], _id, title, desc[id_field]), HTTPStatus.CONFLICT)
+ if (
+ _id
+ in desc["_admin"]["projects_read"]
+ + desc["_admin"]["projects_write"]
+ ):
+ raise EngineException(
+ "Project '{}' ({}) is being used by {} '{}'".format(
+ db_content["name"], _id, title, desc[id_field]
+ ),
+ HTTPStatus.CONFLICT,
+ )
if _id in session["project_id"]:
- raise EngineException("You cannot delete your own project", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "You cannot delete your own project", http_code=HTTPStatus.CONFLICT
+ )
if db_content["name"] == "admin":
- raise EngineException("You cannot delete project 'admin'", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "You cannot delete project 'admin'", http_code=HTTPStatus.CONFLICT
+ )
# If any user is using this project, raise CONFLICT exception
if not session["force"]:
for user in self.auth.get_user_list():
for prm in user.get("project_role_mappings"):
if prm["project"] == _id:
- raise EngineException("Project '{}' ({}) is being used by user '{}'"
- .format(db_content["name"], _id, user["username"]), HTTPStatus.CONFLICT)
+ raise EngineException(
+ "Project '{}' ({}) is being used by user '{}'".format(
+ db_content["name"], _id, user["username"]
+ ),
+ HTTPStatus.CONFLICT,
+ )
# If any VNFD, NSD, NST, PDU, etc. is using this project, raise CONFLICT exception
if not session["force"]:
BaseTopic._update_input_with_kwargs(content, kwargs)
content = self._validate_input_new(content, session["force"])
self.check_conflict_on_new(session, content)
- self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
+ self.format_on_new(
+ content, project_id=session["project_id"], make_public=session["public"]
+ )
_id = self.auth.create_project(content)
rollback.append({"topic": self.topic, "_id": _id})
self._send_msg("created", content, not_send_msg=None)
# Allow _id to be a name or uuid
filter_q = {self.id_field(self.topic, _id): _id}
# projects = self.auth.get_project_list(filter_q=filter_q)
- projects = self.list(session, filter_q) # To allow default filtering (Bug 853)
+ projects = self.list(session, filter_q) # To allow default filtering (Bug 853)
if len(projects) == 1:
return projects[0]
elif len(projects) > 1:
class RoleTopicAuth(BaseTopic):
topic = "roles"
- topic_msg = None # "roles"
+ topic_msg = None # "roles"
schema_new = roles_new_schema
schema_edit = roles_edit_schema
multiproject = False
if role_def[-1] == ":":
raise ValidationError("Operation cannot end with ':'")
- match = next((op for op in operations if op == role_def or op.startswith(role_def + ":")), None)
+ match = next(
+ (
+ op
+ for op in operations
+ if op == role_def or op.startswith(role_def + ":")
+ ),
+ None,
+ )
if not match:
raise ValidationError("Invalid permission '{}'".format(role_def))
# check name is not uuid
role_name = indata.get("name")
if is_valid_uuid(role_name):
- raise EngineException("role name '{}' cannot have an uuid format".format(role_name),
- HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "role name '{}' cannot have an uuid format".format(role_name),
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
# check name not exists
name = indata["name"]
# if self.db.get_one(self.topic, {"name": indata.get("name")}, fail_on_empty=False, fail_on_more=False):
if self.auth.get_role_list({"name": name}):
- raise EngineException("role name '{}' exists".format(name), HTTPStatus.CONFLICT)
+ raise EngineException(
+ "role name '{}' exists".format(name), HTTPStatus.CONFLICT
+ )
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
"""
# check name is not uuid
role_name = edit_content.get("name")
if is_valid_uuid(role_name):
- raise EngineException("role name '{}' cannot have an uuid format".format(role_name),
- HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "role name '{}' cannot have an uuid format".format(role_name),
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
# Check renaming of admin roles
role = self.auth.get_role(_id)
if role["name"] in ["system_admin", "project_admin"]:
- raise EngineException("You cannot rename role '{}'".format(role["name"]), http_code=HTTPStatus.FORBIDDEN)
+ raise EngineException(
+ "You cannot rename role '{}'".format(role["name"]),
+ http_code=HTTPStatus.FORBIDDEN,
+ )
# check name not exists
if "name" in edit_content:
# if self.db.get_one(self.topic, {"name":role_name,"_id.ne":_id}, fail_on_empty=False, fail_on_more=False):
roles = self.auth.get_role_list({"name": role_name})
if roles and roles[0][BaseTopic.id_field("roles", _id)] != _id:
- raise EngineException("role name '{}' exists".format(role_name), HTTPStatus.CONFLICT)
+ raise EngineException(
+ "role name '{}' exists".format(role_name), HTTPStatus.CONFLICT
+ )
return final_content
"""
role = self.auth.get_role(_id)
if role["name"] in ["system_admin", "project_admin"]:
- raise EngineException("You cannot delete role '{}'".format(role["name"]), http_code=HTTPStatus.FORBIDDEN)
+ raise EngineException(
+ "You cannot delete role '{}'".format(role["name"]),
+ http_code=HTTPStatus.FORBIDDEN,
+ )
# If any user is using this role, raise CONFLICT exception
if not session["force"]:
for user in self.auth.get_user_list():
for prm in user.get("project_role_mappings"):
if prm["role"] == _id:
- raise EngineException("Role '{}' ({}) is being used by user '{}'"
- .format(role["name"], _id, user["username"]), HTTPStatus.CONFLICT)
+ raise EngineException(
+ "Role '{}' ({}) is being used by user '{}'".format(
+ role["name"], _id, user["username"]
+ ),
+ HTTPStatus.CONFLICT,
+ )
@staticmethod
- def format_on_new(content, project_id=None, make_public=False): # TO BE REMOVED ?
+ def format_on_new(content, project_id=None, make_public=False): # TO BE REMOVED ?
"""
Modifies content descriptor to include _admin
"""
filter_q = {BaseTopic.id_field(self.topic, _id): _id}
# roles = self.auth.get_role_list(filter_q)
- roles = self.list(session, filter_q) # To allow default filtering (Bug 853)
+ roles = self.list(session, filter_q) # To allow default filtering (Bug 853)
if not roles:
- raise AuthconnNotFoundException("Not found any role with filter {}".format(filter_q))
+ raise AuthconnNotFoundException(
+ "Not found any role with filter {}".format(filter_q)
+ )
elif len(roles) > 1:
- raise AuthconnConflictException("Found more than one role with filter {}".format(filter_q))
+ raise AuthconnConflictException(
+ "Found more than one role with filter {}".format(filter_q)
+ )
return roles[0]
def list(self, session, filter_q=None, api_req=False):
self._update_input_with_kwargs(content, kwargs)
content = self._validate_input_new(content, session["force"])
self.check_conflict_on_new(session, content)
- self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
+ self.format_on_new(
+ content, project_id=session["project_id"], make_public=session["public"]
+ )
# role_name = content["name"]
rid = self.auth.create_role(content)
content["_id"] = rid
filter_q = {BaseTopic.id_field(self.topic, _id): _id}
roles = self.auth.get_role_list(filter_q)
if not roles:
- raise AuthconnNotFoundException("Not found any role with filter {}".format(filter_q))
+ raise AuthconnNotFoundException(
+ "Not found any role with filter {}".format(filter_q)
+ )
elif len(roles) > 1:
- raise AuthconnConflictException("Found more than one role with filter {}".format(filter_q))
+ raise AuthconnConflictException(
+ "Found more than one role with filter {}".format(filter_q)
+ )
rid = roles[0]["_id"]
self.check_conflict_on_del(session, rid, None)
# filter_q = {"_id": _id}
import yaml
from base64 import standard_b64decode
from copy import deepcopy
+
# from functools import reduce
from http import HTTPStatus
from time import time
This class must be threading safe
"""
- periodin_db_pruning = 60 * 30 # for the internal backend only. every 30 minutes expired tokens will be pruned
- token_limit = 500 # when reached, the token cache will be cleared
+ periodin_db_pruning = (
+ 60 * 30
+ ) # for the internal backend only. every 30 minutes expired tokens will be pruned
+ token_limit = 500 # when reached, the token cache will be cleared
def __init__(self, valid_methods, valid_query_string):
"""
self.db = None
self.msg = None
self.tokens_cache = dict()
- self.next_db_prune_time = 0 # time when next cleaning of expired tokens must be done
+ self.next_db_prune_time = (
+ 0 # time when next cleaning of expired tokens must be done
+ )
self.roles_to_operations_file = None
# self.roles_to_operations_table = None
self.resources_to_operations_mapping = {}
self.role_permissions = []
self.valid_methods = valid_methods
self.valid_query_string = valid_query_string
- self.system_admin_role_id = None # system_role id
+ self.system_admin_role_id = None # system_role id
self.test_project_id = None # test_project_id
def start(self, config):
self.db = dbmemory.DbMemory()
self.db.db_connect(config["database"])
else:
- raise AuthException("Invalid configuration param '{}' at '[database]':'driver'"
- .format(config["database"]["driver"]))
+ raise AuthException(
+ "Invalid configuration param '{}' at '[database]':'driver'".format(
+ config["database"]["driver"]
+ )
+ )
if not self.msg:
if config["message"]["driver"] == "local":
self.msg = msglocal.MsgLocal()
self.msg = msgkafka.MsgKafka()
self.msg.connect(config["message"])
else:
- raise AuthException("Invalid configuration param '{}' at '[message]':'driver'"
- .format(config["message"]["driver"]))
+ raise AuthException(
+ "Invalid configuration param '{}' at '[message]':'driver'".format(
+ config["message"]["driver"]
+ )
+ )
if not self.backend:
if config["authentication"]["backend"] == "keystone":
- self.backend = AuthconnKeystone(self.config["authentication"], self.db, self.role_permissions)
+ self.backend = AuthconnKeystone(
+ self.config["authentication"], self.db, self.role_permissions
+ )
elif config["authentication"]["backend"] == "internal":
- self.backend = AuthconnInternal(self.config["authentication"], self.db, self.role_permissions)
+ self.backend = AuthconnInternal(
+ self.config["authentication"], self.db, self.role_permissions
+ )
self._internal_tokens_prune("tokens")
elif config["authentication"]["backend"] == "tacacs":
- self.backend = AuthconnTacacs(self.config["authentication"], self.db, self.role_permissions)
+ self.backend = AuthconnTacacs(
+ self.config["authentication"], self.db, self.role_permissions
+ )
self._internal_tokens_prune("tokens_tacacs")
else:
- raise AuthException("Unknown authentication backend: {}"
- .format(config["authentication"]["backend"]))
+ raise AuthException(
+ "Unknown authentication backend: {}".format(
+ config["authentication"]["backend"]
+ )
+ )
if not self.roles_to_operations_file:
if "roles_to_operations" in config["rbac"]:
- self.roles_to_operations_file = config["rbac"]["roles_to_operations"]
+ self.roles_to_operations_file = config["rbac"][
+ "roles_to_operations"
+ ]
else:
possible_paths = (
- __file__[:__file__.rfind("auth.py")] + "roles_to_operations.yml",
- "./roles_to_operations.yml"
+ __file__[: __file__.rfind("auth.py")]
+ + "roles_to_operations.yml",
+ "./roles_to_operations.yml",
)
for config_file in possible_paths:
if path.isfile(config_file):
self.roles_to_operations_file = config_file
break
if not self.roles_to_operations_file:
- raise AuthException("Invalid permission configuration: roles_to_operations file missing")
+ raise AuthException(
+ "Invalid permission configuration: roles_to_operations file missing"
+ )
# load role_permissions
def load_role_permissions(method_dict):
for k in method_dict:
if k == "ROLE_PERMISSION":
- for method in chain(method_dict.get("METHODS", ()), method_dict.get("TODO", ())):
+ for method in chain(
+ method_dict.get("METHODS", ()), method_dict.get("TODO", ())
+ ):
permission = method_dict["ROLE_PERMISSION"] + method.lower()
if permission not in self.role_permissions:
self.role_permissions.append(permission)
self.role_permissions.append(permission)
# get ids of role system_admin and test project
- role_system_admin = self.db.get_one("roles", {"name": "system_admin"}, fail_on_empty=False)
+ role_system_admin = self.db.get_one(
+ "roles", {"name": "system_admin"}, fail_on_empty=False
+ )
if role_system_admin:
self.system_admin_role_id = role_system_admin["_id"]
- test_project_name = self.config["authentication"].get("project_not_authorized", "admin")
- test_project = self.db.get_one("projects", {"name": test_project_name}, fail_on_empty=False)
+ test_project_name = self.config["authentication"].get(
+ "project_not_authorized", "admin"
+ )
+ test_project = self.db.get_one(
+ "projects", {"name": test_project_name}, fail_on_empty=False
+ )
if test_project:
self.test_project_id = test_project["_id"]
project_desc["_id"] = str(uuid4())
project_desc["_admin"] = {"created": now, "modified": now}
pid = self.backend.create_project(project_desc)
- self.logger.info("Project '{}' created at database".format(project_desc["name"]))
+ self.logger.info(
+ "Project '{}' created at database".format(project_desc["name"])
+ )
return pid
def create_admin_user(self, project_id):
return None
# user_desc = {"username": "admin", "password": "admin", "projects": [project_id]}
now = time()
- user_desc = {"username": "admin", "password": "admin", "_admin": {"created": now, "modified": now}}
+ user_desc = {
+ "username": "admin",
+ "password": "admin",
+ "_admin": {"created": now, "modified": now},
+ }
if project_id:
pid = project_id
else:
# role = self.db.get_one("roles", {"name": "system_admin"}, fail_on_empty=False, fail_on_more=False)
roles = self.backend.get_role_list({"name": "system_admin"})
if pid and roles:
- user_desc["project_role_mappings"] = [{"project": pid, "role": roles[0]["_id"]}]
+ user_desc["project_role_mappings"] = [
+ {"project": pid, "role": roles[0]["_id"]}
+ ]
uid = self.backend.create_user(user_desc)
self.logger.info("User '{}' created at database".format(user_desc["username"]))
return uid
- def init_db(self, target_version='1.0'):
+ def init_db(self, target_version="1.0"):
"""
Check if the database has been initialized, with at least one user. If not, create the required tables
and insert the predefined mappings between roles and permissions.
records = self.backend.get_role_list()
# Loading permissions to AUTH. At lease system_admin must be present.
- if not records or not next((r for r in records if r["name"] == "system_admin"), None):
+ if not records or not next(
+ (r for r in records if r["name"] == "system_admin"), None
+ ):
with open(self.roles_to_operations_file, "r") as stream:
roles_to_operations_yaml = yaml.load(stream, Loader=yaml.Loader)
if role_with_operations["name"] not in role_names:
role_names.append(role_with_operations["name"])
else:
- raise AuthException("Duplicated role name '{}' at file '{}''"
- .format(role_with_operations["name"], self.roles_to_operations_file))
+ raise AuthException(
+ "Duplicated role name '{}' at file '{}''".format(
+ role_with_operations["name"], self.roles_to_operations_file
+ )
+ )
if not role_with_operations["permissions"]:
continue
- for permission, is_allowed in role_with_operations["permissions"].items():
+ for permission, is_allowed in role_with_operations[
+ "permissions"
+ ].items():
if not isinstance(is_allowed, bool):
- raise AuthException("Invalid value for permission '{}' at role '{}'; at file '{}'"
- .format(permission, role_with_operations["name"],
- self.roles_to_operations_file))
+ raise AuthException(
+ "Invalid value for permission '{}' at role '{}'; at file '{}'".format(
+ permission,
+ role_with_operations["name"],
+ self.roles_to_operations_file,
+ )
+ )
# TODO check permission is ok
if permission[-1] == ":":
- raise AuthException("Invalid permission '{}' terminated in ':' for role '{}'; at file {}"
- .format(permission, role_with_operations["name"],
- self.roles_to_operations_file))
+ raise AuthException(
+ "Invalid permission '{}' terminated in ':' for role '{}'; at file {}".format(
+ permission,
+ role_with_operations["name"],
+ self.roles_to_operations_file,
+ )
+ )
if "default" not in role_with_operations["permissions"]:
role_with_operations["permissions"]["default"] = False
# self.db.create(self.roles_to_operations_table, role_with_operations)
try:
self.backend.create_role(role_with_operations)
- self.logger.info("Role '{}' created".format(role_with_operations["name"]))
+ self.logger.info(
+ "Role '{}' created".format(role_with_operations["name"])
+ )
except (AuthException, AuthconnException) as e:
if role_with_operations["name"] == "system_admin":
raise
- self.logger.error("Role '{}' cannot be created: {}".format(role_with_operations["name"], e))
+ self.logger.error(
+ "Role '{}' cannot be created: {}".format(
+ role_with_operations["name"], e
+ )
+ )
# Create admin project&user if required
pid = self.create_admin_project()
if user_with_system_admin:
break
if not user_with_system_admin:
- self.backend.update_user({"_id": user_admin_id,
- "add_project_role_mappings": [{"project": pid, "role": role_id}]})
- self.logger.info("Added role system admin to user='{}' project=admin".format(user_admin_id))
+ self.backend.update_user(
+ {
+ "_id": user_admin_id,
+ "add_project_role_mappings": [
+ {"project": pid, "role": role_id}
+ ],
+ }
+ )
+ self.logger.info(
+ "Added role system admin to user='{}' project=admin".format(
+ user_admin_id
+ )
+ )
except Exception as e:
- self.logger.error("Error in Authorization DataBase initialization: {}: {}".format(type(e).__name__, e))
+ self.logger.error(
+ "Error in Authorization DataBase initialization: {}: {}".format(
+ type(e).__name__, e
+ )
+ )
self.load_operation_to_allowed_roles()
for record in records:
if not record.get("permissions"):
continue
- record_permissions = {oper: record["permissions"].get("default", False) for oper in self.role_permissions}
- operations_joined = [(oper, value) for oper, value in record["permissions"].items()
- if oper not in ignore_fields]
+ record_permissions = {
+ oper: record["permissions"].get("default", False)
+ for oper in self.role_permissions
+ }
+ operations_joined = [
+ (oper, value)
+ for oper, value in record["permissions"].items()
+ if oper not in ignore_fields
+ ]
operations_joined.sort(key=lambda x: x[0].count(":"))
for oper in operations_joined:
- match = list(filter(lambda x: x.find(oper[0]) == 0, record_permissions.keys()))
+ match = list(
+ filter(lambda x: x.find(oper[0]) == 0, record_permissions.keys())
+ )
for m in match:
record_permissions[m] = oper[1]
self.operation_to_allowed_roles = permissions
- def authorize(self, role_permission=None, query_string_operations=None, item_id=None):
+ def authorize(
+ self, role_permission=None, query_string_operations=None, item_id=None
+ ):
token = None
user_passwd64 = None
try:
token = cherrypy.session.get("Authorization")
if token == "logout":
token = None # force Unauthorized response to insert user password again
- elif user_passwd64 and cherrypy.request.config.get("auth.allow_basic_authentication"):
+ elif user_passwd64 and cherrypy.request.config.get(
+ "auth.allow_basic_authentication"
+ ):
# 3. Get new token from user password
user = None
passwd = None
user, _, passwd = user_passwd.partition(":")
except Exception:
pass
- outdata = self.new_token(None, {"username": user, "password": passwd})
+ outdata = self.new_token(
+ None, {"username": user, "password": passwd}
+ )
token = outdata["_id"]
- cherrypy.session['Authorization'] = token
+ cherrypy.session["Authorization"] = token
if not token:
- raise AuthException("Needed a token or Authorization http header",
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Needed a token or Authorization http header",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
# try to get from cache first
now = time()
# TODO add to token info remote host, port
if role_permission:
- RBAC_auth = self.check_permissions(token_info, cherrypy.request.method, role_permission,
- query_string_operations, item_id)
+ RBAC_auth = self.check_permissions(
+ token_info,
+ cherrypy.request.method,
+ role_permission,
+ query_string_operations,
+ item_id,
+ )
token_info["allow_show_user_project_role"] = RBAC_auth
return token_info
except AuthException as e:
if not isinstance(e, AuthExceptionUnauthorized):
- if cherrypy.session.get('Authorization'):
- del cherrypy.session['Authorization']
- cherrypy.response.headers["WWW-Authenticate"] = 'Bearer realm="{}"'.format(e)
+ if cherrypy.session.get("Authorization"):
+ del cherrypy.session["Authorization"]
+ cherrypy.response.headers[
+ "WWW-Authenticate"
+ ] = 'Bearer realm="{}"'.format(e)
if self.config["authentication"].get("user_not_authorized"):
- return {"id": "testing-token", "_id": "testing-token",
- "project_id": self.test_project_id,
- "username": self.config["authentication"]["user_not_authorized"],
- "roles": [self.system_admin_role_id],
- "admin": True, "allow_show_user_project_role": True}
+ return {
+ "id": "testing-token",
+ "_id": "testing-token",
+ "project_id": self.test_project_id,
+ "username": self.config["authentication"]["user_not_authorized"],
+ "roles": [self.system_admin_role_id],
+ "admin": True,
+ "allow_show_user_project_role": True,
+ }
raise
def new_token(self, token_info, indata, remote):
if not new_token_info.get("expires"):
new_token_info["expires"] = time() + 3600
if not new_token_info.get("admin"):
- new_token_info["admin"] = True if new_token_info.get("project_name") == "admin" else False
+ new_token_info["admin"] = (
+ True if new_token_info.get("project_name") == "admin" else False
+ )
# TODO put admin in RBAC
if remote.name:
return self._internal_get_token_list(token_info)
else:
# TODO: check if this can be avoided. Backend may provide enough information
- return [deepcopy(token) for token in self.tokens_cache.values()
- if token["username"] == token_info["username"]]
+ return [
+ deepcopy(token)
+ for token in self.tokens_cache.values()
+ if token["username"] == token_info["username"]
+ ]
def get_token(self, token_info, token):
if self.config["authentication"]["backend"] == "internal":
token_value = self.tokens_cache.get(token)
if not token_value:
raise AuthException("token not found", http_code=HTTPStatus.NOT_FOUND)
- if token_value["username"] != token_info["username"] and not token_info["admin"]:
- raise AuthException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+ if (
+ token_value["username"] != token_info["username"]
+ and not token_info["admin"]
+ ):
+ raise AuthException(
+ "needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED
+ )
return token_value
def del_token(self, token):
self.remove_token_from_cache(token)
return "token '{}' deleted".format(token)
except KeyError:
- raise AuthException("Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND)
-
- def check_permissions(self, token_info, method, role_permission=None, query_string_operations=None, item_id=None):
+ raise AuthException(
+ "Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND
+ )
+
+ def check_permissions(
+ self,
+ token_info,
+ method,
+ role_permission=None,
+ query_string_operations=None,
+ item_id=None,
+ ):
"""
Checks that operation has permissions to be done, base on the assigned roles to this user project
:param token_info: Dictionary that contains "roles" with a list of assigned roles.
if not query_string_operations:
return True
for query_string_operation in query_string_operations:
- if role not in self.operation_to_allowed_roles[query_string_operation]:
+ if (
+ role
+ not in self.operation_to_allowed_roles[query_string_operation]
+ ):
break
else:
return True
# User/Project/Role whole listings are filtered elsewhere
# uid, pid, rid = ("user_id", "project_id", "id") if is_valid_uuid(id) else ("username", "project_name", "name")
uid = "user_id" if is_valid_uuid(item_id) else "username"
- if (role_permission in ["projects:get", "projects:id:get", "roles:get", "roles:id:get", "users:get"]) \
- or (role_permission == "users:id:get" and item_id == token_info[uid]):
+ if (
+ role_permission
+ in [
+ "projects:get",
+ "projects:id:get",
+ "roles:get",
+ "roles:id:get",
+ "users:get",
+ ]
+ ) or (role_permission == "users:id:get" and item_id == token_info[uid]):
# or (role_permission == "projects:id:get" and item_id == token_info[pid]) \
# or (role_permission == "roles:id:get" and item_id in [role[rid] for role in token_info["roles"]]):
return False
if not operation_allowed:
raise AuthExceptionUnauthorized("Access denied: lack of permissions.")
else:
- raise AuthExceptionUnauthorized("Access denied: You have not permissions to use these admin query string")
+ raise AuthExceptionUnauthorized(
+ "Access denied: You have not permissions to use these admin query string"
+ )
def get_user_list(self):
return self.backend.get_user_list()
def _normalize_url(self, url, method):
# DEPRECATED !!!
# Removing query strings
- normalized_url = url if '?' not in url else url[:url.find("?")]
+ normalized_url = url if "?" not in url else url[: url.find("?")]
normalized_url_splitted = normalized_url.split("/")
parameters = {}
- filtered_keys = [key for key in self.resources_to_operations_mapping.keys()
- if method in key.split()[0]]
+ filtered_keys = [
+ key
+ for key in self.resources_to_operations_mapping.keys()
+ if method in key.split()[0]
+ ]
for idx, path_part in enumerate(normalized_url_splitted):
tmp_keys = []
if splitted[idx] == "<artifactPath>":
tmp_keys.append(tmp_key)
continue
- elif idx == len(normalized_url_splitted) - 1 and \
- len(normalized_url_splitted) != len(splitted):
+ elif idx == len(normalized_url_splitted) - 1 and len(
+ normalized_url_splitted
+ ) != len(splitted):
continue
else:
tmp_keys.append(tmp_key)
elif splitted[idx] == path_part:
- if idx == len(normalized_url_splitted) - 1 and \
- len(normalized_url_splitted) != len(splitted):
+ if idx == len(normalized_url_splitted) - 1 and len(
+ normalized_url_splitted
+ ) != len(splitted):
continue
else:
tmp_keys.append(tmp_key)
filtered_keys = tmp_keys
- if len(filtered_keys) == 1 and \
- filtered_keys[0].split("/")[-1] == "<artifactPath>":
+ if (
+ len(filtered_keys) == 1
+ and filtered_keys[0].split("/")[-1] == "<artifactPath>"
+ ):
break
if len(filtered_keys) == 0:
- raise AuthException("Cannot make an authorization decision. URL not found. URL: {0}".format(url))
+ raise AuthException(
+ "Cannot make an authorization decision. URL not found. URL: {0}".format(
+ url
+ )
+ )
elif len(filtered_keys) > 1:
- raise AuthException("Cannot make an authorization decision. Multiple URLs found. URL: {0}".format(url))
+ raise AuthException(
+ "Cannot make an authorization decision. Multiple URLs found. URL: {0}".format(
+ url
+ )
+ )
filtered_key = filtered_keys[0]
for idx, path_part in enumerate(filtered_key.split()[1].split("/")):
if "<" in path_part and ">" in path_part:
if path_part == "<artifactPath>":
- parameters[path_part[1:-1]] = "/".join(normalized_url_splitted[idx:])
+ parameters[path_part[1:-1]] = "/".join(
+ normalized_url_splitted[idx:]
+ )
else:
parameters[path_part[1:-1]] = normalized_url_splitted[idx]
def _internal_get_token_list(self, token_info):
now = time()
- token_list = self.db.get_list("tokens", {"username": token_info["username"], "expires.gt": now})
+ token_list = self.db.get_list(
+ "tokens", {"username": token_info["username"], "expires.gt": now}
+ )
return token_list
def _internal_get_token(self, token_info, token_id):
token_value = self.db.get_one("tokens", {"_id": token_id}, fail_on_empty=False)
if not token_value:
raise AuthException("token not found", http_code=HTTPStatus.NOT_FOUND)
- if token_value["username"] != token_info["username"] and not token_info["admin"]:
- raise AuthException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
+ if (
+ token_value["username"] != token_info["username"]
+ and not token_info["admin"]
+ ):
+ raise AuthException(
+ "needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED
+ )
return token_value
def _internal_tokens_prune(self, token_collection, now=None):
plugins with the definition of the methods to be implemented.
"""
-__author__ = "Eduardo Sousa <esousa@whitestack.com>, " \
- "Pedro de la Cruz Ramos <pdelacruzramos@altran.com>"
+__author__ = (
+ "Eduardo Sousa <esousa@whitestack.com>, "
+ "Pedro de la Cruz Ramos <pdelacruzramos@altran.com>"
+)
__date__ = "$27-jul-2018 23:59:59$"
from http import HTTPStatus
"""
Authentication error, because token, user password not recognized
"""
+
def __init__(self, message, http_code=HTTPStatus.UNAUTHORIZED):
super(AuthException, self).__init__(message)
self.http_code = http_code
"""
Authentication error, because not having rights to make this operation
"""
+
pass
"""
Common and base class Exception for all authconn exceptions.
"""
+
def __init__(self, message, http_code=HTTPStatus.UNAUTHORIZED):
super(AuthconnException, self).__init__(message)
self.http_code = http_code
"""
Connectivity error with Auth backend.
"""
+
def __init__(self, message, http_code=HTTPStatus.BAD_GATEWAY):
super(AuthconnConnectionException, self).__init__(message, http_code)
"""
The request is not supported by the Auth backend.
"""
+
def __init__(self, message, http_code=HTTPStatus.NOT_IMPLEMENTED):
super(AuthconnNotSupportedException, self).__init__(message, http_code)
"""
The method is not implemented by the Auth backend.
"""
+
def __init__(self, message, http_code=HTTPStatus.NOT_IMPLEMENTED):
super(AuthconnNotImplementedException, self).__init__(message, http_code)
"""
The operation executed failed.
"""
+
def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR):
super(AuthconnOperationException, self).__init__(message, http_code)
"""
The operation executed failed because element not found.
"""
+
def __init__(self, message, http_code=HTTPStatus.NOT_FOUND):
super().__init__(message, http_code)
"""
The operation has conflicts.
"""
+
def __init__(self, message, http_code=HTTPStatus.CONFLICT):
super().__init__(message, http_code)
Each Auth backend connector plugin must be a subclass of
Authconn class.
"""
+
def __init__(self, config, db, role_permissions):
"""
Constructor of the Authconn class.
users = self.get_user_list(filt)
if not users:
if fail:
- raise AuthconnNotFoundException("User with {} not found".format(filt), http_code=HTTPStatus.NOT_FOUND)
+ raise AuthconnNotFoundException(
+ "User with {} not found".format(filt),
+ http_code=HTTPStatus.NOT_FOUND,
+ )
else:
return None
return users[0]
projs = self.get_project_list(filt)
if not projs:
if fail:
- raise AuthconnNotFoundException("project with {} not found".format(filt))
+ raise AuthconnNotFoundException(
+ "project with {} not found".format(filt)
+ )
else:
return None
return projs[0]
OSM Internal Authentication Backend and leverages the RBAC model
"""
-__author__ = "Pedro de la Cruz Ramos <pdelacruzramos@altran.com>, " \
- "Alfonso Tierno <alfonso.tiernosepulveda@telefoncia.com"
+__author__ = (
+ "Pedro de la Cruz Ramos <pdelacruzramos@altran.com>, "
+ "Alfonso Tierno <alfonso.tiernosepulveda@telefoncia.com"
+)
__date__ = "$06-jun-2019 11:16:08$"
import logging
import re
-from osm_nbi.authconn import Authconn, AuthException # , AuthconnOperationException
+from osm_nbi.authconn import Authconn, AuthException # , AuthconnOperationException
from osm_common.dbbase import DbException
from osm_nbi.base_topic import BaseTopic
from osm_nbi.validation import is_valid_uuid
class AuthconnInternal(Authconn):
- token_time_window = 2 # seconds
- token_delay = 1 # seconds to wait upon second request within time window
+ token_time_window = 2 # seconds
+ token_delay = 1 # seconds to wait upon second request within time window
users_collection = "users"
roles_collection = "roles"
try:
if not token:
- raise AuthException("Needed a token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Needed a token or Authorization HTTP header",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
now = time()
# if not token_info:
token_info = self.db.get_one(self.tokens_collection, {"_id": token})
if token_info["expires"] < now:
- raise AuthException("Expired Token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Expired Token or Authorization HTTP header",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
return token_info
except DbException as e:
if e.http_code == HTTPStatus.NOT_FOUND:
- raise AuthException("Invalid Token or Authorization HTTP header", http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Invalid Token or Authorization HTTP header",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
else:
raise
except AuthException:
raise
except Exception:
- self.logger.exception("Error during token validation using internal backend")
- raise AuthException("Error during token validation using internal backend",
- http_code=HTTPStatus.UNAUTHORIZED)
+ self.logger.exception(
+ "Error during token validation using internal backend"
+ )
+ raise AuthException(
+ "Error during token validation using internal backend",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
def revoke_token(self, token):
"""
return True
except DbException as e:
if e.http_code == HTTPStatus.NOT_FOUND:
- raise AuthException("Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND)
+ raise AuthException(
+ "Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND
+ )
else:
# raise
exmsg = "Error during token revocation using internal backend"
:param user: username of the user.
:param password: password to be validated.
"""
- user_rows = self.db.get_list(self.users_collection, {BaseTopic.id_field("users", user): user})
+ user_rows = self.db.get_list(
+ self.users_collection, {BaseTopic.id_field("users", user): user}
+ )
user_content = None
if user_rows:
user_content = user_rows[0]
salt = user_content["_admin"]["salt"]
- shadow_password = sha256(password.encode('utf-8') + salt.encode('utf-8')).hexdigest()
+ shadow_password = sha256(
+ password.encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
if shadow_password != user_content["password"]:
user_content = None
return user_content
if user:
user_content = self.validate_user(user, password)
if not user_content:
- raise AuthException("Invalid username/password", http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Invalid username/password", http_code=HTTPStatus.UNAUTHORIZED
+ )
if not user_content.get("_admin", None):
- raise AuthException("No default project for this user.", http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "No default project for this user.",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
elif token_info:
- user_rows = self.db.get_list(self.users_collection, {"username": token_info["username"]})
+ user_rows = self.db.get_list(
+ self.users_collection, {"username": token_info["username"]}
+ )
if user_rows:
user_content = user_rows[0]
else:
raise AuthException("Invalid token", http_code=HTTPStatus.UNAUTHORIZED)
else:
- raise AuthException("Provide credentials: username/password or Authorization Bearer token",
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Provide credentials: username/password or Authorization Bearer token",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
# Delay upon second request within time window
- if now - user_content["_admin"].get("last_token_time", 0) < self.token_time_window:
+ if (
+ now - user_content["_admin"].get("last_token_time", 0)
+ < self.token_time_window
+ ):
sleep(self.token_delay)
# user_content["_admin"]["last_token_time"] = now
# self.db.replace("users", user_content["_id"], user_content) # might cause race conditions
- self.db.set_one(self.users_collection,
- {"_id": user_content["_id"]}, {"_admin.last_token_time": now})
-
- token_id = ''.join(random_choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789')
- for _ in range(0, 32))
+ self.db.set_one(
+ self.users_collection,
+ {"_id": user_content["_id"]},
+ {"_admin.last_token_time": now},
+ )
+
+ token_id = "".join(
+ random_choice(
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
+ )
+ for _ in range(0, 32)
+ )
# projects = user_content.get("projects", [])
prm_list = user_content.get("project_role_mappings", [])
if not project:
project = prm_list[0]["project"] if prm_list else None
if not project:
- raise AuthException("can't find a default project for this user", http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "can't find a default project for this user",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
projects = [prm["project"] for prm in prm_list]
- proj = self.db.get_one(self.projects_collection,
- {BaseTopic.id_field("projects", project): project})
+ proj = self.db.get_one(
+ self.projects_collection, {BaseTopic.id_field("projects", project): project}
+ )
project_name = proj["name"]
project_id = proj["_id"]
if project_name not in projects and project_id not in projects:
- raise AuthException("project {} not allowed for this user".format(project),
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "project {} not allowed for this user".format(project),
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
# TODO remove admin, this vill be used by roles RBAC
if project_name == "admin":
roles_list = []
for prm in prm_list:
if prm["project"] in [project_id, project_name]:
- role = self.db.get_one(self.roles_collection,
- {BaseTopic.id_field("roles", prm["role"]): prm["role"]})
+ role = self.db.get_one(
+ self.roles_collection,
+ {BaseTopic.id_field("roles", prm["role"]): prm["role"]},
+ )
rid = role["_id"]
if rid not in roles:
rnm = role["name"]
roles.append(rid)
roles_list.append({"name": rnm, "id": rid})
if not roles_list:
- rid = self.db.get_one(self.roles_collection, {"name": "project_admin"})["_id"]
+ rid = self.db.get_one(self.roles_collection, {"name": "project_admin"})[
+ "_id"
+ ]
roles_list = [{"name": "project_admin", "id": rid}]
- new_token = {"issued_at": now,
- "expires": now + 3600,
- "_id": token_id,
- "id": token_id,
- "project_id": proj["_id"],
- "project_name": proj["name"],
- "username": user_content["username"],
- "user_id": user_content["_id"],
- "admin": token_admin,
- "roles": roles_list,
- }
+ new_token = {
+ "issued_at": now,
+ "expires": now + 3600,
+ "_id": token_id,
+ "id": token_id,
+ "project_id": proj["_id"],
+ "project_name": proj["name"],
+ "username": user_content["username"],
+ "user_id": user_content["_id"],
+ "admin": token_admin,
+ "roles": roles_list,
+ }
self.db.create(self.tokens_collection, new_token)
return deepcopy(new_token)
salt = uuid4().hex
user_info["_admin"]["salt"] = salt
if "password" in user_info:
- user_info["password"] = sha256(user_info["password"].encode('utf-8') + salt.encode('utf-8')).hexdigest()
+ user_info["password"] = sha256(
+ user_info["password"].encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
# "projects" are not stored any more
if "projects" in user_info:
del user_info["projects"]
:param user_info: user info modifications
"""
uid = user_info["_id"]
- user_data = self.db.get_one(self.users_collection, {BaseTopic.id_field("users", uid): uid})
+ user_data = self.db.get_one(
+ self.users_collection, {BaseTopic.id_field("users", uid): uid}
+ )
BaseTopic.format_on_edit(user_data, user_info)
# User Name
usnm = user_info.get("username")
user_data["username"] = usnm
# If password is given and is not already encripted
pswd = user_info.get("password")
- if pswd and (len(pswd) != 64 or not re.match('[a-fA-F0-9]*', pswd)): # TODO: Improve check?
+ if pswd and (
+ len(pswd) != 64 or not re.match("[a-fA-F0-9]*", pswd)
+ ): # TODO: Improve check?
salt = uuid4().hex
if "_admin" not in user_data:
user_data["_admin"] = {}
user_data["_admin"]["salt"] = salt
- user_data["password"] = sha256(pswd.encode('utf-8') + salt.encode('utf-8')).hexdigest()
+ user_data["password"] = sha256(
+ pswd.encode("utf-8") + salt.encode("utf-8")
+ ).hexdigest()
# Project-Role Mappings
# TODO: Check that user_info NEVER includes "project_role_mappings"
if "project_role_mappings" not in user_data:
for pidf in ["project", "project_name"]:
for ridf in ["role", "role_name"]:
try:
- user_data["project_role_mappings"].remove({"role": prm[ridf], "project": prm[pidf]})
+ user_data["project_role_mappings"].remove(
+ {"role": prm[ridf], "project": prm[pidf]}
+ )
except KeyError:
pass
except ValueError:
for prm in prms:
project_id = prm["project"]
if project_id not in project_id_name:
- pr = self.db.get_one(self.projects_collection,
- {BaseTopic.id_field("projects", project_id): project_id},
- fail_on_empty=False)
+ pr = self.db.get_one(
+ self.projects_collection,
+ {BaseTopic.id_field("projects", project_id): project_id},
+ fail_on_empty=False,
+ )
project_id_name[project_id] = pr["name"] if pr else None
prm["project_name"] = project_id_name[project_id]
if prm["project_name"] not in projects:
role_id = prm["role"]
if role_id not in role_id_name:
- role = self.db.get_one(self.roles_collection,
- {BaseTopic.id_field("roles", role_id): role_id},
- fail_on_empty=False)
+ role = self.db.get_one(
+ self.roles_collection,
+ {BaseTopic.id_field("roles", role_id): role_id},
+ fail_on_empty=False,
+ )
role_id_name[role_id] = role["name"] if role else None
prm["role_name"] = role_id_name[role_id]
user["projects"] = projects # for backward compatibility
elif projects:
# user created with an old version. Create a project_role mapping with role project_admin
user["project_role_mappings"] = []
- role = self.db.get_one(self.roles_collection,
- {BaseTopic.id_field("roles", "project_admin"): "project_admin"})
+ role = self.db.get_one(
+ self.roles_collection,
+ {BaseTopic.id_field("roles", "project_admin"): "project_admin"},
+ )
for p_id_name in projects:
- pr = self.db.get_one(self.projects_collection,
- {BaseTopic.id_field("projects", p_id_name): p_id_name})
- prm = {"project": pr["_id"],
- "project_name": pr["name"],
- "role_name": "project_admin",
- "role": role["_id"]
- }
+ pr = self.db.get_one(
+ self.projects_collection,
+ {BaseTopic.id_field("projects", p_id_name): p_id_name},
+ )
+ prm = {
+ "project": pr["_id"],
+ "project_name": pr["name"],
+ "role_name": "project_admin",
+ "role": role["_id"],
+ }
user["project_role_mappings"].append(prm)
else:
user["projects"] = []
:return: None
:raises AuthconnOperationException: if project update failed.
"""
- self.db.set_one(self.projects_collection, {BaseTopic.id_field("projects", project_id): project_id},
- project_info)
+ self.db.set_one(
+ self.projects_collection,
+ {BaseTopic.id_field("projects", project_id): project_id},
+ project_info,
+ )
"""
-__author__ = "Eduardo Sousa <esousa@whitestack.com>, " \
- "Pedro de la Cruz Ramos <pdelacruzramos@altran.com>"
+__author__ = (
+ "Eduardo Sousa <esousa@whitestack.com>, "
+ "Pedro de la Cruz Ramos <pdelacruzramos@altran.com>"
+)
__date__ = "$27-jul-2018 23:59:59$"
-from osm_nbi.authconn import Authconn, AuthException, AuthconnOperationException, AuthconnNotFoundException, \
- AuthconnConflictException
+from osm_nbi.authconn import (
+ Authconn,
+ AuthException,
+ AuthconnOperationException,
+ AuthconnNotFoundException,
+ AuthconnConflictException,
+)
import logging
import requests
if config.get("auth_url"):
validate_input(self.auth_url, http_schema)
else:
- self.auth_url = "http://{0}:{1}/v3".format(config.get("auth_host", "keystone"),
- config.get("auth_port", "5000"))
+ self.auth_url = "http://{0}:{1}/v3".format(
+ config.get("auth_host", "keystone"), config.get("auth_port", "5000")
+ )
self.user_domain_name_list = config.get("user_domain_name", "default")
self.user_domain_name_list = self.user_domain_name_list.split(",")
# read only domain list
- self.user_domain_ro_list = [x[:-3] for x in self.user_domain_name_list if x.endswith(":ro")]
+ self.user_domain_ro_list = [
+ x[:-3] for x in self.user_domain_name_list if x.endswith(":ro")
+ ]
# remove the ":ro"
- self.user_domain_name_list = [x if not x.endswith(":ro") else x[:-3] for x in self.user_domain_name_list]
+ self.user_domain_name_list = [
+ x if not x.endswith(":ro") else x[:-3] for x in self.user_domain_name_list
+ ]
self.admin_project = config.get("service_project", "service")
self.admin_username = config.get("service_username", "nbi")
self.project_domain_name_list = config.get("project_domain_name", "default")
self.project_domain_name_list = self.project_domain_name_list.split(",")
if len(self.user_domain_name_list) != len(self.project_domain_name_list):
- raise ValueError("Invalid configuration parameter fo authenticate. 'project_domain_name' and "
- "'user_domain_name' must be a comma-separated list with the same size. Revise "
- "configuration or/and 'OSMNBI_AUTHENTICATION_PROJECT_DOMAIN_NAME', "
- "'OSMNBI_AUTHENTICATION_USER_DOMAIN_NAME' Variables")
+ raise ValueError(
+ "Invalid configuration parameter fo authenticate. 'project_domain_name' and "
+ "'user_domain_name' must be a comma-separated list with the same size. Revise "
+ "configuration or/and 'OSMNBI_AUTHENTICATION_PROJECT_DOMAIN_NAME', "
+ "'OSMNBI_AUTHENTICATION_USER_DOMAIN_NAME' Variables"
+ )
# Waiting for Keystone to be up
available = None
if counter == 0:
raise AuthException("Keystone not available after 300s timeout")
- self.auth = v3.Password(user_domain_name=self.user_domain_name_list[0],
- username=self.admin_username,
- password=self.admin_password,
- project_domain_name=self.project_domain_name_list[0],
- project_name=self.admin_project,
- auth_url=self.auth_url)
+ self.auth = v3.Password(
+ user_domain_name=self.user_domain_name_list[0],
+ username=self.admin_username,
+ password=self.admin_password,
+ project_domain_name=self.project_domain_name_list[0],
+ project_name=self.admin_project,
+ auth_url=self.auth_url,
+ )
self.sess = session.Session(auth=self.auth)
- self.keystone = client.Client(session=self.sess, endpoint_override=self.auth_url)
+ self.keystone = client.Client(
+ session=self.sess, endpoint_override=self.auth_url
+ )
def authenticate(self, credentials, token_info=None):
"""
project_id = None
project_name = None
if credentials.get("project_domain_name"):
- project_domain_name_list = (credentials["project_domain_name"], )
+ project_domain_name_list = (credentials["project_domain_name"],)
else:
project_domain_name_list = self.project_domain_name_list
if credentials.get("user_domain_name"):
- user_domain_name_list = (credentials["user_domain_name"], )
+ user_domain_name_list = (credentials["user_domain_name"],)
else:
user_domain_name_list = self.user_domain_name_list
username=username,
password=credentials.get("password"),
user_domain_name=user_domain_name,
- project_domain_name=project_domain_name)
+ project_domain_name=project_domain_name,
+ )
elif token_info:
- unscoped_token = self.keystone.tokens.validate(token=token_info.get("_id"))
+ unscoped_token = self.keystone.tokens.validate(
+ token=token_info.get("_id")
+ )
else:
- raise AuthException("Provide credentials: username/password or Authorization Bearer token",
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Provide credentials: username/password or Authorization Bearer token",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
if not credentials.get("project_id"):
# get first project for the user
- project_list = self.keystone.projects.list(user=unscoped_token["user"]["id"])
+ project_list = self.keystone.projects.list(
+ user=unscoped_token["user"]["id"]
+ )
if not project_list:
- raise AuthException("The user {} has not any project and cannot be used for authentication".
- format(credentials.get("username")), http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "The user {} has not any project and cannot be used for authentication".format(
+ credentials.get("username")
+ ),
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
project_id = project_list[0].id
else:
if is_valid_uuid(credentials["project_id"]):
project_id=project_id,
user_domain_name=user_domain_name,
project_domain_name=project_domain_name,
- token=unscoped_token["auth_token"])
+ token=unscoped_token["auth_token"],
+ )
auth_token = {
"_id": scoped_token.auth_token,
"project_domain_name": scoped_token.project_domain_name,
"user_domain_name": scoped_token.user_domain_name,
"expires": scoped_token.expires.timestamp(),
- "issued_at": scoped_token.issued.timestamp()
+ "issued_at": scoped_token.issued.timestamp(),
}
return auth_token
except ClientException as e:
- if index >= len(user_domain_name_list)-1 or index >= len(project_domain_name_list)-1:
+ if (
+ index >= len(user_domain_name_list) - 1
+ or index >= len(project_domain_name_list) - 1
+ ):
# if last try, launch exception
# self.logger.exception("Error during user authentication using keystone: {}".format(e))
- raise AuthException("Error during user authentication using Keystone: {}".format(e),
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Error during user authentication using Keystone: {}".format(e),
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
def validate_token(self, token):
"""
"username": token_info["user"]["name"],
"roles": token_info["roles"],
"expires": token_info.expires.timestamp(),
- "issued_at": token_info.issued.timestamp()
+ "issued_at": token_info.issued.timestamp(),
}
return ses
except ClientException as e:
# self.logger.exception("Error during token validation using keystone: {}".format(e))
- raise AuthException("Error during token validation using Keystone: {}".format(e),
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Error during token validation using Keystone: {}".format(e),
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
def revoke_token(self, token):
"""
return True
except ClientException as e:
# self.logger.exception("Error during token revocation using keystone: {}".format(e))
- raise AuthException("Error during token revocation using Keystone: {}".format(e),
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Error during token revocation using Keystone: {}".format(e),
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
def _get_domain_id(self, domain_name, fail_if_not_found=True):
"""
# domain_name is already an id
return domain_name
if not domain_id and fail_if_not_found:
- raise AuthconnNotFoundException("Domain {} cannot be found".format(domain_name))
+ raise AuthconnNotFoundException(
+ "Domain {} cannot be found".format(domain_name)
+ )
return domain_id
def _get_domains(self):
"""
try:
- if user_info.get("domain_name") and user_info["domain_name"] in self.user_domain_ro_list:
- raise AuthconnConflictException("Cannot create a user in the read only domain {}".
- format(user_info["domain_name"]))
+ if (
+ user_info.get("domain_name")
+ and user_info["domain_name"] in self.user_domain_ro_list
+ ):
+ raise AuthconnConflictException(
+ "Cannot create a user in the read only domain {}".format(
+ user_info["domain_name"]
+ )
+ )
new_user = self.keystone.users.create(
- user_info["username"], password=user_info["password"],
- domain=self._get_domain_id(user_info.get("domain_name", self.user_domain_name_list[0])),
- _admin=user_info["_admin"])
+ user_info["username"],
+ password=user_info["password"],
+ domain=self._get_domain_id(
+ user_info.get("domain_name", self.user_domain_name_list[0])
+ ),
+ _admin=user_info["_admin"],
+ )
if "project_role_mappings" in user_info.keys():
for mapping in user_info["project_role_mappings"]:
- self.assign_role_to_user(new_user, mapping["project"], mapping["role"])
+ self.assign_role_to_user(
+ new_user, mapping["project"], mapping["role"]
+ )
return {"username": new_user.name, "_id": new_user.id}
except Conflict as e:
# self.logger.exception("Error during user creation using keystone: {}".format(e))
raise AuthconnOperationException(e, http_code=HTTPStatus.CONFLICT)
except ClientException as e:
# self.logger.exception("Error during user creation using keystone: {}".format(e))
- raise AuthconnOperationException("Error during user creation using Keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during user creation using Keystone: {}".format(e)
+ )
def update_user(self, user_info):
"""
user_obj = None
if not user_obj:
for user_domain in self.user_domain_name_list:
- domain_id = self._get_domain_id(user_domain, fail_if_not_found=False)
+ domain_id = self._get_domain_id(
+ user_domain, fail_if_not_found=False
+ )
if not domain_id:
continue
- user_obj_list = self.keystone.users.list(name=user, domain=domain_id)
+ user_obj_list = self.keystone.users.list(
+ name=user, domain=domain_id
+ )
if user_obj_list:
user_obj = user_obj_list[0]
break
- else: # user not found
+ else: # user not found
raise AuthconnNotFoundException("User '{}' not found".format(user))
user_id = user_obj.id
if domain_name in self.user_domain_ro_list:
if user_info.get("password") or user_info.get("username"):
- raise AuthconnConflictException("Cannot update the user {} belonging to a read only domain {}".
- format(user, domain_name))
-
- elif user_info.get("password") or user_info.get("username") \
- or user_info.get("add_project_role_mappings") or user_info.get("remove_project_role_mappings"):
+ raise AuthconnConflictException(
+ "Cannot update the user {} belonging to a read only domain {}".format(
+ user, domain_name
+ )
+ )
+
+ elif (
+ user_info.get("password")
+ or user_info.get("username")
+ or user_info.get("add_project_role_mappings")
+ or user_info.get("remove_project_role_mappings")
+ ):
# if user_index>0, it is an external domain, that should not be updated
- ctime = user_obj._admin.get("created", 0) if hasattr(user_obj, "_admin") else 0
+ ctime = (
+ user_obj._admin.get("created", 0)
+ if hasattr(user_obj, "_admin")
+ else 0
+ )
try:
- self.keystone.users.update(user_id, password=user_info.get("password"),
- name=user_info.get("username"),
- _admin={"created": ctime, "modified": time.time()})
+ self.keystone.users.update(
+ user_id,
+ password=user_info.get("password"),
+ name=user_info.get("username"),
+ _admin={"created": ctime, "modified": time.time()},
+ )
except Exception as e:
if user_info.get("username") or user_info.get("password"):
- raise AuthconnOperationException("Error during username/password change: {}".format(str(e)))
- self.logger.error("Error during updating user profile: {}".format(str(e)))
+ raise AuthconnOperationException(
+ "Error during username/password change: {}".format(str(e))
+ )
+ self.logger.error(
+ "Error during updating user profile: {}".format(str(e))
+ )
for mapping in user_info.get("remove_project_role_mappings", []):
- self.remove_role_from_user(user_obj, mapping["project"], mapping["role"])
+ self.remove_role_from_user(
+ user_obj, mapping["project"], mapping["role"]
+ )
for mapping in user_info.get("add_project_role_mappings", []):
self.assign_role_to_user(user_obj, mapping["project"], mapping["role"])
except ClientException as e:
# self.logger.exception("Error during user password/name update using keystone: {}".format(e))
- raise AuthconnOperationException("Error during user update using Keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during user update using Keystone: {}".format(e)
+ )
def delete_user(self, user_id):
"""
domain_id = user_obj.domain_id
domain_name = self.domains_id2name.get(domain_id)
if domain_name in self.user_domain_ro_list:
- raise AuthconnConflictException("Cannot delete user {} belonging to a read only domain {}".
- format(user_id, domain_name))
+ raise AuthconnConflictException(
+ "Cannot delete user {} belonging to a read only domain {}".format(
+ user_id, domain_name
+ )
+ )
result, detail = self.keystone.users.delete(user_id)
if result.status_code != 204:
return True
except ClientException as e:
# self.logger.exception("Error during user deletion using keystone: {}".format(e))
- raise AuthconnOperationException("Error during user deletion using Keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during user deletion using Keystone: {}".format(e)
+ )
def get_user_list(self, filter_q=None):
"""
if filter_q:
filter_name = filter_q.get("name") or filter_q.get("username")
if filter_q.get("domain_name"):
- filter_domain = self._get_domain_id(filter_q["domain_name"], fail_if_not_found=False)
+ filter_domain = self._get_domain_id(
+ filter_q["domain_name"], fail_if_not_found=False
+ )
# If domain is not found, use the same name to obtain an empty list
filter_domain = filter_domain or filter_q["domain_name"]
if filter_q.get("domain_id"):
# get users from user_domain_name_list[1:], because it will not be provided in case of LDAP
if filter_domain is None and len(self.user_domain_name_list) > 1:
for user_domain in self.user_domain_name_list[1:]:
- domain_id = self._get_domain_id(user_domain, fail_if_not_found=False)
+ domain_id = self._get_domain_id(
+ user_domain, fail_if_not_found=False
+ )
if not domain_id:
continue
# find if users of this domain are already provided. In this case ignore
if u.domain_id == domain_id:
break
else:
- users += self.keystone.users.list(name=filter_name, domain=domain_id)
+ users += self.keystone.users.list(
+ name=filter_name, domain=domain_id
+ )
# if filter name matches a user id, provide it also
if filter_name:
except Exception:
pass
- users = [{
- "username": user.name,
- "_id": user.id,
- "id": user.id,
- "_admin": user.to_dict().get("_admin", {}), # TODO: REVISE
- "domain_name": self.domains_id2name.get(user.domain_id)
- } for user in users if user.name != self.admin_username]
+ users = [
+ {
+ "username": user.name,
+ "_id": user.id,
+ "id": user.id,
+ "_admin": user.to_dict().get("_admin", {}), # TODO: REVISE
+ "domain_name": self.domains_id2name.get(user.domain_id),
+ }
+ for user in users
+ if user.name != self.admin_username
+ ]
if filter_q and filter_q.get("_id"):
users = [user for user in users if filter_q["_id"] == user["_id"]]
for project in projects:
user["projects"].append(project.name)
- roles = self.keystone.roles.list(user=user["_id"], project=project.id)
+ roles = self.keystone.roles.list(
+ user=user["_id"], project=project.id
+ )
for role in roles:
prm = {
"project": project.id,
return users
except ClientException as e:
# self.logger.exception("Error during user listing using keystone: {}".format(e))
- raise AuthconnOperationException("Error during user listing using Keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during user listing using Keystone: {}".format(e)
+ )
def get_role_list(self, filter_q=None):
"""
filter_name = filter_q.get("name")
roles_list = self.keystone.roles.list(name=filter_name)
- roles = [{
- "name": role.name,
- "_id": role.id,
- "_admin": role.to_dict().get("_admin", {}),
- "permissions": role.to_dict().get("permissions", {})
- } for role in roles_list if role.name != "service"]
+ roles = [
+ {
+ "name": role.name,
+ "_id": role.id,
+ "_admin": role.to_dict().get("_admin", {}),
+ "permissions": role.to_dict().get("permissions", {}),
+ }
+ for role in roles_list
+ if role.name != "service"
+ ]
if filter_q and filter_q.get("_id"):
roles = [role for role in roles if filter_q["_id"] == role["_id"]]
return roles
except ClientException as e:
# self.logger.exception("Error during user role listing using keystone: {}".format(e))
- raise AuthException("Error during user role listing using Keystone: {}".format(e),
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Error during user role listing using Keystone: {}".format(e),
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
def create_role(self, role_info):
"""
:raises AuthconnOperationException: if role creation failed.
"""
try:
- result = self.keystone.roles.create(role_info["name"], permissions=role_info.get("permissions"),
- _admin=role_info.get("_admin"))
+ result = self.keystone.roles.create(
+ role_info["name"],
+ permissions=role_info.get("permissions"),
+ _admin=role_info.get("_admin"),
+ )
return result.id
except Conflict as ex:
raise AuthconnConflictException(str(ex))
except ClientException as e:
# self.logger.exception("Error during role creation using keystone: {}".format(e))
- raise AuthconnOperationException("Error during role creation using Keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during role creation using Keystone: {}".format(e)
+ )
def delete_role(self, role_id):
"""
return True
except ClientException as e:
# self.logger.exception("Error during role deletion using keystone: {}".format(e))
- raise AuthconnOperationException("Error during role deletion using Keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during role deletion using Keystone: {}".format(e)
+ )
def update_role(self, role_info):
"""
"""
try:
rid = role_info["_id"]
- if not is_valid_uuid(rid): # Is this required?
+ if not is_valid_uuid(rid): # Is this required?
role_obj_list = self.keystone.roles.list(name=rid)
if not role_obj_list:
raise AuthconnNotFoundException("Role '{}' not found".format(rid))
rid = role_obj_list[0].id
- self.keystone.roles.update(rid, name=role_info["name"], permissions=role_info.get("permissions"),
- _admin=role_info.get("_admin"))
+ self.keystone.roles.update(
+ rid,
+ name=role_info["name"],
+ permissions=role_info.get("permissions"),
+ _admin=role_info.get("_admin"),
+ )
except ClientException as e:
# self.logger.exception("Error during role update using keystone: {}".format(e))
- raise AuthconnOperationException("Error during role updating using Keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during role updating using Keystone: {}".format(e)
+ )
def get_project_list(self, filter_q=None):
"""
if filter_q.get("domain_id"):
filter_domain = filter_q["domain_id"]
- projects = self.keystone.projects.list(name=filter_name, domain=filter_domain)
+ projects = self.keystone.projects.list(
+ name=filter_name, domain=filter_domain
+ )
- projects = [{
- "name": project.name,
- "_id": project.id,
- "_admin": project.to_dict().get("_admin", {}), # TODO: REVISE
- "quotas": project.to_dict().get("quotas", {}), # TODO: REVISE
- "domain_name": self.domains_id2name.get(project.domain_id)
- } for project in projects]
+ projects = [
+ {
+ "name": project.name,
+ "_id": project.id,
+ "_admin": project.to_dict().get("_admin", {}), # TODO: REVISE
+ "quotas": project.to_dict().get("quotas", {}), # TODO: REVISE
+ "domain_name": self.domains_id2name.get(project.domain_id),
+ }
+ for project in projects
+ ]
if filter_q and filter_q.get("_id"):
- projects = [project for project in projects
- if filter_q["_id"] == project["_id"]]
+ projects = [
+ project for project in projects if filter_q["_id"] == project["_id"]
+ ]
return projects
except ClientException as e:
# self.logger.exception("Error during user project listing using keystone: {}".format(e))
- raise AuthException("Error during user project listing using Keystone: {}".format(e),
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "Error during user project listing using Keystone: {}".format(e),
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
def create_project(self, project_info):
"""
try:
result = self.keystone.projects.create(
project_info["name"],
- domain=self._get_domain_id(project_info.get("domain_name", self.project_domain_name_list[0])),
+ domain=self._get_domain_id(
+ project_info.get("domain_name", self.project_domain_name_list[0])
+ ),
_admin=project_info["_admin"],
- quotas=project_info.get("quotas", {})
+ quotas=project_info.get("quotas", {}),
)
return result.id
except ClientException as e:
# self.logger.exception("Error during project creation using keystone: {}".format(e))
- raise AuthconnOperationException("Error during project creation using Keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during project creation using Keystone: {}".format(e)
+ )
def delete_project(self, project_id):
"""
return True
except ClientException as e:
# self.logger.exception("Error during project deletion using keystone: {}".format(e))
- raise AuthconnOperationException("Error during project deletion using Keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during project deletion using Keystone: {}".format(e)
+ )
def update_project(self, project_id, project_info):
"""
:return: None
"""
try:
- self.keystone.projects.update(project_id, name=project_info["name"],
- _admin=project_info["_admin"],
- quotas=project_info.get("quotas", {})
- )
+ self.keystone.projects.update(
+ project_id,
+ name=project_info["name"],
+ _admin=project_info["_admin"],
+ quotas=project_info.get("quotas", {}),
+ )
except ClientException as e:
# self.logger.exception("Error during project update using keystone: {}".format(e))
- raise AuthconnOperationException("Error during project update using Keystone: {}".format(e))
+ raise AuthconnOperationException(
+ "Error during project update using Keystone: {}".format(e)
+ )
def assign_role_to_user(self, user_obj, project, role):
"""
except Exception:
project_obj_list = self.keystone.projects.list(name=project)
if not project_obj_list:
- raise AuthconnNotFoundException("Project '{}' not found".format(project))
+ raise AuthconnNotFoundException(
+ "Project '{}' not found".format(project)
+ )
project_obj = project_obj_list[0]
try:
self.keystone.roles.grant(role_obj, user=user_obj, project=project_obj)
except ClientException as e:
# self.logger.exception("Error during user role assignment using keystone: {}".format(e))
- raise AuthconnOperationException("Error during role '{}' assignment to user '{}' and project '{}' using "
- "Keystone: {}".format(role, user_obj.name, project, e))
+ raise AuthconnOperationException(
+ "Error during role '{}' assignment to user '{}' and project '{}' using "
+ "Keystone: {}".format(role, user_obj.name, project, e)
+ )
def remove_role_from_user(self, user_obj, project, role):
"""
except Exception:
project_obj_list = self.keystone.projects.list(name=project)
if not project_obj_list:
- raise AuthconnNotFoundException("Project '{}' not found".format(project))
+ raise AuthconnNotFoundException(
+ "Project '{}' not found".format(project)
+ )
project_obj = project_obj_list[0]
try:
self.keystone.roles.revoke(role_obj, user=user_obj, project=project_obj)
except ClientException as e:
# self.logger.exception("Error during user role revocation using keystone: {}".format(e))
- raise AuthconnOperationException("Error during role '{}' revocation to user '{}' and project '{}' using "
- "Keystone: {}".format(role, user_obj.name, project, e))
+ raise AuthconnOperationException(
+ "Error during role '{}' revocation to user '{}' and project '{}' using "
+ "Keystone: {}".format(role, user_obj.name, project, e)
+ )
##
-"""
+"""
AuthconnTacacs implements implements the connector for TACACS.
Leverages AuthconnInternal for token lifecycle management and the RBAC model.
When NBI bootstraps, it tries to create admin user with admin role associated to admin project.
Hence, the TACACS server should contain admin user.
-"""
+"""
__author__ = "K Sai Kiran <saikiran.k@tataelxsi.co.in>"
__date__ = "$11-Nov-2020 11:04:00$"
-from osm_nbi.authconn import Authconn, AuthException
+from osm_nbi.authconn import Authconn, AuthException
from osm_nbi.authconn_internal import AuthconnInternal
from osm_nbi.base_topic import BaseTopic
self.db = db
self.tacacs_host = config["tacacs_host"]
self.tacacs_secret = config["tacacs_secret"]
- self.tacacs_port = config["tacacs_port"] if config.get("tacacs_port") else self.tacacs_def_port
- self.tacacs_timeout = config["tacacs_timeout"] if config.get("tacacs_timeout") else self.tacacs_def_timeout
- self.tacacs_cli = TACACSClient(self.tacacs_host, self.tacacs_port, self.tacacs_secret,
- self.tacacs_timeout)
+ self.tacacs_port = (
+ config["tacacs_port"] if config.get("tacacs_port") else self.tacacs_def_port
+ )
+ self.tacacs_timeout = (
+ config["tacacs_timeout"]
+ if config.get("tacacs_timeout")
+ else self.tacacs_def_timeout
+ )
+ self.tacacs_cli = TACACSClient(
+ self.tacacs_host, self.tacacs_port, self.tacacs_secret, self.tacacs_timeout
+ )
def validate_user(self, user, password):
- """
- """
+ """"""
now = time()
try:
tacacs_authen = self.tacacs_cli.authenticate(user, password)
except Exception as e:
- raise AuthException("TACACS server error: {}".format(e), http_code=HTTPStatus.UNAUTHORIZED)
+ raise AuthException(
+ "TACACS server error: {}".format(e), http_code=HTTPStatus.UNAUTHORIZED
+ )
user_content = None
- user_rows = self.db.get_list(self.users_collection, {BaseTopic.id_field("users", user): user})
+ user_rows = self.db.get_list(
+ self.users_collection, {BaseTopic.id_field("users", user): user}
+ )
if not tacacs_authen.valid:
if user_rows:
# To remove TACACS stale user from system.
if user_rows:
user_content = user_rows[0]
else:
- new_user = {'username': user,
- 'password': password,
- '_admin': {
- 'created': now,
- 'modified': now
- },
- 'project_role_mappings': []
- }
+ new_user = {
+ "username": user,
+ "password": password,
+ "_admin": {"created": now, "modified": now},
+ "project_role_mappings": [],
+ }
user_content = self.create_user(new_user)
return user_content
"""
BaseTopic.format_on_new(user_info, make_public=False)
try:
- authen = self.tacacs_cli.authenticate(user_info["username"], user_info["password"])
+ authen = self.tacacs_cli.authenticate(
+ user_info["username"], user_info["password"]
+ )
if authen.valid:
user_info.pop("password")
self.db.create(self.users_collection, user_info)
else:
- raise AuthException("TACACS server error: Invalid credentials", http_code=HTTPStatus.FORBIDDEN)
+ raise AuthException(
+ "TACACS server error: Invalid credentials",
+ http_code=HTTPStatus.FORBIDDEN,
+ )
except Exception as e:
- raise AuthException("TACACS server error: {}".format(e), http_code=HTTPStatus.BAD_REQUEST)
+ raise AuthException(
+ "TACACS server error: {}".format(e), http_code=HTTPStatus.BAD_REQUEST
+ )
return {"username": user_info["username"], "_id": user_info["_id"]}
def update_user(self, user_info):
:param user_info: Full user information in dict.
:return: returns None for successful add/remove of project and role map.
"""
- if(user_info.get("username")):
- raise AuthException("Can not update username of this user", http_code=HTTPStatus.FORBIDDEN)
- if(user_info.get("password")):
- raise AuthException("Can not update password of this user", http_code=HTTPStatus.FORBIDDEN)
+ if user_info.get("username"):
+ raise AuthException(
+ "Can not update username of this user", http_code=HTTPStatus.FORBIDDEN
+ )
+ if user_info.get("password"):
+ raise AuthException(
+ "Can not update password of this user", http_code=HTTPStatus.FORBIDDEN
+ )
super(AuthconnTacacs, self).update_user(user_info)
class EngineException(Exception):
-
def __init__(self, message, http_code=HTTPStatus.BAD_REQUEST):
self.http_code = http_code
super(Exception, self).__init__(message)
if i > 0:
i += 1
# format in hex, len can be 2 for mac or 4 for ipv6
- return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
+ return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
+ ip_mac[:i], int(ip_mac[i:], 16) + vm_index
+ )
except Exception:
pass
return None
class BaseTopic:
# static variables for all instance classes
- topic = None # to_override
- topic_msg = None # to_override
- quota_name = None # to_override. If not provided topic will be used for quota_name
- schema_new = None # to_override
+ topic = None # to_override
+ topic_msg = None # to_override
+ quota_name = None # to_override. If not provided topic will be used for quota_name
+ schema_new = None # to_override
schema_edit = None # to_override
multiproject = True # True if this Topic can be shared by several projects. Then it contains _admin.projects_read
default_quota = 500
# Alternative ID Fields for some Topics
- alt_id_field = {
- "projects": "name",
- "users": "username",
- "roles": "name"
- }
+ alt_id_field = {"projects": "name", "users": "username", "roles": "name"}
def __init__(self, db, fs, msg, auth):
self.db = db
count = self.db.count(self.topic, {"_admin.projects_read": pid})
if count >= quota:
name = proj["name"]
- raise ValidationError("quota ({}={}) exceeded for project {} ({})".format(quota_name, quota, name, pid),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise ValidationError(
+ "quota ({}={}) exceeded for project {} ({})".format(
+ quota_name, quota, name, pid
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
def _validate_input_new(self, input, force=False):
"""
not present or contains ANY mean public.
:param session: contains:
project_id: project list this session has rights to access. Can be empty, one or several
- set_project: items created will contain this project list
+ set_project: items created will contain this project list
force: True or False
public: True, False or None
method: "list", "show", "write", "delete"
project_filter_n.append(session["PROJECT.ne"])
if project_filter:
- if session["method"] in ("list", "show", "delete") or session.get("set_project"):
+ if session["method"] in ("list", "show", "delete") or session.get(
+ "set_project"
+ ):
p_filter["_admin.projects_read.cont"] = project_filter
else:
p_filter["_admin.projects_write.cont"] = project_filter
if project_filter_n:
- if session["method"] in ("list", "show", "delete") or session.get("set_project"):
+ if session["method"] in ("list", "show", "delete") or session.get(
+ "set_project"
+ ):
p_filter["_admin.projects_read.ncont"] = project_filter_n
else:
p_filter["_admin.projects_write.ncont"] = project_filter_n
return final_content
# Change public status
if session["public"] is not None:
- if session["public"] and "ANY" not in final_content["_admin"]["projects_read"]:
+ if (
+ session["public"]
+ and "ANY" not in final_content["_admin"]["projects_read"]
+ ):
final_content["_admin"]["projects_read"].append("ANY")
final_content["_admin"]["projects_write"].clear()
- if not session["public"] and "ANY" in final_content["_admin"]["projects_read"]:
+ if (
+ not session["public"]
+ and "ANY" in final_content["_admin"]["projects_read"]
+ ):
final_content["_admin"]["projects_read"].remove("ANY")
# Change project status
_filter["name"] = name
if _id:
_filter["_id.neq"] = _id
- if self.db.get_one(self.topic, _filter, fail_on_empty=False, fail_on_more=False):
- raise EngineException("name '{}' already exists for {}".format(name, self.topic), HTTPStatus.CONFLICT)
+ if self.db.get_one(
+ self.topic, _filter, fail_on_empty=False, fail_on_more=False
+ ):
+ raise EngineException(
+ "name '{}' already exists for {}".format(name, self.topic),
+ HTTPStatus.CONFLICT,
+ )
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
kitem_old = int(kitem)
# if index greater than list, extend the list
if kitem_old >= len(update_content):
- update_content += [None] * (kitem_old - len(update_content) + 1)
+ update_content += [None] * (
+ kitem_old - len(update_content) + 1
+ )
if not isinstance(update_content[kitem_old], (dict, list)):
update_content[kitem_old] = {}
else:
raise EngineException(
- "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
+ "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(
+ k, kitem
+ )
+ )
if v is None:
del update_content[kitem_old]
else:
update_content[kitem_old] = v if not yaml_format else safe_load(v)
except KeyError:
raise EngineException(
- "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
+ "Invalid query string '{}'. Descriptor does not contain '{}'".format(
+ k, kitem_old
+ )
+ )
except ValueError:
- raise EngineException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
- k, kitem))
+ raise EngineException(
+ "Invalid query string '{}'. Expected integer index list instead of '{}'".format(
+ k, kitem
+ )
+ )
except IndexError:
raise EngineException(
- "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
+ "Invalid query string '{}'. Index '{}' out of range".format(
+ k, kitem_old
+ )
+ )
except YAMLError:
raise EngineException("Invalid query string '{}' yaml format".format(k))
self.sol005_projection(data)
return data
-
+
# TODO transform data for SOL005 URL requests
# TODO remove _admin if not admin
:param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
:return: opened file or raises an exception
"""
- raise EngineException("Method get_file not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Method get_file not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR
+ )
def list(self, session, filter_q=None, api_req=False):
"""
# Only perform SOL005 projection if we are serving an external request
if api_req:
data = [self.sol005_projection(inst) for inst in data]
-
+
return data
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
self._update_input_with_kwargs(content, kwargs)
content = self._validate_input_new(content, force=session["force"])
self.check_conflict_on_new(session, content)
- op_id = self.format_on_new(content, project_id=session["project_id"], make_public=session["public"])
+ op_id = self.format_on_new(
+ content, project_id=session["project_id"], make_public=session["public"]
+ )
_id = self.db.create(self.topic, content)
rollback.append({"topic": self.topic, "_id": _id})
if op_id:
:return: True package has is completely uploaded or False if partial content has been uplodaed.
Raise exception on error
"""
- raise EngineException("Method upload_content not valid for this topic", HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Method upload_content not valid for this topic",
+ HTTPStatus.INTERNAL_SERVER_ERROR,
+ )
def delete_list(self, session, filter_q=None):
"""
self.check_conflict_on_del(session, _id, item_content)
if dry_run:
return None
-
+
if self.multiproject and session["project_id"]:
# remove reference from project_read if there are more projects referencing it. If it last one,
# do not remove reference, but delete
- other_projects_referencing = next((p for p in item_content["_admin"]["projects_read"]
- if p not in session["project_id"] and p != "ANY"), None)
+ other_projects_referencing = next(
+ (
+ p
+ for p in item_content["_admin"]["projects_read"]
+ if p not in session["project_id"] and p != "ANY"
+ ),
+ None,
+ )
# check if there are projects referencing it (apart from ANY, that means, public)....
if other_projects_referencing:
# remove references but not delete
- update_dict_pull = {"_admin.projects_read": session["project_id"],
- "_admin.projects_write": session["project_id"]}
- self.db.set_one(self.topic, filter_q, update_dict=None, pull_list=update_dict_pull)
+ update_dict_pull = {
+ "_admin.projects_read": session["project_id"],
+ "_admin.projects_write": session["project_id"],
+ }
+ self.db.set_one(
+ self.topic, filter_q, update_dict=None, pull_list=update_dict_pull
+ )
return None
else:
- can_write = next((p for p in item_content["_admin"]["projects_write"] if p == "ANY" or
- p in session["project_id"]), None)
+ can_write = next(
+ (
+ p
+ for p in item_content["_admin"]["projects_write"]
+ if p == "ANY" or p in session["project_id"]
+ ),
+ None,
+ )
if not can_write:
- raise EngineException("You have not write permission to delete it",
- http_code=HTTPStatus.UNAUTHORIZED)
+ raise EngineException(
+ "You have not write permission to delete it",
+ http_code=HTTPStatus.UNAUTHORIZED,
+ )
# delete
self.db.del_one(self.topic, filter_q)
self._update_input_with_kwargs(indata, kwargs)
try:
if indata and session.get("set_project"):
- raise EngineException("Cannot edit content and set to project (query string SET_PROJECT) at same time",
- HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "Cannot edit content and set to project (query string SET_PROJECT) at same time",
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
# TODO self._check_edition(session, indata, _id, force)
if not content:
content = self.show(session, _id)
import json
import importlib
import copy
+
# import logging
from hashlib import md5
from osm_common.dbbase import DbException, deep_update_rfc7396
from time import time
from uuid import uuid4
from re import fullmatch
-from osm_nbi.validation import ValidationError, pdu_new_schema, pdu_edit_schema, \
- validate_input, vnfpkgop_new_schema
+from osm_nbi.validation import (
+ ValidationError,
+ pdu_new_schema,
+ pdu_edit_schema,
+ validate_input,
+ vnfpkgop_new_schema,
+)
from osm_nbi.base_topic import BaseTopic, EngineException, get_iterable
+
etsi_nfv_vnfd = importlib.import_module("osm_im.etsi-nfv-vnfd")
etsi_nfv_nsd = importlib.import_module("osm_im.etsi-nfv-nsd")
from osm_im.nst import nst as nst_im
class DescriptorTopic(BaseTopic):
-
def __init__(self, db, fs, msg, auth):
BaseTopic.__init__(self, db, fs, msg, auth)
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- final_content = super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
def _check_unique_id_name(descriptor, position=""):
for desc_key, desc_item in descriptor.items():
desc_item_id = None
for index, list_item in enumerate(desc_item):
if isinstance(list_item, dict):
- _check_unique_id_name(list_item, "{}.{}[{}]"
- .format(position, desc_key, index))
+ _check_unique_id_name(
+ list_item, "{}.{}[{}]".format(position, desc_key, index)
+ )
# Base case
- if index == 0 and (list_item.get("id") or list_item.get("name")):
+ if index == 0 and (
+ list_item.get("id") or list_item.get("name")
+ ):
desc_item_id = "id" if list_item.get("id") else "name"
if desc_item_id and list_item.get(desc_item_id):
if list_item[desc_item_id] in used_ids:
- position = "{}.{}[{}]".format(position, desc_key, index)
- raise EngineException("Error: identifier {} '{}' is not unique and repeats at '{}'"
- .format(desc_item_id, list_item[desc_item_id],
- position), HTTPStatus.UNPROCESSABLE_ENTITY)
+ position = "{}.{}[{}]".format(
+ position, desc_key, index
+ )
+ raise EngineException(
+ "Error: identifier {} '{}' is not unique and repeats at '{}'".format(
+ desc_item_id,
+ list_item[desc_item_id],
+ position,
+ ),
+ HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
used_ids.append(list_item[desc_item_id])
_check_unique_id_name(final_content)
if k in final_content:
internal_keys[k] = final_content.pop(k)
storage_params = internal_keys["_admin"].get("storage")
- serialized = self._validate_input_new(final_content, storage_params, session["force"])
+ serialized = self._validate_input_new(
+ final_content, storage_params, session["force"]
+ )
# 1.2. modify final_content with a serialized version
final_content = copy.deepcopy(serialized)
_filter["_id.neq"] = _id
if self.db.get_one(self.topic, _filter, fail_on_empty=False):
- raise EngineException("{} with id '{}' already exists for this project".format(self.topic[:-1],
- final_content["id"]),
- HTTPStatus.CONFLICT)
+ raise EngineException(
+ "{} with id '{}' already exists for this project".format(
+ self.topic[:-1], final_content["id"]
+ ),
+ HTTPStatus.CONFLICT,
+ )
return final_content
if len(desc_list) == 1:
return desc_list[0]
elif len(desc_list) > 1:
- raise DbException("Found more than one {} with id='{}' belonging to this project".format(topic[:-1], id),
- HTTPStatus.CONFLICT)
+ raise DbException(
+ "Found more than one {} with id='{}' belonging to this project".format(
+ topic[:-1], id
+ ),
+ HTTPStatus.CONFLICT,
+ )
# not found any: try to find public
_filter = BaseTopic._get_project_filter(session)
_filter["id"] = id
desc_list = db.get_list(topic, _filter)
if not desc_list:
- raise DbException("Not found any {} with id='{}'".format(topic[:-1], id), HTTPStatus.NOT_FOUND)
+ raise DbException(
+ "Not found any {} with id='{}'".format(topic[:-1], id),
+ HTTPStatus.NOT_FOUND,
+ )
elif len(desc_list) == 1:
return desc_list[0]
else:
- raise DbException("Found more than one public {} with id='{}'; and no one belonging to this project".format(
- topic[:-1], id), HTTPStatus.CONFLICT)
+ raise DbException(
+ "Found more than one public {} with id='{}'; and no one belonging to this project".format(
+ topic[:-1], id
+ ),
+ HTTPStatus.CONFLICT,
+ )
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
"""
# _remove_envelop
if indata:
if "userDefinedData" in indata:
- indata = indata['userDefinedData']
+ indata = indata["userDefinedData"]
# Override descriptor with query string kwargs
self._update_input_with_kwargs(indata, kwargs)
# indata = DescriptorTopic._validate_input_new(self, indata, project_id=session["force"])
content = {"_admin": {"userDefinedData": indata}}
- self.format_on_new(content, session["project_id"], make_public=session["public"])
+ self.format_on_new(
+ content, session["project_id"], make_public=session["public"]
+ )
_id = self.db.create(self.topic, content)
rollback.append({"topic": self.topic, "_id": _id})
self._send_msg("created", {"_id": _id})
expected_md5 = headers.get("Content-File-MD5")
compressed = None
content_type = headers.get("Content-Type")
- if content_type and "application/gzip" in content_type or "application/x-gzip" in content_type or \
- "application/zip" in content_type:
+ if (
+ content_type
+ and "application/gzip" in content_type
+ or "application/x-gzip" in content_type
+ or "application/zip" in content_type
+ ):
compressed = "gzip"
filename = headers.get("Content-Filename")
if not filename:
error_text = ""
try:
if content_range_text:
- content_range = content_range_text.replace("-", " ").replace("/", " ").split()
- if content_range[0] != "bytes": # TODO check x<y not negative < total....
+ content_range = (
+ content_range_text.replace("-", " ").replace("/", " ").split()
+ )
+ if (
+ content_range[0] != "bytes"
+ ): # TODO check x<y not negative < total....
raise IndexError()
start = int(content_range[1])
end = int(content_range[2]) + 1
total = int(content_range[3])
else:
start = 0
- temp_folder = _id + "_" # all the content is upload here and if ok, it is rename from id_ to is folder
+ temp_folder = (
+ _id + "_"
+ ) # all the content is upload here and if ok, it is rename from id_ to is folder
if start:
- if not self.fs.file_exists(temp_folder, 'dir'):
- raise EngineException("invalid Transaction-Id header", HTTPStatus.NOT_FOUND)
+ if not self.fs.file_exists(temp_folder, "dir"):
+ raise EngineException(
+ "invalid Transaction-Id header", HTTPStatus.NOT_FOUND
+ )
else:
self.fs.file_delete(temp_folder, ignore_non_exist=True)
self.fs.mkdir(temp_folder)
storage["folder"] = _id
file_path = (temp_folder, filename)
- if self.fs.file_exists(file_path, 'file'):
+ if self.fs.file_exists(file_path, "file"):
file_size = self.fs.file_size(file_path)
else:
file_size = 0
if file_size != start:
- raise EngineException("invalid Content-Range start sequence, expected '{}' but received '{}'".format(
- file_size, start), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
- file_pkg = self.fs.file_open(file_path, 'a+b')
+ raise EngineException(
+ "invalid Content-Range start sequence, expected '{}' but received '{}'".format(
+ file_size, start
+ ),
+ HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
+ )
+ file_pkg = self.fs.file_open(file_path, "a+b")
if isinstance(indata, dict):
indata_text = yaml.safe_dump(indata, indent=4, default_flow_style=False)
file_pkg.write(indata_text.encode(encoding="utf-8"))
file_pkg.write(indata_text)
if content_range_text:
if indata_len != end - start:
- raise EngineException("Mismatch between Content-Range header {}-{} and body length of {}".format(
- start, end - 1, indata_len), HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
+ raise EngineException(
+ "Mismatch between Content-Range header {}-{} and body length of {}".format(
+ start, end - 1, indata_len
+ ),
+ HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
+ )
if end != total:
# TODO update to UPLOADING
return False
raise EngineException("Error, MD5 mismatch", HTTPStatus.CONFLICT)
file_pkg.seek(0, 0)
if compressed == "gzip":
- tar = tarfile.open(mode='r', fileobj=file_pkg)
+ tar = tarfile.open(mode="r", fileobj=file_pkg)
descriptor_file_name = None
for tarinfo in tar:
tarname = tarinfo.name
tarname_path = tarname.split("/")
- if not tarname_path[0] or ".." in tarname_path: # if start with "/" means absolute path
- raise EngineException("Absolute path or '..' are not allowed for package descriptor tar.gz")
+ if (
+ not tarname_path[0] or ".." in tarname_path
+ ): # if start with "/" means absolute path
+ raise EngineException(
+ "Absolute path or '..' are not allowed for package descriptor tar.gz"
+ )
if len(tarname_path) == 1 and not tarinfo.isdir():
- raise EngineException("All files must be inside a dir for package descriptor tar.gz")
- if tarname.endswith(".yaml") or tarname.endswith(".json") or tarname.endswith(".yml"):
+ raise EngineException(
+ "All files must be inside a dir for package descriptor tar.gz"
+ )
+ if (
+ tarname.endswith(".yaml")
+ or tarname.endswith(".json")
+ or tarname.endswith(".yml")
+ ):
storage["pkg-dir"] = tarname_path[0]
if len(tarname_path) == 2:
if descriptor_file_name:
raise EngineException(
- "Found more than one descriptor file at package descriptor tar.gz")
+ "Found more than one descriptor file at package descriptor tar.gz"
+ )
descriptor_file_name = tarname
if not descriptor_file_name:
- raise EngineException("Not found any descriptor file at package descriptor tar.gz")
+ raise EngineException(
+ "Not found any descriptor file at package descriptor tar.gz"
+ )
storage["descriptor"] = descriptor_file_name
storage["zipfile"] = filename
self.fs.file_extract(tar, temp_folder)
- with self.fs.file_open((temp_folder, descriptor_file_name), "r") as descriptor_file:
+ with self.fs.file_open(
+ (temp_folder, descriptor_file_name), "r"
+ ) as descriptor_file:
content = descriptor_file.read()
else:
content = file_pkg.read()
self._update_input_with_kwargs(indata, kwargs)
deep_update_rfc7396(current_desc, indata)
- current_desc = self.check_conflict_on_edit(session, current_desc, indata, _id=_id)
+ current_desc = self.check_conflict_on_edit(
+ session, current_desc, indata, _id=_id
+ )
current_desc["_admin"]["modified"] = time()
self.db.replace(self.topic, _id, current_desc)
self.fs.dir_rename(temp_folder, _id)
except EngineException:
raise
except IndexError:
- raise EngineException("invalid Content-Range header format. Expected 'bytes start-end/total'",
- HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE)
+ raise EngineException(
+ "invalid Content-Range header format. Expected 'bytes start-end/total'",
+ HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
+ )
except IOError as e:
- raise EngineException("invalid upload transaction sequence: '{}'".format(e), HTTPStatus.BAD_REQUEST)
+ raise EngineException(
+ "invalid upload transaction sequence: '{}'".format(e),
+ HTTPStatus.BAD_REQUEST,
+ )
except tarfile.ReadError as e:
- raise EngineException("invalid file content {}".format(e), HTTPStatus.BAD_REQUEST)
+ raise EngineException(
+ "invalid file content {}".format(e), HTTPStatus.BAD_REQUEST
+ )
except (ValueError, yaml.YAMLError) as e:
raise EngineException(error_text + str(e))
except ValidationError as e:
"""
accept_text = accept_zip = False
if accept_header:
- if 'text/plain' in accept_header or '*/*' in accept_header:
+ if "text/plain" in accept_header or "*/*" in accept_header:
accept_text = True
- if 'application/zip' in accept_header or '*/*' in accept_header:
- accept_zip = 'application/zip'
- elif 'application/gzip' in accept_header:
- accept_zip = 'application/gzip'
+ if "application/zip" in accept_header or "*/*" in accept_header:
+ accept_zip = "application/zip"
+ elif "application/gzip" in accept_header:
+ accept_zip = "application/gzip"
if not accept_text and not accept_zip:
- raise EngineException("provide request header 'Accept' with 'application/zip' or 'text/plain'",
- http_code=HTTPStatus.NOT_ACCEPTABLE)
+ raise EngineException(
+ "provide request header 'Accept' with 'application/zip' or 'text/plain'",
+ http_code=HTTPStatus.NOT_ACCEPTABLE,
+ )
content = self.show(session, _id)
if content["_admin"]["onboardingState"] != "ONBOARDED":
- raise EngineException("Cannot get content because this resource is not at 'ONBOARDED' state. "
- "onboardingState is {}".format(content["_admin"]["onboardingState"]),
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "Cannot get content because this resource is not at 'ONBOARDED' state. "
+ "onboardingState is {}".format(content["_admin"]["onboardingState"]),
+ http_code=HTTPStatus.CONFLICT,
+ )
storage = content["_admin"]["storage"]
if path is not None and path != "$DESCRIPTOR": # artifacts
- if not storage.get('pkg-dir'):
- raise EngineException("Packages does not contains artifacts", http_code=HTTPStatus.BAD_REQUEST)
- if self.fs.file_exists((storage['folder'], storage['pkg-dir'], *path), 'dir'):
- folder_content = self.fs.dir_ls((storage['folder'], storage['pkg-dir'], *path))
+ if not storage.get("pkg-dir"):
+ raise EngineException(
+ "Packages does not contains artifacts",
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
+ if self.fs.file_exists(
+ (storage["folder"], storage["pkg-dir"], *path), "dir"
+ ):
+ folder_content = self.fs.dir_ls(
+ (storage["folder"], storage["pkg-dir"], *path)
+ )
return folder_content, "text/plain"
# TODO manage folders in http
else:
- return self.fs.file_open((storage['folder'], storage['pkg-dir'], *path), "rb"), \
- "application/octet-stream"
+ return (
+ self.fs.file_open(
+ (storage["folder"], storage["pkg-dir"], *path), "rb"
+ ),
+ "application/octet-stream",
+ )
# pkgtype accept ZIP TEXT -> result
# manyfiles yes X -> zip
# onefile yes no -> zip
# X yes -> text
contain_many_files = False
- if storage.get('pkg-dir'):
+ if storage.get("pkg-dir"):
# check if there are more than one file in the package, ignoring checksums.txt.
- pkg_files = self.fs.dir_ls((storage['folder'], storage['pkg-dir']))
- if len(pkg_files) >= 3 or (len(pkg_files) == 2 and 'checksums.txt' not in pkg_files):
+ pkg_files = self.fs.dir_ls((storage["folder"], storage["pkg-dir"]))
+ if len(pkg_files) >= 3 or (
+ len(pkg_files) == 2 and "checksums.txt" not in pkg_files
+ ):
contain_many_files = True
if accept_text and (not contain_many_files or path == "$DESCRIPTOR"):
- return self.fs.file_open((storage['folder'], storage['descriptor']), "r"), "text/plain"
+ return (
+ self.fs.file_open((storage["folder"], storage["descriptor"]), "r"),
+ "text/plain",
+ )
elif contain_many_files and not accept_zip:
- raise EngineException("Packages that contains several files need to be retrieved with 'application/zip'"
- "Accept header", http_code=HTTPStatus.NOT_ACCEPTABLE)
+ raise EngineException(
+ "Packages that contains several files need to be retrieved with 'application/zip'"
+ "Accept header",
+ http_code=HTTPStatus.NOT_ACCEPTABLE,
+ )
else:
- if not storage.get('zipfile'):
+ if not storage.get("zipfile"):
# TODO generate zipfile if not present
- raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in "
- "future versions", http_code=HTTPStatus.NOT_ACCEPTABLE)
- return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), accept_zip
+ raise EngineException(
+ "Only allowed 'text/plain' Accept header for this descriptor. To be solved in "
+ "future versions",
+ http_code=HTTPStatus.NOT_ACCEPTABLE,
+ )
+ return (
+ self.fs.file_open((storage["folder"], storage["zipfile"]), "rb"),
+ accept_zip,
+ )
def _remove_yang_prefixes_from_descriptor(self, descriptor):
new_descriptor = {}
new_v.append(self._remove_yang_prefixes_from_descriptor(x))
else:
new_v.append(x)
- new_descriptor[k.split(':')[-1]] = new_v
+ new_descriptor[k.split(":")[-1]] = new_v
return new_descriptor
def pyangbind_validation(self, item, data, force=False):
- raise EngineException("Not possible to validate '{}' item".format(item),
- http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Not possible to validate '{}' item".format(item),
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+ )
def _validate_input_edit(self, indata, content, force=False):
# not needed to validate with pyangbind becuase it will be validated at check_conflict_on_edit
if indata["operationalState"] in ("ENABLED", "DISABLED"):
indata["_admin"]["operationalState"] = indata.pop("operationalState")
else:
- raise EngineException("State '{}' is not a valid operational state"
- .format(indata["operationalState"]),
- http_code=HTTPStatus.BAD_REQUEST)
-
- # In the case of user defined data, we need to put the data in the root of the object
+ raise EngineException(
+ "State '{}' is not a valid operational state".format(
+ indata["operationalState"]
+ ),
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
+
+ # In the case of user defined data, we need to put the data in the root of the object
# to preserve current expected behaviour
if "userDefinedData" in indata:
data = indata.pop("userDefinedData")
if type(data) == dict:
indata["_admin"]["userDefinedData"] = data
else:
- raise EngineException("userDefinedData should be an object, but is '{}' instead"
- .format(type(data)),
- http_code=HTTPStatus.BAD_REQUEST)
-
- if ("operationalState" in indata["_admin"] and
- content["_admin"]["operationalState"] == indata["_admin"]["operationalState"]):
- raise EngineException("operationalState already {}".format(content["_admin"]["operationalState"]),
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "userDefinedData should be an object, but is '{}' instead".format(
+ type(data)
+ ),
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
+
+ if (
+ "operationalState" in indata["_admin"]
+ and content["_admin"]["operationalState"]
+ == indata["_admin"]["operationalState"]
+ ):
+ raise EngineException(
+ "operationalState already {}".format(
+ content["_admin"]["operationalState"]
+ ),
+ http_code=HTTPStatus.CONFLICT,
+ )
return indata
def pyangbind_validation(self, item, data, force=False):
if self._descriptor_data_is_in_old_format(data):
- raise EngineException("ERROR: Unsupported descriptor format. Please, use an ETSI SOL006 descriptor.",
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "ERROR: Unsupported descriptor format. Please, use an ETSI SOL006 descriptor.",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
try:
myvnfd = etsi_nfv_vnfd.etsi_nfv_vnfd()
- pybindJSONDecoder.load_ietf_json({'etsi-nfv-vnfd:vnfd': data}, None, None, obj=myvnfd,
- path_helper=True, skip_unknown=force)
+ pybindJSONDecoder.load_ietf_json(
+ {"etsi-nfv-vnfd:vnfd": data},
+ None,
+ None,
+ obj=myvnfd,
+ path_helper=True,
+ skip_unknown=force,
+ )
out = pybindJSON.dumps(myvnfd, mode="ietf")
desc_out = self._remove_envelop(yaml.safe_load(out))
desc_out = self._remove_yang_prefixes_from_descriptor(desc_out)
return utils.deep_update_dict(data, desc_out)
except Exception as e:
- raise EngineException("Error in pyangbind validation: {}".format(str(e)),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "Error in pyangbind validation: {}".format(str(e)),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
@staticmethod
def _descriptor_data_is_in_old_format(data):
- return ('vnfd-catalog' in data) or ('vnfd:vnfd-catalog' in data)
+ return ("vnfd-catalog" in data) or ("vnfd:vnfd-catalog" in data)
@staticmethod
def _remove_envelop(indata=None):
return {}
clean_indata = indata
- if clean_indata.get('etsi-nfv-vnfd:vnfd'):
- if not isinstance(clean_indata['etsi-nfv-vnfd:vnfd'], dict):
+ if clean_indata.get("etsi-nfv-vnfd:vnfd"):
+ if not isinstance(clean_indata["etsi-nfv-vnfd:vnfd"], dict):
raise EngineException("'etsi-nfv-vnfd:vnfd' must be a dict")
- clean_indata = clean_indata['etsi-nfv-vnfd:vnfd']
- elif clean_indata.get('vnfd'):
- if not isinstance(clean_indata['vnfd'], dict):
+ clean_indata = clean_indata["etsi-nfv-vnfd:vnfd"]
+ elif clean_indata.get("vnfd"):
+ if not isinstance(clean_indata["vnfd"], dict):
raise EngineException("'vnfd' must be dict")
- clean_indata = clean_indata['vnfd']
+ clean_indata = clean_indata["vnfd"]
return clean_indata
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- final_content = super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
# set type of vnfd
contains_pdu = False
# check vnfrs using this vnfd
_filter["vnfd-id"] = _id
if self.db.get_list("vnfrs", _filter):
- raise EngineException("There is at least one VNF instance using this descriptor",
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one VNF instance using this descriptor",
+ http_code=HTTPStatus.CONFLICT,
+ )
# check NSD referencing this VNFD
del _filter["vnfd-id"]
_filter["vnfd-id"] = descriptor_id
if self.db.get_list("nsds", _filter):
- raise EngineException("There is at least one NS package referencing this descriptor",
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one NS package referencing this descriptor",
+ http_code=HTTPStatus.CONFLICT,
+ )
def _validate_input_new(self, indata, storage_params, force=False):
indata.pop("onboardingState", None)
if not indata.get("vdu"):
return
if not indata.get("mgmt-cp"):
- raise EngineException("'mgmt-cp' is a mandatory field and it is not defined",
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "'mgmt-cp' is a mandatory field and it is not defined",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
for cp in get_iterable(indata.get("ext-cpd")):
if cp["id"] == indata["mgmt-cp"]:
break
else:
- raise EngineException("mgmt-cp='{}' must match an existing ext-cpd".format(indata["mgmt-cp"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "mgmt-cp='{}' must match an existing ext-cpd".format(indata["mgmt-cp"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
@staticmethod
def validate_vdu_internal_connection_points(vdu):
for cpd in get_iterable(vdu.get("int-cpd")):
cpd_id = cpd.get("id")
if cpd_id and cpd_id in int_cpds:
- raise EngineException("vdu[id='{}']:int-cpd[id='{}'] is already used by other int-cpd"
- .format(vdu["id"], cpd_id),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "vdu[id='{}']:int-cpd[id='{}'] is already used by other int-cpd".format(
+ vdu["id"], cpd_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
int_cpds.add(cpd_id)
@staticmethod
for cpd in get_iterable(indata.get("ext-cpd")):
cpd_id = cpd.get("id")
if cpd_id and cpd_id in ext_cpds:
- raise EngineException("ext-cpd[id='{}'] is already used by other ext-cpd".format(cpd_id),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "ext-cpd[id='{}'] is already used by other ext-cpd".format(cpd_id),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
ext_cpds.add(cpd_id)
int_cpd = cpd.get("int-cpd")
if int_cpd:
if (int_cpd.get("vdu-id"), int_cpd.get("cpd")) not in all_vdus_int_cpds:
- raise EngineException("ext-cpd[id='{}']:int-cpd must match an existing vdu int-cpd".format(cpd_id),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "ext-cpd[id='{}']:int-cpd must match an existing vdu int-cpd".format(
+ cpd_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
# TODO: Validate k8s-cluster-net points to a valid k8s-cluster:nets ?
def _validate_vdu_charms_in_package(self, storage_params, indata):
for df in indata["df"]:
- if "lcm-operations-configuration" in df and "operate-vnf-op-config" in df["lcm-operations-configuration"]:
- configs = df["lcm-operations-configuration"]["operate-vnf-op-config"].get("day1-2", [])
+ if (
+ "lcm-operations-configuration" in df
+ and "operate-vnf-op-config" in df["lcm-operations-configuration"]
+ ):
+ configs = df["lcm-operations-configuration"][
+ "operate-vnf-op-config"
+ ].get("day1-2", [])
vdus = df.get("vdu-profile", [])
for vdu in vdus:
for config in configs:
if config["id"] == vdu["id"] and utils.find_in_list(
config.get("execution-environment-list", []),
- lambda ee: "juju" in ee
+ lambda ee: "juju" in ee,
):
- if not self._validate_package_folders(storage_params, 'charms'):
- raise EngineException("Charm defined in vnf[id={}] but not present in "
- "package".format(indata["id"]))
+ if not self._validate_package_folders(
+ storage_params, "charms"
+ ):
+ raise EngineException(
+ "Charm defined in vnf[id={}] but not present in "
+ "package".format(indata["id"])
+ )
def _validate_vdu_cloud_init_in_package(self, storage_params, vdu, indata):
if not vdu.get("cloud-init-file"):
return
- if not self._validate_package_folders(storage_params, 'cloud_init', vdu["cloud-init-file"]):
- raise EngineException("Cloud-init defined in vnf[id={}]:vdu[id={}] but not present in "
- "package".format(indata["id"], vdu["id"]))
+ if not self._validate_package_folders(
+ storage_params, "cloud_init", vdu["cloud-init-file"]
+ ):
+ raise EngineException(
+ "Cloud-init defined in vnf[id={}]:vdu[id={}] but not present in "
+ "package".format(indata["id"], vdu["id"])
+ )
def _validate_vnf_charms_in_package(self, storage_params, indata):
# Get VNF configuration through new container
- for deployment_flavor in indata.get('df', []):
+ for deployment_flavor in indata.get("df", []):
if "lcm-operations-configuration" not in deployment_flavor:
return
- if "operate-vnf-op-config" not in deployment_flavor["lcm-operations-configuration"]:
+ if (
+ "operate-vnf-op-config"
+ not in deployment_flavor["lcm-operations-configuration"]
+ ):
return
- for day_1_2_config in deployment_flavor["lcm-operations-configuration"]["operate-vnf-op-config"]["day1-2"]:
+ for day_1_2_config in deployment_flavor["lcm-operations-configuration"][
+ "operate-vnf-op-config"
+ ]["day1-2"]:
if day_1_2_config["id"] == indata["id"]:
if utils.find_in_list(
day_1_2_config.get("execution-environment-list", []),
- lambda ee: "juju" in ee
+ lambda ee: "juju" in ee,
):
- if not self._validate_package_folders(storage_params, 'charms'):
- raise EngineException("Charm defined in vnf[id={}] but not present in "
- "package".format(indata["id"]))
+ if not self._validate_package_folders(storage_params, "charms"):
+ raise EngineException(
+ "Charm defined in vnf[id={}] but not present in "
+ "package".format(indata["id"])
+ )
def _validate_package_folders(self, storage_params, folder, file=None):
if not storage_params or not storage_params.get("pkg-dir"):
return False
else:
- if self.fs.file_exists("{}_".format(storage_params["folder"]), 'dir'):
- f = "{}_/{}/{}".format(storage_params["folder"], storage_params["pkg-dir"], folder)
+ if self.fs.file_exists("{}_".format(storage_params["folder"]), "dir"):
+ f = "{}_/{}/{}".format(
+ storage_params["folder"], storage_params["pkg-dir"], folder
+ )
else:
- f = "{}/{}/{}".format(storage_params["folder"], storage_params["pkg-dir"], folder)
+ f = "{}/{}/{}".format(
+ storage_params["folder"], storage_params["pkg-dir"], folder
+ )
if file:
- return self.fs.file_exists("{}/{}".format(f, file), 'file')
+ return self.fs.file_exists("{}/{}".format(f, file), "file")
else:
- if self.fs.file_exists(f, 'dir'):
+ if self.fs.file_exists(f, "dir"):
if self.fs.dir_ls(f):
return True
return False
for ivld in get_iterable(indata.get("int-virtual-link-desc")):
ivld_id = ivld.get("id")
if ivld_id and ivld_id in all_ivld_ids:
- raise EngineException("Duplicated VLD id in int-virtual-link-desc[id={}]".format(ivld_id),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "Duplicated VLD id in int-virtual-link-desc[id={}]".format(ivld_id),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
else:
all_ivld_ids.add(ivld_id)
if int_cpd_ivld_id and int_cpd_ivld_id not in all_ivld_ids:
raise EngineException(
"vdu[id='{}']:int-cpd[id='{}']:int-virtual-link-desc='{}' must match an existing "
- "int-virtual-link-desc".format(vdu["id"], int_cpd["id"], int_cpd_ivld_id),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ "int-virtual-link-desc".format(
+ vdu["id"], int_cpd["id"], int_cpd_ivld_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
for df in get_iterable(indata.get("df")):
for vlp in get_iterable(df.get("virtual-link-profile")):
vlp_ivld_id = vlp.get("id")
if vlp_ivld_id and vlp_ivld_id not in all_ivld_ids:
- raise EngineException("df[id='{}']:virtual-link-profile='{}' must match an existing "
- "int-virtual-link-desc".format(df["id"], vlp_ivld_id),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "df[id='{}']:virtual-link-profile='{}' must match an existing "
+ "int-virtual-link-desc".format(df["id"], vlp_ivld_id),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
@staticmethod
def validate_monitoring_params(indata):
for mp in get_iterable(ivld.get("monitoring-parameters")):
mp_id = mp.get("id")
if mp_id and mp_id in all_monitoring_params:
- raise EngineException("Duplicated monitoring-parameter id in "
- "int-virtual-link-desc[id='{}']:monitoring-parameters[id='{}']"
- .format(ivld["id"], mp_id),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "Duplicated monitoring-parameter id in "
+ "int-virtual-link-desc[id='{}']:monitoring-parameters[id='{}']".format(
+ ivld["id"], mp_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
else:
all_monitoring_params.add(mp_id)
for mp in get_iterable(vdu.get("monitoring-parameter")):
mp_id = mp.get("id")
if mp_id and mp_id in all_monitoring_params:
- raise EngineException("Duplicated monitoring-parameter id in "
- "vdu[id='{}']:monitoring-parameter[id='{}']"
- .format(vdu["id"], mp_id),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "Duplicated monitoring-parameter id in "
+ "vdu[id='{}']:monitoring-parameter[id='{}']".format(
+ vdu["id"], mp_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
else:
all_monitoring_params.add(mp_id)
for mp in get_iterable(df.get("monitoring-parameter")):
mp_id = mp.get("id")
if mp_id and mp_id in all_monitoring_params:
- raise EngineException("Duplicated monitoring-parameter id in "
- "df[id='{}']:monitoring-parameter[id='{}']"
- .format(df["id"], mp_id),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "Duplicated monitoring-parameter id in "
+ "df[id='{}']:monitoring-parameter[id='{}']".format(
+ df["id"], mp_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
else:
all_monitoring_params.add(mp_id)
for sp in get_iterable(sa.get("scaling-policy")):
for sc in get_iterable(sp.get("scaling-criteria")):
sc_monitoring_param = sc.get("vnf-monitoring-param-ref")
- if sc_monitoring_param and sc_monitoring_param not in all_monitoring_params:
- raise EngineException("df[id='{}']:scaling-aspect[id='{}']:scaling-policy"
- "[name='{}']:scaling-criteria[name='{}']: "
- "vnf-monitoring-param-ref='{}' not defined in any monitoring-param"
- .format(df["id"], sa["id"], sp["name"], sc["name"],
- sc_monitoring_param),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ if (
+ sc_monitoring_param
+ and sc_monitoring_param not in all_monitoring_params
+ ):
+ raise EngineException(
+ "df[id='{}']:scaling-aspect[id='{}']:scaling-policy"
+ "[name='{}']:scaling-criteria[name='{}']: "
+ "vnf-monitoring-param-ref='{}' not defined in any monitoring-param".format(
+ df["id"],
+ sa["id"],
+ sp["name"],
+ sc["name"],
+ sc_monitoring_param,
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
for sca in get_iterable(sa.get("scaling-config-action")):
- if "lcm-operations-configuration" not in df \
- or "operate-vnf-op-config" not in df["lcm-operations-configuration"] \
+ if (
+ "lcm-operations-configuration" not in df
+ or "operate-vnf-op-config"
+ not in df["lcm-operations-configuration"]
or not utils.find_in_list(
- df["lcm-operations-configuration"]["operate-vnf-op-config"].get("day1-2", []),
- lambda config: config["id"] == indata["id"]):
- raise EngineException("'day1-2 configuration' not defined in the descriptor but it is "
- "referenced by df[id='{}']:scaling-aspect[id='{}']:scaling-config-action"
- .format(df["id"], sa["id"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ df["lcm-operations-configuration"][
+ "operate-vnf-op-config"
+ ].get("day1-2", []),
+ lambda config: config["id"] == indata["id"],
+ )
+ ):
+ raise EngineException(
+ "'day1-2 configuration' not defined in the descriptor but it is "
+ "referenced by df[id='{}']:scaling-aspect[id='{}']:scaling-config-action".format(
+ df["id"], sa["id"]
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
for configuration in get_iterable(
- df["lcm-operations-configuration"]["operate-vnf-op-config"].get("day1-2", [])
+ df["lcm-operations-configuration"]["operate-vnf-op-config"].get(
+ "day1-2", []
+ )
):
- for primitive in get_iterable(configuration.get("config-primitive")):
- if primitive["name"] == sca["vnf-config-primitive-name-ref"]:
+ for primitive in get_iterable(
+ configuration.get("config-primitive")
+ ):
+ if (
+ primitive["name"]
+ == sca["vnf-config-primitive-name-ref"]
+ ):
break
else:
- raise EngineException("df[id='{}']:scaling-aspect[id='{}']:scaling-config-action:vnf-"
- "config-primitive-name-ref='{}' does not match any "
- "day1-2 configuration:config-primitive:name"
- .format(df["id"], sa["id"], sca["vnf-config-primitive-name-ref"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "df[id='{}']:scaling-aspect[id='{}']:scaling-config-action:vnf-"
+ "config-primitive-name-ref='{}' does not match any "
+ "day1-2 configuration:config-primitive:name".format(
+ df["id"],
+ sa["id"],
+ sca["vnf-config-primitive-name-ref"],
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
def delete_extra(self, session, _id, db_content, not_send_msg=None):
"""
links = {}
links["self"] = {"href": "/vnfpkgm/v1/vnf_packages/{}".format(data["_id"])}
links["vnfd"] = {"href": "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(data["_id"])}
- links["packageContent"] = {"href": "/vnfpkgm/v1/vnf_packages/{}/package_content".format(data["_id"])}
+ links["packageContent"] = {
+ "href": "/vnfpkgm/v1/vnf_packages/{}/package_content".format(data["_id"])
+ }
data["_links"] = links
return super().sol005_projection(data)
def pyangbind_validation(self, item, data, force=False):
if self._descriptor_data_is_in_old_format(data):
- raise EngineException("ERROR: Unsupported descriptor format. Please, use an ETSI SOL006 descriptor.",
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "ERROR: Unsupported descriptor format. Please, use an ETSI SOL006 descriptor.",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
try:
- nsd_vnf_profiles = data.get('df', [{}])[0].get('vnf-profile', [])
+ nsd_vnf_profiles = data.get("df", [{}])[0].get("vnf-profile", [])
mynsd = etsi_nfv_nsd.etsi_nfv_nsd()
- pybindJSONDecoder.load_ietf_json({'nsd': {'nsd': [data]}}, None, None, obj=mynsd,
- path_helper=True, skip_unknown=force)
+ pybindJSONDecoder.load_ietf_json(
+ {"nsd": {"nsd": [data]}},
+ None,
+ None,
+ obj=mynsd,
+ path_helper=True,
+ skip_unknown=force,
+ )
out = pybindJSON.dumps(mynsd, mode="ietf")
desc_out = self._remove_envelop(yaml.safe_load(out))
desc_out = self._remove_yang_prefixes_from_descriptor(desc_out)
if nsd_vnf_profiles:
- desc_out['df'][0]['vnf-profile'] = nsd_vnf_profiles
+ desc_out["df"][0]["vnf-profile"] = nsd_vnf_profiles
return desc_out
except Exception as e:
- raise EngineException("Error in pyangbind validation: {}".format(str(e)),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "Error in pyangbind validation: {}".format(str(e)),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
@staticmethod
def _descriptor_data_is_in_old_format(data):
- return ('nsd-catalog' in data) or ('nsd:nsd-catalog' in data)
+ return ("nsd-catalog" in data) or ("nsd:nsd-catalog" in data)
@staticmethod
def _remove_envelop(indata=None):
return {}
clean_indata = indata
- if clean_indata.get('nsd'):
- clean_indata = clean_indata['nsd']
- elif clean_indata.get('etsi-nfv-nsd:nsd'):
- clean_indata = clean_indata['etsi-nfv-nsd:nsd']
- if clean_indata.get('nsd'):
- if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1:
+ if clean_indata.get("nsd"):
+ clean_indata = clean_indata["nsd"]
+ elif clean_indata.get("etsi-nfv-nsd:nsd"):
+ clean_indata = clean_indata["etsi-nfv-nsd:nsd"]
+ if clean_indata.get("nsd"):
+ if (
+ not isinstance(clean_indata["nsd"], list)
+ or len(clean_indata["nsd"]) != 1
+ ):
raise EngineException("'nsd' must be a list of only one element")
- clean_indata = clean_indata['nsd'][0]
+ clean_indata = clean_indata["nsd"][0]
return clean_indata
def _validate_input_new(self, indata, storage_params, force=False):
for vlp in get_iterable(df.get("virtual-link-profile")):
if vld_id and vld_id == vlp.get("virtual-link-desc-id"):
if vlp.get("virtual-link-protocol-data"):
- raise EngineException("Error at df[id='{}']:virtual-link-profile[id='{}']:virtual-link-"
- "protocol-data You cannot set a virtual-link-protocol-data "
- "when mgmt-network is True"
- .format(df["id"], vlp["id"]), http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "Error at df[id='{}']:virtual-link-profile[id='{}']:virtual-link-"
+ "protocol-data You cannot set a virtual-link-protocol-data "
+ "when mgmt-network is True".format(df["id"], vlp["id"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
@staticmethod
def validate_vnf_profiles_vnfd_id(indata):
for vnf_profile in get_iterable(df.get("vnf-profile")):
vnfd_id = vnf_profile.get("vnfd-id")
if vnfd_id and vnfd_id not in all_vnfd_ids:
- raise EngineException("Error at df[id='{}']:vnf_profile[id='{}']:vnfd-id='{}' "
- "does not match any vnfd-id".format(df["id"], vnf_profile["id"], vnfd_id),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "Error at df[id='{}']:vnf_profile[id='{}']:vnfd-id='{}' "
+ "does not match any vnfd-id".format(
+ df["id"], vnf_profile["id"], vnfd_id
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
def _validate_input_edit(self, indata, content, force=False):
# not needed to validate with pyangbind becuase it will be validated at check_conflict_on_edit
"""
indata looks as follows:
- - In the new case (conformant)
- {'nsdOperationalState': 'DISABLED', 'userDefinedData': {'id': 'string23',
+ - In the new case (conformant)
+ {'nsdOperationalState': 'DISABLED', 'userDefinedData': {'id': 'string23',
'_id': 'c6ddc544-cede-4b94-9ebe-be07b298a3c1', 'name': 'simon46'}}
- In the old case (backwards-compatible)
{'id': 'string23', '_id': 'c6ddc544-cede-4b94-9ebe-be07b298a3c1', 'name': 'simon46'}
if indata["nsdOperationalState"] in ("ENABLED", "DISABLED"):
indata["_admin"]["operationalState"] = indata.pop("nsdOperationalState")
else:
- raise EngineException("State '{}' is not a valid operational state"
- .format(indata["nsdOperationalState"]),
- http_code=HTTPStatus.BAD_REQUEST)
-
- # In the case of user defined data, we need to put the data in the root of the object
+ raise EngineException(
+ "State '{}' is not a valid operational state".format(
+ indata["nsdOperationalState"]
+ ),
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
+
+ # In the case of user defined data, we need to put the data in the root of the object
# to preserve current expected behaviour
if "userDefinedData" in indata:
data = indata.pop("userDefinedData")
if type(data) == dict:
indata["_admin"]["userDefinedData"] = data
else:
- raise EngineException("userDefinedData should be an object, but is '{}' instead"
- .format(type(data)),
- http_code=HTTPStatus.BAD_REQUEST)
- if ("operationalState" in indata["_admin"] and
- content["_admin"]["operationalState"] == indata["_admin"]["operationalState"]):
- raise EngineException("nsdOperationalState already {}".format(content["_admin"]["operationalState"]),
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "userDefinedData should be an object, but is '{}' instead".format(
+ type(data)
+ ),
+ http_code=HTTPStatus.BAD_REQUEST,
+ )
+ if (
+ "operationalState" in indata["_admin"]
+ and content["_admin"]["operationalState"]
+ == indata["_admin"]["operationalState"]
+ ):
+ raise EngineException(
+ "nsdOperationalState already {}".format(
+ content["_admin"]["operationalState"]
+ ),
+ http_code=HTTPStatus.CONFLICT,
+ )
return indata
def _check_descriptor_dependencies(self, session, descriptor):
query_filter["id"] = vnfd_id
vnf_list = self.db.get_list("vnfds", query_filter)
if not vnf_list:
- raise EngineException("Descriptor error at 'vnfd-id'='{}' references a non "
- "existing vnfd".format(vnfd_id), http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "Descriptor error at 'vnfd-id'='{}' references a non "
+ "existing vnfd".format(vnfd_id),
+ http_code=HTTPStatus.CONFLICT,
+ )
vnfds_index[vnfd_id] = vnf_list[0]
return vnfds_index
vnfd = vnfds_index.get(vnf_profile["vnfd-id"])
all_vnfd_ext_cpds = set()
for ext_cpd in get_iterable(vnfd.get("ext-cpd")):
- if ext_cpd.get('id'):
- all_vnfd_ext_cpds.add(ext_cpd.get('id'))
+ if ext_cpd.get("id"):
+ all_vnfd_ext_cpds.add(ext_cpd.get("id"))
- for virtual_link in get_iterable(vnf_profile.get("virtual-link-connectivity")):
+ for virtual_link in get_iterable(
+ vnf_profile.get("virtual-link-connectivity")
+ ):
for vl_cpd in get_iterable(virtual_link.get("constituent-cpd-id")):
- vl_cpd_id = vl_cpd.get('constituent-cpd-id')
+ vl_cpd_id = vl_cpd.get("constituent-cpd-id")
if vl_cpd_id and vl_cpd_id not in all_vnfd_ext_cpds:
- raise EngineException("Error at df[id='{}']:vnf-profile[id='{}']:virtual-link-connectivity"
- "[virtual-link-profile-id='{}']:constituent-cpd-id='{}' references a "
- "non existing ext-cpd:id inside vnfd '{}'"
- .format(df["id"], vnf_profile["id"],
- virtual_link["virtual-link-profile-id"], vl_cpd_id, vnfd["id"]),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "Error at df[id='{}']:vnf-profile[id='{}']:virtual-link-connectivity"
+ "[virtual-link-profile-id='{}']:constituent-cpd-id='{}' references a "
+ "non existing ext-cpd:id inside vnfd '{}'".format(
+ df["id"],
+ vnf_profile["id"],
+ virtual_link["virtual-link-profile-id"],
+ vl_cpd_id,
+ vnfd["id"],
+ ),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- final_content = super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
self._check_descriptor_dependencies(session, final_content)
_filter = self._get_project_filter(session)
_filter["nsd-id"] = _id
if self.db.get_list("nsrs", _filter):
- raise EngineException("There is at least one NS instance using this descriptor",
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one NS instance using this descriptor",
+ http_code=HTTPStatus.CONFLICT,
+ )
# check NSD referenced by NST
del _filter["nsd-id"]
_filter["netslice-subnet.ANYINDEX.nsd-ref"] = descriptor_id
if self.db.get_list("nsts", _filter):
- raise EngineException("There is at least one NetSlice Template referencing this descriptor",
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one NetSlice Template referencing this descriptor",
+ http_code=HTTPStatus.CONFLICT,
+ )
def sol005_projection(self, data):
data["nsdOnboardingState"] = data["_admin"]["onboardingState"]
links = {}
links["self"] = {"href": "/nsd/v1/ns_descriptors/{}".format(data["_id"])}
- links["nsd_content"] = {"href": "/nsd/v1/ns_descriptors/{}/nsd_content".format(data["_id"])}
+ links["nsd_content"] = {
+ "href": "/nsd/v1/ns_descriptors/{}/nsd_content".format(data["_id"])
+ }
data["_links"] = links
return super().sol005_projection(data)
def pyangbind_validation(self, item, data, force=False):
try:
mynst = nst_im()
- pybindJSONDecoder.load_ietf_json({'nst': [data]}, None, None, obj=mynst,
- path_helper=True, skip_unknown=force)
+ pybindJSONDecoder.load_ietf_json(
+ {"nst": [data]},
+ None,
+ None,
+ obj=mynst,
+ path_helper=True,
+ skip_unknown=force,
+ )
out = pybindJSON.dumps(mynst, mode="ietf")
desc_out = self._remove_envelop(yaml.safe_load(out))
return desc_out
except Exception as e:
- raise EngineException("Error in pyangbind validation: {}".format(str(e)),
- http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ raise EngineException(
+ "Error in pyangbind validation: {}".format(str(e)),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY,
+ )
@staticmethod
def _remove_envelop(indata=None):
return {}
clean_indata = indata
- if clean_indata.get('nst'):
- if not isinstance(clean_indata['nst'], list) or len(clean_indata['nst']) != 1:
+ if clean_indata.get("nst"):
+ if (
+ not isinstance(clean_indata["nst"], list)
+ or len(clean_indata["nst"]) != 1
+ ):
raise EngineException("'nst' must be a list only one element")
- clean_indata = clean_indata['nst'][0]
- elif clean_indata.get('nst:nst'):
- if not isinstance(clean_indata['nst:nst'], list) or len(clean_indata['nst:nst']) != 1:
+ clean_indata = clean_indata["nst"][0]
+ elif clean_indata.get("nst:nst"):
+ if (
+ not isinstance(clean_indata["nst:nst"], list)
+ or len(clean_indata["nst:nst"]) != 1
+ ):
raise EngineException("'nst:nst' must be a list only one element")
- clean_indata = clean_indata['nst:nst'][0]
+ clean_indata = clean_indata["nst:nst"][0]
return clean_indata
def _validate_input_new(self, indata, storage_params, force=False):
filter_q = self._get_project_filter(session)
filter_q["id"] = nsd_id
if not self.db.get_list("nsds", filter_q):
- raise EngineException("Descriptor error at 'netslice-subnet':'nsd-ref'='{}' references a non "
- "existing nsd".format(nsd_id), http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "Descriptor error at 'netslice-subnet':'nsd-ref'='{}' references a non "
+ "existing nsd".format(nsd_id),
+ http_code=HTTPStatus.CONFLICT,
+ )
def check_conflict_on_edit(self, session, final_content, edit_content, _id):
- final_content = super().check_conflict_on_edit(session, final_content, edit_content, _id)
+ final_content = super().check_conflict_on_edit(
+ session, final_content, edit_content, _id
+ )
self._check_descriptor_dependencies(session, final_content)
return final_content
_filter = self._get_project_filter(session)
_filter["_admin.nst-id"] = _id
if self.db.get_list("nsis", _filter):
- raise EngineException("there is at least one Netslice Instance using this descriptor",
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "there is at least one Netslice Instance using this descriptor",
+ http_code=HTTPStatus.CONFLICT,
+ )
def sol005_projection(self, data):
data["onboardingState"] = data["_admin"]["onboardingState"]
_filter = self._get_project_filter(session)
_filter["vdur.pdu-id"] = _id
if self.db.get_list("vnfrs", _filter):
- raise EngineException("There is at least one VNF instance using this PDU", http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "There is at least one VNF instance using this PDU",
+ http_code=HTTPStatus.CONFLICT,
+ )
class VnfPkgOpTopic(BaseTopic):
BaseTopic.__init__(self, db, fs, msg, auth)
def edit(self, session, _id, indata=None, kwargs=None, content=None):
- raise EngineException("Method 'edit' not allowed for topic '{}'".format(self.topic),
- HTTPStatus.METHOD_NOT_ALLOWED)
+ raise EngineException(
+ "Method 'edit' not allowed for topic '{}'".format(self.topic),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
def delete(self, session, _id, dry_run=False):
- raise EngineException("Method 'delete' not allowed for topic '{}'".format(self.topic),
- HTTPStatus.METHOD_NOT_ALLOWED)
+ raise EngineException(
+ "Method 'delete' not allowed for topic '{}'".format(self.topic),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
def delete_list(self, session, filter_q=None):
- raise EngineException("Method 'delete_list' not allowed for topic '{}'".format(self.topic),
- HTTPStatus.METHOD_NOT_ALLOWED)
+ raise EngineException(
+ "Method 'delete_list' not allowed for topic '{}'".format(self.topic),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
def new(self, rollback, session, indata=None, kwargs=None, headers=None):
"""
juju_bundle = kdu.get("juju-bundle")
break
else:
- raise EngineException("Not found vnfd[id='{}']:kdu[name='{}']".format(vnfpkg_id, kdu_name))
+ raise EngineException(
+ "Not found vnfd[id='{}']:kdu[name='{}']".format(vnfpkg_id, kdu_name)
+ )
if helm_chart:
indata["helm-chart"] = helm_chart
match = fullmatch(r"([^/]*)/([^/]*)", helm_chart)
match = fullmatch(r"([^/]*)/([^/]*)", juju_bundle)
repo_name = match.group(1) if match else None
else:
- raise EngineException("Found neither 'helm-chart' nor 'juju-bundle' in vnfd[id='{}']:kdu[name='{}']"
- .format(vnfpkg_id, kdu_name))
+ raise EngineException(
+ "Found neither 'helm-chart' nor 'juju-bundle' in vnfd[id='{}']:kdu[name='{}']".format(
+ vnfpkg_id, kdu_name
+ )
+ )
if repo_name:
del filter_q["_id"]
filter_q["name"] = repo_name
"links": {
"self": "/osm/vnfpkgm/v1/vnfpkg_op_occs/" + vnfpkgop_id,
"vnfpkg": "/osm/vnfpkgm/v1/vnf_packages/" + vnfpkg_id,
- }
+ },
}
- self.format_on_new(vnfpkgop_desc, session["project_id"], make_public=session["public"])
+ self.format_on_new(
+ vnfpkgop_desc, session["project_id"], make_public=session["public"]
+ )
ctime = vnfpkgop_desc["_admin"]["created"]
vnfpkgop_desc["statusEnteredTime"] = ctime
vnfpkgop_desc["startTime"] = ctime
# limitations under the License.
import logging
+
# import yaml
-from osm_common import dbmongo, dbmemory, fslocal, fsmongo, msglocal, msgkafka, version as common_version
+from osm_common import (
+ dbmongo,
+ dbmemory,
+ fslocal,
+ fsmongo,
+ msglocal,
+ msgkafka,
+ version as common_version,
+)
from osm_common.dbbase import DbException
from osm_common.fsbase import FsException
from osm_common.msgbase import MsgException
from osm_nbi.admin_topics import K8sClusterTopic, K8sRepoTopic, OsmRepoTopic
from osm_nbi.admin_topics import VcaTopic
from osm_nbi.admin_topics import UserTopicAuth, ProjectTopicAuth, RoleTopicAuth
-from osm_nbi.descriptor_topics import VnfdTopic, NsdTopic, PduTopic, NstTopic, VnfPkgOpTopic
-from osm_nbi.instance_topics import NsrTopic, VnfrTopic, NsLcmOpTopic, NsiTopic, NsiLcmOpTopic
+from osm_nbi.descriptor_topics import (
+ VnfdTopic,
+ NsdTopic,
+ PduTopic,
+ NstTopic,
+ VnfPkgOpTopic,
+)
+from osm_nbi.instance_topics import (
+ NsrTopic,
+ VnfrTopic,
+ NsLcmOpTopic,
+ NsiTopic,
+ NsiLcmOpTopic,
+)
from osm_nbi.pmjobs_topics import PmJobsTopic
from osm_nbi.subscription_topics import NslcmSubscriptionsTopic
from base64 import b64encode
-from os import urandom # , path
+from os import urandom # , path
from threading import Lock
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
"vca": VcaTopic,
"k8srepos": K8sRepoTopic,
"osmrepos": OsmRepoTopic,
- "users": UserTopicAuth, # Valid for both internal and keystone authentication backends
- "projects": ProjectTopicAuth, # Valid for both internal and keystone authentication backends
- "roles": RoleTopicAuth, # Valid for both internal and keystone authentication backends
+ "users": UserTopicAuth, # Valid for both internal and keystone authentication backends
+ "projects": ProjectTopicAuth, # Valid for both internal and keystone authentication backends
+ "roles": RoleTopicAuth, # Valid for both internal and keystone authentication backends
"nsis": NsiTopic,
"nsilcmops": NsiLcmOpTopic,
"vnfpkgops": VnfPkgOpTopic,
self.config = config
# check right version of common
if versiontuple(common_version) < versiontuple(min_common_version):
- raise EngineException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
- common_version, min_common_version))
+ raise EngineException(
+ "Not compatible osm/common version '{}'. Needed '{}' or higher".format(
+ common_version, min_common_version
+ )
+ )
try:
if not self.db:
self.db = dbmemory.DbMemory()
self.db.db_connect(config["database"])
else:
- raise EngineException("Invalid configuration param '{}' at '[database]':'driver'".format(
- config["database"]["driver"]))
+ raise EngineException(
+ "Invalid configuration param '{}' at '[database]':'driver'".format(
+ config["database"]["driver"]
+ )
+ )
if not self.fs:
if config["storage"]["driver"] == "local":
self.fs = fslocal.FsLocal()
self.fs = fsmongo.FsMongo()
self.fs.fs_connect(config["storage"])
else:
- raise EngineException("Invalid configuration param '{}' at '[storage]':'driver'".format(
- config["storage"]["driver"]))
+ raise EngineException(
+ "Invalid configuration param '{}' at '[storage]':'driver'".format(
+ config["storage"]["driver"]
+ )
+ )
if not self.msg:
if config["message"]["driver"] == "local":
self.msg = msglocal.MsgLocal()
self.msg = msgkafka.MsgKafka()
self.msg.connect(config["message"])
else:
- raise EngineException("Invalid configuration param '{}' at '[message]':'driver'".format(
- config["message"]["driver"]))
+ raise EngineException(
+ "Invalid configuration param '{}' at '[message]':'driver'".format(
+ config["message"]["driver"]
+ )
+ )
if not self.authconn:
if config["authentication"]["backend"] == "keystone":
- self.authconn = AuthconnKeystone(config["authentication"], self.db,
- self.authenticator.role_permissions)
+ self.authconn = AuthconnKeystone(
+ config["authentication"],
+ self.db,
+ self.authenticator.role_permissions,
+ )
elif config["authentication"]["backend"] == "tacacs":
- self.authconn = AuthconnTacacs(config["authentication"], self.db,
- self.authenticator.role_permissions)
+ self.authconn = AuthconnTacacs(
+ config["authentication"],
+ self.db,
+ self.authenticator.role_permissions,
+ )
else:
- self.authconn = AuthconnInternal(config["authentication"], self.db,
- self.authenticator.role_permissions)
+ self.authconn = AuthconnInternal(
+ config["authentication"],
+ self.db,
+ self.authenticator.role_permissions,
+ )
# if not self.operations:
# if "resources_to_operations" in config["rbac"]:
# resources_to_operations_file = config["rbac"]["resources_to_operations"]
for topic, topic_class in self.map_from_topic_to_class.items():
# if self.auth and topic_class in (UserTopicAuth, ProjectTopicAuth):
# self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.auth)
- self.map_topic[topic] = topic_class(self.db, self.fs, self.msg, self.authconn)
-
- self.map_topic["pm_jobs"] = PmJobsTopic(self.db, config["prometheus"].get("host"),
- config["prometheus"].get("port"))
+ self.map_topic[topic] = topic_class(
+ self.db, self.fs, self.msg, self.authconn
+ )
+
+ self.map_topic["pm_jobs"] = PmJobsTopic(
+ self.db,
+ config["prometheus"].get("host"),
+ config["prometheus"].get("port"),
+ )
except (DbException, FsException, MsgException) as e:
raise EngineException(str(e), http_code=e.http_code)
except (DbException, FsException, MsgException) as e:
raise EngineException(str(e), http_code=e.http_code)
- def new_item(self, rollback, session, topic, indata=None, kwargs=None, headers=None):
+ def new_item(
+ self, rollback, session, topic, indata=None, kwargs=None, headers=None
+ ):
"""
Creates a new entry into database. For nsds and vnfds it creates an almost empty DISABLED entry,
that must be completed with a call to method upload_content
:return: _id: identity of the inserted data.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
with self.write_lock:
return self.map_topic[topic].new(rollback, session, indata, kwargs, headers)
:return: _id: identity of the inserted data.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
with self.write_lock:
- return self.map_topic[topic].upload_content(session, _id, indata, kwargs, headers)
+ return self.map_topic[topic].upload_content(
+ session, _id, indata, kwargs, headers
+ )
def get_item_list(self, session, topic, filter_q=None, api_req=False):
"""
:return: The list, it can be empty if no one match the filter_q.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
return self.map_topic[topic].list(session, filter_q, api_req)
def get_item(self, session, topic, _id, api_req=False):
:return: dictionary, raise exception if not found.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
return self.map_topic[topic].show(session, _id, api_req)
def get_file(self, session, topic, _id, path=None, accept_header=None):
:return: opened file plus Accept format or raises an exception
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
return self.map_topic[topic].get_file(session, _id, path, accept_header)
def del_item_list(self, session, topic, _filter=None):
:return: The deleted list, it can be empty if no one match the _filter.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
with self.write_lock:
return self.map_topic[topic].delete_list(session, _filter)
:return: dictionary with deleted item _id. It raises exception if not found.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
with self.write_lock:
return self.map_topic[topic].delete(session, _id, not_send_msg=not_send_msg)
:return: dictionary with edited item _id, raise exception if not found.
"""
if topic not in self.map_topic:
- raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR
+ )
with self.write_lock:
return self.map_topic[topic].edit(session, _id, indata, kwargs)
def upgrade_db(self, current_version, target_version):
if target_version not in self.map_target_version_to_int.keys():
- raise EngineException("Cannot upgrade to version '{}' with this version of code".format(target_version),
- http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+ raise EngineException(
+ "Cannot upgrade to version '{}' with this version of code".format(
+ target_version
+ ),
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR,
+ )
if current_version == target_version:
return
-
+
target_version_int = self.map_target_version_to_int[target_version]
if not current_version:
# create database version
serial = urandom(32)
version_data = {
- "_id": "version", # Always "version"
- "version_int": 1000, # version number
- "version": "1.0", # version text
- "date": "2018-10-25", # version date
+ "_id": "version", # Always "version"
+ "version_int": 1000, # version number
+ "version": "1.0", # version text
+ "date": "2018-10-25", # version date
"description": "added serial", # changes in this version
- 'status': "ENABLED", # ENABLED, DISABLED (migration in process), ERROR,
- 'serial': b64encode(serial)
+ "status": "ENABLED", # ENABLED, DISABLED (migration in process), ERROR,
+ "serial": b64encode(serial),
}
self.db.create("admin", version_data)
self.db.set_secret_key(serial)
current_version = "1.0"
-
- if current_version in ("1.0", "1.1") and target_version_int >= self.map_target_version_to_int["1.2"]:
- if self.config['authentication']['backend'] == "internal":
+
+ if (
+ current_version in ("1.0", "1.1")
+ and target_version_int >= self.map_target_version_to_int["1.2"]
+ ):
+ if self.config["authentication"]["backend"] == "internal":
self.db.del_list("roles")
version_data = {
"version_int": 1002,
"version": "1.2",
"date": "2019-06-11",
- "description": "set new format for roles_operations"
+ "description": "set new format for roles_operations",
}
self.db.set_one("admin", {"_id": "version"}, version_data)
current_version = "1.2"
# TODO add future migrations here
- def init_db(self, target_version='1.0'):
+ def init_db(self, target_version="1.0"):
"""
Init database if empty. If not empty it checks that database version and migrates if needed
If empty, it creates a new user admin/admin at 'users' and a new entry at 'version'
:return: None if ok, exception if error or if the version is different.
"""
- version_data = self.db.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
+ version_data = self.db.get_one(
+ "admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True
+ )
# check database status is ok
- if version_data and version_data.get("status") != 'ENABLED':
- raise EngineException("Wrong database status '{}'".format(
- version_data["status"]), HTTPStatus.INTERNAL_SERVER_ERROR)
+ if version_data and version_data.get("status") != "ENABLED":
+ raise EngineException(
+ "Wrong database status '{}'".format(version_data["status"]),
+ HTTPStatus.INTERNAL_SERVER_ERROR,
+ )
# check version
db_version = None if not version_data else version_data.get("version")
</form>
"""
-html_vnfpackage_body = """<a href="/osm/vnfpkgm/v1/vnf_packages/{id}/artifacts">Artifacts </a>"""
-html_nspackage_body = """<a href="/osm/nsd/v1/ns_descriptors/{id}/artifacts">Artifacts </a>"""
+html_vnfpackage_body = (
+ """<a href="/osm/vnfpkgm/v1/vnf_packages/{id}/artifacts">Artifacts </a>"""
+)
+html_nspackage_body = (
+ """<a href="/osm/nsd/v1/ns_descriptors/{id}/artifacts">Artifacts </a>"""
+)
def format(data, request, response, toke_info):
:param response: cherrypy response
:return: string with teh html response
"""
- response.headers["Content-Type"] = 'text/html'
+ response.headers["Content-Type"] = "text/html"
if response.status == HTTPStatus.UNAUTHORIZED.value:
- if response.headers.get("WWW-Authenticate") and request.config.get("auth.allow_basic_authentication"):
- response.headers["WWW-Authenticate"] = "Basic" + response.headers["WWW-Authenticate"][6:]
+ if response.headers.get("WWW-Authenticate") and request.config.get(
+ "auth.allow_basic_authentication"
+ ):
+ response.headers["WWW-Authenticate"] = (
+ "Basic" + response.headers["WWW-Authenticate"][6:]
+ )
return
else:
return html_auth2.format(error=data)
if request.path_info in ("/version", "/system"):
- return "<pre>" + yaml.safe_dump(data, explicit_start=False, indent=4, default_flow_style=False) + "</pre>"
+ return (
+ "<pre>"
+ + yaml.safe_dump(
+ data, explicit_start=False, indent=4, default_flow_style=False
+ )
+ + "</pre>"
+ )
body = html_body.format(item=html_escape(request.path_info))
if response.status and response.status > 202:
# input request.path_info (URL) can contain XSS that are translated into output error detail
- body += html_body_error.format(html_escape(
- yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False)))
+ body += html_body_error.format(
+ html_escape(
+ yaml.safe_dump(
+ data, explicit_start=True, indent=4, default_flow_style=False
+ )
+ )
+ )
elif isinstance(data, (list, tuple)):
if request.path_info == "/vnfpkgm/v1/vnf_packages":
body += html_upload_body.format(request.path_info + "_content", "VNFD")
data_id = k.pop("_id", None)
elif isinstance(k, str):
data_id = k
- body += '<p> <a href="/osm/{url}/{id}">{id}</a>: {t} </p>'.format(url=request.path_info, id=data_id,
- t=html_escape(str(k)))
+ body += '<p> <a href="/osm/{url}/{id}">{id}</a>: {t} </p>'.format(
+ url=request.path_info, id=data_id, t=html_escape(str(k))
+ )
elif isinstance(data, dict):
if "Location" in response.headers:
body += '<a href="{}"> show </a>'.format(response.headers["Location"])
else:
- _id = request.path_info[request.path_info.rfind("/")+1:]
- body += '<a href="/osm/{}?METHOD=DELETE"> <img src="/osm/static/delete.png" height="25" width="25"> </a>'\
- .format(request.path_info)
- if request.path_info.startswith("/nslcm/v1/ns_instances_content/") or \
- request.path_info.startswith("/nslcm/v1/ns_instances/"):
+ _id = request.path_info[request.path_info.rfind("/") + 1 :]
+ body += '<a href="/osm/{}?METHOD=DELETE"> <img src="/osm/static/delete.png" height="25" width="25"> </a>'.format(
+ request.path_info
+ )
+ if request.path_info.startswith(
+ "/nslcm/v1/ns_instances_content/"
+ ) or request.path_info.startswith("/nslcm/v1/ns_instances/"):
body += html_nslcmop_body.format(id=_id)
- elif request.path_info.startswith("/nsilcm/v1/netslice_instances_content/") or \
- request.path_info.startswith("/nsilcm/v1/netslice_instances/"):
+ elif request.path_info.startswith(
+ "/nsilcm/v1/netslice_instances_content/"
+ ) or request.path_info.startswith("/nsilcm/v1/netslice_instances/"):
body += html_nsilcmop_body.format(id=_id)
- elif request.path_info.startswith("/vnfpkgm/v1/vnf_packages/") or \
- request.path_info.startswith("/vnfpkgm/v1/vnf_packages_content/"):
+ elif request.path_info.startswith(
+ "/vnfpkgm/v1/vnf_packages/"
+ ) or request.path_info.startswith("/vnfpkgm/v1/vnf_packages_content/"):
body += html_vnfpackage_body.format(id=_id)
- elif request.path_info.startswith("/nsd/v1/ns_descriptors/") or \
- request.path_info.startswith("/nsd/v1/ns_descriptors_content/"):
+ elif request.path_info.startswith(
+ "/nsd/v1/ns_descriptors/"
+ ) or request.path_info.startswith("/nsd/v1/ns_descriptors_content/"):
body += html_nspackage_body.format(id=_id)
- body += "<pre>" + html_escape(yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False)) + \
- "</pre>"
+ body += (
+ "<pre>"
+ + html_escape(
+ yaml.safe_dump(
+ data, explicit_start=True, indent=4, default_flow_style=False
+ )
+ )
+ + "</pre>"
+ )
elif data is None:
if request.method == "DELETE" or "METHOD=DELETE" in request.query_string:
body += "<pre> deleted </pre>"
from http import HTTPStatus
from time import time
from copy import copy, deepcopy
-from osm_nbi.validation import validate_input, ValidationError, ns_instantiate, ns_terminate, ns_action, ns_scale,\
- nsi_instantiate
-from osm_nbi.base_topic import BaseTopic, EngineException, get_iterable, deep_get, increment_ip_mac
+from osm_nbi.validation import (
+ validate_input,
+ ValidationError,
+ ns_instantiate,
+ ns_terminate,
+ ns_action,
+ ns_scale,
+ nsi_instantiate,
+)
+from osm_nbi.base_topic import (
+ BaseTopic,
+ EngineException,
+ get_iterable,
+ deep_get,
+ increment_ip_mac,
+)
from yaml import safe_dump
from osm_common.dbbase import DbException
from osm_common.msgbase import MsgException
from osm_common.fsbase import FsException
from osm_nbi import utils
-from re import match # For checking that additional parameter names are valid Jinja2 identifiers
+from re import (
+ match,
+) # For checking that additional parameter names are valid Jinja2 identifiers
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
return
nsd_id = descriptor["nsdId"]
if not self.get_item_list(session, "nsds", {"id": nsd_id}):
- raise EngineException("Descriptor error at nsdId='{}' references a non exist nsd".format(nsd_id),
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "Descriptor error at nsdId='{}' references a non exist nsd".format(
+ nsd_id
+ ),
+ http_code=HTTPStatus.CONFLICT,
+ )
@staticmethod
def format_on_new(content, project_id=None, make_public=False):
return
nsr = db_content
if nsr["_admin"].get("nsState") == "INSTANTIATED":
- raise EngineException("nsr '{}' cannot be deleted because it is in 'INSTANTIATED' state. "
- "Launch 'terminate' operation first; or force deletion".format(_id),
- http_code=HTTPStatus.CONFLICT)
+ raise EngineException(
+ "nsr '{}' cannot be deleted because it is in 'INSTANTIATED' state. "
+ "Launch 'terminate' operation first; or force deletion".format(_id),
+ http_code=HTTPStatus.CONFLICT,
+ )
def delete_extra(self, session, _id, db_content, not_send_msg=None):
"""
self.db.del_list("vnfrs", {"nsr-id-ref": _id})
# set all used pdus as free
- self.db.set_list("pdus", {"_admin.usage.nsr_id": _id},
- {"_admin.usageState": "NOT_IN_USE", "_admin.usage": None})
+ self.db.set_list(
+ "pdus",
+ {"_admin.usage.nsr_id": _id},
+ {"_admin.usageState": "NOT_IN_USE", "_admin.usage": None},
+ )
# Set NSD usageState
nsr = db_content
used_nsd_id = nsr.get("nsd-id")
if used_nsd_id:
# check if used by another NSR
- nsrs_list = self.db.get_one("nsrs", {"nsd-id": used_nsd_id},
- fail_on_empty=False, fail_on_more=False)
+ nsrs_list = self.db.get_one(
+ "nsrs", {"nsd-id": used_nsd_id}, fail_on_empty=False, fail_on_more=False
+ )
if not nsrs_list:
- self.db.set_one("nsds", {"_id": used_nsd_id}, {"_admin.usageState": "NOT_IN_USE"})
+ self.db.set_one(
+ "nsds", {"_id": used_nsd_id}, {"_admin.usageState": "NOT_IN_USE"}
+ )
# Set VNFD usageState
used_vnfd_id_list = nsr.get("vnfd-id")
if used_vnfd_id_list:
for used_vnfd_id in used_vnfd_id_list:
# check if used by another NSR
- nsrs_list = self.db.get_one("nsrs", {"vnfd-id": used_vnfd_id},
- fail_on_empty=False, fail_on_more=False)
+ nsrs_list = self.db.get_one(
+ "nsrs",
+ {"vnfd-id": used_vnfd_id},
+ fail_on_empty=False,
+ fail_on_more=False,
+ )
if not nsrs_list:
- self.db.set_one("vnfds", {"_id": used_vnfd_id}, {"_admin.usageState": "NOT_IN_USE"})
+ self.db.set_one(
+ "vnfds",
+ {"_id": used_vnfd_id},
+ {"_admin.usageState": "NOT_IN_USE"},
+ )
# delete extra ro_nsrs used for internal RO module
self.db.del_one("ro_nsrs", q_filter={"_id": _id}, fail_on_empty=False)
return formated_request
@staticmethod
- def _format_additional_params(ns_request, member_vnf_index=None, vdu_id=None, kdu_name=None, descriptor=None):
+ def _format_additional_params(
+ ns_request, member_vnf_index=None, vdu_id=None, kdu_name=None, descriptor=None
+ ):
"""
Get and format user additional params for NS or VNF
:param ns_request: User instantiation additional parameters
additional_params = copy(ns_request.get("additionalParamsForNs"))
where_ = "additionalParamsForNs"
elif ns_request.get("additionalParamsForVnf"):
- where_ = "additionalParamsForVnf[member-vnf-index={}]".format(member_vnf_index)
- item = next((x for x in ns_request["additionalParamsForVnf"] if x["member-vnf-index"] == member_vnf_index),
- None)
+ where_ = "additionalParamsForVnf[member-vnf-index={}]".format(
+ member_vnf_index
+ )
+ item = next(
+ (
+ x
+ for x in ns_request["additionalParamsForVnf"]
+ if x["member-vnf-index"] == member_vnf_index
+ ),
+ None,
+ )
if item:
if not vdu_id and not kdu_name:
other_params = item
additional_params = copy(item.get("additionalParams")) or {}
if vdu_id and item.get("additionalParamsForVdu"):
- item_vdu = next((x for x in item["additionalParamsForVdu"] if x["vdu_id"] == vdu_id), None)
+ item_vdu = next(
+ (
+ x
+ for x in item["additionalParamsForVdu"]
+ if x["vdu_id"] == vdu_id
+ ),
+ None,
+ )
other_params = item_vdu
if item_vdu and item_vdu.get("additionalParams"):
where_ += ".additionalParamsForVdu[vdu_id={}]".format(vdu_id)
if kdu_name:
additional_params = {}
if item.get("additionalParamsForKdu"):
- item_kdu = next((x for x in item["additionalParamsForKdu"] if x["kdu_name"] == kdu_name), None)
+ item_kdu = next(
+ (
+ x
+ for x in item["additionalParamsForKdu"]
+ if x["kdu_name"] == kdu_name
+ ),
+ None,
+ )
other_params = item_kdu
if item_kdu and item_kdu.get("additionalParams"):
- where_ += ".additionalParamsForKdu[kdu_name={}]".format(kdu_name)
+ where_ += ".additionalParamsForKdu[kdu_name={}]".format(
+ kdu_name
+ )
additional_params = item_kdu["additionalParams"]
if additional_params:
for k, v in additional_params.items():
# BEGIN Check that additional parameter names are valid Jinja2 identifiers if target is not Kdu
- if not kdu_name and not match('^[a-zA-Z_][a-zA-Z0-9_]*$', k):
- raise EngineException("Invalid param name at {}:{}. Must contain only alphanumeric characters "
- "and underscores, and cannot start with a digit"
- .format(where_, k))
+ if not kdu_name and not match("^[a-zA-Z_][a-zA-Z0-9_]*$", k):
+ raise EngineException(
+ "Invalid param name at {}:{}. Must contain only alphanumeric characters "
+ "and underscores, and cannot start with a digit".format(
+ where_, k
+ )
+ )
# END Check that additional parameter names are valid Jinja2 identifiers
if not isinstance(k, str):
- raise EngineException("Invalid param at {}:{}. Only string keys are allowed".format(where_, k))
+ raise EngineException(
+ "Invalid param at {}:{}. Only string keys are allowed".format(
+ where_, k
+ )
+ )
if "." in k or "$" in k:
- raise EngineException("Invalid param at {}:{}. Keys must not contain dots or $".format(where_, k))
+ raise EngineException(
+ "Invalid param at {}:{}. Keys must not contain dots or $".format(
+ where_, k
+ )
+ )
if isinstance(v, (dict, tuple, list)):
additional_params[k] = "!!yaml " + safe_dump(v)
# TODO: check for cloud-init
if member_vnf_index:
initial_primitives = []
- if "lcm-operations-configuration" in df \
- and "operate-vnf-op-config" in df["lcm-operations-configuration"]:
- for config in df["lcm-operations-configuration"]["operate-vnf-op-config"].get("day1-2", []):
- for primitive in get_iterable(config.get("initial-config-primitive")):
+ if (
+ "lcm-operations-configuration" in df
+ and "operate-vnf-op-config"
+ in df["lcm-operations-configuration"]
+ ):
+ for config in df["lcm-operations-configuration"][
+ "operate-vnf-op-config"
+ ].get("day1-2", []):
+ for primitive in get_iterable(
+ config.get("initial-config-primitive")
+ ):
initial_primitives.append(primitive)
else:
- initial_primitives = deep_get(descriptor, ("ns-configuration", "initial-config-primitive"))
+ initial_primitives = deep_get(
+ descriptor, ("ns-configuration", "initial-config-primitive")
+ )
for initial_primitive in get_iterable(initial_primitives):
for param in get_iterable(initial_primitive.get("parameter")):
- if param["value"].startswith("<") and param["value"].endswith(">"):
- &n