New feature: Code changes for project support
[osm/SO.git] / common / python / rift / mano / dts / subscriber / core.py
1 """
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file core.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 09-Jul-2016
21
22 """
23
24 import abc
25 import collections
26 import asyncio
27
28 from gi.repository import (RwDts as rwdts, ProtobufC)
29 import rift.tasklets
30 from rift.mano.utils.project import (
31 get_add_delete_update_cfgs,
32 )
33
34 from ..core import DtsHandler
35
36
37 class SubscriberDtsHandler(DtsHandler):
38 """A common class for all subscribers.
39 """
40 @classmethod
41 def from_project(cls, proj, callback=None):
42 """Convenience method to build the object from tasklet
43
44 Args:
45 proj (rift.mano.utils.project.ManoProject): Project
46 callback (None, optional): Callable, which will be invoked on
47 subscriber changes.
48
49 Signature of callback:
50 Args:
51 msg: The Gi Object msg from DTS
52 action(rwdts.QueryAction): Action type
53 """
54 return cls(proj.log, proj.dts, proj.loop, proj, callback=callback)
55
56 def __init__(self, log, dts, loop, project, callback=None):
57 super().__init__(log, dts, loop, project)
58 self.callback = callback
59
60 def get_reg_flags(self):
61 """Default set of REG flags, can be over-ridden by sub classes.
62
63 Returns:
64 Set of rwdts.Flag types.
65 """
66 return rwdts.Flag.SUBSCRIBER|rwdts.Flag.DELTA_READY|rwdts.Flag.CACHE
67
68
69
70 class AbstractOpdataSubscriber(SubscriberDtsHandler):
71 """Abstract class that simplifies the process of creating subscribers
72 for opdata.
73
74 Opdata subscriber can be created in one step by subclassing and implementing
75 the MANDATORY get_xpath() method
76
77 """
78 @abc.abstractmethod
79 def get_xpath(self):
80 """
81 Returns:
82 str: xpath
83 """
84 pass
85
86 @asyncio.coroutine
87 def register(self):
88 """Triggers the registration
89 """
90
91 if self.reg:
92 self._log.warning("RPC already registered for project {}".
93 format(self._project.name))
94 return
95
96 xacts = {}
97
98 def on_commit(xact_info):
99 xact_id = xact_info.handle.get_xact().id
100 if xact_id in xacts:
101 msg, action = xacts.pop(xact_id)
102
103 if self.callback:
104 self.callback(msg, action)
105
106 return rwdts.MemberRspCode.ACTION_OK
107
108 @asyncio.coroutine
109 def on_prepare(xact_info, action, ks_path, msg):
110 try:
111 # Defer all actions till the commit state.
112 xacts[xact_info.xact.id] = (msg, action)
113
114 except Exception as e:
115 self.log.exception(e)
116
117 finally:
118 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
119
120 reg_event = asyncio.Event(loop=self.loop)
121
122 @asyncio.coroutine
123 def on_ready(_, status):
124 reg_event.set()
125
126 handler = rift.tasklets.DTS.RegistrationHandler(
127 on_ready=on_ready,
128 on_prepare=on_prepare,
129 on_commit=on_commit
130 )
131
132 self.reg = yield from self.dts.register(
133 xpath=self.project.add_project(self.get_xpath()),
134 flags=self.get_reg_flags(),
135 handler=handler)
136
137 # yield from reg_event.wait()
138
139 assert self.reg is not None
140
141
142 class AbstractConfigSubscriber(SubscriberDtsHandler):
143 """Abstract class that simplifies the process of creating subscribers
144 for config data.
145
146 Config subscriber can be created in one step by subclassing and implementing
147 the MANDATORY get_xpath() method
148
149 """
150 KEY = "msgs"
151
152 @abc.abstractmethod
153 def get_xpath(self):
154 pass
155
156 @abc.abstractmethod
157 def key_name(self):
158 pass
159
160 @asyncio.coroutine
161 def register(self):
162 """ Register for VNFD configuration"""
163
164 if self.reg:
165 self._log.warning("RPC already registered for project {}".
166 format(self._project.name))
167 return
168
169 def on_apply(dts, acg, xact, action, scratch):
170 """Apply the configuration"""
171 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
172
173
174 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
175 dts_member_reg=self.reg,
176 xact=xact,
177 key_name=self.key_name())
178
179 [self.callback(cfg, rwdts.QueryAction.DELETE)
180 for cfg in delete_cfgs if self.callback]
181
182 [self.callback(cfg, rwdts.QueryAction.CREATE)
183 for cfg in add_cfgs if self.callback]
184
185 [self.callback(cfg, rwdts.QueryAction.UPDATE)
186 for cfg in update_cfgs if self.callback]
187
188 @asyncio.coroutine
189 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
190 """ on prepare callback """
191 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
192
193 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
194 with self.dts.appconf_group_create(handler=acg_hdl) as acg:
195 self.reg = acg.register(
196 xpath=self.project.add_project(self.get_xpath()),
197 flags=self.get_reg_flags(),
198 on_prepare=on_prepare)