kubectl_command: str = "/usr/bin/kubectl",
juju_command: str = "/usr/bin/juju",
log: object = None,
- loop: object = None,
on_update_db=None,
):
"""
:param kubectl_command: path to kubectl executable
:param helm_command: path to helm executable
:param log: logger
- :param: loop: Asyncio loop
"""
# parent class
K8sConnector.__init__(self, db, log=log, on_update_db=on_update_db)
self.fs = fs
- self.loop = loop or asyncio.get_event_loop()
self.log.debug("Initializing K8S Juju connector")
db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
self._store = MotorStore(db_uri)
- self.loading_libjuju = asyncio.Lock(loop=self.loop)
+ self.loading_libjuju = asyncio.Lock()
self.uninstall_locks = {}
self.log.debug("K8S Juju connector initialized")
will_not_delete = False
if model_name not in self.uninstall_locks:
- self.uninstall_locks[model_name] = asyncio.Lock(loop=self.loop)
+ self.uninstall_locks[model_name] = asyncio.Lock()
delete_lock = self.uninstall_locks[model_name]
while delete_lock.locked():
if not self.libjuju:
async with self.loading_libjuju:
vca_connection = await get_connection(self._store)
- self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
+ self.libjuju = Libjuju(vca_connection, log=self.log)
return self.libjuju
else:
vca_connection = await get_connection(self._store, vca_id)
- return Libjuju(vca_connection, loop=self.loop, log=self.log, n2vc=self)
+ return Libjuju(vca_connection, log=self.log, n2vc=self)
def _get_kubectl(self, credentials: str) -> Kubectl:
"""