db: object,
fs: object,
log: object,
- loop: object,
on_update_db=None,
**kwargs,
):
:param object fs: FileSystem object managing the package artifacts (repo common
FsBase)
:param object log: the logging object to log to
- :param object loop: the loop to use for asyncio (default current thread loop)
:param on_update_db: callback called when n2vc connector updates database.
Received arguments:
table: e.g. "nsrs"
# store arguments into self
self.db = db
self.fs = fs
- self.loop = loop or asyncio.get_event_loop()
self.on_update_db = on_update_db
# generate private/public key-pair
# TODO
@abc.abstractmethod
async def remove_relation(self):
- """
- """
+ """ """
# TODO
@abc.abstractmethod
async def deregister_execution_environments(self):
- """
- """
+ """ """
@abc.abstractmethod
async def delete_namespace(
:param float total_timeout:
"""
+ @abc.abstractmethod
+ async def upgrade_charm(
+ self,
+ ee_id: str = None,
+ path: str = None,
+ charm_id: str = None,
+ charm_type: str = None,
+ timeout: float = None,
+ ) -> str:
+ """This method upgrade charms in VNFs
+
+ Args:
+ ee_id: Execution environment id
+ path: Local path to the charm
+ charm_id: charm-id
+ charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
+ timeout: (Float) Timeout for the ns update operation
+
+ Returns:
+ The output of the update operation if status equals to "completed"
+ """
+
@abc.abstractmethod
async def exec_primitive(
self,
# .format(str(status.value), detailed_status, vca_status, entity_type))
try:
-
the_table = db_dict["collection"]
the_filter = db_dict["filter"]
the_path = db_dict["path"]
the_table, the_filter, the_path, update_dict, vca_id=vca_id
)
else:
- self.on_update_db(the_table, the_filter, the_path, update_dict, vca_id=vca_id)
+ self.on_update_db(
+ the_table, the_filter, the_path, update_dict, vca_id=vca_id
+ )
except DbException as e:
if e.http_code == HTTPStatus.NOT_FOUND:
# convert obj to yaml
yaml_text = obj_to_yaml(obj)
# parse to dict
- return yaml.load(yaml_text, Loader=yaml.Loader)
+ return yaml.load(yaml_text, Loader=yaml.SafeLoader)