X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Ffrontend_grpc.py;h=2c3a7d9b2fcb45fba9f7c74e0aed1f045c4f6677;hb=a0c6bafeb1998041e171aa87b2134f69d650a9c0;hp=88308e18e7bf3e6df0166fbbe3de6e290cf9dc57;hpb=302266e50bdeec9f1bf3a5d865993b1aab4ee61f;p=osm%2FLCM.git diff --git a/osm_lcm/frontend_grpc.py b/osm_lcm/frontend_grpc.py index 88308e1..2c3a7d9 100644 --- a/osm_lcm/frontend_grpc.py +++ b/osm_lcm/frontend_grpc.py @@ -23,6 +23,7 @@ import typing import grpclib.const import grpclib.client + if typing.TYPE_CHECKING: import grpclib.server @@ -30,24 +31,29 @@ import osm_lcm.frontend_pb2 class FrontendExecutorBase(abc.ABC): - @abc.abstractmethod - async def RunPrimitive(self, stream: 'grpclib.server.Stream[osm_lcm.frontend_pb2.PrimitiveRequest, osm_lcm.frontend_pb2.PrimitiveReply]') -> None: + async def RunPrimitive( + self, + stream: "grpclib.server.Stream[osm_lcm.frontend_pb2.PrimitiveRequest, osm_lcm.frontend_pb2.PrimitiveReply]", + ) -> None: pass @abc.abstractmethod - async def GetSshKey(self, stream: 'grpclib.server.Stream[osm_lcm.frontend_pb2.SshKeyRequest, osm_lcm.frontend_pb2.SshKeyReply]') -> None: + async def GetSshKey( + self, + stream: "grpclib.server.Stream[osm_lcm.frontend_pb2.SshKeyRequest, osm_lcm.frontend_pb2.SshKeyReply]", + ) -> None: pass def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]: return { - '/osm_ee.FrontendExecutor/RunPrimitive': grpclib.const.Handler( + "/osm_ee.FrontendExecutor/RunPrimitive": grpclib.const.Handler( self.RunPrimitive, grpclib.const.Cardinality.UNARY_STREAM, osm_lcm.frontend_pb2.PrimitiveRequest, osm_lcm.frontend_pb2.PrimitiveReply, ), - '/osm_ee.FrontendExecutor/GetSshKey': grpclib.const.Handler( + "/osm_ee.FrontendExecutor/GetSshKey": grpclib.const.Handler( self.GetSshKey, grpclib.const.Cardinality.UNARY_UNARY, osm_lcm.frontend_pb2.SshKeyRequest, @@ -57,17 +63,16 @@ class FrontendExecutorBase(abc.ABC): class FrontendExecutorStub: - def __init__(self, channel: grpclib.client.Channel) -> None: self.RunPrimitive = grpclib.client.UnaryStreamMethod( channel, - '/osm_ee.FrontendExecutor/RunPrimitive', + "/osm_ee.FrontendExecutor/RunPrimitive", osm_lcm.frontend_pb2.PrimitiveRequest, osm_lcm.frontend_pb2.PrimitiveReply, ) self.GetSshKey = grpclib.client.UnaryUnaryMethod( channel, - '/osm_ee.FrontendExecutor/GetSshKey', + "/osm_ee.FrontendExecutor/GetSshKey", osm_lcm.frontend_pb2.SshKeyRequest, osm_lcm.frontend_pb2.SshKeyReply, )