HEALTH_CHECK_ACTION = "health-check"
def __init__(self):
- self.logger = logging.getLogger('osm_ee.base')
+ self.logger = logging.getLogger("osm_ee.base")
+ self.config_params = {}
# Check if configuration is stored and load it
if os.path.exists(self.CONFIG_FILE):
- with open(self.CONFIG_FILE, 'r') as file:
+ with open(self.CONFIG_FILE, "r") as file:
self.config_params = yaml.load(file, Loader=yaml.FullLoader)
- self.logger.debug("Load existing config from file: {}".format(self.config_params))
- else:
- self.config_params = {}
+ self.logger.debug(
+ "Load existing config from file: {}".format(self.config_params)
+ )
self.vnf_ee = VnfEE(self.config_params)
return ssh_key
async def run_action(self, id, name, params):
- self.logger.debug("Execute action id: {}, name: {}, params: {}".format(id, name, params))
+ self.logger.debug(
+ "Execute action id: {}, name: {}, params: {}".format(id, name, params)
+ )
try:
# Health-check
action_params = yaml.safe_load(params)
if name == "config":
- self.logger.debug("Store config info in file: {}".format(self.CONFIG_FILE))
+ self.logger.debug(
+ "Store config info in file: {}".format(self.CONFIG_FILE)
+ )
self.config_params.update(action_params)
- with open(self.CONFIG_FILE, 'w') as file:
+ with open(self.CONFIG_FILE, "w") as file:
config = yaml.dump(self.config_params, file)
+ self.logger.debug("Config info: {}".format(config))
async for return_status, detailed_message in method(id, action_params):
if return_status not in self.RETURN_STATUS_LIST:
else:
yield return_status, str(detailed_message)
except AttributeError as e:
- error_msg = "Action name: {} not implemented".format(name)
+ error_msg = "Action name: {} not implemented. Exception: {}".format(
+ name, str(e)
+ )
self.logger.error(error_msg)
yield "ERROR", error_msg
except Exception as e:
- self.logger.error("Error executing action id, name: {},{}: {}".format(id, name, str(e)), exc_info=True)
+ self.logger.error(
+ "Error executing action id, name: {},{}: {}".format(id, name, str(e)),
+ exc_info=True,
+ )
yield "ERROR", str(e)
-if __name__ == '__main__':
+if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
# contact with: nfvlabs@tid.es
##
+
class ExecEnvException(Exception):
"""Exception thrown by the EE if the actions can't be invoked or there is any generic error"""
+
class VnfException(Exception):
- """Exception thrown by the Vnf EE code in case of error"""
\ No newline at end of file
+ """Exception thrown by the Vnf EE code in case of error"""
import yaml
import asyncio
import uuid
-import traceback
-import os
import socket
from grpclib.client import Channel
-from osm_ee.frontend_pb2 import PrimitiveRequest, PrimitiveReply
+from osm_ee.frontend_pb2 import PrimitiveRequest
from osm_ee.frontend_pb2 import SshKeyRequest, SshKeyReply
from osm_ee.frontend_grpc import FrontendExecutorStub
try:
stub = FrontendExecutorStub(channel)
- if (primitive_name == "get_ssh_key"):
+ if primitive_name == "get_ssh_key":
print("Get ssh key")
reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest())
print(reply.message)
primitive_id = str(uuid.uuid1())
print("Execute primitive {}, params: {}".format(primitive_name, params))
await stream.send_message(
- PrimitiveRequest(id=primitive_id, name=primitive_name, params=yaml.dump(params)), end=True)
+ PrimitiveRequest(
+ id=primitive_id, name=primitive_name, params=yaml.dump(params)
+ ),
+ end=True,
+ )
async for reply in stream:
print(reply)
- #replies = [reply async for reply in stream]
- #print(replies)
+ # replies = [reply async for reply in stream]
+ # print(replies)
except Exception as e:
print("Error executing primitive {}: {}".format(primitive_name, str(e)))
- #print(traceback.format_exc())
finally:
channel.close()
-if __name__ == '__main__':
+if __name__ == "__main__":
args = sys.argv[1:]
- if (len(args) < 1):
+ if len(args) < 1:
print("Usage: host port primitive_name params")
else:
host_name = args[0]
loop = asyncio.get_event_loop()
try:
- task = asyncio.ensure_future(frontend_client(host_name, port, primitive_name, params))
+ task = asyncio.ensure_future(
+ frontend_client(host_name, port, primitive_name, params)
+ )
loop.run_until_complete(task)
finally:
loop.close()
import grpclib.const
import grpclib.client
+
if typing.TYPE_CHECKING:
import grpclib.server
class FrontendExecutorBase(abc.ABC):
@abc.abstractmethod
- async def RunPrimitive(self, stream: 'grpclib.server.Stream[osm_ee.frontend_pb2.PrimitiveRequest, osm_ee.frontend_pb2.PrimitiveReply]') -> None:
+ async def RunPrimitive(
+ self,
+ stream: "grpclib.server.Stream[osm_ee.frontend_pb2.PrimitiveRequest, osm_ee.frontend_pb2.PrimitiveReply]",
+ ) -> None:
pass
@abc.abstractmethod
- async def GetSshKey(self, stream: 'grpclib.server.Stream[osm_ee.frontend_pb2.SshKeyRequest, osm_ee.frontend_pb2.SshKeyReply]') -> None:
+ async def GetSshKey(
+ self,
+ stream: "grpclib.server.Stream[osm_ee.frontend_pb2.SshKeyRequest, osm_ee.frontend_pb2.SshKeyReply]",
+ ) -> None:
pass
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
return {
- '/osm_ee.FrontendExecutor/RunPrimitive': grpclib.const.Handler(
+ "/osm_ee.FrontendExecutor/RunPrimitive": grpclib.const.Handler(
self.RunPrimitive,
grpclib.const.Cardinality.UNARY_STREAM,
osm_ee.frontend_pb2.PrimitiveRequest,
osm_ee.frontend_pb2.PrimitiveReply,
),
- '/osm_ee.FrontendExecutor/GetSshKey': grpclib.const.Handler(
+ "/osm_ee.FrontendExecutor/GetSshKey": grpclib.const.Handler(
self.GetSshKey,
grpclib.const.Cardinality.UNARY_UNARY,
osm_ee.frontend_pb2.SshKeyRequest,
def __init__(self, channel: grpclib.client.Channel) -> None:
self.RunPrimitive = grpclib.client.UnaryStreamMethod(
channel,
- '/osm_ee.FrontendExecutor/RunPrimitive',
+ "/osm_ee.FrontendExecutor/RunPrimitive",
osm_ee.frontend_pb2.PrimitiveRequest,
osm_ee.frontend_pb2.PrimitiveReply,
)
self.GetSshKey = grpclib.client.UnaryUnaryMethod(
channel,
- '/osm_ee.FrontendExecutor/GetSshKey',
+ "/osm_ee.FrontendExecutor/GetSshKey",
osm_ee.frontend_pb2.SshKeyRequest,
osm_ee.frontend_pb2.SshKeyReply,
)
class FrontendExecutor(FrontendExecutorBase):
def __init__(self):
- self.logger = logging.getLogger('osm_ee.frontend_server')
+ self.logger = logging.getLogger("osm_ee.frontend_server")
self.base_ee = BaseEE()
- async def RunPrimitive(self, stream: Stream[PrimitiveRequest, PrimitiveReply]) -> None:
+ async def RunPrimitive(
+ self, stream: Stream[PrimitiveRequest, PrimitiveReply]
+ ) -> None:
request = await stream.recv_message()
try:
- self.logger.debug(f'Run primitive: id {request.id}, name: {request.name}, params: {request.params}')
- async for status, detailed_message in self.base_ee.run_action(request.id, request.name, request.params):
- self.logger.debug(f'Send response {status}, {detailed_message}')
+ self.logger.debug(
+ f"Run primitive: id {request.id}, name: {request.name}, params: {request.params}"
+ )
+ async for status, detailed_message in self.base_ee.run_action(
+ request.id, request.name, request.params
+ ):
+ self.logger.debug(f"Send response {status}, {detailed_message}")
await stream.send_message(
- PrimitiveReply(status=status, detailed_message=detailed_message))
+ PrimitiveReply(status=status, detailed_message=detailed_message)
+ )
except Exception as e:
- self.logger.debug(f'Error executing primitive: id {request.id}, name: {request.name}, error_msg: {str(e)}')
+ self.logger.debug(
+ f"Error executing primitive: id {request.id}, name: {request.name}, error_msg: {str(e)}"
+ )
await stream.send_message(
- PrimitiveReply(status="ERROR", detailed_message=str(e)))
+ PrimitiveReply(status="ERROR", detailed_message=str(e))
+ )
async def GetSshKey(self, stream: Stream[SshKeyRequest, SshKeyReply]) -> None:
request = await stream.recv_message()
await stream.send_message(SshKeyReply(message=message))
-async def main(*, host: str = '0.0.0.0', port: int = 50051) -> None:
+async def main(*, host: str = "0.0.0.0", port: int = 50051) -> None:
logging.basicConfig()
- logger = logging.getLogger('osm_ee')
+ logger = logging.getLogger("osm_ee")
logger.setLevel(logging.DEBUG)
# Generate ssh key
server = Server([FrontendExecutor()])
with graceful_exit([server]):
await server.start(host, port, ssl=util_grpc.create_secure_context())
- logging.getLogger('osm_ee.frontend_server').debug(f'Serving on {host}:{port}')
+ logging.getLogger("osm_ee.frontend_server").debug(f"Serving on {host}:{port}")
await server.wait_closed()
-if __name__ == '__main__':
+if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
main_task = asyncio.ensure_future(main())
logger = logging.getLogger("osm_ee.util_ansible")
-async def execute_playbook(playbook_name: str, inventory: str, extra_vars: dict,
- ) -> (int, str):
-
- command = 'ansible-playbook --inventory={} --extra-vars {} {}'.format(quote(inventory),
- quote(json.dumps(extra_vars)),
- quote(playbook_name))
+async def execute_playbook(
+ playbook_name: str,
+ inventory: str,
+ extra_vars: dict,
+) -> (int, str):
+
+ command = "ansible-playbook --inventory={} --extra-vars {} {}".format(
+ quote(inventory), quote(json.dumps(extra_vars)), quote(playbook_name)
+ )
logger.debug("Command to be executed: {}".format(command))
logger = logging.getLogger("osm_ee.util")
-async def local_async_exec(command: str
- ) -> (int, str, str):
+async def local_async_exec(command: str) -> (int, str, str):
"""
- Executed a local command using asyncio.
- TODO - do not know yet if return error code, and stdout and strerr or just one of them
+ Executed a local command using asyncio.
+ TODO - do not know yet if return error code, and stdout and strerr or just one of them
"""
scommand = split(command)
logger.debug("Execute local command: {}".format(command))
process = await asyncio.create_subprocess_exec(
- *scommand,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE
+ *scommand, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
# wait for command terminate
def create_secure_context() -> ssl.SSLContext:
# retrieve certificates from secrets
if not _retrieve_certs():
- logger.warning("TLS Certificates not found, starting gRPC server in unsecure mode")
+ logger.warning(
+ "TLS Certificates not found, starting gRPC server in unsecure mode"
+ )
return None
# create SSL context
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_cert_chain(SERVER_CERT_FILE, SERVER_KEY_FILE)
ctx.load_verify_locations(CLIENT_CA_FILE)
- ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20')
- ctx.set_alpn_protocols(['h2'])
+ ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20")
+ ctx.set_alpn_protocols(["h2"])
try:
- ctx.set_npn_protocols(['h2'])
+ ctx.set_npn_protocols(["h2"])
except NotImplementedError:
pass
return ctx
# we are not running in kubernetes
return {}
# Read the namespace from the service account
- current_namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()
+ current_namespace = open(
+ "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
+ ).read()
v1 = client.CoreV1Api()
try:
class VnfEE:
def __init__(self, config_params):
- self.logger = logging.getLogger('osm_ee.vnf')
+ self.logger = logging.getLogger("osm_ee.vnf")
self.config_params = config_params
async def config(self, id, params):