a2ab8fcf41c0c64a97bcc69fecd2858d0cc266b2
[osm/SO.git] / rwlaunchpad / plugins / rwstagingmgr / rift / tasklets / rwstagingmgr / publisher / staging_status.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Author(s): Varun Prasad
17 # Creation Date: 09/25/2016
18 #
19
20 import asyncio
21 import gi
22 import uuid
23
24 from gi.repository import (RwDts as rwdts)
25 import rift.mano.dts as mano_dts
26 import rift.tasklets
27 gi.require_version('RwKeyspec', '1.0')
28 from gi.repository.RwKeyspec import quoted_key
29
30 from ..protocol import StagingStoreProtocol
31
32 class StagingStorePublisher(mano_dts.DtsHandler, StagingStoreProtocol):
33
34 def __init__(self, project):
35 super().__init__(project.log, project.dts, project.loop, project)
36 self.delegate = None
37
38 def xpath(self, area_id=None):
39 return self.project.add_project("D,/rw-staging-mgmt:staging-areas/rw-staging-mgmt:staging-area" +
40 ("[area-id={}]".format(quoted_key(area_id)) if area_id else ""))
41
42 @asyncio.coroutine
43 def register(self):
44 # we need a dummy callback for recovery to work
45 @asyncio.coroutine
46 def on_event(dts, g_reg, xact, xact_event, scratch_data):
47 if xact_event == rwdts.MemberEvent.INSTALL:
48 if self.delegate:
49 self.delegate.on_recovery(self.reg.elements)
50
51 return rwdts.MemberRspCode.ACTION_OK
52
53 hdl = rift.tasklets.DTS.RegistrationHandler()
54 handlers = rift.tasklets.Group.Handler(on_event=on_event)
55 with self.dts.group_create(handler=handlers) as group:
56 self.reg = group.register(xpath=self.xpath(),
57 handler=hdl,
58 flags=(rwdts.Flag.PUBLISHER |
59 rwdts.Flag.NO_PREP_READ |
60 rwdts.Flag.CACHE |
61 rwdts.Flag.DATASTORE),)
62
63 assert self.reg is not None
64
65 def deregister(self):
66 self._log.debug("Project {}: de-register staging store handler".
67 format(self._project.name))
68 if self.reg:
69 self.reg.deregister()
70
71 def on_staging_area_create(self, store):
72 self.reg.update_element(self.xpath(store.area_id), store)
73
74 def on_staging_area_delete(self, store):
75 self.reg.update_element(self.xpath(store.area_id), store)
76
77 def stop(self):
78 self.deregister()
79
80 def deregister(self):
81 """ de-register with dts """
82 if self.reg is not None:
83 self.reg.deregister()
84 self.reg = None