Merge "Revert "Functional spec for cloud-init support""
[osm/SO.git] / rwlaunchpad / plugins / rwresmgr / rift / tasklets / rwresmgrtasklet / rwresmgr_events.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import sys
20
21 import gi
22 gi.require_version('RwDts', '1.0')
23 gi.require_version('RwYang', '1.0')
24 gi.require_version('RwResourceMgrYang', '1.0')
25 gi.require_version('RwLaunchpadYang', '1.0')
26 gi.require_version('RwcalYang', '1.0')
27 from gi.repository import (
28 RwDts as rwdts,
29 RwYang,
30 RwResourceMgrYang,
31 RwLaunchpadYang,
32 RwcalYang,
33 )
34
35 from gi.repository.RwTypes import RwStatus
36 import rift.tasklets
37
38 if sys.version_info < (3, 4, 4):
39 asyncio.ensure_future = asyncio.async
40
41
42 class ResourceMgrEvent(object):
43 VDU_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data"
44 VLINK_REQUEST_XPATH = "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
45
46 def __init__(self, dts, log, loop, parent):
47 self._log = log
48 self._dts = dts
49 self._loop = loop
50 self._parent = parent
51 self._vdu_reg = None
52 self._link_reg = None
53
54 self._vdu_reg_event = asyncio.Event(loop=self._loop)
55 self._link_reg_event = asyncio.Event(loop=self._loop)
56
57 @asyncio.coroutine
58 def wait_ready(self, timeout=5):
59 self._log.debug("Waiting for all request registrations to become ready.")
60 yield from asyncio.wait([self._link_reg_event.wait(), self._vdu_reg_event.wait()],
61 timeout=timeout, loop=self._loop)
62
63 def create_record_dts(self, regh, xact, path, msg):
64 """
65 Create a record in DTS with path and message
66 """
67 self._log.debug("Creating Resource Record xact = %s, %s:%s",
68 xact, path, msg)
69 regh.create_element(path, msg)
70
71 def delete_record_dts(self, regh, xact, path):
72 """
73 Delete a VNFR record in DTS with path and message
74 """
75 self._log.debug("Deleting Resource Record xact = %s, %s",
76 xact, path)
77 regh.delete_element(path)
78
79 @asyncio.coroutine
80 def register(self):
81 @asyncio.coroutine
82 def onlink_event(dts, g_reg, xact, xact_event, scratch_data):
83 @asyncio.coroutine
84 def instantiate_realloc_vn(link):
85 """Re-populate the virtual link information after restart
86
87 Arguments:
88 vlink
89
90 """
91 # wait for 3 seconds
92 yield from asyncio.sleep(3, loop=self._loop)
93
94 response_info = yield from self._parent.reallocate_virtual_network(link.event_id,
95 link.cloud_account,
96 link.request_info, link.resource_info,
97 )
98 if (xact_event == rwdts.MemberEvent.INSTALL):
99 link_cfg = self._link_reg.elements
100 for link in link_cfg:
101 self._loop.create_task(instantiate_realloc_vn(link))
102 return rwdts.MemberRspCode.ACTION_OK
103
104 @asyncio.coroutine
105 def onvdu_event(dts, g_reg, xact, xact_event, scratch_data):
106 @asyncio.coroutine
107 def instantiate_realloc_vdu(vdu):
108 """Re-populate the VDU information after restart
109
110 Arguments:
111 vdu
112
113 """
114 # wait for 3 seconds
115 yield from asyncio.sleep(3, loop=self._loop)
116
117 response_info = yield from self._parent.allocate_virtual_compute(vdu.event_id,
118 vdu.cloud_account,
119 vdu.request_info
120 )
121 if (xact_event == rwdts.MemberEvent.INSTALL):
122 vdu_cfg = self._vdu_reg.elements
123 for vdu in vdu_cfg:
124 self._loop.create_task(instantiate_realloc_vdu(vdu))
125 return rwdts.MemberRspCode.ACTION_OK
126
127 def on_link_request_commit(xact_info):
128 """ The transaction has been committed """
129 self._log.debug("Received link request commit (xact_info: %s)", xact_info)
130 return rwdts.MemberRspCode.ACTION_OK
131
132 @asyncio.coroutine
133 def on_link_request_prepare(xact_info, action, ks_path, request_msg):
134 self._log.debug("Received virtual-link on_prepare callback (xact_info: %s, action: %s): %s",
135 xact_info, action, request_msg)
136
137 response_info = None
138 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
139
140 schema = RwResourceMgrYang.VirtualLinkEventData().schema()
141 pathentry = schema.keyspec_to_entry(ks_path)
142
143 if action == rwdts.QueryAction.CREATE:
144 try:
145 response_info = yield from self._parent.allocate_virtual_network(pathentry.key00.event_id,
146 request_msg.cloud_account,
147 request_msg.request_info)
148 except Exception as e:
149 self._log.error("Encountered exception: %s while creating virtual network", str(e))
150 self._log.exception(e)
151 response_info = RwResourceMgrYang.VirtualLinkEventData_ResourceInfo()
152 response_info.resource_state = 'failed'
153 response_info.resource_errors = str(e)
154 yield from self._dts.query_update(response_xpath,
155 rwdts.XactFlag.ADVISE,
156 response_info)
157 else:
158 request_msg.resource_info = response_info
159 self.create_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()), request_msg)
160 elif action == rwdts.QueryAction.DELETE:
161 yield from self._parent.release_virtual_network(pathentry.key00.event_id)
162 self.delete_record_dts(self._link_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
163 elif action == rwdts.QueryAction.READ:
164 response_info = yield from self._parent.read_virtual_network_info(pathentry.key00.event_id)
165 else:
166 raise ValueError("Only read/create/delete actions available. Received action: %s" %(action))
167
168 self._log.debug("Responding with VirtualLinkInfo at xpath %s: %s.",
169 response_xpath, response_info)
170
171 xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
172
173
174 def on_vdu_request_commit(xact_info):
175 """ The transaction has been committed """
176 self._log.debug("Received vdu request commit (xact_info: %s)", xact_info)
177 return rwdts.MemberRspCode.ACTION_OK
178
179 def monitor_vdu_state(response_xpath, pathentry):
180 self._log.info("Initiating VDU state monitoring for xpath: %s ", response_xpath)
181 loop_cnt = 180
182 for i in range(loop_cnt):
183 self._log.debug("VDU state monitoring for xpath: %s. Sleeping for 1 second", response_xpath)
184 yield from asyncio.sleep(1, loop = self._loop)
185 try:
186 response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
187 except Exception as e:
188 self._log.info("VDU state monitoring: Received exception %s in VDU state monitoring for %s. Aborting monitoring",
189 str(e),response_xpath)
190 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
191 response_info.resource_state = 'failed'
192 response_info.resource_errors = str(e)
193 yield from self._dts.query_update(response_xpath,
194 rwdts.XactFlag.ADVISE,
195 response_info)
196 else:
197 if response_info.resource_state == 'active' or response_info.resource_state == 'failed':
198 self._log.info("VDU state monitoring: VDU reached terminal state. Publishing VDU info: %s at path: %s",
199 response_info, response_xpath)
200 yield from self._dts.query_update(response_xpath,
201 rwdts.XactFlag.ADVISE,
202 response_info)
203 return
204 else:
205 ### End of loop. This is only possible if VDU did not reach active state
206 err_msg = "VDU state monitoring: VDU at xpath :{} did not reached active state in {} seconds. Aborting monitoring".format(response_xpath, loop_cnt)
207 self._log.info(err_msg)
208 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
209 response_info.resource_state = 'failed'
210 response_info.resource_errors = err_msg
211 yield from self._dts.query_update(response_xpath,
212 rwdts.XactFlag.ADVISE,
213 response_info)
214 return
215
216 def allocate_vdu_task(ks_path, event_id, cloud_account, request_msg):
217 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
218 schema = RwResourceMgrYang.VDUEventData().schema()
219 pathentry = schema.keyspec_to_entry(ks_path)
220 try:
221 response_info = yield from self._parent.allocate_virtual_compute(event_id,
222 cloud_account,
223 request_msg,)
224 except Exception as e:
225 self._log.error("Encountered exception : %s while creating virtual compute", str(e))
226 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
227 response_info.resource_state = 'failed'
228 response_info.resource_errors = str(e)
229 yield from self._dts.query_update(response_xpath,
230 rwdts.XactFlag.ADVISE,
231 response_info)
232 else:
233 if response_info.resource_state == 'failed' or response_info.resource_state == 'active' :
234 self._log.info("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
235 response_info, response_xpath)
236 yield from self._dts.query_update(response_xpath,
237 rwdts.XactFlag.ADVISE,
238 response_info)
239 else:
240 asyncio.ensure_future(monitor_vdu_state(response_xpath, pathentry),
241 loop = self._loop)
242
243
244 @asyncio.coroutine
245 def on_vdu_request_prepare(xact_info, action, ks_path, request_msg):
246 self._log.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
247 xact_info, action, request_msg)
248 response_xpath = ks_path.to_xpath(RwResourceMgrYang.get_schema()) + "/resource-info"
249 schema = RwResourceMgrYang.VDUEventData().schema()
250 pathentry = schema.keyspec_to_entry(ks_path)
251
252 if action == rwdts.QueryAction.CREATE:
253 response_info = RwResourceMgrYang.VDUEventData_ResourceInfo()
254 response_info.resource_state = 'pending'
255 request_msg.resource_info = response_info
256 self.create_record_dts(self._vdu_reg,
257 None,
258 ks_path.to_xpath(RwResourceMgrYang.get_schema()),
259 request_msg)
260 asyncio.ensure_future(allocate_vdu_task(ks_path,
261 pathentry.key00.event_id,
262 request_msg.cloud_account,
263 request_msg.request_info),
264 loop = self._loop)
265 elif action == rwdts.QueryAction.DELETE:
266 response_info = None
267 yield from self._parent.release_virtual_compute(pathentry.key00.event_id)
268 self.delete_record_dts(self._vdu_reg, None, ks_path.to_xpath(RwResourceMgrYang.get_schema()))
269 elif action == rwdts.QueryAction.READ:
270 response_info = yield from self._parent.read_virtual_compute_info(pathentry.key00.event_id)
271 else:
272 raise ValueError("Only create/delete actions available. Received action: %s" %(action))
273
274 self._log.debug("Responding with VDUInfo at xpath %s: %s",
275 response_xpath, response_info)
276
277 xact_info.respond_xpath(rwdts.XactRspCode.ACK, response_xpath, response_info)
278
279
280 @asyncio.coroutine
281 def on_request_ready(registration, status):
282 self._log.debug("Got request ready event (registration: %s) (status: %s)",
283 registration, status)
284
285 if registration == self._link_reg:
286 self._link_reg_event.set()
287 elif registration == self._vdu_reg:
288 self._vdu_reg_event.set()
289 else:
290 self._log.error("Unknown registration ready event: %s", registration)
291
292 link_handlers = rift.tasklets.Group.Handler(on_event=onlink_event,)
293 with self._dts.group_create(handler=link_handlers) as link_group:
294 self._log.debug("Registering for Link Resource Request using xpath: %s",
295 ResourceMgrEvent.VLINK_REQUEST_XPATH)
296
297 self._link_reg = link_group.register(xpath=ResourceMgrEvent.VLINK_REQUEST_XPATH,
298 handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
299 on_commit=on_link_request_commit,
300 on_prepare=on_link_request_prepare),
301 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
302
303 vdu_handlers = rift.tasklets.Group.Handler(on_event=onvdu_event, )
304 with self._dts.group_create(handler=vdu_handlers) as vdu_group:
305
306 self._log.debug("Registering for VDU Resource Request using xpath: %s",
307 ResourceMgrEvent.VDU_REQUEST_XPATH)
308
309 self._vdu_reg = vdu_group.register(xpath=ResourceMgrEvent.VDU_REQUEST_XPATH,
310 handler=rift.tasklets.DTS.RegistrationHandler(on_ready=on_request_ready,
311 on_commit=on_vdu_request_commit,
312 on_prepare=on_vdu_request_prepare),
313 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.DATASTORE,)
314