print("kwargs > {}".format(kwargs))
if args:
if "update" in args:
- ro_ns_desc = yaml.safe_load(
- descriptors.ro_update_action_text
- )
+ ro_ns_desc = yaml.safe_load(descriptors.ro_update_action_text)
while True:
yield ro_ns_desc
if kwargs.get("delete"):
- ro_ns_desc = yaml.safe_load(
- descriptors.ro_delete_action_text
- )
+ ro_ns_desc = yaml.safe_load(descriptors.ro_delete_action_text)
while True:
yield ro_ns_desc
Database.instance = None
self.db = Database({"database": {"driver": "memory"}}).instance.db
- self.db.create_list(
- "vnfds", yaml.safe_load(descriptors.db_vnfds_text)
- )
+ self.db.create_list("vnfds", yaml.safe_load(descriptors.db_vnfds_text))
self.db.create_list(
"vnfds_revisions",
yaml.safe_load(descriptors.db_vnfds_revisions_text),
)
- self.db.create_list(
- "nsds", yaml.safe_load(descriptors.db_nsds_text)
- )
- self.db.create_list(
- "nsrs", yaml.safe_load(descriptors.db_nsrs_text)
- )
+ self.db.create_list("nsds", yaml.safe_load(descriptors.db_nsds_text))
+ self.db.create_list("nsrs", yaml.safe_load(descriptors.db_nsrs_text))
self.db.create_list(
"vim_accounts",
yaml.safe_load(descriptors.db_vim_accounts_text),
self.db.create_list(
"nslcmops", yaml.safe_load(descriptors.db_nslcmops_text)
)
- self.db.create_list(
- "vnfrs", yaml.safe_load(descriptors.db_vnfrs_text)
- )
- self.db_vim_accounts = yaml.safe_load(
- descriptors.db_vim_accounts_text
- )
+ self.db.create_list("vnfrs", yaml.safe_load(descriptors.db_vnfrs_text))
+ self.db_vim_accounts = yaml.safe_load(descriptors.db_vim_accounts_text)
# Mock kafka
self.msg = asynctest.Mock(MsgKafka())
self.assertEqual(return_value, expected_value)
with self.assertRaises(Exception) as context:
self.db.get_one("vnfrs", {"_id": vnf_instance_id})
- self.assertTrue("database exception Not found entry with filter" in str(context.exception))
+ self.assertTrue(
+ "database exception Not found entry with filter"
+ in str(context.exception)
+ )
# test vertical scale executes sucessfully
# @patch("osm_lcm.ng_ro.status.response")