-import asyncio
import io
import json
import logging
import yaml
+from juju.errors import JujuAPIError
+
log = logging.getLogger("websocket")
self.ws = None
self.facades = {}
+ @property
+ def is_open(self):
+ if self.ws:
+ return self.ws.open
+ return False
+
def _get_ssl(self, cert):
return ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
#log.debug("Send: %s", outgoing)
#log.debug("Recv: %s", result)
if result and 'error' in result:
- raise RuntimeError(result)
+ raise JujuAPIError(result)
return result
async def clone(self):