3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 gi
.require_version('RwDts', '1.0')
23 gi
.require_version('RwYang', '1.0')
24 gi
.require_version('RwResourceMgrYang', '1.0')
25 gi
.require_version('RwLaunchpadYang', '1.0')
26 gi
.require_version('RwcalYang', '1.0')
27 from gi
.repository
import (
35 from gi
.repository
.RwTypes
import RwStatus
38 if sys
.version_info
< (3, 4, 4):
39 asyncio
.ensure_future
= asyncio
.async
42 class ResourceMgrEvent(object):
43 VDU_REQUEST_XPATH
= "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data"
44 VLINK_REQUEST_XPATH
= "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
46 def __init__(self
, dts
, log
, loop
, parent
):
54 self
._vdu
_reg
_event
= asyncio
.Event(loop
=self
._loop
)
55 self
._link
_reg
_event
= asyncio
.Event(loop
=self
._loop
)
58 def wait_ready(self
, timeout
=5):
59 self
._log
.debug("Waiting for all request registrations to become ready.")
60 yield from asyncio
.wait([self
._link
_reg
_event
.wait(), self
._vdu
_reg
_event
.wait()],
61 timeout
=timeout
, loop
=self
._loop
)
63 def create_record_dts(self
, regh
, xact
, path
, msg
):
65 Create a record in DTS with path and message
67 self
._log
.debug("Creating Resource Record xact = %s, %s:%s",
69 regh
.create_element(path
, msg
)
71 def delete_record_dts(self
, regh
, xact
, path
):
73 Delete a VNFR record in DTS with path and message
75 self
._log
.debug("Deleting Resource Record xact = %s, %s",
77 regh
.delete_element(path
)
82 def onlink_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
84 def instantiate_realloc_vn(link
):
85 """Re-populate the virtual link information after restart
92 yield from asyncio
.sleep(3, loop
=self
._loop
)
94 response_info
= yield from self
._parent
.reallocate_virtual_network(link
.event_id
,
96 link
.request_info
, link
.resource_info
,
98 if (xact_event
== rwdts
.MemberEvent
.INSTALL
):
99 link_cfg
= self
._link
_reg
.elements
100 for link
in link_cfg
:
101 self
._loop
.create_task(instantiate_realloc_vn(link
))
102 return rwdts
.MemberRspCode
.ACTION_OK
105 def onvdu_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
107 def instantiate_realloc_vdu(vdu
):
108 """Re-populate the VDU information after restart
115 yield from asyncio
.sleep(3, loop
=self
._loop
)
117 response_info
= yield from self
._parent
.allocate_virtual_compute(vdu
.event_id
,
121 if (xact_event
== rwdts
.MemberEvent
.INSTALL
):
122 vdu_cfg
= self
._vdu
_reg
.elements
124 self
._loop
.create_task(instantiate_realloc_vdu(vdu
))
125 return rwdts
.MemberRspCode
.ACTION_OK
127 def on_link_request_commit(xact_info
):
128 """ The transaction has been committed """
129 self
._log
.debug("Received link request commit (xact_info: %s)", xact_info
)
130 return rwdts
.MemberRspCode
.ACTION_OK
133 def on_link_request_prepare(xact_info
, action
, ks_path
, request_msg
):
134 self
._log
.debug("Received virtual-link on_prepare callback (xact_info: %s, action: %s): %s",
135 xact_info
, action
, request_msg
)
138 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
140 schema
= RwResourceMgrYang
.VirtualLinkEventData().schema()
141 pathentry
= schema
.keyspec_to_entry(ks_path
)
143 if action
== rwdts
.QueryAction
.CREATE
:
145 response_info
= yield from self
._parent
.allocate_virtual_network(pathentry
.key00
.event_id
,
146 request_msg
.cloud_account
,
147 request_msg
.request_info
)
148 except Exception as e
:
149 self
._log
.error("Encountered exception: %s while creating virtual network", str(e
))
150 self
._log
.exception(e
)
151 response_info
= RwResourceMgrYang
.VirtualLinkEventData_ResourceInfo()
152 response_info
.resource_state
= 'failed'
153 response_info
.resource_errors
= str(e
)
154 yield from self
._dts
.query_update(response_xpath
,
155 rwdts
.XactFlag
.ADVISE
,
158 request_msg
.resource_info
= response_info
159 self
.create_record_dts(self
._link
_reg
, None, ks_path
.to_xpath(RwResourceMgrYang
.get_schema()), request_msg
)
160 elif action
== rwdts
.QueryAction
.DELETE
:
161 yield from self
._parent
.release_virtual_network(pathentry
.key00
.event_id
)
162 self
.delete_record_dts(self
._link
_reg
, None, ks_path
.to_xpath(RwResourceMgrYang
.get_schema()))
163 elif action
== rwdts
.QueryAction
.READ
:
164 response_info
= yield from self
._parent
.read_virtual_network_info(pathentry
.key00
.event_id
)
166 raise ValueError("Only read/create/delete actions available. Received action: %s" %(action))
168 self
._log
.debug("Responding with VirtualLinkInfo at xpath %s: %s.",
169 response_xpath
, response_info
)
171 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, response_xpath
, response_info
)
174 def on_vdu_request_commit(xact_info
):
175 """ The transaction has been committed """
176 self
._log
.debug("Received vdu request commit (xact_info: %s)", xact_info
)
177 return rwdts
.MemberRspCode
.ACTION_OK
179 def monitor_vdu_state(response_xpath
, pathentry
):
180 self
._log
.info("Initiating VDU state monitoring for xpath: %s ", response_xpath
)
183 loop_cnt
= int(time_to_wait
/sleep_time
)
184 for i
in range(loop_cnt
):
185 self
._log
.debug("VDU state monitoring for xpath: %s. Sleeping for 2 second", response_xpath
)
186 yield from asyncio
.sleep(2, loop
= self
._loop
)
188 response_info
= yield from self
._parent
.read_virtual_compute_info(pathentry
.key00
.event_id
)
189 except Exception as e
:
190 self
._log
.info("VDU state monitoring: Received exception %s in VDU state monitoring for %s. Aborting monitoring",
191 str(e
),response_xpath
)
192 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
193 response_info
.resource_state
= 'failed'
194 response_info
.resource_errors
= str(e
)
195 yield from self
._dts
.query_update(response_xpath
,
196 rwdts
.XactFlag
.ADVISE
,
199 if response_info
.resource_state
== 'active' or response_info
.resource_state
== 'failed':
200 self
._log
.info("VDU state monitoring: VDU reached terminal state. Publishing VDU info: %s at path: %s",
201 response_info
, response_xpath
)
202 yield from self
._dts
.query_update(response_xpath
,
203 rwdts
.XactFlag
.ADVISE
,
207 ### End of loop. This is only possible if VDU did not reach active state
208 err_msg
= "VDU state monitoring: VDU at xpath :{} did not reached active state in {} seconds. Aborting monitoring".format(response_xpath
, time_to_wait
)
209 self
._log
.info(err_msg
)
210 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
211 response_info
.resource_state
= 'failed'
212 response_info
.resource_errors
= err_msg
213 yield from self
._dts
.query_update(response_xpath
,
214 rwdts
.XactFlag
.ADVISE
,
218 def allocate_vdu_task(ks_path
, event_id
, cloud_account
, request_msg
):
219 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
220 schema
= RwResourceMgrYang
.VDUEventData().schema()
221 pathentry
= schema
.keyspec_to_entry(ks_path
)
223 response_info
= yield from self
._parent
.allocate_virtual_compute(event_id
,
226 except Exception as e
:
227 self
._log
.error("Encountered exception : %s while creating virtual compute", str(e
))
228 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
229 response_info
.resource_state
= 'failed'
230 response_info
.resource_errors
= str(e
)
231 yield from self
._dts
.query_update(response_xpath
,
232 rwdts
.XactFlag
.ADVISE
,
235 if response_info
.resource_state
== 'failed' or response_info
.resource_state
== 'active' :
236 self
._log
.info("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
237 response_info
, response_xpath
)
238 yield from self
._dts
.query_update(response_xpath
,
239 rwdts
.XactFlag
.ADVISE
,
242 asyncio
.ensure_future(monitor_vdu_state(response_xpath
, pathentry
),
247 def on_vdu_request_prepare(xact_info
, action
, ks_path
, request_msg
):
248 self
._log
.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
249 xact_info
, action
, request_msg
)
250 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
251 schema
= RwResourceMgrYang
.VDUEventData().schema()
252 pathentry
= schema
.keyspec_to_entry(ks_path
)
254 if action
== rwdts
.QueryAction
.CREATE
:
255 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
256 response_info
.resource_state
= 'pending'
257 request_msg
.resource_info
= response_info
258 self
.create_record_dts(self
._vdu
_reg
,
260 ks_path
.to_xpath(RwResourceMgrYang
.get_schema()),
262 asyncio
.ensure_future(allocate_vdu_task(ks_path
,
263 pathentry
.key00
.event_id
,
264 request_msg
.cloud_account
,
265 request_msg
.request_info
),
267 elif action
== rwdts
.QueryAction
.DELETE
:
269 yield from self
._parent
.release_virtual_compute(pathentry
.key00
.event_id
)
270 self
.delete_record_dts(self
._vdu
_reg
, None, ks_path
.to_xpath(RwResourceMgrYang
.get_schema()))
271 elif action
== rwdts
.QueryAction
.READ
:
272 response_info
= yield from self
._parent
.read_virtual_compute_info(pathentry
.key00
.event_id
)
274 raise ValueError("Only create/delete actions available. Received action: %s" %(action))
276 self
._log
.debug("Responding with VDUInfo at xpath %s: %s",
277 response_xpath
, response_info
)
279 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, response_xpath
, response_info
)
283 def on_request_ready(registration
, status
):
284 self
._log
.debug("Got request ready event (registration: %s) (status: %s)",
285 registration
, status
)
287 if registration
== self
._link
_reg
:
288 self
._link
_reg
_event
.set()
289 elif registration
== self
._vdu
_reg
:
290 self
._vdu
_reg
_event
.set()
292 self
._log
.error("Unknown registration ready event: %s", registration
)
294 link_handlers
= rift
.tasklets
.Group
.Handler(on_event
=onlink_event
,)
295 with self
._dts
.group_create(handler
=link_handlers
) as link_group
:
296 self
._log
.debug("Registering for Link Resource Request using xpath: %s",
297 ResourceMgrEvent
.VLINK_REQUEST_XPATH
)
299 self
._link
_reg
= link_group
.register(xpath
=ResourceMgrEvent
.VLINK_REQUEST_XPATH
,
300 handler
=rift
.tasklets
.DTS
.RegistrationHandler(on_ready
=on_request_ready
,
301 on_commit
=on_link_request_commit
,
302 on_prepare
=on_link_request_prepare
),
303 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.DATASTORE
,)
305 vdu_handlers
= rift
.tasklets
.Group
.Handler(on_event
=onvdu_event
, )
306 with self
._dts
.group_create(handler
=vdu_handlers
) as vdu_group
:
308 self
._log
.debug("Registering for VDU Resource Request using xpath: %s",
309 ResourceMgrEvent
.VDU_REQUEST_XPATH
)
311 self
._vdu
_reg
= vdu_group
.register(xpath
=ResourceMgrEvent
.VDU_REQUEST_XPATH
,
312 handler
=rift
.tasklets
.DTS
.RegistrationHandler(on_ready
=on_request_ready
,
313 on_commit
=on_vdu_request_commit
,
314 on_prepare
=on_vdu_request_prepare
),
315 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.DATASTORE
,)