asyncio.ensure_future(
self._local_async_exec(command=command, raise_exception_on_error=False)
)
- # loop = asyncio.get_event_loop()
- # loop.run_until_complete(self._local_async_exec(command=command,
- # raise_exception_on_error=False))
except Exception as e:
self.warning(
msg="helm init failed (it was already initialized): {}".format(e)
kubectl_command: str = "/usr/bin/kubectl",
juju_command: str = "/usr/bin/juju",
log: object = None,
- loop: object = None,
on_update_db=None,
):
"""
:param kubectl_command: path to kubectl executable
:param helm_command: path to helm executable
:param log: logger
- :param: loop: Asyncio loop
"""
# parent class
K8sConnector.__init__(self, db, log=log, on_update_db=on_update_db)
self.fs = fs
- self.loop = loop or asyncio.get_event_loop()
self.log.debug("Initializing K8S Juju connector")
db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
if not self.libjuju:
async with self.loading_libjuju:
vca_connection = await get_connection(self._store)
- self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
+ self.libjuju = Libjuju(vca_connection, log=self.log)
return self.libjuju
else:
vca_connection = await get_connection(self._store, vca_id)
- return Libjuju(vca_connection, loop=self.loop, log=self.log, n2vc=self)
+ return Libjuju(vca_connection, log=self.log, n2vc=self)
def _get_kubectl(self, credentials: str) -> Kubectl:
"""
def __init__(
self,
vca_connection: Connection,
- loop: asyncio.AbstractEventLoop = None,
log: logging.Logger = None,
n2vc: N2VCConnector = None,
):
Constructor
:param: vca_connection: n2vc.vca.connection object
- :param: loop: Asyncio loop
:param: log: Logger
:param: n2vc: N2VC object
"""
self.n2vc = n2vc
self.vca_connection = vca_connection
- self.loop = loop or asyncio.get_event_loop()
- self.loop.set_exception_handler(self.handle_exception)
self.creating_model = asyncio.Lock()
if self.vca_connection.is_default:
self.health_check_task = self._create_health_check_task()
def _create_health_check_task(self):
- return self.loop.create_task(self.health_check())
+ return asyncio.get_event_loop().create_task(self.health_check())
async def get_controller(self, timeout: float = 60.0) -> Controller:
"""
await self.disconnect_model(model)
await self.disconnect_controller(controller)
- def handle_exception(self, loop, context):
- # All unhandled exceptions by libjuju are handled here.
- pass
-
async def health_check(self, interval: float = 300.0):
"""
Health check to make sure controller and controller_model connections are OK
db: object,
fs: object,
log: object,
- loop: object,
on_update_db=None,
**kwargs,
):
:param object fs: FileSystem object managing the package artifacts (repo common
FsBase)
:param object log: the logging object to log to
- :param object loop: the loop to use for asyncio (default current thread loop)
:param on_update_db: callback called when n2vc connector updates database.
Received arguments:
table: e.g. "nsrs"
# store arguments into self
self.db = db
self.fs = fs
- self.loop = loop or asyncio.get_event_loop()
self.on_update_db = on_update_db
# generate private/public key-pair
db: object,
fs: object,
log: object = None,
- loop: object = None,
on_update_db=None,
):
"""
:param: db: Database object from osm_common
:param: fs: Filesystem object from osm_common
:param: log: Logger
- :param: loop: Asyncio loop
:param: on_update_db: Callback function to be called for updating the database.
"""
# parent class constructor
- N2VCConnector.__init__(
- self, db=db, fs=fs, log=log, loop=loop, on_update_db=on_update_db
- )
+ N2VCConnector.__init__(self, db=db, fs=fs, log=log, on_update_db=on_update_db)
# silence websocket traffic log
logging.getLogger("websockets.protocol").setLevel(logging.INFO)
if not self.libjuju:
async with self.loading_libjuju:
vca_connection = await get_connection(self._store)
- self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
+ self.libjuju = Libjuju(vca_connection, log=self.log)
return self.libjuju
else:
vca_connection = await get_connection(self._store, vca_id)
- return Libjuju(vca_connection, loop=self.loop, log=self.log, n2vc=self)
+ return Libjuju(vca_connection, log=self.log, n2vc=self)
def _write_ee_id_db(self, db_dict: dict, ee_id: str):
# write ee_id to database: _admin.deployed.VCA.x
:param: vca_id: VCA ID
"""
vca_connection = await get_connection(self._store, vca_id=vca_id)
- libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log, n2vc=self)
+ libjuju = Libjuju(vca_connection, log=self.log, n2vc=self)
controller = await libjuju.get_controller()
await libjuju.disconnect_controller(controller)
fs=fslocal.FsLocal(),
db=self.db,
log=None,
- loop=self.loop,
on_update_db=None,
)
self.k8s_juju_conn._store.get_vca_id.return_value = None
}
)
logging.disable(logging.CRITICAL)
- self.libjuju = Libjuju(vca_connection, self.loop)
+ self.libjuju = Libjuju(vca_connection)
self.loop.run_until_complete(self.libjuju.disconnect())
db=self.db,
fs=fslocal.FsLocal(),
log=None,
- loop=self.loop,
on_update_db=None,
)
N2VCJujuConnector.get_public_key.assert_not_called()