3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 gi
.require_version('RwDts', '1.0')
23 gi
.require_version('RwYang', '1.0')
24 gi
.require_version('RwResourceMgrYang', '1.0')
25 gi
.require_version('RwLaunchpadYang', '1.0')
26 gi
.require_version('RwcalYang', '1.0')
27 from gi
.repository
import (
35 from gi
.repository
.RwTypes
import RwStatus
38 if sys
.version_info
< (3, 4, 4):
39 asyncio
.ensure_future
= asyncio
.async
42 class ResourceMgrEvent(object):
43 VDU_REQUEST_XPATH
= "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data"
44 VLINK_REQUEST_XPATH
= "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
46 def __init__(self
, dts
, log
, loop
, parent
):
54 self
._vdu
_reg
_event
= asyncio
.Event(loop
=self
._loop
)
55 self
._link
_reg
_event
= asyncio
.Event(loop
=self
._loop
)
58 def wait_ready(self
, timeout
=5):
59 self
._log
.debug("Waiting for all request registrations to become ready.")
60 yield from asyncio
.wait([self
._link
_reg
_event
.wait(), self
._vdu
_reg
_event
.wait()],
61 timeout
=timeout
, loop
=self
._loop
)
63 def create_record_dts(self
, regh
, xact
, path
, msg
):
65 Create a record in DTS with path and message
67 self
._log
.debug("Creating Resource Record xact = %s, %s:%s",
69 regh
.create_element(path
, msg
)
71 def delete_record_dts(self
, regh
, xact
, path
):
73 Delete a VNFR record in DTS with path and message
75 self
._log
.debug("Deleting Resource Record xact = %s, %s",
77 regh
.delete_element(path
)
82 def onlink_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
84 def instantiate_realloc_vn(link
):
85 """Re-populate the virtual link information after restart
92 yield from asyncio
.sleep(3, loop
=self
._loop
)
94 response_info
= yield from self
._parent
.reallocate_virtual_network(link
.event_id
,
96 link
.request_info
, link
.resource_info
,
98 if (xact_event
== rwdts
.MemberEvent
.INSTALL
):
99 link_cfg
= self
._link
_reg
.elements
100 for link
in link_cfg
:
101 self
._loop
.create_task(instantiate_realloc_vn(link
))
102 return rwdts
.MemberRspCode
.ACTION_OK
105 def onvdu_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
107 def instantiate_realloc_vdu(vdu
):
108 """Re-populate the VDU information after restart
115 yield from asyncio
.sleep(3, loop
=self
._loop
)
117 response_info
= yield from self
._parent
.allocate_virtual_compute(vdu
.event_id
,
121 if (xact_event
== rwdts
.MemberEvent
.INSTALL
):
122 vdu_cfg
= self
._vdu
_reg
.elements
124 self
._loop
.create_task(instantiate_realloc_vdu(vdu
))
125 return rwdts
.MemberRspCode
.ACTION_OK
127 def on_link_request_commit(xact_info
):
128 """ The transaction has been committed """
129 self
._log
.debug("Received link request commit (xact_info: %s)", xact_info
)
130 return rwdts
.MemberRspCode
.ACTION_OK
133 def on_link_request_prepare(xact_info
, action
, ks_path
, request_msg
):
134 self
._log
.debug("Received virtual-link on_prepare callback (xact_info: %s, action: %s): %s",
135 xact_info
, action
, request_msg
)
138 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
140 schema
= RwResourceMgrYang
.VirtualLinkEventData().schema()
141 pathentry
= schema
.keyspec_to_entry(ks_path
)
143 if action
== rwdts
.QueryAction
.CREATE
:
145 response_info
= yield from self
._parent
.allocate_virtual_network(pathentry
.key00
.event_id
,
146 request_msg
.cloud_account
,
147 request_msg
.request_info
)
148 except Exception as e
:
149 self
._log
.error("Encountered exception: %s while creating virtual network", str(e
))
150 self
._log
.exception(e
)
151 response_info
= RwResourceMgrYang
.VirtualLinkEventData_ResourceInfo()
152 response_info
.resource_state
= 'failed'
153 response_info
.resource_errors
= str(e
)
154 yield from self
._dts
.query_update(response_xpath
,
155 rwdts
.XactFlag
.ADVISE
,
158 request_msg
.resource_info
= response_info
159 self
.create_record_dts(self
._link
_reg
, None, ks_path
.to_xpath(RwResourceMgrYang
.get_schema()), request_msg
)
160 elif action
== rwdts
.QueryAction
.DELETE
:
161 yield from self
._parent
.release_virtual_network(pathentry
.key00
.event_id
)
162 self
.delete_record_dts(self
._link
_reg
, None, ks_path
.to_xpath(RwResourceMgrYang
.get_schema()))
163 elif action
== rwdts
.QueryAction
.READ
:
164 response_info
= yield from self
._parent
.read_virtual_network_info(pathentry
.key00
.event_id
)
166 raise ValueError("Only read/create/delete actions available. Received action: %s" %(action))
168 self
._log
.debug("Responding with VirtualLinkInfo at xpath %s: %s.",
169 response_xpath
, response_info
)
171 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, response_xpath
, response_info
)
174 def on_vdu_request_commit(xact_info
):
175 """ The transaction has been committed """
176 self
._log
.debug("Received vdu request commit (xact_info: %s)", xact_info
)
177 return rwdts
.MemberRspCode
.ACTION_OK
179 def monitor_vdu_state(response_xpath
, pathentry
):
180 self
._log
.info("Initiating VDU state monitoring for xpath: %s ", response_xpath
)
182 for i
in range(loop_cnt
):
183 self
._log
.debug("VDU state monitoring for xpath: %s. Sleeping for 1 second", response_xpath
)
184 yield from asyncio
.sleep(1, loop
= self
._loop
)
186 response_info
= yield from self
._parent
.read_virtual_compute_info(pathentry
.key00
.event_id
)
187 except Exception as e
:
188 self
._log
.info("VDU state monitoring: Received exception %s in VDU state monitoring for %s. Aborting monitoring",
189 str(e
),response_xpath
)
190 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
191 response_info
.resource_state
= 'failed'
192 response_info
.resource_errors
= str(e
)
193 yield from self
._dts
.query_update(response_xpath
,
194 rwdts
.XactFlag
.ADVISE
,
197 if response_info
.resource_state
== 'active' or response_info
.resource_state
== 'failed':
198 self
._log
.info("VDU state monitoring: VDU reached terminal state. Publishing VDU info: %s at path: %s",
199 response_info
, response_xpath
)
200 yield from self
._dts
.query_update(response_xpath
,
201 rwdts
.XactFlag
.ADVISE
,
205 ### End of loop. This is only possible if VDU did not reach active state
206 err_msg
= "VDU state monitoring: VDU at xpath :{} did not reached active state in {} seconds. Aborting monitoring".format(response_xpath
, loop_cnt
)
207 self
._log
.info(err_msg
)
208 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
209 response_info
.resource_state
= 'failed'
210 response_info
.resource_errors
= err_msg
211 yield from self
._dts
.query_update(response_xpath
,
212 rwdts
.XactFlag
.ADVISE
,
216 def allocate_vdu_task(ks_path
, event_id
, cloud_account
, request_msg
):
217 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
218 schema
= RwResourceMgrYang
.VDUEventData().schema()
219 pathentry
= schema
.keyspec_to_entry(ks_path
)
221 response_info
= yield from self
._parent
.allocate_virtual_compute(event_id
,
224 except Exception as e
:
225 self
._log
.error("Encountered exception : %s while creating virtual compute", str(e
))
226 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
227 response_info
.resource_state
= 'failed'
228 response_info
.resource_errors
= str(e
)
229 yield from self
._dts
.query_update(response_xpath
,
230 rwdts
.XactFlag
.ADVISE
,
233 if response_info
.resource_state
== 'failed' or response_info
.resource_state
== 'active' :
234 self
._log
.info("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
235 response_info
, response_xpath
)
236 yield from self
._dts
.query_update(response_xpath
,
237 rwdts
.XactFlag
.ADVISE
,
240 asyncio
.ensure_future(monitor_vdu_state(response_xpath
, pathentry
),
245 def on_vdu_request_prepare(xact_info
, action
, ks_path
, request_msg
):
246 self
._log
.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
247 xact_info
, action
, request_msg
)
248 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
249 schema
= RwResourceMgrYang
.VDUEventData().schema()
250 pathentry
= schema
.keyspec_to_entry(ks_path
)
252 if action
== rwdts
.QueryAction
.CREATE
:
253 response_info
= RwResourceMgrYang
.VDUEventData_ResourceInfo()
254 response_info
.resource_state
= 'pending'
255 request_msg
.resource_info
= response_info
256 self
.create_record_dts(self
._vdu
_reg
,
258 ks_path
.to_xpath(RwResourceMgrYang
.get_schema()),
260 asyncio
.ensure_future(allocate_vdu_task(ks_path
,
261 pathentry
.key00
.event_id
,
262 request_msg
.cloud_account
,
263 request_msg
.request_info
),
265 elif action
== rwdts
.QueryAction
.DELETE
:
267 yield from self
._parent
.release_virtual_compute(pathentry
.key00
.event_id
)
268 self
.delete_record_dts(self
._vdu
_reg
, None, ks_path
.to_xpath(RwResourceMgrYang
.get_schema()))
269 elif action
== rwdts
.QueryAction
.READ
:
270 response_info
= yield from self
._parent
.read_virtual_compute_info(pathentry
.key00
.event_id
)
272 raise ValueError("Only create/delete actions available. Received action: %s" %(action))
274 self
._log
.debug("Responding with VDUInfo at xpath %s: %s",
275 response_xpath
, response_info
)
277 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, response_xpath
, response_info
)
281 def on_request_ready(registration
, status
):
282 self
._log
.debug("Got request ready event (registration: %s) (status: %s)",
283 registration
, status
)
285 if registration
== self
._link
_reg
:
286 self
._link
_reg
_event
.set()
287 elif registration
== self
._vdu
_reg
:
288 self
._vdu
_reg
_event
.set()
290 self
._log
.error("Unknown registration ready event: %s", registration
)
292 link_handlers
= rift
.tasklets
.Group
.Handler(on_event
=onlink_event
,)
293 with self
._dts
.group_create(handler
=link_handlers
) as link_group
:
294 self
._log
.debug("Registering for Link Resource Request using xpath: %s",
295 ResourceMgrEvent
.VLINK_REQUEST_XPATH
)
297 self
._link
_reg
= link_group
.register(xpath
=ResourceMgrEvent
.VLINK_REQUEST_XPATH
,
298 handler
=rift
.tasklets
.DTS
.RegistrationHandler(on_ready
=on_request_ready
,
299 on_commit
=on_link_request_commit
,
300 on_prepare
=on_link_request_prepare
),
301 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.DATASTORE
,)
303 vdu_handlers
= rift
.tasklets
.Group
.Handler(on_event
=onvdu_event
, )
304 with self
._dts
.group_create(handler
=vdu_handlers
) as vdu_group
:
306 self
._log
.debug("Registering for VDU Resource Request using xpath: %s",
307 ResourceMgrEvent
.VDU_REQUEST_XPATH
)
309 self
._vdu
_reg
= vdu_group
.register(xpath
=ResourceMgrEvent
.VDU_REQUEST_XPATH
,
310 handler
=rift
.tasklets
.DTS
.RegistrationHandler(on_ready
=on_request_ready
,
311 on_commit
=on_vdu_request_commit
,
312 on_prepare
=on_vdu_request_prepare
),
313 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.DATASTORE
,)