Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwresmgr / rift / tasklets / rwresmgrtasklet / rwresmgr_events.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import sys
20
21 import gi
22 gi.require_version('RwDts', '1.0')
23 gi.require_version('RwYang', '1.0')
24 gi.require_version('RwResourceMgrYang', '1.0')
25 gi.require_version('RwLaunchpadYang', '1.0')
26 gi.require_version('RwcalYang', '1.0')
27 from gi.repository import (
28 RwDts as rwdts,
29 RwYang,
30 RwResourceMgrYang,
31 RwLaunchpadYang,
32 RwcalYang,
33 )
34
35 from gi.repository.RwTypes import RwStatus
36 import rift.tasklets
37
38 if sys.version_info < (3, 4, 4):
39 asyncio.ensure_future = asyncio.async
40
41
42 class ResourceMgrEvent(object):
43 VDU_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data"
44 VLINK_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
45
46 def __init__(self, dts, log, loop, parent):
47 self._log = log
48 self._dts = dts
49 self._loop = loop
50 self._parent = parent
51 self._project = parent._project
52 self._vdu_reg = None
53 self._link_reg = None
54
55 self._vdu_reg_event = asyncio.Event(loop=self._loop)
56 self._link_reg_event = asyncio.Event(loop=self._loop)
57
58 @asyncio.coroutine
59 def wait_ready(self, timeout=5):
60 self._log.debug("Waiting for all request registrations to become ready.")
61 yield from asyncio.wait([self._link_reg_event.wait(), self._vdu_reg_event.wait()],
62 timeout=timeout, loop=self._loop)
63
64 def _add_config_flag(self, xpath, config=False):
65 if xpath[0] == '/':
66 if config:
67 return 'C,' + xpath
68 else:
69 return 'D,' + xpath
70
71 return xpath
72
73 def create_record_dts(self, regh, xact, xpath, msg):
74 """
75 Create a record in DTS with path and message
76 """
77 path = self._add_config_flag(self._project.add_project(xpath))
78 self._log.debug("Creating Resource Record xact = %s, %s:%s",
79 xact, path, msg)
80 regh.create_element(path, msg)
81
82 def delete_record_dts(self, regh, xact, xpath):
83 """
84 Delete a VNFR record in DTS with path and message
85 """
86 path = self._add_config_flag(self._project.add_project(xpath))
87 self._log.debug("Deleting Resource Record xact = %s, %s",
88 xact, path)
89 regh.delete_element(path)
90
91
92 @asyncio.coroutine
93 def register(self):
94 @asyncio.coroutine
95 def onlink_event(dts, g_reg, xact, xact_event, scratch_data):
96 @asyncio.coroutine
97 def instantiate_realloc_vn(link):
98 """Re-populate the virtual link information after restart
99
100 Arguments:
101 vlink
102
103 """
104 # wait for 3 seconds
105 yield from asyncio.sleep(3, loop=self._loop)
106
107 response_info = yield from self._parent.reallocate_virtual_network(link.event_id,
108 link.cloud_account,
109 link.request_info, link.resource_info,
110 )
111 if (xact_event == rwdts.MemberEvent.INSTALL):
112 link_cfg = self._link_reg.elements
113 for link in link_cfg:
114 self._loop.create_task(instantiate_realloc_vn(link))
115 return rwdts.MemberRspCode.ACTION_OK
116
117 @asyncio.coroutine
118 def onvdu_event(dts, g_reg, xact, xact_event, scratch_data):
119 @asyncio.coroutine
120 def instantiate_realloc_vdu(vdu):
121 """Re-populate the VDU information after restart
122
123 Arguments:
124 vdu
125
126 """
127 # wait for 3 seconds
128 yield from asyncio.sleep(3, loop=self._loop)
129
130 response_info = yield from self._parent.allocate_virtual_compute(vdu.event_id,
131 vdu.cloud_account,
132 vdu.request_info
133 )
134 if (xact_event == rwdts.MemberEvent.INSTALL):
135 vdu_cfg = self._vdu_reg.elements
136 for vdu in vdu_cfg:
137 self._loop.create_task(instantiate_realloc_vdu(vdu))
138 return rwdts.MemberRspCode.ACTION_OK
139
140 def on_link_request_commit(xact_info):
141 """ The transaction has been committed """
142 self._log.debug("Received link request commit (xact_info: %s)", xact_info)
143 return rwdts.MemberRspCode.ACTION_OK
144
145 @asyncio.coroutine
146 def on_link_request_prepare(xact_info, action, ks_path, request_msg):
147 self._log.debug("Received virtual-link on_prepare callback (xact_info: %s, action: %s): %s",
148 xact_info, action, request_msg)
149
150 response_info = None
151 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
152
153 schema = RwResourceMgrYang.VirtualLinkEventData().schema()
154 pathentry = schema.keyspec_to_entry(ks_path)
155
156 if action == rwdts.QueryAction.CREATE:
157 try:
158 response_info = yield from self._parent.allocate_virtual_network(pathentry.key00.event_id,
159 request_msg.cloud_account,
160 request_msg.request_info)
161 except Exception as e:
162 self._log.error("Encountered exception: %s while creating virtual network", str(e))
163 self._log.exception(e)
164 response_info = RwResourceMgrYang.VirtualLinkEventData_ResourceInfo()
165 response_info.resource_state = 'failed'
166 response_info.resource_errors = str(e)
167 yield from self._dts.query_update(response_xpath,
168 rwdts.XactFlag.ADVISE,
169 response_info)
170 else:
171 request_msg.resource_info = response_info
172 self.create_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()), request_msg)
173 elif action == rwdts.QueryAction.DELETE:
174 yield from self._parent.release_virtual_network(pathentry.key00.event_id)
175 self.delete_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
176 elif action == rwdts.QueryAction.READ:
177 # TODO: Check why we are getting null event id request
178 if pathentry.key00.event_id:
179 response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id)
180 else:
181 xact_info.respond_xpath(rwdts.XactRspCode.NA)
182 return
183 else:
184 raise ValueError("Only read/create/delete actions available. Received action: %s" %(action))
185
186 self._log.info("Responding with VirtualLinkInfo at xpath %s: %s.",
187 response_xpath, response_info)
188
189 xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
190
191
192 def on_vdu_request_commit(xact_info):
193 """ The transaction has been committed """
194 self._log.debug("Received vdu request commit (xact_info: %s)", xact_info)
195 return rwdts.MemberRspCode.ACTION_OK
196
197 def monitor_vdu_state(response_xpath, pathentry):
198 self._log.debug("Initiating VDU state monitoring for xpath: %s ", response_xpath)
199 time_to_wait = 300
200 sleep_time = 2
201 loop_cnt = int(time_to_wait/sleep_time)
202 for i in range(loop_cnt):
203 self._log.debug("VDU state monitoring for xpath: %s. Sleeping for 2 second", response_xpath)
204 yield from asyncio.sleep(2, loop = self._loop)
205 try:
206 response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
207 except Exception as e:
208 self._log.info("VDU state monitoring: Received exception %s in VDU state monitoring for %s. Aborting monitoring",
209 str(e),response_xpath)
210 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
211 response_info.resource_state = 'failed'
212 response_info.resource_errors = str(e)
213 yield from self._dts.query_update(response_xpath,
214 rwdts.XactFlag.ADVISE,
215 response_info)
216 else:
217 if response_info.resource_state == 'active' or response_info.resource_state == 'failed':
218 self._log.info("VDU state monitoring: VDU reached terminal state. " +
219 "Publishing VDU info: %s at path: %s",
220 response_info, response_xpath)
221 yield from self._dts.query_update(response_xpath,
222 rwdts.XactFlag.ADVISE,
223 response_info)
224 return
225 else:
226 ### End of loop. This is only possible if VDU did not reach active state
227 err_msg = ("VDU state monitoring: VDU at xpath :{} did not reached active " +
228 "state in {} seconds. Aborting monitoring".
229 format(response_xpath, time_to_wait))
230 self._log.info(err_msg)
231 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
232 response_info.resource_state = 'failed'
233 response_info.resource_errors = err_msg
234 yield from self._dts.query_update(response_xpath,
235 rwdts.XactFlag.ADVISE,
236 response_info)
237 return
238
239 def allocate_vdu_task(ks_path, event_id, cloud_account, request_msg):
240 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
241 response_xpath = self._add_config_flag(response_xpath)
242 schema = RwResourceMgrYang.VDUEventData().schema()
243 pathentry = schema.keyspec_to_entry(ks_path)
244 try:
245 response_info = yield from self._parent.allocate_virtual_compute(event_id,
246 cloud_account,
247 request_msg,)
248 except Exception as e:
249 self._log.error("Encountered exception : %s while creating virtual compute", str(e))
250 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
251 response_info.resource_state = 'failed'
252 response_info.resource_errors = str(e)
253 yield from self._dts.query_update(response_xpath,
254 rwdts.XactFlag.ADVISE,
255 response_info)
256 else:
257 if response_info.resource_state == 'failed' or response_info.resource_state == 'active' :
258 self._log.debug("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
259 response_info, response_xpath)
260 yield from self._dts.query_update(response_xpath,
261 rwdts.XactFlag.ADVISE,
262 response_info)
263 else:
264 self._log.debug("VDU create monitor at {}".format(response_xpath))
265 asyncio.ensure_future(monitor_vdu_state(response_xpath, pathentry),
266 loop = self._loop)
267
268 @asyncio.coroutine
269 def on_vdu_request_prepare(xact_info, action, ks_path, request_msg):
270 self._log.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
271 xact_info, action, request_msg)
272 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
273 response_xpath = self._add_config_flag(response_xpath)
274 schema = RwResourceMgrYang.VDUEventData().schema()
275 pathentry = schema.keyspec_to_entry(ks_path)
276
277 if action == rwdts.QueryAction.CREATE:
278 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
279 response_info.resource_state = 'pending'
280 request_msg.resource_info = response_info
281 self.create_record_dts(self._vdu_reg,
282 None,
283 ks_path.to_xpath(RwResourceMgrYang.get_schema()),
284 request_msg)
285 asyncio.ensure_future(allocate_vdu_task(ks_path,
286 pathentry.key00.event_id,
287 request_msg.cloud_account,
288 request_msg.request_info),
289 loop = self._loop)
290 elif action == rwdts.QueryAction.DELETE:
291 response_info = None
292 yield from self._parent.release_virtual_compute(pathentry.key00.event_id)
293 self.delete_record_dts(self._vdu_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
294 elif action == rwdts.QueryAction.READ:
295 # TODO: Check why we are getting null event id request
296 if pathentry.key00.event_id:
297 response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
298 else:
299 xact_info.respond_xpath(rwdts.XactRspCode.NA)
300 return
301 else:
302 raise ValueError("Only create/delete actions available. Received action: %s" %(action))
303
304 self._log.debug("Responding with VDUInfo at xpath %s: %s",
305 response_xpath, response_info)
306
307 xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
308
309
310 @asyncio.coroutine
311 def on_request_ready(registration, status):
312 self._log.debug("Got request ready event (registration: %s) (status: %s)",
313 registration, status)
314
315 if registration == self._link_reg:
316 self._link_reg_event.set()
317 elif registration == self._vdu_reg:
318 self._vdu_reg_event.set()
319 else:
320 self._log.error("Unknown registration ready event: %s", registration)
321
322 link_handlers = rift.tasklets.Group.Handler(on_event=onlink_event,)
323 with self._dts.group_create(handler=link_handlers) as link_group:
324 xpath = self._project.add_project(ResourceMgrEvent.VLINK_REQUEST_XPATH)
325 self._log.debug("Registering for Link Resource Request using xpath: {}".
326 format(xpath))
327
328 self._link_reg = link_group.register(xpath=xpath,
329 handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
330 on_commit=on_link_request_commit,
331 on_prepare=on_link_request_prepare),
332 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
333
334 vdu_handlers = rift.tasklets.Group.Handler(on_event=onvdu_event, )
335 with self._dts.group_create(handler=vdu_handlers) as vdu_group:
336
337 xpath = self._project.add_project(ResourceMgrEvent.VDU_REQUEST_XPATH)
338 self._log.debug("Registering for VDU Resource Request using xpath: {}".
339 format(xpath))
340
341 self._vdu_reg = vdu_group.register(xpath=xpath,
342 handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
343 on_commit=on_vdu_request_commit,
344 on_prepare=on_vdu_request_prepare),
345 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
346
347
348 def deregister(self):
349 self._log.debug("De-register for project {}".format(self._project.name))
350
351 if self._vdu_reg:
352 self._vdu_reg.deregister()
353 self._vdu_reg = None
354
355 if self._link_reg:
356 self._link_reg.deregister()
357 self._link_reg = None