),
}
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.vim")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.ro_config = config["RO"]
db_vim_update["_admin.deployed.RO"] = None
db_vim_update["_admin.detailed-status"] = step
self.update_db_2("vim_accounts", vim_id, db_vim_update)
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
vim_RO = deepcopy(vim_content)
vim_RO.pop("_id", None)
vim_RO.pop("_admin", None)
RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
step = "Editing vim at RO"
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
vim_RO = deepcopy(vim_content)
vim_RO.pop("_id", None)
vim_RO.pop("_admin", None)
and db_vim["_admin"]["deployed"].get("RO")
):
RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Detaching vim from RO tenant"
try:
await RO.detach("vim_account", RO_vim_id)
# values that are encrypted at wim config because they are passwords
wim_config_encrypted = ()
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.vim")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.ro_config = config["RO"]
step = "Creating wim at RO"
db_wim_update["_admin.detailed-status"] = step
self.update_db_2("wim_accounts", wim_id, db_wim_update)
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
wim_RO = deepcopy(wim_content)
wim_RO.pop("_id", None)
wim_RO.pop("_admin", None)
):
RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
step = "Editing wim at RO"
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
wim_RO = deepcopy(wim_content)
wim_RO.pop("_id", None)
wim_RO.pop("_admin", None)
and db_wim["_admin"]["deployed"].get("RO")
):
RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Detaching wim from RO tenant"
try:
await RO.detach("wim_account", RO_wim_id)
class SdnLcm(LcmBase):
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.sdn")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.ro_config = config["RO"]
db_sdn_update["_admin.detailed-status"] = step
self.update_db_2("sdns", sdn_id, db_sdn_update)
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
sdn_RO = deepcopy(sdn_content)
sdn_RO.pop("_id", None)
sdn_RO.pop("_admin", None)
and db_sdn["_admin"]["deployed"].get("RO")
):
RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Editing sdn at RO"
sdn_RO = deepcopy(sdn_content)
sdn_RO.pop("_id", None)
and db_sdn["_admin"]["deployed"].get("RO")
):
RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
- RO = ROclient.ROClient(self.loop, **self.ro_config)
+ RO = ROclient.ROClient(**self.ro_config)
step = "Deleting sdn from RO"
try:
await RO.delete("sdn", RO_sdn_id)
class K8sClusterLcm(LcmBase):
timeout_create = 300
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.k8scluster")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.vca_config = config["VCA"]
kubectl_command=self.vca_config.get("kubectlpath"),
juju_command=self.vca_config.get("jujupath"),
log=self.logger,
- loop=self.loop,
on_update_db=None,
db=self.db,
fs=self.fs,
class VcaLcm(LcmBase):
timeout_create = 30
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.vca")
- self.loop = loop
self.lcm_tasks = lcm_tasks
super().__init__(msg, self.logger)
# create N2VC connector
- self.n2vc = N2VCJujuConnector(
- log=self.logger, loop=self.loop, fs=self.fs, db=self.db
- )
+ self.n2vc = N2VCJujuConnector(log=self.logger, fs=self.fs, db=self.db)
def _get_vca_by_id(self, vca_id: str) -> dict:
db_vca = self.db.get_one("vca", {"_id": vca_id})
class K8sRepoLcm(LcmBase):
- def __init__(self, msg, lcm_tasks, config, loop):
+ def __init__(self, msg, lcm_tasks, config):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"""
self.logger = logging.getLogger("lcm.k8srepo")
- self.loop = loop
self.lcm_tasks = lcm_tasks
self.vca_config = config["VCA"]