c80925c6a35ab7f048e5afeffd6ded695c487957
[osm/SO.git] / rwlaunchpad / plugins / rwresmgr / rift / tasklets / rwresmgrtasklet / rwresmgr_events.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import sys
20
21 import gi
22 gi.require_version('RwDts', '1.0')
23 gi.require_version('RwYang', '1.0')
24 gi.require_version('RwResourceMgrYang', '1.0')
25 gi.require_version('RwLaunchpadYang', '1.0')
26 gi.require_version('RwcalYang', '1.0')
27 from gi.repository import (
28 RwDts as rwdts,
29 RwYang,
30 RwResourceMgrYang,
31 RwLaunchpadYang,
32 RwcalYang,
33 )
34
35 from gi.repository.RwTypes import RwStatus
36 import rift.tasklets
37
38 if sys.version_info < (3, 4, 4):
39 asyncio.ensure_future = asyncio.async
40
41
42 class ResourceMgrEvent(object):
43 VDU_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data"
44 VLINK_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
45
46 def __init__(self, dts, log, loop, parent):
47 self._log = log
48 self._dts = dts
49 self._loop = loop
50 self._parent = parent
51 self._vdu_reg = None
52 self._link_reg = None
53
54 self._vdu_reg_event = asyncio.Event(loop=self._loop)
55 self._link_reg_event = asyncio.Event(loop=self._loop)
56
57 @asyncio.coroutine
58 def wait_ready(self, timeout=5):
59 self._log.debug("Waiting for all request registrations to become ready.")
60 yield from asyncio.wait([self._link_reg_event.wait(), self._vdu_reg_event.wait()],
61 timeout=timeout, loop=self._loop)
62
63 def create_record_dts(self, regh, xact, path, msg):
64 """
65 Create a record in DTS with path and message
66 """
67 self._log.debug("Creating Resource Record xact = %s, %s:%s",
68 xact, path, msg)
69 regh.create_element(path, msg)
70
71 def delete_record_dts(self, regh, xact, path):
72 """
73 Delete a VNFR record in DTS with path and message
74 """
75 self._log.debug("Deleting Resource Record xact = %s, %s",
76 xact, path)
77 regh.delete_element(path)
78
79 @asyncio.coroutine
80 def register(self):
81 @asyncio.coroutine
82 def onlink_event(dts, g_reg, xact, xact_event, scratch_data):
83 @asyncio.coroutine
84 def instantiate_realloc_vn(link):
85 """Re-populate the virtual link information after restart
86
87 Arguments:
88 vlink
89
90 """
91 # wait for 3 seconds
92 yield from asyncio.sleep(3, loop=self._loop)
93
94 response_info = yield from self._parent.reallocate_virtual_network(link.event_id,
95 link.cloud_account,
96 link.request_info, link.resource_info,
97 )
98 if (xact_event == rwdts.MemberEvent.INSTALL):
99 link_cfg = self._link_reg.elements
100 for link in link_cfg:
101 self._loop.create_task(instantiate_realloc_vn(link))
102 return rwdts.MemberRspCode.ACTION_OK
103
104 @asyncio.coroutine
105 def onvdu_event(dts, g_reg, xact, xact_event, scratch_data):
106 @asyncio.coroutine
107 def instantiate_realloc_vdu(vdu):
108 """Re-populate the VDU information after restart
109
110 Arguments:
111 vdu
112
113 """
114 # wait for 3 seconds
115 yield from asyncio.sleep(3, loop=self._loop)
116
117 response_info = yield from self._parent.allocate_virtual_compute(vdu.event_id,
118 vdu.cloud_account,
119 vdu.request_info
120 )
121 if (xact_event == rwdts.MemberEvent.INSTALL):
122 vdu_cfg = self._vdu_reg.elements
123 for vdu in vdu_cfg:
124 self._loop.create_task(instantiate_realloc_vdu(vdu))
125 return rwdts.MemberRspCode.ACTION_OK
126
127 def on_link_request_commit(xact_info):
128 """ The transaction has been committed """
129 self._log.debug("Received link request commit (xact_info: %s)", xact_info)
130 return rwdts.MemberRspCode.ACTION_OK
131
132 @asyncio.coroutine
133 def on_link_request_prepare(xact_info, action, ks_path, request_msg):
134 self._log.debug("Received virtual-link on_prepare callback (xact_info: %s, action: %s): %s",
135 xact_info, action, request_msg)
136
137 response_info = None
138 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
139
140 schema = RwResourceMgrYang.VirtualLinkEventData().schema()
141 pathentry = schema.keyspec_to_entry(ks_path)
142
143 if action == rwdts.QueryAction.CREATE:
144 try:
145 response_info = yield from self._parent.allocate_virtual_network(pathentry.key00.event_id,
146 request_msg.cloud_account,
147 request_msg.request_info)
148 except Exception as e:
149 self._log.error("Encountered exception: %s while creating virtual network", str(e))
150 self._log.exception(e)
151 response_info = RwResourceMgrYang.VirtualLinkEventData_ResourceInfo()
152 response_info.resource_state = 'failed'
153 response_info.resource_errors = str(e)
154 yield from self._dts.query_update(response_xpath,
155 rwdts.XactFlag.ADVISE,
156 response_info)
157 else:
158 request_msg.resource_info = response_info
159 self.create_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()), request_msg)
160 elif action == rwdts.QueryAction.DELETE:
161 yield from self._parent.release_virtual_network(pathentry.key00.event_id)
162 self.delete_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
163 elif action == rwdts.QueryAction.READ:
164 response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id)
165 else:
166 raise ValueError("Only read/create/delete actions available. Received action: %s" %(action))
167
168 self._log.debug("Responding with VirtualLinkInfo at xpath %s: %s.",
169 response_xpath, response_info)
170
171 xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
172
173
174 def on_vdu_request_commit(xact_info):
175 """ The transaction has been committed """
176 self._log.debug("Received vdu request commit (xact_info: %s)", xact_info)
177 return rwdts.MemberRspCode.ACTION_OK
178
179 def monitor_vdu_state(response_xpath, pathentry):
180 self._log.info("Initiating VDU state monitoring for xpath: %s ", response_xpath)
181 time_to_wait = 300
182 sleep_time = 2
183 loop_cnt = int(time_to_wait/sleep_time)
184 for i in range(loop_cnt):
185 self._log.debug("VDU state monitoring for xpath: %s. Sleeping for 2 second", response_xpath)
186 yield from asyncio.sleep(2, loop = self._loop)
187 try:
188 response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
189 except Exception as e:
190 self._log.info("VDU state monitoring: Received exception %s in VDU state monitoring for %s. Aborting monitoring",
191 str(e),response_xpath)
192 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
193 response_info.resource_state = 'failed'
194 response_info.resource_errors = str(e)
195 yield from self._dts.query_update(response_xpath,
196 rwdts.XactFlag.ADVISE,
197 response_info)
198 else:
199 if response_info.resource_state == 'active' or response_info.resource_state == 'failed':
200 self._log.info("VDU state monitoring: VDU reached terminal state. Publishing VDU info: %s at path: %s",
201 response_info, response_xpath)
202 yield from self._dts.query_update(response_xpath,
203 rwdts.XactFlag.ADVISE,
204 response_info)
205 return
206 else:
207 ### End of loop. This is only possible if VDU did not reach active state
208 err_msg = "VDU state monitoring: VDU at xpath :{} did not reached active state in {} seconds. Aborting monitoring".format(response_xpath, time_to_wait)
209 self._log.info(err_msg)
210 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
211 response_info.resource_state = 'failed'
212 response_info.resource_errors = err_msg
213 yield from self._dts.query_update(response_xpath,
214 rwdts.XactFlag.ADVISE,
215 response_info)
216 return
217
218 def allocate_vdu_task(ks_path, event_id, cloud_account, request_msg):
219 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
220 schema = RwResourceMgrYang.VDUEventData().schema()
221 pathentry = schema.keyspec_to_entry(ks_path)
222 try:
223 response_info = yield from self._parent.allocate_virtual_compute(event_id,
224 cloud_account,
225 request_msg,)
226 except Exception as e:
227 self._log.error("Encountered exception : %s while creating virtual compute", str(e))
228 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
229 response_info.resource_state = 'failed'
230 response_info.resource_errors = str(e)
231 yield from self._dts.query_update(response_xpath,
232 rwdts.XactFlag.ADVISE,
233 response_info)
234 else:
235 if response_info.resource_state == 'failed' or response_info.resource_state == 'active' :
236 self._log.info("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
237 response_info, response_xpath)
238 yield from self._dts.query_update(response_xpath,
239 rwdts.XactFlag.ADVISE,
240 response_info)
241 else:
242 asyncio.ensure_future(monitor_vdu_state(response_xpath, pathentry),
243 loop = self._loop)
244
245
246 @asyncio.coroutine
247 def on_vdu_request_prepare(xact_info, action, ks_path, request_msg):
248 self._log.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
249 xact_info, action, request_msg)
250 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
251 schema = RwResourceMgrYang.VDUEventData().schema()
252 pathentry = schema.keyspec_to_entry(ks_path)
253
254 if action == rwdts.QueryAction.CREATE:
255 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
256 response_info.resource_state = 'pending'
257 request_msg.resource_info = response_info
258 self.create_record_dts(self._vdu_reg,
259 None,
260 ks_path.to_xpath(RwResourceMgrYang.get_schema()),
261 request_msg)
262 asyncio.ensure_future(allocate_vdu_task(ks_path,
263 pathentry.key00.event_id,
264 request_msg.cloud_account,
265 request_msg.request_info),
266 loop = self._loop)
267 elif action == rwdts.QueryAction.DELETE:
268 response_info = None
269 yield from self._parent.release_virtual_compute(pathentry.key00.event_id)
270 self.delete_record_dts(self._vdu_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
271 elif action == rwdts.QueryAction.READ:
272 response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
273 else:
274 raise ValueError("Only create/delete actions available. Received action: %s" %(action))
275
276 self._log.debug("Responding with VDUInfo at xpath %s: %s",
277 response_xpath, response_info)
278
279 xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
280
281
282 @asyncio.coroutine
283 def on_request_ready(registration, status):
284 self._log.debug("Got request ready event (registration: %s) (status: %s)",
285 registration, status)
286
287 if registration == self._link_reg:
288 self._link_reg_event.set()
289 elif registration == self._vdu_reg:
290 self._vdu_reg_event.set()
291 else:
292 self._log.error("Unknown registration ready event: %s", registration)
293
294 link_handlers = rift.tasklets.Group.Handler(on_event=onlink_event,)
295 with self._dts.group_create(handler=link_handlers) as link_group:
296 self._log.debug("Registering for Link Resource Request using xpath: %s",
297 ResourceMgrEvent.VLINK_REQUEST_XPATH)
298
299 self._link_reg = link_group.register(xpath=ResourceMgrEvent.VLINK_REQUEST_XPATH,
300 handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
301 on_commit=on_link_request_commit,
302 on_prepare=on_link_request_prepare),
303 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
304
305 vdu_handlers = rift.tasklets.Group.Handler(on_event=onvdu_event, )
306 with self._dts.group_create(handler=vdu_handlers) as vdu_group:
307
308 self._log.debug("Registering for VDU Resource Request using xpath: %s",
309 ResourceMgrEvent.VDU_REQUEST_XPATH)
310
311 self._vdu_reg = vdu_group.register(xpath=ResourceMgrEvent.VDU_REQUEST_XPATH,
312 handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
313 on_commit=on_vdu_request_commit,
314 on_prepare=on_vdu_request_prepare),
315 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
316