update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / common / python / rift / mano / dts / subscriber / core.py
1 """
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 @file core.py
19 @author Varun Prasad (varun.prasad@riftio.com)
20 @date 09-Jul-2016
21
22 """
23
24 import abc
25 import collections
26 import asyncio
27
28 from gi.repository import (RwDts as rwdts, ProtobufC)
29 import rift.tasklets
30 from rift.mano.utils.project import (
31 get_add_delete_update_cfgs,
32 )
33
34 from ..core import DtsHandler
35
36
37 class SubscriberDtsHandler(DtsHandler):
38 """A common class for all subscribers.
39 """
40 @classmethod
41 def from_project(cls, proj, callback=None):
42 """Convenience method to build the object from tasklet
43
44 Args:
45 proj (rift.mano.utils.project.ManoProject): Project
46 callback (None, optional): Callable, which will be invoked on
47 subscriber changes.
48
49 Signature of callback:
50 Args:
51 msg: The Gi Object msg from DTS
52 action(rwdts.QueryAction): Action type
53 """
54 return cls(proj.log, proj.dts, proj.loop, proj, callback=callback)
55
56 def __init__(self, log, dts, loop, project, callback=None):
57 super().__init__(log, dts, loop, project)
58 self.callback = callback
59
60 @abc.abstractmethod
61 def get_xpath(self):
62 """
63 Returns:
64 str: xpath
65 """
66 pass
67
68 def get_reg_flags(self):
69 """Default set of REG flags, can be over-ridden by sub classes.
70
71 Returns:
72 Set of rwdts.Flag types.
73 """
74 return rwdts.Flag.SUBSCRIBER|rwdts.Flag.DELTA_READY|rwdts.Flag.CACHE
75
76 @asyncio.coroutine
77 def data(self):
78 itr = yield from self.dts.query_read(
79 self.get_xpath())
80
81 values = []
82 for res in itr:
83 result = yield from res
84 result = result.result
85 values.append(result)
86
87 return values
88
89
90
91 class AbstractOpdataSubscriber(SubscriberDtsHandler):
92 """Abstract class that simplifies the process of creating subscribers
93 for opdata.
94
95 Opdata subscriber can be created in one step by subclassing and implementing
96 the MANDATORY get_xpath() method
97
98 """
99
100 @asyncio.coroutine
101 def register(self):
102 """Triggers the registration
103 """
104
105 if self._reg:
106 self._log.warning("RPC already registered for project {}".
107 format(self._project.name))
108 return
109
110 xacts = {}
111
112 def on_commit(xact_info):
113 try:
114 xact_id = xact_info.handle.get_xact().id
115 if xact_id in xacts:
116 msg, action = xacts.pop(xact_id)
117
118 if self.callback:
119 self.callback(msg, action)
120 except Exception as e:
121 self.log.error("Exception when committing data for registration:{} exception:{}".format(self.get_xpath(), e))
122 self.log.exception(e)
123
124 return rwdts.MemberRspCode.ACTION_OK
125
126 @asyncio.coroutine
127 def on_prepare(xact_info, action, ks_path, msg):
128 try:
129 # Defer all actions till the commit state.
130 xacts[xact_info.xact.id] = (msg, action)
131
132 except Exception as e:
133 self.log.exception(e)
134
135 try:
136 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
137 except rift.tasklets.dts.ResponseError as e:
138 self._log.warning("Reg handle is None during action {} for {}: {}".
139 format(action, self.__class__, e))
140
141 reg_event = asyncio.Event(loop=self.loop)
142
143 @asyncio.coroutine
144 def on_ready(_, status):
145 reg_event.set()
146
147 handler = rift.tasklets.DTS.RegistrationHandler(
148 on_ready=on_ready,
149 on_prepare=on_prepare,
150 on_commit=on_commit
151 )
152
153 self._reg = yield from self.dts.register(
154 xpath=self.project.add_project(self.get_xpath()),
155 flags=self.get_reg_flags(),
156 handler=handler)
157
158 # yield from reg_event.wait()
159
160 assert self._reg is not None
161
162
163 class AbstractConfigSubscriber(SubscriberDtsHandler):
164 """Abstract class that simplifies the process of creating subscribers
165 for config data.
166
167 Config subscriber can be created in one step by subclassing and implementing
168 the MANDATORY get_xpath() method
169
170 """
171 KEY = "msgs"
172
173 @abc.abstractmethod
174 def get_xpath(self):
175 pass
176
177 @abc.abstractmethod
178 def key_name(self):
179 pass
180
181 @asyncio.coroutine
182 def register(self):
183 """ Register for VNFD configuration"""
184
185 def on_apply(dts, acg, xact, action, scratch):
186 """Apply the configuration"""
187 if xact.xact is None:
188 if action == rwdts.AppconfAction.INSTALL:
189 try:
190 if self._reg:
191 for cfg in self._reg.elements:
192 if self.callback:
193 self.callback(cfg, rwdts.QueryAction.CREATE)
194
195 else:
196 self._log.error("Reg handle is None during action {} for {}".
197 format(action, self.__class__))
198
199 except Exception as e:
200 self._log.exception("Adding config {} during restart failed: {}".
201 format(cfg, e))
202 return
203
204 add_cfgs, delete_cfgs, update_cfgs = get_add_delete_update_cfgs(
205 dts_member_reg=self._reg,
206 xact=xact,
207 key_name=self.key_name())
208
209 [self.callback(cfg, rwdts.QueryAction.DELETE)
210 for cfg in delete_cfgs if self.callback]
211
212 [self.callback(cfg, rwdts.QueryAction.CREATE)
213 for cfg in add_cfgs if self.callback]
214
215 [self.callback(cfg, rwdts.QueryAction.UPDATE)
216 for cfg in update_cfgs if self.callback]
217
218 @asyncio.coroutine
219 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
220 """ on prepare callback """
221 self._log.debug("Subscriber DTS prepare for project %s: %s",
222 self.project, xact_info.query_action)
223 try:
224 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
225 except rift.tasklets.dts.ResponseError as e:
226 self._log.warning(
227 "Subscriber DTS prepare for project {}, action {} in class {} failed: {}".
228 format(self.project, xact_info.query_action, self.__class__, e))
229
230 acg_hdl = rift.tasklets.AppConfGroup.Handler(on_apply=on_apply)
231 with self.dts.appconf_group_create(handler=acg_hdl) as acg:
232 self._reg = acg.register(
233 xpath=self.project.add_project(self.get_xpath()),
234 flags=self.get_reg_flags(),
235 on_prepare=on_prepare)