from osm_common.dbbase import DbException
from osm_common.fsbase import FsException
from osm_common.msgbase import MsgException
+from osm_common.wftemporal import WFTemporal
from http import HTTPStatus
from codecs import getreader
from os import environ, path
cherrypy.request.headers.pop("Content-File-MD5", None)
elif "application/yaml" in cherrypy.request.headers["Content-Type"]:
error_text = "Invalid yaml format "
- indata = yaml.load(
- cherrypy.request.body, Loader=yaml.SafeLoader
- )
+ indata = yaml.safe_load(cherrypy.request.body)
cherrypy.request.headers.pop("Content-File-MD5", None)
elif (
"application/binary" in cherrypy.request.headers["Content-Type"]
# "Only 'Content-Type' of type 'application/json' or
# 'application/yaml' for input format are available")
error_text = "Invalid yaml format "
- indata = yaml.load(
- cherrypy.request.body, Loader=yaml.SafeLoader
- )
+ indata = yaml.safe_load(cherrypy.request.body)
cherrypy.request.headers.pop("Content-File-MD5", None)
else:
error_text = "Invalid yaml format "
- indata = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ indata = yaml.safe_load(cherrypy.request.body)
cherrypy.request.headers.pop("Content-File-MD5", None)
if not indata:
indata = {}
kwargs[k] = None
elif format_yaml:
try:
- kwargs[k] = yaml.load(v, Loader=yaml.SafeLoader)
+ kwargs[k] = yaml.safe_load(v)
except Exception:
pass
elif (
v[index] = None
elif format_yaml:
try:
- v[index] = yaml.load(v[index], Loader=yaml.SafeLoader)
+ v[index] = yaml.safe_load(v[index])
except Exception:
pass
project_name=None,
ns_id=None,
*args,
- **kwargs
+ **kwargs,
):
if topic == "alarms":
try:
return self._format_out(str(alarm_list))
# to handle patch request for alarm update
elif cherrypy.request.method == "PATCH":
- data = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ data = yaml.safe_load(cherrypy.request.body)
try:
# check if uuid is valid
self.engine.db.get_one("alarms", {"uuid": data.get("uuid")})
return_text = "<html><pre>{} ->\n".format(main_topic)
try:
if cherrypy.request.method == "POST":
- to_send = yaml.load(cherrypy.request.body, Loader=yaml.SafeLoader)
+ to_send = yaml.safe_load(cherrypy.request.body)
for k, v in to_send.items():
self.engine.msg.write(main_topic, k, v)
return_text += " {}: {}\n".format(k, v)
elif cherrypy.request.method == "GET":
for k, v in kwargs.items():
- v_dict = yaml.load(v, Loader=yaml.SafeLoader)
+ v_dict = yaml.safe_load(v)
self.engine.msg.write(main_topic, k, v_dict)
return_text += " {}: {}\n".format(k, v_dict)
except Exception as e:
_id=None,
item=None,
*args,
- **kwargs
+ **kwargs,
):
token_info = None
outdata = None
update_dict["server.socket_host"] = v
elif k1 in ("server", "test", "auth", "log"):
update_dict[k1 + "." + k2] = v
- elif k1 in ("message", "database", "storage", "authentication"):
+ elif k1 in ("message", "database", "storage", "authentication", "temporal"):
# k2 = k2.replace('_', '.')
if k2 in ("port", "db_port"):
engine_config[k1][k2] = int(v)
subscription_thread.start()
# Do not capture except SubscriptionException
+ WFTemporal.temporal_api = (
+ f'{engine_config["temporal"]["host"]}:{engine_config["temporal"]["port"]}'
+ )
+
backend = engine_config["authentication"]["backend"]
cherrypy.log.error(
"Starting OSM NBI Version '{} {}' with '{}' authentication backend".format(