__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
-__version__ = "0.1.3" # file version, not NBI version
+__version__ = "0.1.3" # file version, not NBI version
version_date = "Aug 2019"
-database_version = '1.2'
-auth_database_version = '1.0'
-nbi_server = None # instance of Server class
+database_version = "1.2"
+auth_database_version = "1.0"
+nbi_server = None # instance of Server class
subscription_thread = None # instance of SubscriptionThread class
"""
terminate O5
action O
scale O5
- heal 5
+ migrate O
+ update 05
+ heal O5
/ns_lcm_op_occs 5 5
/<nsLcmOpOccId> 5 5 5
TO BE COMPLETED 5 5
/<id> O O O
/k8srepos O O
/<id> O O
+ /osmrepos O O
+ /<id> O O
/nst/v1 O O
/netslice_templates_content O O
ADMIN: To act as an administrator or a different project
PUBLIC: To get public descriptors or set a descriptor as public
SET_PROJECT: To make a descriptor available for other project
-
+
Header field name Reference Example Descriptions
Accept IETF RFC 7231 [19] application/json Content-Types that are acceptable for the response.
This header field shall be present if the response is expected to have a non-empty message body.
# contains allowed URL and methods, and the role_permission name
"admin": {
"v1": {
- "tokens": {"METHODS": ("GET", "POST", "DELETE"),
- "ROLE_PERMISSION": "tokens:",
- "<ID>": {"METHODS": ("GET", "DELETE"),
- "ROLE_PERMISSION": "tokens:id:"
- }
- },
- "users": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "users:",
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
- "ROLE_PERMISSION": "users:id:"
- }
- },
- "projects": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "projects:",
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
- "ROLE_PERMISSION": "projects:id:"}
- },
- "roles": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "roles:",
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
- "ROLE_PERMISSION": "roles:id:"
- }
- },
- "vims": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "vims:",
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
- "ROLE_PERMISSION": "vims:id:"
- }
- },
- "vim_accounts": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "vim_accounts:",
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
- "ROLE_PERMISSION": "vim_accounts:id:"
- }
- },
- "wim_accounts": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "wim_accounts:",
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
- "ROLE_PERMISSION": "wim_accounts:id:"
- }
- },
- "sdns": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "sdn_controllers:",
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
- "ROLE_PERMISSION": "sdn_controllers:id:"
- }
- },
- "k8sclusters": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "k8sclusters:",
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
- "ROLE_PERMISSION": "k8sclusters:id:"
- }
- },
- "k8srepos": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "k8srepos:",
- "<ID>": {"METHODS": ("GET", "DELETE"),
- "ROLE_PERMISSION": "k8srepos:id:"
- }
- },
-
+ "tokens": {
+ "METHODS": ("GET", "POST", "DELETE"),
+ "ROLE_PERMISSION": "tokens:",
+ "<ID>": {"METHODS": ("GET", "DELETE"), "ROLE_PERMISSION": "tokens:id:"},
+ },
+ "users": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "users:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "users:id:",
+ },
+ },
+ "projects": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "projects:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "projects:id:",
+ },
+ },
+ "roles": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "roles:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "roles:id:",
+ },
+ },
+ "vims": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vims:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "vims:id:",
+ },
+ },
+ "vim_accounts": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vim_accounts:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "vim_accounts:id:",
+ },
+ },
+ "wim_accounts": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "wim_accounts:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "wim_accounts:id:",
+ },
+ },
+ "sdns": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "sdn_controllers:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "sdn_controllers:id:",
+ },
+ },
+ "k8sclusters": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "k8sclusters:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "k8sclusters:id:",
+ },
+ },
+ "vca": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vca:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "vca:id:",
+ },
+ },
+ "k8srepos": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "k8srepos:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "k8srepos:id:",
+ },
+ },
+ "osmrepos": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "osmrepos:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "osmrepos:id:",
+ },
+ },
+ "domains": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "domains:",
+ },
}
},
"pdu": {
"v1": {
- "pdu_descriptors": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "pduds:",
- "<ID>": {"METHODS": ("GET", "POST", "DELETE", "PATCH", "PUT"),
- "ROLE_PERMISSION": "pduds:id:"
- }
- },
+ "pdu_descriptors": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "pduds:",
+ "<ID>": {
+ "METHODS": ("GET", "POST", "DELETE", "PATCH", "PUT"),
+ "ROLE_PERMISSION": "pduds:id:",
+ },
+ },
}
},
"nsd": {
"v1": {
- "ns_descriptors_content": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "nsds:",
- "<ID>": {"METHODS": ("GET", "PUT", "DELETE"),
- "ROLE_PERMISSION": "nsds:id:"
- }
- },
- "ns_descriptors": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "nsds:",
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"),
- "ROLE_PERMISSION": "nsds:id:",
- "nsd_content": {"METHODS": ("GET", "PUT"),
- "ROLE_PERMISSION": "nsds:id:content:",
- },
- "nsd": {"METHODS": ("GET",), # descriptor inside package
- "ROLE_PERMISSION": "nsds:id:content:"
- },
- "artifacts": {"*": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "nsds:id:nsd_artifact:"
- }
- }
- }
- },
- "pnf_descriptors": {"TODO": ("GET", "POST"),
- "<ID>": {"TODO": ("GET", "DELETE", "PATCH"),
- "pnfd_content": {"TODO": ("GET", "PUT")}
- }
- },
- "subscriptions": {"TODO": ("GET", "POST"),
- "<ID>": {"TODO": ("GET", "DELETE")}
- },
+ "ns_descriptors_content": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "nsds:",
+ "<ID>": {
+ "METHODS": ("GET", "PUT", "DELETE"),
+ "ROLE_PERMISSION": "nsds:id:",
+ },
+ },
+ "ns_descriptors": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "nsds:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"),
+ "ROLE_PERMISSION": "nsds:id:",
+ "nsd_content": {
+ "METHODS": ("GET", "PUT"),
+ "ROLE_PERMISSION": "nsds:id:content:",
+ },
+ "nsd": {
+ "METHODS": ("GET",), # descriptor inside package
+ "ROLE_PERMISSION": "nsds:id:content:",
+ },
+ "artifacts": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "nsds:id:nsd_artifact:",
+ "*": None,
+ },
+ },
+ },
+ "pnf_descriptors": {
+ "TODO": ("GET", "POST"),
+ "<ID>": {
+ "TODO": ("GET", "DELETE", "PATCH"),
+ "pnfd_content": {"TODO": ("GET", "PUT")},
+ },
+ },
+ "subscriptions": {
+ "TODO": ("GET", "POST"),
+ "<ID>": {"TODO": ("GET", "DELETE")},
+ },
}
},
"vnfpkgm": {
"v1": {
- "vnf_packages_content": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "vnfds:",
- "<ID>": {"METHODS": ("GET", "PUT", "DELETE"),
- "ROLE_PERMISSION": "vnfds:id:"}
- },
- "vnf_packages": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "vnfds:",
- "<ID>": {"METHODS": ("GET", "DELETE", "PATCH"), # GET: vnfPkgInfo
- "ROLE_PERMISSION": "vnfds:id:",
- "package_content": {"METHODS": ("GET", "PUT"), # package
- "ROLE_PERMISSION": "vnfds:id:",
- "upload_from_uri": {"METHODS": (),
- "TODO": ("POST", ),
- "ROLE_PERMISSION": "vnfds:id:upload:"
- }
- },
- "vnfd": {"METHODS": ("GET", ), # descriptor inside package
- "ROLE_PERMISSION": "vnfds:id:content:"
- },
- "artifacts": {"*": {"METHODS": ("GET", ),
- "ROLE_PERMISSION": "vnfds:id:vnfd_artifact:"
- }
- }
- }
- },
- "subscriptions": {"TODO": ("GET", "POST"),
- "<ID>": {"TODO": ("GET", "DELETE")}
- },
+ "vnf_packages_content": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vnfds:",
+ "<ID>": {
+ "METHODS": ("GET", "PUT", "DELETE"),
+ "ROLE_PERMISSION": "vnfds:id:",
+ },
+ },
+ "vnf_packages": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vnfds:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE", "PATCH"), # GET: vnfPkgInfo
+ "ROLE_PERMISSION": "vnfds:id:",
+ "package_content": {
+ "METHODS": ("GET", "PUT"), # package
+ "ROLE_PERMISSION": "vnfds:id:",
+ "upload_from_uri": {
+ "METHODS": (),
+ "TODO": ("POST",),
+ "ROLE_PERMISSION": "vnfds:id:upload:",
+ },
+ },
+ "vnfd": {
+ "METHODS": ("GET",), # descriptor inside package
+ "ROLE_PERMISSION": "vnfds:id:content:",
+ },
+ "artifacts": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnfds:id:vnfd_artifact:",
+ "*": None,
+ },
+ "action": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "vnfds:id:action:",
+ },
+ },
+ },
+ "subscriptions": {
+ "TODO": ("GET", "POST"),
+ "<ID>": {"TODO": ("GET", "DELETE")},
+ },
+ "vnfpkg_op_occs": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnfds:vnfpkgops:",
+ "<ID>": {"METHODS": ("GET",), "ROLE_PERMISSION": "vnfds:vnfpkgops:id:"},
+ },
}
},
"nslcm": {
"v1": {
- "ns_instances_content": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "ns_instances:",
- "<ID>": {"METHODS": ("GET", "DELETE"),
- "ROLE_PERMISSION": "ns_instances:id:"
- }
- },
- "ns_instances": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "ns_instances:",
- "<ID>": {"METHODS": ("GET", "DELETE"),
- "ROLE_PERMISSION": "ns_instances:id:",
- "scale": {"METHODS": ("POST",),
- "ROLE_PERMISSION": "ns_instances:id:scale:"
+ "ns_instances_content": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "ns_instances:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "ns_instances:id:",
+ },
+ },
+ "ns_instances": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "ns_instances:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "ns_instances:id:",
+ "heal": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:heal:",
+ },
+ "scale": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:scale:",
+ },
+ "terminate": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:terminate:",
+ },
+ "instantiate": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:instantiate:",
+ },
+ "migrate": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:migrate:",
+ },
+ "action": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:action:",
+ },
+ "update": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:update:",
+ },
+ "verticalscale": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "ns_instances:id:verticalscale:"
+ },
+ },
+ },
+ "ns_lcm_op_occs": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "ns_instances:opps:",
+ "<ID>": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "ns_instances:opps:id:",
+ },
+ },
+ "vnfrs": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnf_instances:",
+ "<ID>": {"METHODS": ("GET",), "ROLE_PERMISSION": "vnf_instances:id:"},
+ },
+ "vnf_instances": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnf_instances:",
+ "<ID>": {"METHODS": ("GET",), "ROLE_PERMISSION": "vnf_instances:id:"},
+ },
+ "subscriptions": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "ns_subscriptions:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "ns_subscriptions:id:",
+ },
+ },
+ }
+ },
+ "vnflcm": {
+ "v1": {
+ "vnf_instances": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vnflcm_instances:",
+ "<ID>": {"METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "vnflcm_instances:id:",
+ "scale": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "vnflcm_instances:id:scale:"
},
- "terminate": {"METHODS": ("POST",),
- "ROLE_PERMISSION": "ns_instances:id:terminate:"
+ "terminate": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "vnflcm_instances:id:terminate:"
},
- "instantiate": {"METHODS": ("POST",),
- "ROLE_PERMISSION": "ns_instances:id:instantiate:"
+ "instantiate": {"METHODS": ("POST",),
+ "ROLE_PERMISSION": "vnflcm_instances:id:instantiate:"
},
- "action": {"METHODS": ("POST",),
- "ROLE_PERMISSION": "ns_instances:id:action:"
- },
- }
- },
- "ns_lcm_op_occs": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "ns_instances:opps:",
+ }
+ },
+ "vnf_lcm_op_occs": {"METHODS": ("GET",),
+ "ROLE_PERMISSION": "vnf_instances:opps:",
"<ID>": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "ns_instances:opps:id:"
+ "ROLE_PERMISSION": "vnf_instances:opps:id:"
},
},
- "vnfrs": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "vnf_instances:",
- "<ID>": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "vnf_instances:id:"
- }
- },
- "vnf_instances": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "vnf_instances:",
- "<ID>": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "vnf_instances:id:"
+ "subscriptions": {"METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "vnflcm_subscriptions:",
+ "<ID>": {"METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "vnflcm_subscriptions:id:"
}
},
}
},
"nst": {
"v1": {
- "netslice_templates_content": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "slice_templates:",
- "<ID>": {"METHODS": ("GET", "PUT", "DELETE"),
- "ROLE_PERMISSION": "slice_templates:id:", }
- },
- "netslice_templates": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "slice_templates:",
- "<ID>": {"METHODS": ("GET", "DELETE"),
- "TODO": ("PATCH",),
- "ROLE_PERMISSION": "slice_templates:id:",
- "nst_content": {"METHODS": ("GET", "PUT"),
- "ROLE_PERMISSION": "slice_templates:id:content:"
- },
- "nst": {"METHODS": ("GET",), # descriptor inside package
- "ROLE_PERMISSION": "slice_templates:id:content:"
- },
- "artifacts": {"*": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "slice_templates:id:content:"
- }
- }
- }
- },
- "subscriptions": {"TODO": ("GET", "POST"),
- "<ID>": {"TODO": ("GET", "DELETE")}
- },
+ "netslice_templates_content": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "slice_templates:",
+ "<ID>": {
+ "METHODS": ("GET", "PUT", "DELETE"),
+ "ROLE_PERMISSION": "slice_templates:id:",
+ },
+ },
+ "netslice_templates": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "slice_templates:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE"),
+ "TODO": ("PATCH",),
+ "ROLE_PERMISSION": "slice_templates:id:",
+ "nst_content": {
+ "METHODS": ("GET", "PUT"),
+ "ROLE_PERMISSION": "slice_templates:id:content:",
+ },
+ "nst": {
+ "METHODS": ("GET",), # descriptor inside package
+ "ROLE_PERMISSION": "slice_templates:id:content:",
+ },
+ "artifacts": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "slice_templates:id:content:",
+ "*": None,
+ },
+ },
+ },
+ "subscriptions": {
+ "TODO": ("GET", "POST"),
+ "<ID>": {"TODO": ("GET", "DELETE")},
+ },
}
},
"nsilcm": {
"v1": {
- "netslice_instances_content": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "slice_instances:",
- "<ID>": {"METHODS": ("GET", "DELETE"),
- "ROLE_PERMISSION": "slice_instances:id:"
- }
- },
- "netslice_instances": {"METHODS": ("GET", "POST"),
- "ROLE_PERMISSION": "slice_instances:",
- "<ID>": {"METHODS": ("GET", "DELETE"),
- "ROLE_PERMISSION": "slice_instances:id:",
- "terminate": {"METHODS": ("POST",),
- "ROLE_PERMISSION": "slice_instances:id:terminate:"
- },
- "instantiate": {"METHODS": ("POST",),
- "ROLE_PERMISSION": "slice_instances:id:instantiate:"
- },
- "action": {"METHODS": ("POST",),
- "ROLE_PERMISSION": "slice_instances:id:action:"
- },
- }
- },
- "nsi_lcm_op_occs": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "slice_instances:opps:",
- "<ID>": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "slice_instances:opps:id:",
- },
- },
+ "netslice_instances_content": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "slice_instances:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "slice_instances:id:",
+ },
+ },
+ "netslice_instances": {
+ "METHODS": ("GET", "POST"),
+ "ROLE_PERMISSION": "slice_instances:",
+ "<ID>": {
+ "METHODS": ("GET", "DELETE"),
+ "ROLE_PERMISSION": "slice_instances:id:",
+ "terminate": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "slice_instances:id:terminate:",
+ },
+ "instantiate": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "slice_instances:id:instantiate:",
+ },
+ "action": {
+ "METHODS": ("POST",),
+ "ROLE_PERMISSION": "slice_instances:id:action:",
+ },
+ },
+ },
+ "nsi_lcm_op_occs": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "slice_instances:opps:",
+ "<ID>": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "slice_instances:opps:id:",
+ },
+ },
}
},
"nspm": {
"pm_jobs": {
"<ID>": {
"reports": {
- "<ID>": {"METHODS": ("GET",),
- "ROLE_PERMISSION": "reports:id:",
- }
+ "<ID>": {
+ "METHODS": ("GET",),
+ "ROLE_PERMISSION": "reports:id:",
+ }
}
},
},
},
},
+ "nsfm": {
+ "v1": {
+ "alarms": {"METHODS": ("GET", "PATCH"),
+ "ROLE_PERMISSION": "alarms:",
+ "<ID>": {"METHODS": ("GET", "PATCH"),
+ "ROLE_PERMISSION": "alarms:id:",
+ },
+ }
+ },
+ },
}
class NbiException(Exception):
-
def __init__(self, message, http_code=HTTPStatus.METHOD_NOT_ALLOWED):
Exception.__init__(self, message)
self.http_code = http_code
cherrypy.request.headers.pop("Content-File-MD5", None)
elif "application/yaml" in cherrypy.request.headers["Content-Type"]:
error_text = "Invalid yaml format "
- indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ indata = yaml.load(
+ cherrypy.request.body, Loader=yaml.SafeLoader
+ )
cherrypy.request.headers.pop("Content-File-MD5", None)
- elif "application/binary" in cherrypy.request.headers["Content-Type"] or \
- "application/gzip" in cherrypy.request.headers["Content-Type"] or \
- "application/zip" in cherrypy.request.headers["Content-Type"] or \
- "text/plain" in cherrypy.request.headers["Content-Type"]:
+ elif (
+ "application/binary" in cherrypy.request.headers["Content-Type"]
+ or "application/gzip"
+ in cherrypy.request.headers["Content-Type"]
+ or "application/zip" in cherrypy.request.headers["Content-Type"]
+ or "text/plain" in cherrypy.request.headers["Content-Type"]
+ ):
indata = cherrypy.request.body # .read()
- elif "multipart/form-data" in cherrypy.request.headers["Content-Type"]:
+ elif (
+ "multipart/form-data"
+ in cherrypy.request.headers["Content-Type"]
+ ):
if "descriptor_file" in kwargs:
filecontent = kwargs.pop("descriptor_file")
if not filecontent.file:
- raise NbiException("empty file or content", HTTPStatus.BAD_REQUEST)
+ raise NbiException(
+ "empty file or content", HTTPStatus.BAD_REQUEST
+ )
indata = filecontent.file # .read()
if filecontent.content_type.value:
- cherrypy.request.headers["Content-Type"] = filecontent.content_type.value
+ cherrypy.request.headers[
+ "Content-Type"
+ ] = filecontent.content_type.value
else:
# raise cherrypy.HTTPError(HTTPStatus.Not_Acceptable,
# "Only 'Content-Type' of type 'application/json' or
# 'application/yaml' for input format are available")
error_text = "Invalid yaml format "
- indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ indata = yaml.load(
+ cherrypy.request.body, Loader=yaml.SafeLoader
+ )
cherrypy.request.headers.pop("Content-File-MD5", None)
else:
error_text = "Invalid yaml format "
kwargs[k] = yaml.load(v, Loader=yaml.SafeLoader)
except Exception:
pass
- elif k.endswith(".gt") or k.endswith(".lt") or k.endswith(".gte") or k.endswith(".lte"):
+ elif (
+ k.endswith(".gt")
+ or k.endswith(".lt")
+ or k.endswith(".gte")
+ or k.endswith(".lte")
+ ):
try:
kwargs[k] = int(v)
except Exception:
except (ValueError, yaml.YAMLError) as exc:
raise NbiException(error_text + str(exc), HTTPStatus.BAD_REQUEST)
except KeyError as exc:
- raise NbiException("Query string error: " + str(exc), HTTPStatus.BAD_REQUEST)
+ raise NbiException(
+ "Query string error: " + str(exc), HTTPStatus.BAD_REQUEST
+ )
except Exception as exc:
raise NbiException(error_text + str(exc), HTTPStatus.BAD_REQUEST)
accept = cherrypy.request.headers.get("Accept")
if data is None:
if accept and "text/html" in accept:
- return html.format(data, cherrypy.request, cherrypy.response, token_info)
+ return html.format(
+ data, cherrypy.request, cherrypy.response, token_info
+ )
# cherrypy.response.status = HTTPStatus.NO_CONTENT.value
return
elif hasattr(data, "read"): # file object
if _format:
cherrypy.response.headers["Content-Type"] = _format
elif "b" in data.mode: # binariy asssumig zip
- cherrypy.response.headers["Content-Type"] = 'application/zip'
+ cherrypy.response.headers["Content-Type"] = "application/zip"
else:
- cherrypy.response.headers["Content-Type"] = 'text/plain'
+ cherrypy.response.headers["Content-Type"] = "text/plain"
# TODO check that cherrypy close file. If not implement pending things to close per thread next
return data
if accept:
- if "application/json" in accept:
- cherrypy.response.headers["Content-Type"] = 'application/json; charset=utf-8'
+ if "text/html" in accept:
+ return html.format(
+ data, cherrypy.request, cherrypy.response, token_info
+ )
+ elif "application/yaml" in accept or "*/*" in accept:
+ pass
+ elif "application/json" in accept or (
+ cherrypy.response.status and cherrypy.response.status >= 300
+ ):
+ cherrypy.response.headers[
+ "Content-Type"
+ ] = "application/json; charset=utf-8"
a = json.dumps(data, indent=4) + "\n"
return a.encode("utf8")
- elif "text/html" in accept:
- return html.format(data, cherrypy.request, cherrypy.response, token_info)
-
- elif "application/yaml" in accept or "*/*" in accept or "text/plain" in accept:
- pass
- # if there is not any valid accept, raise an error. But if response is already an error, format in yaml
- elif cherrypy.response.status >= 400:
- raise cherrypy.HTTPError(HTTPStatus.NOT_ACCEPTABLE.value,
- "Only 'Accept' of type 'application/json' or 'application/yaml' "
- "for output format are available")
- cherrypy.response.headers["Content-Type"] = 'application/yaml'
- return yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False, tags=False,
- encoding='utf-8', allow_unicode=True) # , canonical=True, default_style='"'
+ cherrypy.response.headers["Content-Type"] = "application/yaml"
+ return yaml.safe_dump(
+ data,
+ explicit_start=True,
+ indent=4,
+ default_flow_style=False,
+ tags=False,
+ encoding="utf-8",
+ allow_unicode=True,
+ ) # , canonical=True, default_style='"'
@cherrypy.expose
def index(self, *args, **kwargs):
try:
if cherrypy.request.method == "GET":
token_info = self.authenticator.authorize()
- outdata = token_info # Home page
+ outdata = token_info # Home page
else:
- raise cherrypy.HTTPError(HTTPStatus.METHOD_NOT_ALLOWED.value,
- "Method {} not allowed for tokens".format(cherrypy.request.method))
+ raise cherrypy.HTTPError(
+ HTTPStatus.METHOD_NOT_ALLOWED.value,
+ "Method {} not allowed for tokens".format(cherrypy.request.method),
+ )
return self._format_out(outdata, token_info)
# TODO consider to remove and provide version using the static version file
try:
if cherrypy.request.method != "GET":
- raise NbiException("Only method GET is allowed", HTTPStatus.METHOD_NOT_ALLOWED)
+ raise NbiException(
+ "Only method GET is allowed", HTTPStatus.METHOD_NOT_ALLOWED
+ )
elif args or kwargs:
- raise NbiException("Invalid URL or query string for version", HTTPStatus.METHOD_NOT_ALLOWED)
+ raise NbiException(
+ "Invalid URL or query string for version",
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
# TODO include version of other modules, pick up from some kafka admin message
osm_nbi_version = {"version": nbi_version, "date": nbi_version_date}
return self._format_out(osm_nbi_version)
}
return self._format_out(problem_details, None)
+ def domain(self):
+ try:
+ domains = {
+ "user_domain_name": cherrypy.tree.apps["/osm"]
+ .config["authentication"]
+ .get("user_domain_name"),
+ "project_domain_name": cherrypy.tree.apps["/osm"]
+ .config["authentication"]
+ .get("project_domain_name"),
+ }
+ return self._format_out(domains)
+ except NbiException as e:
+ cherrypy.response.status = e.http_code.value
+ problem_details = {
+ "code": e.http_code.name,
+ "status": e.http_code.value,
+ "detail": str(e),
+ }
+ return self._format_out(problem_details, None)
+
@staticmethod
def _format_login(token_info):
"""
if token_info.get("id"):
cherrypy.request.login += ";session=" + token_info["id"][0:12]
+ # NS Fault Management
+ @cherrypy.expose
+ def nsfm(self, version=None, topic=None, uuid=None, project_name=None, ns_id=None, *args, **kwargs):
+ if topic == 'alarms':
+ try:
+ method = cherrypy.request.method
+ role_permission = self._check_valid_url_method(method, "nsfm", version, topic, None, None, *args)
+ query_string_operations = self._extract_query_string_operations(kwargs, method)
+
+ self.authenticator.authorize(role_permission, query_string_operations, None)
+
+ # to handle get request
+ if cherrypy.request.method == 'GET':
+ # if request is on basis of uuid
+ if uuid and uuid != 'None':
+ try:
+ alarm = self.engine.db.get_one("alarms", {"uuid": uuid})
+ alarm_action = self.engine.db.get_one("alarms_action", {"uuid": uuid})
+ alarm.update(alarm_action)
+ vnf = self.engine.db.get_one("vnfrs", {"nsr-id-ref": alarm["tags"]["ns_id"]})
+ alarm["vnf-id"] = vnf["_id"]
+ return self._format_out(str(alarm))
+ except Exception:
+ return self._format_out("Please provide valid alarm uuid")
+ elif ns_id and ns_id != 'None':
+ # if request is on basis of ns_id
+ try:
+ alarms = self.engine.db.get_list("alarms", {"tags.ns_id": ns_id})
+ for alarm in alarms:
+ alarm_action = self.engine.db.get_one("alarms_action", {"uuid": alarm['uuid']})
+ alarm.update(alarm_action)
+ return self._format_out(str(alarms))
+ except Exception:
+ return self._format_out("Please provide valid ns id")
+ else:
+ # to return only alarm which are related to given project
+ project = self.engine.db.get_one("projects", {"name": project_name})
+ project_id = project.get('_id')
+ ns_list = self.engine.db.get_list("nsrs", {"_admin.projects_read": project_id})
+ ns_ids = []
+ for ns in ns_list:
+ ns_ids.append(ns.get("_id"))
+ alarms = self.engine.db.get_list("alarms")
+ alarm_list = [alarm for alarm in alarms if alarm["tags"]["ns_id"] in ns_ids]
+ for alrm in alarm_list:
+ action = self.engine.db.get_one("alarms_action", {"uuid": alrm.get("uuid")})
+ alrm.update(action)
+ return self._format_out(str(alarm_list))
+ # to handle patch request for alarm update
+ elif cherrypy.request.method == 'PATCH':
+ data = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ try:
+ # check if uuid is valid
+ self.engine.db.get_one("alarms", {"uuid": data.get("uuid")})
+ except Exception:
+ return self._format_out("Please provide valid alarm uuid.")
+ if data.get("is_enable") is not None:
+ if data.get("is_enable"):
+ alarm_status = 'ok'
+ else:
+ alarm_status = 'disabled'
+ self.engine.db.set_one("alarms", {"uuid": data.get("uuid")},
+ {"alarm_status": alarm_status})
+ else:
+ self.engine.db.set_one("alarms", {"uuid": data.get("uuid")},
+ {"threshold": data.get("threshold")})
+ return self._format_out("Alarm updated")
+ except Exception as e:
+ cherrypy.response.status = e.http_code.value
+ if isinstance(e, (NbiException, EngineException, DbException, FsException, MsgException, AuthException,
+ ValidationError, AuthconnException)):
+ http_code_value = cherrypy.response.status = e.http_code.value
+ http_code_name = e.http_code.name
+ cherrypy.log("Exception {}".format(e))
+ else:
+ http_code_value = cherrypy.response.status = HTTPStatus.BAD_REQUEST.value # INTERNAL_SERVER_ERROR
+ cherrypy.log("CRITICAL: Exception {}".format(e), traceback=True)
+ http_code_name = HTTPStatus.BAD_REQUEST.name
+ problem_details = {
+ "code": http_code_name,
+ "status": http_code_value,
+ "detail": str(e),
+ }
+ return self._format_out(problem_details)
+
@cherrypy.expose
def token(self, method, token_id=None, kwargs=None):
token_info = None
# self.engine.load_dbase(cherrypy.request.app.config)
indata = self._format_in(kwargs)
if not isinstance(indata, dict):
- raise NbiException("Expected application/yaml or application/json Content-Type", HTTPStatus.BAD_REQUEST)
+ raise NbiException(
+ "Expected application/yaml or application/json Content-Type",
+ HTTPStatus.BAD_REQUEST,
+ )
if method == "GET":
token_info = self.authenticator.authorize()
indata.update(kwargs)
# This is needed to log the user when authentication fails
cherrypy.request.login = "{}".format(indata.get("username", "-"))
- outdata = token_info = self.authenticator.new_token(token_info, indata, cherrypy.request.remote)
- cherrypy.session['Authorization'] = outdata["_id"]
+ outdata = token_info = self.authenticator.new_token(
+ token_info, indata, cherrypy.request.remote
+ )
+ cherrypy.session["Authorization"] = outdata["_id"]
self._set_location_header("admin", "v1", "tokens", outdata["_id"])
# for logging
self._format_login(token_info)
-
+ # password expiry check
+ if self.authenticator.check_password_expiry(outdata):
+ outdata = {"id": outdata["id"],
+ "message": "change_password",
+ "user_id": outdata["user_id"]
+ }
# cherrypy.response.cookie["Authorization"] = outdata["id"]
# cherrypy.response.cookie["Authorization"]['expires'] = 3600
elif method == "DELETE":
token_id = token_info["_id"]
outdata = self.authenticator.del_token(token_id)
token_info = None
- cherrypy.session['Authorization'] = "logout"
+ cherrypy.session["Authorization"] = "logout"
# cherrypy.response.cookie["Authorization"] = token_id
# cherrypy.response.cookie["Authorization"]['expires'] = 0
else:
- raise NbiException("Method {} not allowed for token".format(method), HTTPStatus.METHOD_NOT_ALLOWED)
+ raise NbiException(
+ "Method {} not allowed for token".format(method),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
return self._format_out(outdata, token_info)
@cherrypy.expose
def test(self, *args, **kwargs):
- if not cherrypy.config.get("server.enable_test") or (isinstance(cherrypy.config["server.enable_test"], str) and
- cherrypy.config["server.enable_test"].lower() == "false"):
+ if not cherrypy.config.get("server.enable_test") or (
+ isinstance(cherrypy.config["server.enable_test"], str)
+ and cherrypy.config["server.enable_test"].lower() == "false"
+ ):
cherrypy.response.status = HTTPStatus.METHOD_NOT_ALLOWED.value
return "test URL is disabled"
thread_info = None
if args and args[0] == "help":
- return "<html><pre>\ninit\nfile/<name> download file\ndb-clear/table\nfs-clear[/folder]\nlogin\nlogin2\n"\
- "sleep/<time>\nmessage/topic\n</pre></html>"
+ return (
+ "<html><pre>\ninit\nfile/<name> download file\ndb-clear/table\nfs-clear[/folder]\nlogin\nlogin2\n"
+ "sleep/<time>\nmessage/topic\n</pre></html>"
+ )
elif args and args[0] == "init":
try:
cherrypy.response.status = HTTPStatus.FORBIDDEN.value
return self._format_out("Database already initialized")
elif args and args[0] == "file":
- return cherrypy.lib.static.serve_file(cherrypy.tree.apps['/osm'].config["storage"]["path"] + "/" + args[1],
- "text/plain", "attachment")
+ return cherrypy.lib.static.serve_file(
+ cherrypy.tree.apps["/osm"].config["storage"]["path"] + "/" + args[1],
+ "text/plain",
+ "attachment",
+ )
elif args and args[0] == "file2":
- f_path = cherrypy.tree.apps['/osm'].config["storage"]["path"] + "/" + args[1]
+ f_path = (
+ cherrypy.tree.apps["/osm"].config["storage"]["path"] + "/" + args[1]
+ )
f = open(f_path, "r")
cherrypy.response.headers["Content-type"] = "text/plain"
return f
return ",".join(folders) + " folders deleted\n"
elif args and args[0] == "login":
if not cherrypy.request.headers.get("Authorization"):
- cherrypy.response.headers["WWW-Authenticate"] = 'Basic realm="Access to OSM site", charset="UTF-8"'
+ cherrypy.response.headers[
+ "WWW-Authenticate"
+ ] = 'Basic realm="Access to OSM site", charset="UTF-8"'
cherrypy.response.status = HTTPStatus.UNAUTHORIZED.value
elif args and args[0] == "login2":
if not cherrypy.request.headers.get("Authorization"):
- cherrypy.response.headers["WWW-Authenticate"] = 'Bearer realm="Access to OSM site"'
+ cherrypy.response.headers[
+ "WWW-Authenticate"
+ ] = 'Bearer realm="Access to OSM site"'
cherrypy.response.status = HTTPStatus.UNAUTHORIZED.value
elif args and args[0] == "sleep":
sleep_time = 5
main_topic = args[1]
return_text = "<html><pre>{} ->\n".format(main_topic)
try:
- if cherrypy.request.method == 'POST':
+ if cherrypy.request.method == "POST":
to_send = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
for k, v in to_send.items():
self.engine.msg.write(main_topic, k, v)
return_text += " {}: {}\n".format(k, v)
- elif cherrypy.request.method == 'GET':
+ elif cherrypy.request.method == "GET":
for k, v in kwargs.items():
- self.engine.msg.write(main_topic, k, yaml.load(v), Loader=yaml.SafeLoader)
- return_text += " {}: {}\n".format(k, yaml.load(v), Loader=yaml.SafeLoader)
+ v_dict = yaml.load(v, Loader=yaml.SafeLoader)
+ self.engine.msg.write(main_topic, k, v_dict)
+ return_text += " {}: {}\n".format(k, v_dict)
except Exception as e:
return_text += "Error: " + str(e)
return_text += "</pre></html>\n"
return return_text
return_text = (
- "<html><pre>\nheaders:\n args: {}\n".format(args) +
- " kwargs: {}\n".format(kwargs) +
- " headers: {}\n".format(cherrypy.request.headers) +
- " path_info: {}\n".format(cherrypy.request.path_info) +
- " query_string: {}\n".format(cherrypy.request.query_string) +
- " session: {}\n".format(cherrypy.session) +
- " cookie: {}\n".format(cherrypy.request.cookie) +
- " method: {}\n".format(cherrypy.request.method) +
- " session: {}\n".format(cherrypy.session.get('fieldname')) +
- " body:\n")
+ "<html><pre>\nheaders:\n args: {}\n".format(args)
+ + " kwargs: {}\n".format(kwargs)
+ + " headers: {}\n".format(cherrypy.request.headers)
+ + " path_info: {}\n".format(cherrypy.request.path_info)
+ + " query_string: {}\n".format(cherrypy.request.query_string)
+ + " session: {}\n".format(cherrypy.session)
+ + " cookie: {}\n".format(cherrypy.request.cookie)
+ + " method: {}\n".format(cherrypy.request.method)
+ + " session: {}\n".format(cherrypy.session.get("fieldname"))
+ + " body:\n"
+ )
return_text += " length: {}\n".format(cherrypy.request.body.length)
if cherrypy.request.body.length:
return_text += " content: {}\n".format(
- str(cherrypy.request.body.read(int(cherrypy.request.headers.get('Content-Length', 0)))))
+ str(
+ cherrypy.request.body.read(
+ int(cherrypy.request.headers.get("Content-Length", 0))
+ )
+ )
+ )
if thread_info:
return_text += "thread: {}\n".format(thread_info)
return_text += "</pre></html>"
@staticmethod
def _check_valid_url_method(method, *args):
if len(args) < 3:
- raise NbiException("URL must contain at least 'main_topic/version/topic'", HTTPStatus.METHOD_NOT_ALLOWED)
+ raise NbiException(
+ "URL must contain at least 'main_topic/version/topic'",
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
reference = valid_url_methods
for arg in args:
if arg is None:
break
if not isinstance(reference, dict):
- raise NbiException("URL contains unexpected extra items '{}'".format(arg),
- HTTPStatus.METHOD_NOT_ALLOWED)
+ raise NbiException(
+ "URL contains unexpected extra items '{}'".format(arg),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
if arg in reference:
reference = reference[arg]
elif "<ID>" in reference:
reference = reference["<ID>"]
elif "*" in reference:
- reference = reference["*"]
+ # if there is content
+ if reference["*"]:
+ reference = reference["*"]
break
else:
- raise NbiException("Unexpected URL item {}".format(arg), HTTPStatus.METHOD_NOT_ALLOWED)
+ raise NbiException(
+ "Unexpected URL item {}".format(arg), HTTPStatus.METHOD_NOT_ALLOWED
+ )
if "TODO" in reference and method in reference["TODO"]:
- raise NbiException("Method {} not supported yet for this URL".format(method), HTTPStatus.NOT_IMPLEMENTED)
+ raise NbiException(
+ "Method {} not supported yet for this URL".format(method),
+ HTTPStatus.NOT_IMPLEMENTED,
+ )
elif "METHODS" in reference and method not in reference["METHODS"]:
- raise NbiException("Method {} not supported for this URL".format(method), HTTPStatus.METHOD_NOT_ALLOWED)
+ raise NbiException(
+ "Method {} not supported for this URL".format(method),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
return reference["ROLE_PERMISSION"] + method.lower()
@staticmethod
:return: None
"""
# Use cherrypy.request.base for absoluted path and make use of request.header HOST just in case behind aNAT
- cherrypy.response.headers["Location"] = "/osm/{}/{}/{}/{}".format(main_topic, version, topic, id)
+ cherrypy.response.headers["Location"] = "/osm/{}/{}/{}/{}".format(
+ main_topic, version, topic, id
+ )
return
@staticmethod
set_project: tuple with projects that a created element will belong to
method: show, list, delete, write
"""
- admin_query = {"force": False, "project_id": (token_info["project_id"], ), "username": token_info["username"],
- "admin": token_info["admin"], "public": None,
- "allow_show_user_project_role": token_info["allow_show_user_project_role"]}
+ admin_query = {
+ "force": False,
+ "project_id": (token_info["project_id"],),
+ "username": token_info["username"],
+ "admin": token_info["admin"],
+ "public": None,
+ "allow_show_user_project_role": token_info["allow_show_user_project_role"],
+ }
if kwargs:
# FORCE
if "FORCE" in kwargs:
- if kwargs["FORCE"].lower() != "false": # if None or True set force to True
+ if (
+ kwargs["FORCE"].lower() != "false"
+ ): # if None or True set force to True
admin_query["force"] = True
del kwargs["FORCE"]
# PUBLIC
if "PUBLIC" in kwargs:
- if kwargs["PUBLIC"].lower() != "false": # if None or True set public to True
+ if (
+ kwargs["PUBLIC"].lower() != "false"
+ ): # if None or True set public to True
admin_query["public"] = True
else:
admin_query["public"] = False
behave_as = kwargs.pop("ADMIN")
if behave_as.lower() != "false":
if not token_info["admin"]:
- raise NbiException("Only admin projects can use 'ADMIN' query string", HTTPStatus.UNAUTHORIZED)
- if not behave_as or behave_as.lower() == "true": # convert True, None to empty list
+ raise NbiException(
+ "Only admin projects can use 'ADMIN' query string",
+ HTTPStatus.UNAUTHORIZED,
+ )
+ if (
+ not behave_as or behave_as.lower() == "true"
+ ): # convert True, None to empty list
admin_query["project_id"] = ()
elif isinstance(behave_as, (list, tuple)):
admin_query["project_id"] = behave_as
- else: # isinstance(behave_as, str)
- admin_query["project_id"] = (behave_as, )
+ else: # isinstance(behave_as, str)
+ admin_query["project_id"] = (behave_as,)
if "SET_PROJECT" in kwargs:
set_project = kwargs.pop("SET_PROJECT")
if not set_project:
admin_query["set_project"] = list(admin_query["project_id"])
else:
if isinstance(set_project, str):
- set_project = (set_project, )
+ set_project = (set_project,)
if admin_query["project_id"]:
for p in set_project:
if p not in admin_query["project_id"]:
- raise NbiException("Unauthorized for 'SET_PROJECT={p}'. Try with 'ADMIN=True' or "
- "'ADMIN='{p}'".format(p=p), HTTPStatus.UNAUTHORIZED)
+ raise NbiException(
+ "Unauthorized for 'SET_PROJECT={p}'. Try with 'ADMIN=True' or "
+ "'ADMIN='{p}'".format(p=p),
+ HTTPStatus.UNAUTHORIZED,
+ )
admin_query["set_project"] = set_project
# PROJECT_READ
return admin_query
@cherrypy.expose
- def default(self, main_topic=None, version=None, topic=None, _id=None, item=None, *args, **kwargs):
+ def default(
+ self,
+ main_topic=None,
+ version=None,
+ topic=None,
+ _id=None,
+ item=None,
+ *args,
+ **kwargs
+ ):
token_info = None
outdata = None
_format = None
engine_session = None
try:
if not main_topic or not version or not topic:
- raise NbiException("URL must contain at least 'main_topic/version/topic'",
- HTTPStatus.METHOD_NOT_ALLOWED)
- if main_topic not in ("admin", "vnfpkgm", "nsd", "nslcm", "pdu", "nst", "nsilcm", "nspm"):
- raise NbiException("URL main_topic '{}' not supported".format(main_topic),
- HTTPStatus.METHOD_NOT_ALLOWED)
- if version != 'v1':
- raise NbiException("URL version '{}' not supported".format(version), HTTPStatus.METHOD_NOT_ALLOWED)
-
- if kwargs and "METHOD" in kwargs and kwargs["METHOD"] in ("PUT", "POST", "DELETE", "GET", "PATCH"):
+ raise NbiException(
+ "URL must contain at least 'main_topic/version/topic'",
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
+ if main_topic not in (
+ "admin",
+ "vnfpkgm",
+ "nsd",
+ "nslcm",
+ "pdu",
+ "nst",
+ "nsilcm",
+ "nspm",
+ "vnflcm",
+ ):
+ raise NbiException(
+ "URL main_topic '{}' not supported".format(main_topic),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
+ if version != "v1":
+ raise NbiException(
+ "URL version '{}' not supported".format(version),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
+
+ if (
+ kwargs
+ and "METHOD" in kwargs
+ and kwargs["METHOD"] in ("PUT", "POST", "DELETE", "GET", "PATCH")
+ ):
method = kwargs.pop("METHOD")
else:
method = cherrypy.request.method
- role_permission = self._check_valid_url_method(method, main_topic, version, topic, _id, item, *args)
- query_string_operations = self._extract_query_string_operations(kwargs, method)
+ role_permission = self._check_valid_url_method(
+ method, main_topic, version, topic, _id, item, *args
+ )
+ query_string_operations = self._extract_query_string_operations(
+ kwargs, method
+ )
if main_topic == "admin" and topic == "tokens":
return self.token(method, _id, kwargs)
- token_info = self.authenticator.authorize(role_permission, query_string_operations, _id)
+ token_info = self.authenticator.authorize(
+ role_permission, query_string_operations, _id
+ )
+ if main_topic == "admin" and topic == "domains":
+ return self.domain()
engine_session = self._manage_admin_query(token_info, kwargs, method, _id)
indata = self._format_in(kwargs)
engine_topic = topic
- if topic == "subscriptions":
- engine_topic = main_topic + "_" + topic
+
if item and topic != "pm_jobs":
engine_topic = item
engine_topic = "nsds"
elif main_topic == "vnfpkgm":
engine_topic = "vnfds"
+ if topic == "vnfpkg_op_occs":
+ engine_topic = "vnfpkgops"
+ if topic == "vnf_packages" and item == "action":
+ engine_topic = "vnfpkgops"
elif main_topic == "nslcm":
engine_topic = "nsrs"
if topic == "ns_lcm_op_occs":
engine_topic = "nslcmops"
if topic == "vnfrs" or topic == "vnf_instances":
engine_topic = "vnfrs"
+ elif main_topic == "vnflcm":
+ if topic == "vnf_lcm_op_occs":
+ engine_topic = "vnflcmops"
elif main_topic == "nst":
engine_topic = "nsts"
elif main_topic == "nsilcm":
engine_topic = "nsilcmops"
elif main_topic == "pdu":
engine_topic = "pdus"
- if engine_topic == "vims": # TODO this is for backward compatibility, it will be removed in the future
+ if (
+ engine_topic == "vims"
+ ): # TODO this is for backward compatibility, it will be removed in the future
engine_topic = "vim_accounts"
+ if topic == "subscriptions":
+ engine_topic = main_topic + "_" + topic
+
if method == "GET":
- if item in ("nsd_content", "package_content", "artifacts", "vnfd", "nsd", "nst", "nst_content"):
+ if item in (
+ "nsd_content",
+ "package_content",
+ "artifacts",
+ "vnfd",
+ "nsd",
+ "nst",
+ "nst_content",
+ ):
if item in ("vnfd", "nsd", "nst"):
path = "$DESCRIPTOR"
elif args:
path = ()
else:
path = None
- file, _format = self.engine.get_file(engine_session, engine_topic, _id, path,
- cherrypy.request.headers.get("Accept"))
+ file, _format = self.engine.get_file(
+ engine_session,
+ engine_topic,
+ _id,
+ path,
+ cherrypy.request.headers.get("Accept"),
+ )
outdata = file
elif not _id:
- outdata = self.engine.get_item_list(engine_session, engine_topic, kwargs)
+ outdata = self.engine.get_item_list(
+ engine_session, engine_topic, kwargs, api_req=True
+ )
else:
if item == "reports":
# TODO check that project_id (_id in this context) has permissions
_id = args[0]
- outdata = self.engine.get_item(engine_session, engine_topic, _id)
+ filter_q = None
+ if "vcaStatusRefresh" in kwargs:
+ filter_q = {"vcaStatusRefresh": kwargs["vcaStatusRefresh"]}
+ outdata = self.engine.get_item(engine_session, engine_topic, _id, filter_q, True)
+
elif method == "POST":
cherrypy.response.status = HTTPStatus.CREATED.value
- if topic in ("ns_descriptors_content", "vnf_packages_content", "netslice_templates_content"):
+ if topic in (
+ "ns_descriptors_content",
+ "vnf_packages_content",
+ "netslice_templates_content",
+ ):
_id = cherrypy.request.headers.get("Transaction-Id")
if not _id:
- _id, _ = self.engine.new_item(rollback, engine_session, engine_topic, {}, None,
- cherrypy.request.headers)
- completed = self.engine.upload_content(engine_session, engine_topic, _id, indata, kwargs,
- cherrypy.request.headers)
+ _id, _ = self.engine.new_item(
+ rollback,
+ engine_session,
+ engine_topic,
+ {},
+ None,
+ cherrypy.request.headers,
+ )
+ completed = self.engine.upload_content(
+ engine_session,
+ engine_topic,
+ _id,
+ indata,
+ kwargs,
+ cherrypy.request.headers,
+ )
if completed:
self._set_location_header(main_topic, version, topic, _id)
else:
outdata = {"id": _id}
elif topic == "ns_instances_content":
# creates NSR
- _id, _ = self.engine.new_item(rollback, engine_session, engine_topic, indata, kwargs)
+ _id, _ = self.engine.new_item(
+ rollback, engine_session, engine_topic, indata, kwargs
+ )
# creates nslcmop
indata["lcmOperationType"] = "instantiate"
indata["nsInstanceId"] = _id
- nslcmop_id, _ = self.engine.new_item(rollback, engine_session, "nslcmops", indata, None)
+ nslcmop_id, _ = self.engine.new_item(
+ rollback, engine_session, "nslcmops", indata, None
+ )
self._set_location_header(main_topic, version, topic, _id)
outdata = {"id": _id, "nslcmop_id": nslcmop_id}
elif topic == "ns_instances" and item:
indata["lcmOperationType"] = item
indata["nsInstanceId"] = _id
- _id, _ = self.engine.new_item(rollback, engine_session, "nslcmops", indata, kwargs)
- self._set_location_header(main_topic, version, "ns_lcm_op_occs", _id)
+ _id, _ = self.engine.new_item(
+ rollback, engine_session, "nslcmops", indata, kwargs
+ )
+ self._set_location_header(
+ main_topic, version, "ns_lcm_op_occs", _id
+ )
outdata = {"id": _id}
cherrypy.response.status = HTTPStatus.ACCEPTED.value
elif topic == "netslice_instances_content":
# creates NetSlice_Instance_record (NSIR)
- _id, _ = self.engine.new_item(rollback, engine_session, engine_topic, indata, kwargs)
+ _id, _ = self.engine.new_item(
+ rollback, engine_session, engine_topic, indata, kwargs
+ )
self._set_location_header(main_topic, version, topic, _id)
indata["lcmOperationType"] = "instantiate"
indata["netsliceInstanceId"] = _id
- nsilcmop_id, _ = self.engine.new_item(rollback, engine_session, "nsilcmops", indata, kwargs)
+ nsilcmop_id, _ = self.engine.new_item(
+ rollback, engine_session, "nsilcmops", indata, kwargs
+ )
outdata = {"id": _id, "nsilcmop_id": nsilcmop_id}
-
elif topic == "netslice_instances" and item:
indata["lcmOperationType"] = item
indata["netsliceInstanceId"] = _id
- _id, _ = self.engine.new_item(rollback, engine_session, "nsilcmops", indata, kwargs)
- self._set_location_header(main_topic, version, "nsi_lcm_op_occs", _id)
+ _id, _ = self.engine.new_item(
+ rollback, engine_session, "nsilcmops", indata, kwargs
+ )
+ self._set_location_header(
+ main_topic, version, "nsi_lcm_op_occs", _id
+ )
+ outdata = {"id": _id}
+ cherrypy.response.status = HTTPStatus.ACCEPTED.value
+ elif topic == "vnf_packages" and item == "action":
+ indata["lcmOperationType"] = item
+ indata["vnfPkgId"] = _id
+ _id, _ = self.engine.new_item(
+ rollback, engine_session, "vnfpkgops", indata, kwargs
+ )
+ self._set_location_header(
+ main_topic, version, "vnfpkg_op_occs", _id
+ )
+ outdata = {"id": _id}
+ cherrypy.response.status = HTTPStatus.ACCEPTED.value
+ elif topic == "subscriptions":
+ _id, _ = self.engine.new_item(
+ rollback, engine_session, engine_topic, indata, kwargs
+ )
+ self._set_location_header(main_topic, version, topic, _id)
+ link = {}
+ link["self"] = cherrypy.response.headers["Location"]
+ outdata = {
+ "id": _id,
+ "filter": indata["filter"],
+ "callbackUri": indata["CallbackUri"],
+ "_links": link,
+ }
+ cherrypy.response.status = HTTPStatus.CREATED.value
+ elif topic == "vnf_instances" and item:
+ indata["lcmOperationType"] = item
+ indata["vnfInstanceId"] = _id
+ _id, _ = self.engine.new_item(rollback, engine_session, "vnflcmops", indata, kwargs)
+ self._set_location_header(main_topic, version, "vnf_lcm_op_occs", _id)
outdata = {"id": _id}
cherrypy.response.status = HTTPStatus.ACCEPTED.value
else:
- _id, op_id = self.engine.new_item(rollback, engine_session, engine_topic, indata, kwargs,
- cherrypy.request.headers)
+ _id, op_id = self.engine.new_item(
+ rollback,
+ engine_session,
+ engine_topic,
+ indata,
+ kwargs,
+ cherrypy.request.headers,
+ )
self._set_location_header(main_topic, version, topic, _id)
outdata = {"id": _id}
if op_id:
elif method == "DELETE":
if not _id:
- outdata = self.engine.del_item_list(engine_session, engine_topic, kwargs)
+ outdata = self.engine.del_item_list(
+ engine_session, engine_topic, kwargs
+ )
cherrypy.response.status = HTTPStatus.OK.value
else: # len(args) > 1
- delete_in_process = False
+ # for NS NSI generate an operation
+ op_id = None
if topic == "ns_instances_content" and not engine_session["force"]:
nslcmop_desc = {
"lcmOperationType": "terminate",
"nsInstanceId": _id,
- "autoremove": True
+ "autoremove": True,
}
- opp_id, _ = self.engine.new_item(rollback, engine_session, "nslcmops", nslcmop_desc, None)
- if opp_id:
- delete_in_process = True
- outdata = {"_id": opp_id}
- cherrypy.response.status = HTTPStatus.ACCEPTED.value
- elif topic == "netslice_instances_content" and not engine_session["force"]:
+ op_id, _ = self.engine.new_item(
+ rollback, engine_session, "nslcmops", nslcmop_desc, kwargs
+ )
+ if op_id:
+ outdata = {"_id": op_id}
+ elif (
+ topic == "netslice_instances_content"
+ and not engine_session["force"]
+ ):
nsilcmop_desc = {
"lcmOperationType": "terminate",
"netsliceInstanceId": _id,
- "autoremove": True
+ "autoremove": True,
}
- opp_id, _ = self.engine.new_item(rollback, engine_session, "nsilcmops", nsilcmop_desc, None)
- if opp_id:
- delete_in_process = True
- outdata = {"_id": opp_id}
- cherrypy.response.status = HTTPStatus.ACCEPTED.value
- if not delete_in_process:
- self.engine.del_item(engine_session, engine_topic, _id)
- cherrypy.response.status = HTTPStatus.NO_CONTENT.value
- if engine_topic in ("vim_accounts", "wim_accounts", "sdns", "k8sclusters", "k8srepos"):
- cherrypy.response.status = HTTPStatus.ACCEPTED.value
+ op_id, _ = self.engine.new_item(
+ rollback, engine_session, "nsilcmops", nsilcmop_desc, None
+ )
+ if op_id:
+ outdata = {"_id": op_id}
+ # if there is not any deletion in process, delete
+ if not op_id:
+ op_id = self.engine.del_item(engine_session, engine_topic, _id)
+ if op_id:
+ outdata = {"op_id": op_id}
+ cherrypy.response.status = (
+ HTTPStatus.ACCEPTED.value
+ if op_id
+ else HTTPStatus.NO_CONTENT.value
+ )
elif method in ("PUT", "PATCH"):
op_id = None
if not indata and not kwargs and not engine_session.get("set_project"):
- raise NbiException("Nothing to update. Provide payload and/or query string",
- HTTPStatus.BAD_REQUEST)
- if item in ("nsd_content", "package_content", "nst_content") and method == "PUT":
- completed = self.engine.upload_content(engine_session, engine_topic, _id, indata, kwargs,
- cherrypy.request.headers)
+ raise NbiException(
+ "Nothing to update. Provide payload and/or query string",
+ HTTPStatus.BAD_REQUEST,
+ )
+ if (
+ item in ("nsd_content", "package_content", "nst_content")
+ and method == "PUT"
+ ):
+ completed = self.engine.upload_content(
+ engine_session,
+ engine_topic,
+ _id,
+ indata,
+ kwargs,
+ cherrypy.request.headers,
+ )
if not completed:
cherrypy.response.headers["Transaction-Id"] = id
else:
- op_id = self.engine.edit_item(engine_session, engine_topic, _id, indata, kwargs)
+ op_id = self.engine.edit_item(
+ engine_session, engine_topic, _id, indata, kwargs
+ )
if op_id:
cherrypy.response.status = HTTPStatus.ACCEPTED.value
cherrypy.response.status = HTTPStatus.NO_CONTENT.value
outdata = None
else:
- raise NbiException("Method {} not allowed".format(method), HTTPStatus.METHOD_NOT_ALLOWED)
+ raise NbiException(
+ "Method {} not allowed".format(method),
+ HTTPStatus.METHOD_NOT_ALLOWED,
+ )
# if Role information changes, it is needed to reload the information of roles
if topic == "roles" and method != "GET":
self.authenticator.load_operation_to_allowed_roles()
- if topic == "projects" and method == "DELETE" \
- or topic in ["users", "roles"] and method in ["PUT", "PATCH", "DELETE"]:
+ if (
+ topic == "projects"
+ and method == "DELETE"
+ or topic in ["users", "roles"]
+ and method in ["PUT", "PATCH", "DELETE"]
+ ):
self.authenticator.remove_token_from_cache()
return self._format_out(outdata, token_info, _format)
except Exception as e:
- if isinstance(e, (NbiException, EngineException, DbException, FsException, MsgException, AuthException,
- ValidationError, AuthconnException)):
+ if isinstance(
+ e,
+ (
+ NbiException,
+ EngineException,
+ DbException,
+ FsException,
+ MsgException,
+ AuthException,
+ ValidationError,
+ AuthconnException,
+ ),
+ ):
http_code_value = cherrypy.response.status = e.http_code.value
http_code_name = e.http_code.name
cherrypy.log("Exception {}".format(e))
else:
- http_code_value = cherrypy.response.status = HTTPStatus.BAD_REQUEST.value # INTERNAL_SERVER_ERROR
+ http_code_value = (
+ cherrypy.response.status
+ ) = HTTPStatus.BAD_REQUEST.value # INTERNAL_SERVER_ERROR
cherrypy.log("CRITICAL: Exception {}".format(e), traceback=True)
http_code_name = HTTPStatus.BAD_REQUEST.name
if hasattr(outdata, "close"): # is an open file
for rollback_item in rollback:
try:
if rollback_item.get("operation") == "set":
- self.engine.db.set_one(rollback_item["topic"], {"_id": rollback_item["_id"]},
- rollback_item["content"], fail_on_empty=False)
+ self.engine.db.set_one(
+ rollback_item["topic"],
+ {"_id": rollback_item["_id"]},
+ rollback_item["content"],
+ fail_on_empty=False,
+ )
+ elif rollback_item.get("operation") == "del_list":
+ self.engine.db.del_list(
+ rollback_item["topic"],
+ rollback_item["filter"],
+ fail_on_empty=False,
+ )
else:
- self.engine.db.del_one(rollback_item["topic"], {"_id": rollback_item["_id"]},
- fail_on_empty=False)
+ self.engine.db.del_one(
+ rollback_item["topic"],
+ {"_id": rollback_item["_id"]},
+ fail_on_empty=False,
+ )
except Exception as e2:
- rollback_error_text = "Rollback Exception {}: {}".format(rollback_item, e2)
+ rollback_error_text = "Rollback Exception {}: {}".format(
+ rollback_item, e2
+ )
cherrypy.log(rollback_error_text)
error_text += ". " + rollback_error_text
# if isinstance(e, MsgException):
if method in ("PUT", "PATCH", "POST") and isinstance(outdata, dict):
for logging_id in ("id", "op_id", "nsilcmop_id", "nslcmop_id"):
if outdata.get(logging_id):
- cherrypy.request.login += ";{}={}".format(logging_id, outdata[logging_id][:36])
+ cherrypy.request.login += ";{}={}".format(
+ logging_id, outdata[logging_id][:36]
+ )
def _start_service():
# update general cherrypy configuration
update_dict = {}
- engine_config = cherrypy.tree.apps['/osm'].config
+ engine_config = cherrypy.tree.apps["/osm"].config
for k, v in environ.items():
if not k.startswith("OSMNBI_"):
continue
continue
try:
# update static configuration
- if k == 'OSMNBI_STATIC_DIR':
- engine_config["/static"]['tools.staticdir.dir'] = v
- engine_config["/static"]['tools.staticdir.on'] = True
- elif k == 'OSMNBI_SOCKET_PORT' or k == 'OSMNBI_SERVER_PORT':
- update_dict['server.socket_port'] = int(v)
- elif k == 'OSMNBI_SOCKET_HOST' or k == 'OSMNBI_SERVER_HOST':
- update_dict['server.socket_host'] = v
+ if k == "OSMNBI_STATIC_DIR":
+ engine_config["/static"]["tools.staticdir.dir"] = v
+ engine_config["/static"]["tools.staticdir.on"] = True
+ elif k == "OSMNBI_SOCKET_PORT" or k == "OSMNBI_SERVER_PORT":
+ update_dict["server.socket_port"] = int(v)
+ elif k == "OSMNBI_SOCKET_HOST" or k == "OSMNBI_SERVER_HOST":
+ update_dict["server.socket_host"] = v
elif k1 in ("server", "test", "auth", "log"):
- update_dict[k1 + '.' + k2] = v
+ update_dict[k1 + "." + k2] = v
elif k1 in ("message", "database", "storage", "authentication"):
# k2 = k2.replace('_', '.')
if k2 in ("port", "db_port"):
engine_config["global"].update(update_dict)
# logging cherrypy
- log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
- log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
+ log_format_simple = (
+ "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
+ )
+ log_formatter_simple = logging.Formatter(
+ log_format_simple, datefmt="%Y-%m-%dT%H:%M:%S"
+ )
logger_server = logging.getLogger("cherrypy.error")
logger_access = logging.getLogger("cherrypy.access")
logger_cherry = logging.getLogger("cherrypy")
logger_nbi = logging.getLogger("nbi")
if "log.file" in engine_config["global"]:
- file_handler = logging.handlers.RotatingFileHandler(engine_config["global"]["log.file"],
- maxBytes=100e6, backupCount=9, delay=0)
+ file_handler = logging.handlers.RotatingFileHandler(
+ engine_config["global"]["log.file"], maxBytes=100e6, backupCount=9, delay=0
+ )
file_handler.setFormatter(log_formatter_simple)
logger_cherry.addHandler(file_handler)
logger_nbi.addHandler(file_handler)
# log always to standard output
- for format_, logger in {"nbi.server %(filename)s:%(lineno)s": logger_server,
- "nbi.access %(filename)s:%(lineno)s": logger_access,
- "%(name)s %(filename)s:%(lineno)s": logger_nbi
- }.items():
+ for format_, logger in {
+ "nbi.server %(filename)s:%(lineno)s": logger_server,
+ "nbi.access %(filename)s:%(lineno)s": logger_access,
+ "%(name)s %(filename)s:%(lineno)s": logger_nbi,
+ }.items():
log_format_cherry = "%(asctime)s %(levelname)s {} %(message)s".format(format_)
- log_formatter_cherry = logging.Formatter(log_format_cherry, datefmt='%Y-%m-%dT%H:%M:%S')
+ log_formatter_cherry = logging.Formatter(
+ log_format_cherry, datefmt="%Y-%m-%dT%H:%M:%S"
+ )
str_handler = logging.StreamHandler()
str_handler.setFormatter(log_formatter_cherry)
logger.addHandler(str_handler)
logger_nbi.setLevel(engine_config["global"]["log.level"])
# logging other modules
- for k1, logname in {"message": "nbi.msg", "database": "nbi.db", "storage": "nbi.fs"}.items():
+ for k1, logname in {
+ "message": "nbi.msg",
+ "database": "nbi.db",
+ "storage": "nbi.fs",
+ }.items():
engine_config[k1]["logger_name"] = logname
logger_module = logging.getLogger(logname)
if "logfile" in engine_config[k1]:
- file_handler = logging.handlers.RotatingFileHandler(engine_config[k1]["logfile"],
- maxBytes=100e6, backupCount=9, delay=0)
+ file_handler = logging.handlers.RotatingFileHandler(
+ engine_config[k1]["logfile"], maxBytes=100e6, backupCount=9, delay=0
+ )
file_handler.setFormatter(log_formatter_simple)
logger_module.addHandler(file_handler)
if "loglevel" in engine_config[k1]:
logger_module.setLevel(engine_config[k1]["loglevel"])
# TODO add more entries, e.g.: storage
- cherrypy.tree.apps['/osm'].root.engine.start(engine_config)
- cherrypy.tree.apps['/osm'].root.authenticator.start(engine_config)
- cherrypy.tree.apps['/osm'].root.engine.init_db(target_version=database_version)
- cherrypy.tree.apps['/osm'].root.authenticator.init_db(target_version=auth_database_version)
+ cherrypy.tree.apps["/osm"].root.engine.start(engine_config)
+ cherrypy.tree.apps["/osm"].root.authenticator.start(engine_config)
+ cherrypy.tree.apps["/osm"].root.engine.init_db(target_version=database_version)
+ cherrypy.tree.apps["/osm"].root.authenticator.init_db(
+ target_version=auth_database_version
+ )
# start subscriptions thread:
- subscription_thread = SubscriptionThread(config=engine_config, engine=nbi_server.engine)
+ subscription_thread = SubscriptionThread(
+ config=engine_config, engine=nbi_server.engine
+ )
subscription_thread.start()
# Do not capture except SubscriptionException
backend = engine_config["authentication"]["backend"]
- cherrypy.log.error("Starting OSM NBI Version '{} {}' with '{}' authentication backend"
- .format(nbi_version, nbi_version_date, backend))
+ cherrypy.log.error(
+ "Starting OSM NBI Version '{} {}' with '{}' authentication backend".format(
+ nbi_version, nbi_version_date, backend
+ )
+ )
def _stop_service():
if subscription_thread:
subscription_thread.terminate()
subscription_thread = None
- cherrypy.tree.apps['/osm'].root.engine.stop()
+ cherrypy.tree.apps["/osm"].root.engine.stop()
cherrypy.log.error("Stopping osm_nbi")
# 'tools.auth_basic.realm': 'localhost',
# 'tools.auth_basic.checkpassword': validate_password})
nbi_server = Server()
- cherrypy.engine.subscribe('start', _start_service)
- cherrypy.engine.subscribe('stop', _stop_service)
- cherrypy.quickstart(nbi_server, '/osm', config_file)
+ cherrypy.engine.subscribe("start", _start_service)
+ cherrypy.engine.subscribe("stop", _stop_service)
+ cherrypy.quickstart(nbi_server, "/osm", config_file)
def usage():
- print("""Usage: {} [options]
+ print(
+ """Usage: {} [options]
-c|--config [configuration_file]: loads the configuration file (default: ./nbi.cfg)
-h|--help: shows this help
- """.format(sys.argv[0]))
+ """.format(
+ sys.argv[0]
+ )
+ )
# --log-socket-host HOST: send logs to this host")
# --log-socket-port PORT: send logs using this port (default: 9022)")
-if __name__ == '__main__':
+if __name__ == "__main__":
try:
# load parameters and configuration
opts, args = getopt.getopt(sys.argv[1:], "hvc:", ["config=", "help"])
assert False, "Unhandled option"
if config_file:
if not path.isfile(config_file):
- print("configuration file '{}' that not exist".format(config_file), file=sys.stderr)
+ print(
+ "configuration file '{}' that not exist".format(config_file),
+ file=sys.stderr,
+ )
exit(1)
else:
- for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./nbi.cfg", "/etc/osm/nbi.cfg"):
+ for config_file in (
+ __file__[: __file__.rfind(".")] + ".cfg",
+ "./nbi.cfg",
+ "/etc/osm/nbi.cfg",
+ ):
if path.isfile(config_file):
break
else:
- print("No configuration file 'nbi.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
+ print(
+ "No configuration file 'nbi.cfg' found neither at local folder nor at /etc/osm/",
+ file=sys.stderr,
+ )
exit(1)
nbi(config_file)
except getopt.GetoptError as e: