X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwstagingmgr%2Frift%2Ftasklets%2Frwstagingmgr%2Frwstagingmgr.py;fp=rwlaunchpad%2Fplugins%2Frwstagingmgr%2Frift%2Ftasklets%2Frwstagingmgr%2Frwstagingmgr.py;h=16ac7cecc2a4523b087a9e17fed6b9c7bc8c2464;hb=4870d0ee29789b859931e4e2c73e13dcb29537d5;hp=04a7cae311b4dfbf71ed7184437979d6f28d712f;hpb=6f1a3fe149e4a6b9803382cb299c902f4cf58ec9;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rwstagingmgr.py b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rwstagingmgr.py index 04a7cae3..16ac7cec 100644 --- a/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rwstagingmgr.py +++ b/rwlaunchpad/plugins/rwstagingmgr/rift/tasklets/rwstagingmgr/rwstagingmgr.py @@ -32,10 +32,17 @@ import tornadostreamform.multipart_streamer as multipart_streamer import gi gi.require_version('RwDts', '1.0') gi.require_version('RwStagingMgmtYang', '1.0') +gi.require_version('rwlib', '1.0') + from gi.repository import ( RwDts as rwdts, RwStagingMgmtYang) import rift.tasklets +from rift.mano.utils.project import ( + ManoProject, + ProjectHandler, +) +import gi.repository.rwlib as rwlib from . import rpc from . import store @@ -43,14 +50,36 @@ from .server import StagingApplication from .publisher import StagingStorePublisher +class StagingManagerProject(ManoProject): + + def __init__(self, name, tasklet, **kw): + super(StagingManagerProject, self).__init__(tasklet.log, name) + self.update(tasklet) + + self.publisher = StagingStorePublisher(self) + # For recovery + self.publisher.delegate = tasklet.store + + @asyncio.coroutine + def register (self): + yield from self.publisher.register() + + def deregister(self): + self.publisher.deregister() + + class StagingManagerTasklet(rift.tasklets.Tasklet): """Tasklet to handle all staging related operations """ def __init__(self, *args, **kwargs): try: super().__init__(*args, **kwargs) + self._project_handler = None + self.projects = {} + except Exception as e: - self.log.exception(e) + self.log.exception("Staging Manager tasklet init: {}". + format(e)) def start(self): super().start() @@ -72,17 +101,10 @@ class StagingManagerTasklet(rift.tasklets.Tasklet): @asyncio.coroutine def init(self): - self.store = store.StagingFileStore(log=self.log) - self.publisher = StagingStorePublisher(self.log, self.dts, self.loop) - # Fore recovery - self.publisher.delegate = self.store - # For create and delete events - self.store.delegate = self.publisher - yield from self.publisher.register() - + self.store = store.StagingFileStore(self) io_loop = rift.tasklets.tornado.TaskletAsyncIOLoop(asyncio_loop=self.loop) - self.app = StagingApplication(self.store) + self.app = StagingApplication(self.store, self.loop) manifest = self.tasklet_info.get_pb_manifest() ssl_cert = manifest.bootstrap_phase.rwsecurity.cert @@ -107,12 +129,19 @@ class StagingManagerTasklet(rift.tasklets.Tasklet): self.dts, self.loop, self.store) - yield from self.create_stg_rpc.register() + self.log.debug("creating project handler") + self.project_handler = ProjectHandler(self, StagingManagerProject) + self.project_handler.register() + @asyncio.coroutine def run(self): - self.server.listen(self.app.PORT) + address = rwlib.getenv("RWVM_INTERNAL_IPADDR") + if (address is None): + address="" + self.server.listen(self.app.PORT, address=address) + self.server.listen(self.app.PORT, address="127.0.0.1") @asyncio.coroutine def on_dts_state_change(self, state):