dd2513e81af9166db16f0275c5a2bec8c3ea2d2f
[osm/SO.git] / common / python / rift / mano / dts / subscriber / core.py
1 """
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file core.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 09-Jul-2016
21
22 """
23
24 import abc
25 import collections
26 import asyncio
27
28 from gi.repository import (RwDts as rwdts, ProtobufC)
29 import rift.tasklets
30
31 from ..core import DtsHandler
32
33
34 class SubscriberDtsHandler(DtsHandler):
35 """A common class for all subscribers.
36 """
37 @classmethod
38 def from_tasklet(cls, tasklet, callback=None):
39 """Convenience method to build the object from tasklet
40
41 Args:
42 tasklet (rift.tasklets.Tasklet): Tasklet
43 callback (None, optional): Callable, which will be invoked on
44 subscriber changes.
45
46 Signature of callback:
47 Args:
48 msg: The Gi Object msg from DTS
49 action(rwdts.QueryAction): Action type
50 """
51 return cls(tasklet.log, tasklet.dts, tasklet.loop, callback=callback)
52
53 def __init__(self, log, dts, loop, callback=None):
54 super().__init__(log, dts, loop)
55 self.callback = callback
56
57 def get_reg_flags(self):
58 """Default set of REG flags, can be over-ridden by sub classes.
59
60 Returns:
61 Set of rwdts.Flag types.
62 """
63 return rwdts.Flag.SUBSCRIBER|rwdts.Flag.DELTA_READY|rwdts.Flag.CACHE
64
65
66
67 class AbstractOpdataSubscriber(SubscriberDtsHandler):
68 """Abstract class that simplifies the process of creating subscribers
69 for opdata.
70
71 Opdata subscriber can be created in one step by subclassing and implementing
72 the MANDATORY get_xpath() method
73
74 """
75 @abc.abstractmethod
76 def get_xpath(self):
77 """
78 Returns:
79 str: xpath
80 """
81 pass
82
83 @asyncio.coroutine
84 def register(self):
85 """Triggers the registration
86 """
87 xacts = {}
88
89 def on_commit(xact_info):
90 xact_id = xact_info.handle.get_xact().id
91 if xact_id in xacts:
92 msg, action = xacts.pop(xact_id)
93
94 if self.callback:
95 self.callback(msg, action)
96
97 return rwdts.MemberRspCode.ACTION_OK
98
99 @asyncio.coroutine
100 def on_prepare(xact_info, action, ks_path, msg):
101 try:
102 # Defer all actions till the commit state.
103 xacts[xact_info.xact.id] = (msg, action)
104
105 except Exception as e:
106 self.log.exception(e)
107
108 finally:
109 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
110
111 reg_event = asyncio.Event(loop=self.loop)
112
113 @asyncio.coroutine
114 def on_ready(_, status):
115 reg_event.set()
116
117 handler = rift.tasklets.DTS.RegistrationHandler(
118 on_ready=on_ready,
119 on_prepare=on_prepare,
120 on_commit=on_commit
121 )
122
123 self.reg = yield from self.dts.register(
124 xpath=self.get_xpath(),
125 flags=self.get_reg_flags(),
126 handler=handler)
127
128 # yield from reg_event.wait()
129
130 assert self.reg is not None
131
132 def deregister(self):
133 self.reg.deregister()
134
135
136 class AbstractConfigSubscriber(SubscriberDtsHandler):
137 """Abstract class that simplifies the process of creating subscribers
138 for config data.
139
140 Config subscriber can be created in one step by subclassing and implementing
141 the MANDATORY get_xpath() method
142
143 """
144 KEY = "msgs"
145
146 @abc.abstractmethod
147 def get_xpath(self):
148 pass
149
150 @abc.abstractmethod
151 def key_name(self):
152 pass
153
154 def get_add_delete_update_cfgs(self, dts_member_reg, xact, key_name):
155 # Unforunately, it is currently difficult to figure out what has exactly
156 # changed in this xact without Pbdelta support (RIFT-4916)
157 # As a workaround, we can fetch the pre and post xact elements and
158 # perform a comparison to figure out adds/deletes/updates
159 xact_cfgs = list(dts_member_reg.get_xact_elements(xact))
160 curr_cfgs = list(dts_member_reg.elements)
161
162 xact_key_map = {getattr(cfg, key_name): cfg for cfg in xact_cfgs}
163 curr_key_map = {getattr(cfg, key_name): cfg for cfg in curr_cfgs}
164
165 # Find Adds
166 added_keys = set(xact_key_map) - set(curr_key_map)
167 added_cfgs = [xact_key_map[key] for key in added_keys]
168
169 # Find Deletes
170 deleted_keys = set(curr_key_map) - set(xact_key_map)
171 deleted_cfgs = [curr_key_map[key] for key in deleted_keys]
172
173 # Find Updates
174 updated_keys = set(curr_key_map) & set(xact_key_map)
175 updated_cfgs = [xact_key_map[key] for key in updated_keys if xact_key_map[key] != curr_key_map[key]]
176
177 return added_cfgs, deleted_cfgs, updated_cfgs
178
179 @asyncio.coroutine
180 def register(self):
181 """ Register for VNFD configuration"""
182
183 def on_apply(dts, acg, xact, action, scratch):
184 """Apply the configuration"""
185 is_recovery = xact.xact is None and action == rwdts.AppconfAction.INSTALL
186
187
188 add_cfgs, delete_cfgs, update_cfgs = self.get_add_delete_update_cfgs(
189 dts_member_reg=self.reg,
190 xact=xact,
191 key_name=self.key_name())
192
193 [self.callback(cfg, rwdts.QueryAction.DELETE)
194 for cfg in delete_cfgs if self.callback]
195
196 [self.callback(cfg, rwdts.QueryAction.CREATE)
197 for cfg in add_cfgs if self.callback]
198
199 [self.callback(cfg, rwdts.QueryAction.UPDATE)
200 for cfg in update_cfgs if self.callback]
201
202 @asyncio.coroutine
203 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
204 """ on prepare callback """
205 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
206
207 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
208 with self.dts.appconf_group_create(handler=acg_hdl) as acg:
209 self.reg = acg.register(
210 xpath=self.get_xpath(),
211 flags=self.get_reg_flags(),
212 on_prepare=on_prepare)
213
214 def deregister(self):
215 self.reg.deregister()