update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwresmgr / rift / tasklets / rwresmgrtasklet / rwresmgr_events.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import gi
20 import sys
21
22 gi.require_version('RwDts', '1.0')
23 gi.require_version('RwYang', '1.0')
24 gi.require_version('RwResourceMgrYang', '1.0')
25 gi.require_version('RwLaunchpadYang', '1.0')
26 gi.require_version('RwcalYang', '1.0')
27 from gi.repository import (
28 RwDts as rwdts,
29 RwYang,
30 RwResourceMgrYang,
31 RwLaunchpadYang,
32 RwcalYang,
33 )
34 gi.require_version('RwKeyspec', '1.0')
35 from gi.repository.RwKeyspec import quoted_key
36
37 from gi.repository.RwTypes import RwStatus
38 import rift.tasklets
39
40 if sys.version_info < (3, 4, 4):
41 asyncio.ensure_future = asyncio.async
42
43
44 class ResourceMgrEvent(object):
45 VDU_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data"
46 VLINK_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
47
48 def __init__(self, dts, log, loop, parent):
49 self._log = log
50 self._dts = dts
51 self._loop = loop
52 self._parent = parent
53 self._project = parent._project
54 self._vdu_reg = None
55 self._link_reg = None
56
57 self._vdu_reg_event = asyncio.Event(loop=self._loop)
58 self._link_reg_event = asyncio.Event(loop=self._loop)
59
60 @asyncio.coroutine
61 def wait_ready(self, timeout=5):
62 self._log.debug("Waiting for all request registrations to become ready.")
63 yield from asyncio.wait([self._link_reg_event.wait(), self._vdu_reg_event.wait()],
64 timeout=timeout, loop=self._loop)
65
66 def _add_config_flag(self, xpath, config=False):
67 if xpath[0] == '/':
68 if config:
69 return 'C,' + xpath
70 else:
71 return 'D,' + xpath
72
73 return xpath
74
75 def create_record_dts(self, regh, xact, xpath, msg):
76 """
77 Create a record in DTS with path and message
78 """
79 path = self._add_config_flag(self._project.add_project(xpath))
80 self._log.debug("Creating Resource Record xact = %s, %s:%s",
81 xact, path, msg)
82 regh.create_element(path, msg)
83
84 def delete_record_dts(self, regh, xact, xpath):
85 """
86 Delete a VNFR record in DTS with path and message
87 """
88 path = self._add_config_flag(self._project.add_project(xpath))
89 self._log.debug("Deleting Resource Record xact = %s, %s",
90 xact, path)
91 regh.delete_element(path)
92
93
94 @asyncio.coroutine
95 def register(self):
96 @asyncio.coroutine
97 def onlink_event(dts, g_reg, xact, xact_event, scratch_data):
98 @asyncio.coroutine
99 def instantiate_realloc_vn(link):
100 """Re-populate the virtual link information after restart
101
102 Arguments:
103 vlink
104
105 """
106 # wait for 3 seconds
107 yield from asyncio.sleep(3, loop=self._loop)
108
109 try:
110 response_info = yield from self._parent.reallocate_virtual_network(
111 link.event_id,
112 link.cloud_account,
113 link.request_info, link.resource_info,
114 )
115 except Exception as e:
116 self._log.error("Encoutered exception in reallocate_virtual_network")
117 self._log.exception(e)
118
119
120 if (xact_event == rwdts.MemberEvent.INSTALL):
121 link_cfg = self._link_reg.elements
122 self._log.debug("onlink_event INSTALL event: {}".format(link_cfg))
123
124 for link in link_cfg:
125 self._loop.create_task(instantiate_realloc_vn(link))
126
127 self._log.debug("onlink_event INSTALL event complete")
128
129 return rwdts.MemberRspCode.ACTION_OK
130
131 @asyncio.coroutine
132 def onvdu_event(dts, g_reg, xact, xact_event, scratch_data):
133 @asyncio.coroutine
134 def instantiate_realloc_vdu(vdu):
135 """Re-populate the VDU information after restart
136
137 Arguments:
138 vdu
139
140 """
141 # wait for 3 seconds
142 yield from asyncio.sleep(3, loop=self._loop)
143
144 try:
145 response_info = yield from self._parent.allocate_virtual_compute(
146 vdu.event_id,
147 vdu.cloud_account,
148 vdu.request_info
149 )
150 except Exception as e:
151 self._log.error("Encoutered exception in allocate_virtual_network")
152 self._log.exception(e)
153 raise e
154
155 response_xpath = "/rw-resource-mgr:resource-mgmt/rw-resource-mgr:vdu-event/rw-resource-mgr:vdu-event-data[rw-resource-mgr:event-id={}]/resource-info".format(
156 quoted_key(vdu.event_id.strip()))
157
158 cloud_account = self._parent.get_cloud_account_detail(cloud_account)
159 asyncio.ensure_future(monitor_vdu_state(response_xpath, vdu.event_id, cloud_account.vdu_instance_timeout), loop=self._loop)
160
161 if (xact_event == rwdts.MemberEvent.INSTALL):
162 vdu_cfg = self._vdu_reg.elements
163 self._log.debug("onvdu_event INSTALL event: {}".format(vdu_cfg))
164
165 for vdu in vdu_cfg:
166 self._loop.create_task(instantiate_realloc_vdu(vdu))
167
168 self._log.debug("onvdu_event INSTALL event complete")
169
170 return rwdts.MemberRspCode.ACTION_OK
171
172 @asyncio.coroutine
173 def allocate_vlink_task(ks_path, event_id, cloud_account, request_info):
174 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
175 schema = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData().schema()
176 pathentry = schema.keyspec_to_entry(ks_path)
177 try:
178 response_info = yield from self._parent.allocate_virtual_network(pathentry.key00.event_id,
179 cloud_account,
180 request_info)
181 except Exception as e:
182 self._log.error("Encountered exception: %s while creating virtual network", str(e))
183 self._log.exception(e)
184 response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData_ResourceInfo()
185 response_info.resource_state = 'failed'
186 response_info.resource_errors = str(e)
187 yield from self._dts.query_update(response_xpath,
188 rwdts.XactFlag.ADVISE,
189 response_info)
190 else:
191 yield from self._dts.query_update(response_xpath,
192 rwdts.XactFlag.ADVISE,
193 response_info)
194
195
196 @asyncio.coroutine
197 def on_link_request_prepare(xact_info, action, ks_path, request_msg):
198 self._log.debug(
199 "Received virtual-link on_prepare callback (xact_info: %s, action: %s): %s",
200 xact_info, action, request_msg)
201
202 response_info = None
203 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
204
205 schema = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData().schema()
206 pathentry = schema.keyspec_to_entry(ks_path)
207
208 if action == rwdts.QueryAction.CREATE:
209 try:
210 response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData_ResourceInfo()
211 response_info.resource_state = 'pending'
212 request_msg.resource_info = response_info
213 self.create_record_dts(self._link_reg,
214 None,
215 ks_path.to_xpath(RwResourceMgrYang.get_schema()),
216 request_msg)
217
218 asyncio.ensure_future(allocate_vlink_task(ks_path,
219 pathentry.key00.event_id,
220 request_msg.cloud_account,
221 request_msg.request_info),
222 loop = self._loop)
223 except Exception as e:
224 self._log.error(
225 "Encountered exception: %s while creating virtual network", str(e))
226 self._log.exception(e)
227 response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData_ResourceInfo()
228 response_info.resource_state = 'failed'
229 response_info.resource_errors = str(e)
230 yield from self._dts.query_update(response_xpath,
231 rwdts.XactFlag.ADVISE,
232 response_info)
233 elif action == rwdts.QueryAction.DELETE:
234 yield from self._parent.release_virtual_network(pathentry.key00.event_id)
235 self.delete_record_dts(self._link_reg, None,
236 ks_path.to_xpath(RwResourceMgrYang.get_schema()))
237
238 elif action == rwdts.QueryAction.READ:
239 # TODO: Check why we are getting null event id request
240 if pathentry.key00.event_id:
241 response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id)
242 else:
243 xact_info.respond_xpath(rwdts.XactRspCode.NA)
244 return
245 else:
246 raise ValueError(
247 "Only read/create/delete actions available. Received action: %s" %(action))
248
249 self._log.info("Responding with VirtualLinkInfo at xpath %s: %s.",
250 response_xpath, response_info)
251
252 xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
253
254
255
256 def monitor_vdu_state(response_xpath, event_id, vdu_timeout):
257 self._log.info("Initiating VDU state monitoring for xpath: %s ", response_xpath)
258 sleep_time = 2
259 loop_cnt = int(vdu_timeout/sleep_time)
260
261 for i in range(loop_cnt):
262 self._log.debug(
263 "VDU state monitoring for xpath: %s. Sleeping for 2 second", response_xpath)
264 yield from asyncio.sleep(2, loop = self._loop)
265
266 try:
267 response_info = yield from self._parent.read_virtual_compute_info(event_id)
268 except Exception as e:
269 self._log.info(
270 "VDU state monitoring: Received exception %s in VDU state monitoring for %s. Aborting monitoring", str(e),response_xpath)
271
272 response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
273 response_info.resource_state = 'failed'
274 response_info.resource_errors = str(e)
275 yield from self._dts.query_update(response_xpath,
276 rwdts.XactFlag.ADVISE,
277 response_info)
278 else:
279 if response_info.resource_state == 'active' or response_info.resource_state == 'failed':
280 self._log.info("VDU state monitoring: VDU reached terminal state. " +
281 "Publishing VDU info: %s at path: %s",
282 response_info, response_xpath)
283 yield from self._dts.query_update(response_xpath,
284 rwdts.XactFlag.ADVISE,
285 response_info)
286 return
287 else:
288 ### End of loop. This is only possible if VDU did not reach active state
289 err_msg = ("VDU state monitoring: VDU at xpath :{} did not reached active " +
290 "state in {} seconds. Aborting monitoring".
291 format(response_xpath, time_to_wait))
292 self._log.info(err_msg)
293 response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
294 response_info.resource_state = 'failed'
295 response_info.resource_errors = err_msg
296 yield from self._dts.query_update(response_xpath,
297 rwdts.XactFlag.ADVISE,
298 response_info)
299 return
300
301 def allocate_vdu_task(ks_path, event_id, cloud_account, request_msg):
302 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
303 response_xpath = self._add_config_flag(response_xpath)
304 schema = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData().schema()
305 pathentry = schema.keyspec_to_entry(ks_path)
306 try:
307 response_info = yield from self._parent.allocate_virtual_compute(event_id,
308 cloud_account,
309 request_msg,)
310 except Exception as e:
311 self._log.error("Encountered exception : %s while creating virtual compute", str(e))
312 response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
313 response_info.resource_state = 'failed'
314 response_info.resource_errors = str(e)
315 yield from self._dts.query_update(response_xpath,
316 rwdts.XactFlag.ADVISE,
317 response_info)
318 else:
319 cloud_account = self._parent.get_cloud_account_detail(cloud_account)
320 #RIFT-17719 - Set the resource state to active if no floating ip pool specified and is waiting for public ip.
321 if response_info.resource_state == 'pending' and cloud_account.has_field('openstack') \
322 and not (cloud_account.openstack.has_field('floating_ip_pool')) :
323 if (request_msg.has_field('allocate_public_address')) and (request_msg.allocate_public_address == True):
324 if not response_info.has_field('public_ip'):
325 response_info.resource_state = 'active'
326
327 if response_info.resource_state == 'failed' or response_info.resource_state == 'active' :
328 self._log.debug("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
329 response_info, response_xpath)
330 yield from self._dts.query_update(response_xpath,
331 rwdts.XactFlag.ADVISE,
332 response_info)
333 else:
334 asyncio.ensure_future(monitor_vdu_state(response_xpath, pathentry.key00.event_id, cloud_account.vdu_instance_timeout),
335 loop = self._loop)
336
337 @asyncio.coroutine
338 def on_vdu_request_prepare(xact_info, action, ks_path, request_msg):
339 self._log.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
340 xact_info, action, request_msg)
341 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
342 response_xpath = self._add_config_flag(response_xpath)
343 schema = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData().schema()
344 pathentry = schema.keyspec_to_entry(ks_path)
345
346 if action == rwdts.QueryAction.CREATE:
347 response_info = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
348 response_info.resource_state = 'pending'
349 request_msg.resource_info = response_info
350 self.create_record_dts(self._vdu_reg,
351 None,
352 ks_path.to_xpath(RwResourceMgrYang.get_schema()),
353 request_msg)
354 asyncio.ensure_future(allocate_vdu_task(ks_path,
355 pathentry.key00.event_id,
356 request_msg.cloud_account,
357 request_msg.request_info),
358 loop = self._loop)
359 elif action == rwdts.QueryAction.DELETE:
360 response_info = None
361 yield from self._parent.release_virtual_compute(pathentry.key00.event_id)
362 self.delete_record_dts(self._vdu_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
363 elif action == rwdts.QueryAction.READ:
364 # TODO: Check why we are getting null event id request
365 if pathentry.key00.event_id:
366 response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
367 else:
368 xact_info.respond_xpath(rwdts.XactRspCode.NA)
369 return
370 else:
371 raise ValueError("Only create/delete actions available. Received action: %s" %(action))
372
373 self._log.debug("Responding with VDUInfo at xpath %s: %s",
374 response_xpath, response_info)
375
376 xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
377
378
379 @asyncio.coroutine
380 def on_request_ready(registration, status):
381 self._log.debug("Got request ready event (registration: %s) (status: %s)",
382 registration, status)
383
384 if registration == self._link_reg:
385 self._link_reg_event.set()
386 elif registration == self._vdu_reg:
387 self._vdu_reg_event.set()
388 else:
389 self._log.error("Unknown registration ready event: %s", registration)
390
391 link_handlers = rift.tasklets.Group.Handler(on_event=onlink_event,)
392 with self._dts.group_create(handler=link_handlers) as link_group:
393 xpath = self._project.add_project(ResourceMgrEvent.VLINK_REQUEST_XPATH)
394 self._log.debug("Registering for Link Resource Request using xpath: {}".
395 format(xpath))
396
397 self._link_reg = link_group.register(xpath=xpath,
398 handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
399 on_prepare=on_link_request_prepare),
400 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
401
402 vdu_handlers = rift.tasklets.Group.Handler(on_event=onvdu_event, )
403 with self._dts.group_create(handler=vdu_handlers) as vdu_group:
404
405 xpath = self._project.add_project(ResourceMgrEvent.VDU_REQUEST_XPATH)
406 self._log.debug("Registering for VDU Resource Request using xpath: {}".
407 format(xpath))
408
409 self._vdu_reg = vdu_group.register(xpath=xpath,
410 handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
411 on_prepare=on_vdu_request_prepare),
412 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
413
414
415 def deregister(self):
416 self._log.debug("De-register for project {}".format(self._project.name))
417
418 if self._vdu_reg:
419 self._vdu_reg.deregister()
420 self._vdu_reg = None
421
422 if self._link_reg:
423 self._link_reg.deregister()
424 self._link_reg = None