3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 gi
.require_version('RwDts', '1.0')
23 gi
.require_version('RwYang', '1.0')
24 gi
.require_version('RwResourceMgrYang', '1.0')
25 gi
.require_version('RwLaunchpadYang', '1.0')
26 gi
.require_version('RwcalYang', '1.0')
27 from gi
.repository
import (
35 from gi
.repository
.RwTypes
import RwStatus
38 if sys
.version_info
< (3, 4, 4):
39 asyncio
.ensure_future
= asyncio
.async
42 class ResourceMgrEvent(object):
43 VDU_REQUEST_XPATH
= "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data"
44 VLINK_REQUEST_XPATH
= "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
46 def __init__(self
, dts
, log
, loop
, parent
):
51 self
._project
= parent
._project
55 self
._vdu
_reg
_event
= asyncio
.Event(loop
=self
._loop
)
56 self
._link
_reg
_event
= asyncio
.Event(loop
=self
._loop
)
59 def wait_ready(self
, timeout
=5):
60 self
._log
.debug("Waiting for all request registrations to become ready.")
61 yield from asyncio
.wait([self
._link
_reg
_event
.wait(), self
._vdu
_reg
_event
.wait()],
62 timeout
=timeout
, loop
=self
._loop
)
64 def _add_config_flag(self
, xpath
, config
=False):
73 def create_record_dts(self
, regh
, xact
, xpath
, msg
):
75 Create a record in DTS with path and message
77 path
= self
._add
_config
_flag
(self
._project
.add_project(xpath
))
78 self
._log
.debug("Creating Resource Record xact = %s, %s:%s",
80 regh
.create_element(path
, msg
)
82 def delete_record_dts(self
, regh
, xact
, xpath
):
84 Delete a VNFR record in DTS with path and message
86 path
= self
._add
_config
_flag
(self
._project
.add_project(xpath
))
87 self
._log
.debug("Deleting Resource Record xact = %s, %s",
89 regh
.delete_element(path
)
95 def onlink_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
97 def instantiate_realloc_vn(link
):
98 """Re-populate the virtual link information after restart
105 yield from asyncio
.sleep(3, loop
=self
._loop
)
107 response_info
= yield from self
._parent
.reallocate_virtual_network(link
.event_id
,
109 link
.request_info
, link
.resource_info
,
111 if (xact_event
== rwdts
.MemberEvent
.INSTALL
):
112 link_cfg
= self
._link
_reg
.elements
113 for link
in link_cfg
:
114 self
._loop
.create_task(instantiate_realloc_vn(link
))
115 return rwdts
.MemberRspCode
.ACTION_OK
118 def onvdu_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
120 def instantiate_realloc_vdu(vdu
):
121 """Re-populate the VDU information after restart
128 yield from asyncio
.sleep(3, loop
=self
._loop
)
130 response_info
= yield from self
._parent
.allocate_virtual_compute(vdu
.event_id
,
134 if (xact_event
== rwdts
.MemberEvent
.INSTALL
):
135 vdu_cfg
= self
._vdu
_reg
.elements
137 self
._loop
.create_task(instantiate_realloc_vdu(vdu
))
138 return rwdts
.MemberRspCode
.ACTION_OK
140 def on_link_request_commit(xact_info
):
141 """ The transaction has been committed """
142 self
._log
.debug("Received link request commit (xact_info: %s)", xact_info
)
143 return rwdts
.MemberRspCode
.ACTION_OK
146 def on_link_request_prepare(xact_info
, action
, ks_path
, request_msg
):
147 self
._log
.debug("Received virtual-link on_prepare callback (xact_info: %s, action: %s): %s",
148 xact_info
, action
, request_msg
)
151 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
153 schema
= RwResourceMgrYang
.VirtualLinkEventData().schema()
154 pathentry
= schema
.keyspec_to_entry(ks_path
)
156 if action
== rwdts
.QueryAction
.CREATE
:
158 response_info
= yield from self
._parent
.allocate_virtual_network(pathentry
.key00
.event_id
,
159 request_msg
.cloud_account
,
160 request_msg
.request_info
)
161 except Exception as e
:
162 self
._log
.error("Encountered exception: %s while creating virtual network", str(e
))
163 self
._log
.exception(e
)
164 response_info
= RwResourceMgrYang
.VirtualLinkEventData_ResourceInfo()
165 response_info
.resource_state
= 'failed'
166 response_info
.resource_errors
= str(e
)
167 yield from self
._dts
.query_update(response_xpath
,
168 rwdts
.XactFlag
.ADVISE
,
171 request_msg
.resource_info
= response_info
172 self
.create_record_dts(self
._link
_reg
, None, ks_path
.to_xpath(RwResourceMgrYang
.get_schema()), request_msg
)
173 elif action
== rwdts
.QueryAction
.DELETE
:
174 yield from self
._parent
.release_virtual_network(pathentry
.key00
.event_id
)
175 self
.delete_record_dts(self
._link
_reg
, None, ks_path
.to_xpath(RwResourceMgrYang
.get_schema()))
176 elif action
== rwdts
.QueryAction
.READ
:
177 # TODO: Check why we are getting null event id request
178 if pathentry
.key00
.event_id
:
179 response_info
= yield from self
._parent
.read_virtual_network_info(pathentry
.key00
.event_id
)
181 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
184 raise ValueError("Only read/create/delete actions available. Received action: %s" %(action))
186 self
._log
.info("Responding with VirtualLinkInfo at xpath %s: %s.",
187 response_xpath
, response_info
)
189 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, response_xpath
, response_info
)
192 def on_vdu_request_commit(xact_info
):
193 """ The transaction has been committed """
194 self
._log
.debug("Received vdu request commit (xact_info: %s)", xact_info
)
195 return rwdts
.MemberRspCode
.ACTION_OK
197 def monitor_vdu_state(response_xpath
, pathentry
):
198 self
._log
.debug("Initiating VDU state monitoring for xpath: %s ", response_xpath
)
201 loop_cnt
= int(time_to_wait
/sleep_time
)
202 for i
in range(loop_cnt
):
203 self
._log
.debug("VDU state monitoring for xpath: %s. Sleeping for 2 second", response_xpath
)
204 yield from asyncio
.sleep(2, loop
= self
._loop
)
206 response_info
= yield from self
._parent
.read_virtual_compute_info(pathentry
.key00
.event_id
)
207 except Exception as e
:
208 self
._log
.info("VDU state monitoring: Received exception %s in VDU state monitoring for %s. Aborting monitoring",
209 str(e
),response_xpath
)
210 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
211 response_info
.resource_state
= 'failed'
212 response_info
.resource_errors
= str(e
)
213 yield from self
._dts
.query_update(response_xpath
,
214 rwdts
.XactFlag
.ADVISE
,
217 if response_info
.resource_state
== 'active' or response_info
.resource_state
== 'failed':
218 self
._log
.info("VDU state monitoring: VDU reached terminal state. " +
219 "Publishing VDU info: %s at path: %s",
220 response_info
, response_xpath
)
221 yield from self
._dts
.query_update(response_xpath
,
222 rwdts
.XactFlag
.ADVISE
,
226 ### End of loop. This is only possible if VDU did not reach active state
227 err_msg
= ("VDU state monitoring: VDU at xpath :{} did not reached active " +
228 "state in {} seconds. Aborting monitoring".
229 format(response_xpath
, time_to_wait
))
230 self
._log
.info(err_msg
)
231 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
232 response_info
.resource_state
= 'failed'
233 response_info
.resource_errors
= err_msg
234 yield from self
._dts
.query_update(response_xpath
,
235 rwdts
.XactFlag
.ADVISE
,
239 def allocate_vdu_task(ks_path
, event_id
, cloud_account
, request_msg
):
240 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
241 response_xpath
= self
._add
_config
_flag
(response_xpath
)
242 schema
= RwResourceMgrYang
.VDUEventData().schema()
243 pathentry
= schema
.keyspec_to_entry(ks_path
)
245 response_info
= yield from self
._parent
.allocate_virtual_compute(event_id
,
248 except Exception as e
:
249 self
._log
.error("Encountered exception : %s while creating virtual compute", str(e
))
250 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
251 response_info
.resource_state
= 'failed'
252 response_info
.resource_errors
= str(e
)
253 yield from self
._dts
.query_update(response_xpath
,
254 rwdts
.XactFlag
.ADVISE
,
257 if response_info
.resource_state
== 'failed' or response_info
.resource_state
== 'active' :
258 self
._log
.debug("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
259 response_info
, response_xpath
)
260 yield from self
._dts
.query_update(response_xpath
,
261 rwdts
.XactFlag
.ADVISE
,
264 self
._log
.debug("VDU create monitor at {}".format(response_xpath
))
265 asyncio
.ensure_future(monitor_vdu_state(response_xpath
, pathentry
),
269 def on_vdu_request_prepare(xact_info
, action
, ks_path
, request_msg
):
270 self
._log
.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
271 xact_info
, action
, request_msg
)
272 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
273 response_xpath
= self
._add
_config
_flag
(response_xpath
)
274 schema
= RwResourceMgrYang
.VDUEventData().schema()
275 pathentry
= schema
.keyspec_to_entry(ks_path
)
277 if action
== rwdts
.QueryAction
.CREATE
:
278 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
279 response_info
.resource_state
= 'pending'
280 request_msg
.resource_info
= response_info
281 self
.create_record_dts(self
._vdu
_reg
,
283 ks_path
.to_xpath(RwResourceMgrYang
.get_schema()),
285 asyncio
.ensure_future(allocate_vdu_task(ks_path
,
286 pathentry
.key00
.event_id
,
287 request_msg
.cloud_account
,
288 request_msg
.request_info
),
290 elif action
== rwdts
.QueryAction
.DELETE
:
292 yield from self
._parent
.release_virtual_compute(pathentry
.key00
.event_id
)
293 self
.delete_record_dts(self
._vdu
_reg
, None, ks_path
.to_xpath(RwResourceMgrYang
.get_schema()))
294 elif action
== rwdts
.QueryAction
.READ
:
295 # TODO: Check why we are getting null event id request
296 if pathentry
.key00
.event_id
:
297 response_info
= yield from self
._parent
.read_virtual_compute_info(pathentry
.key00
.event_id
)
299 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
302 raise ValueError("Only create/delete actions available. Received action: %s" %(action))
304 self
._log
.debug("Responding with VDUInfo at xpath %s: %s",
305 response_xpath
, response_info
)
307 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, response_xpath
, response_info
)
311 def on_request_ready(registration
, status
):
312 self
._log
.debug("Got request ready event (registration: %s) (status: %s)",
313 registration
, status
)
315 if registration
== self
._link
_reg
:
316 self
._link
_reg
_event
.set()
317 elif registration
== self
._vdu
_reg
:
318 self
._vdu
_reg
_event
.set()
320 self
._log
.error("Unknown registration ready event: %s", registration
)
322 link_handlers
= rift
.tasklets
.Group
.Handler(on_event
=onlink_event
,)
323 with self
._dts
.group_create(handler
=link_handlers
) as link_group
:
324 xpath
= self
._project
.add_project(ResourceMgrEvent
.VLINK_REQUEST_XPATH
)
325 self
._log
.debug("Registering for Link Resource Request using xpath: {}".
328 self
._link
_reg
= link_group
.register(xpath
=xpath
,
329 handler
=rift
.tasklets
.DTS
.RegistrationHandler(on_ready
=on_request_ready
,
330 on_commit
=on_link_request_commit
,
331 on_prepare
=on_link_request_prepare
),
332 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.DATASTORE
,)
334 vdu_handlers
= rift
.tasklets
.Group
.Handler(on_event
=onvdu_event
, )
335 with self
._dts
.group_create(handler
=vdu_handlers
) as vdu_group
:
337 xpath
= self
._project
.add_project(ResourceMgrEvent
.VDU_REQUEST_XPATH
)
338 self
._log
.debug("Registering for VDU Resource Request using xpath: {}".
341 self
._vdu
_reg
= vdu_group
.register(xpath
=xpath
,
342 handler
=rift
.tasklets
.DTS
.RegistrationHandler(on_ready
=on_request_ready
,
343 on_commit
=on_vdu_request_commit
,
344 on_prepare
=on_vdu_request_prepare
),
345 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.DATASTORE
,)
348 def deregister(self
):
349 self
._log
.debug("De-register for project {}".format(self
._project
.name
))
352 self
._vdu
_reg
.deregister()
356 self
._link
_reg
.deregister()
357 self
._link
_reg
= None