3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 gi
.require_version('RwDts', '1.0')
23 gi
.require_version('RwYang', '1.0')
24 gi
.require_version('RwResourceMgrYang', '1.0')
25 gi
.require_version('RwLaunchpadYang', '1.0')
26 gi
.require_version('RwcalYang', '1.0')
27 from gi
.repository
import (
34 gi
.require_version('RwKeyspec', '1.0')
35 from gi
.repository
.RwKeyspec
import quoted_key
37 from gi
.repository
.RwTypes
import RwStatus
40 if sys
.version_info
< (3, 4, 4):
41 asyncio
.ensure_future
= asyncio
.async
44 class ResourceMgrEvent(object):
45 VDU_REQUEST_XPATH
= "D,/rw-resource-mgr:resource-mgmt/vdu-event/vdu-event-data"
46 VLINK_REQUEST_XPATH
= "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
48 def __init__(self
, dts
, log
, loop
, parent
):
53 self
._project
= parent
._project
57 self
._vdu
_reg
_event
= asyncio
.Event(loop
=self
._loop
)
58 self
._link
_reg
_event
= asyncio
.Event(loop
=self
._loop
)
61 def wait_ready(self
, timeout
=5):
62 self
._log
.debug("Waiting for all request registrations to become ready.")
63 yield from asyncio
.wait([self
._link
_reg
_event
.wait(), self
._vdu
_reg
_event
.wait()],
64 timeout
=timeout
, loop
=self
._loop
)
66 def _add_config_flag(self
, xpath
, config
=False):
75 def create_record_dts(self
, regh
, xact
, xpath
, msg
):
77 Create a record in DTS with path and message
79 path
= self
._add
_config
_flag
(self
._project
.add_project(xpath
))
80 self
._log
.debug("Creating Resource Record xact = %s, %s:%s",
82 regh
.create_element(path
, msg
)
84 def delete_record_dts(self
, regh
, xact
, xpath
):
86 Delete a VNFR record in DTS with path and message
88 path
= self
._add
_config
_flag
(self
._project
.add_project(xpath
))
89 self
._log
.debug("Deleting Resource Record xact = %s, %s",
91 regh
.delete_element(path
)
97 def onlink_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
99 def instantiate_realloc_vn(link
):
100 """Re-populate the virtual link information after restart
107 yield from asyncio
.sleep(3, loop
=self
._loop
)
110 response_info
= yield from self
._parent
.reallocate_virtual_network(
113 link
.request_info
, link
.resource_info
,
115 except Exception as e
:
116 self
._log
.error("Encoutered exception in reallocate_virtual_network")
117 self
._log
.exception(e
)
120 if (xact_event
== rwdts
.MemberEvent
.INSTALL
):
121 link_cfg
= self
._link
_reg
.elements
122 self
._log
.debug("onlink_event INSTALL event: {}".format(link_cfg
))
124 for link
in link_cfg
:
125 self
._loop
.create_task(instantiate_realloc_vn(link
))
127 self
._log
.debug("onlink_event INSTALL event complete")
129 return rwdts
.MemberRspCode
.ACTION_OK
132 def onvdu_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
134 def instantiate_realloc_vdu(vdu
):
135 """Re-populate the VDU information after restart
142 yield from asyncio
.sleep(3, loop
=self
._loop
)
145 response_info
= yield from self
._parent
.allocate_virtual_compute(
150 except Exception as e
:
151 self
._log
.error("Encoutered exception in allocate_virtual_network")
152 self
._log
.exception(e
)
155 response_xpath
= "/rw-resource-mgr:resource-mgmt/rw-resource-mgr:vdu-event/rw-resource-mgr:vdu-event-data[rw-resource-mgr:event-id={}]/resource-info".format(
156 quoted_key(vdu
.event_id
.strip()))
158 cloud_account
= self
._parent
.get_cloud_account_detail(cloud_account
)
159 asyncio
.ensure_future(monitor_vdu_state(response_xpath
, vdu
.event_id
, cloud_account
.vdu_instance_timeout
), loop
=self
._loop
)
161 if (xact_event
== rwdts
.MemberEvent
.INSTALL
):
162 vdu_cfg
= self
._vdu
_reg
.elements
163 self
._log
.debug("onvdu_event INSTALL event: {}".format(vdu_cfg
))
166 self
._loop
.create_task(instantiate_realloc_vdu(vdu
))
168 self
._log
.debug("onvdu_event INSTALL event complete")
170 return rwdts
.MemberRspCode
.ACTION_OK
173 def allocate_vlink_task(ks_path
, event_id
, cloud_account
, request_info
):
174 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
175 schema
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData().schema()
176 pathentry
= schema
.keyspec_to_entry(ks_path
)
178 response_info
= yield from self
._parent
.allocate_virtual_network(pathentry
.key00
.event_id
,
181 except Exception as e
:
182 self
._log
.error("Encountered exception: %s while creating virtual network", str(e
))
183 self
._log
.exception(e
)
184 response_info
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData_ResourceInfo()
185 response_info
.resource_state
= 'failed'
186 response_info
.resource_errors
= str(e
)
187 yield from self
._dts
.query_update(response_xpath
,
188 rwdts
.XactFlag
.ADVISE
,
191 yield from self
._dts
.query_update(response_xpath
,
192 rwdts
.XactFlag
.ADVISE
,
197 def on_link_request_prepare(xact_info
, action
, ks_path
, request_msg
):
199 "Received virtual-link on_prepare callback (xact_info: %s, action: %s): %s",
200 xact_info
, action
, request_msg
)
203 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
205 schema
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData().schema()
206 pathentry
= schema
.keyspec_to_entry(ks_path
)
208 if action
== rwdts
.QueryAction
.CREATE
:
210 response_info
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData_ResourceInfo()
211 response_info
.resource_state
= 'pending'
212 request_msg
.resource_info
= response_info
213 self
.create_record_dts(self
._link
_reg
,
215 ks_path
.to_xpath(RwResourceMgrYang
.get_schema()),
218 asyncio
.ensure_future(allocate_vlink_task(ks_path
,
219 pathentry
.key00
.event_id
,
220 request_msg
.cloud_account
,
221 request_msg
.request_info
),
223 except Exception as e
:
225 "Encountered exception: %s while creating virtual network", str(e
))
226 self
._log
.exception(e
)
227 response_info
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData_ResourceInfo()
228 response_info
.resource_state
= 'failed'
229 response_info
.resource_errors
= str(e
)
230 yield from self
._dts
.query_update(response_xpath
,
231 rwdts
.XactFlag
.ADVISE
,
233 elif action
== rwdts
.QueryAction
.DELETE
:
234 yield from self
._parent
.release_virtual_network(pathentry
.key00
.event_id
)
235 self
.delete_record_dts(self
._link
_reg
, None,
236 ks_path
.to_xpath(RwResourceMgrYang
.get_schema()))
238 elif action
== rwdts
.QueryAction
.READ
:
239 # TODO: Check why we are getting null event id request
240 if pathentry
.key00
.event_id
:
241 response_info
= yield from self
._parent
.read_virtual_network_info(pathentry
.key00
.event_id
)
243 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
247 "Only read/create/delete actions available. Received action: %s" %(action))
249 self
._log
.info("Responding with VirtualLinkInfo at xpath %s: %s.",
250 response_xpath
, response_info
)
252 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, response_xpath
, response_info
)
256 def monitor_vdu_state(response_xpath
, event_id
, vdu_timeout
):
257 self
._log
.info("Initiating VDU state monitoring for xpath: %s ", response_xpath
)
259 loop_cnt
= int(vdu_timeout
/sleep_time
)
261 for i
in range(loop_cnt
):
263 "VDU state monitoring for xpath: %s. Sleeping for 2 second", response_xpath
)
264 yield from asyncio
.sleep(2, loop
= self
._loop
)
267 response_info
= yield from self
._parent
.read_virtual_compute_info(event_id
)
268 except Exception as e
:
270 "VDU state monitoring: Received exception %s in VDU state monitoring for %s. Aborting monitoring", str(e
),response_xpath
)
272 response_info
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
273 response_info
.resource_state
= 'failed'
274 response_info
.resource_errors
= str(e
)
275 yield from self
._dts
.query_update(response_xpath
,
276 rwdts
.XactFlag
.ADVISE
,
279 if response_info
.resource_state
== 'active' or response_info
.resource_state
== 'failed':
280 self
._log
.info("VDU state monitoring: VDU reached terminal state. " +
281 "Publishing VDU info: %s at path: %s",
282 response_info
, response_xpath
)
283 yield from self
._dts
.query_update(response_xpath
,
284 rwdts
.XactFlag
.ADVISE
,
288 ### End of loop. This is only possible if VDU did not reach active state
289 err_msg
= ("VDU state monitoring: VDU at xpath :{} did not reached active " +
290 "state in {} seconds. Aborting monitoring".
291 format(response_xpath
, time_to_wait
))
292 self
._log
.info(err_msg
)
293 response_info
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
294 response_info
.resource_state
= 'failed'
295 response_info
.resource_errors
= err_msg
296 yield from self
._dts
.query_update(response_xpath
,
297 rwdts
.XactFlag
.ADVISE
,
301 def allocate_vdu_task(ks_path
, event_id
, cloud_account
, request_msg
):
302 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
303 response_xpath
= self
._add
_config
_flag
(response_xpath
)
304 schema
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData().schema()
305 pathentry
= schema
.keyspec_to_entry(ks_path
)
307 response_info
= yield from self
._parent
.allocate_virtual_compute(event_id
,
310 except Exception as e
:
311 self
._log
.error("Encountered exception : %s while creating virtual compute", str(e
))
312 response_info
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
313 response_info
.resource_state
= 'failed'
314 response_info
.resource_errors
= str(e
)
315 yield from self
._dts
.query_update(response_xpath
,
316 rwdts
.XactFlag
.ADVISE
,
319 cloud_account
= self
._parent
.get_cloud_account_detail(cloud_account
)
320 #RIFT-17719 - Set the resource state to active if no floating ip pool specified and is waiting for public ip.
321 if response_info
.resource_state
== 'pending' and cloud_account
.has_field('openstack') \
322 and not (cloud_account
.openstack
.has_field('floating_ip_pool')) :
323 if (request_msg
.has_field('allocate_public_address')) and (request_msg
.allocate_public_address
== True):
324 if not response_info
.has_field('public_ip'):
325 response_info
.resource_state
= 'active'
327 if response_info
.resource_state
== 'failed' or response_info
.resource_state
== 'active' :
328 self
._log
.debug("Virtual compute create task completed. Publishing VDU info: %s at path: %s",
329 response_info
, response_xpath
)
330 yield from self
._dts
.query_update(response_xpath
,
331 rwdts
.XactFlag
.ADVISE
,
334 asyncio
.ensure_future(monitor_vdu_state(response_xpath
, pathentry
.key00
.event_id
, cloud_account
.vdu_instance_timeout
),
338 def on_vdu_request_prepare(xact_info
, action
, ks_path
, request_msg
):
339 self
._log
.debug("Received vdu on_prepare callback (xact_info: %s, action: %s): %s",
340 xact_info
, action
, request_msg
)
341 response_xpath
= ks_path
.to_xpath(RwResourceMgrYang
.get_schema()) + "/resource-info"
342 response_xpath
= self
._add
_config
_flag
(response_xpath
)
343 schema
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData().schema()
344 pathentry
= schema
.keyspec_to_entry(ks_path
)
346 if action
== rwdts
.QueryAction
.CREATE
:
347 response_info
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
348 response_info
.resource_state
= 'pending'
349 request_msg
.resource_info
= response_info
350 self
.create_record_dts(self
._vdu
_reg
,
352 ks_path
.to_xpath(RwResourceMgrYang
.get_schema()),
354 asyncio
.ensure_future(allocate_vdu_task(ks_path
,
355 pathentry
.key00
.event_id
,
356 request_msg
.cloud_account
,
357 request_msg
.request_info
),
359 elif action
== rwdts
.QueryAction
.DELETE
:
361 yield from self
._parent
.release_virtual_compute(pathentry
.key00
.event_id
)
362 self
.delete_record_dts(self
._vdu
_reg
, None, ks_path
.to_xpath(RwResourceMgrYang
.get_schema()))
363 elif action
== rwdts
.QueryAction
.READ
:
364 # TODO: Check why we are getting null event id request
365 if pathentry
.key00
.event_id
:
366 response_info
= yield from self
._parent
.read_virtual_compute_info(pathentry
.key00
.event_id
)
368 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
)
371 raise ValueError("Only create/delete actions available. Received action: %s" %(action))
373 self
._log
.debug("Responding with VDUInfo at xpath %s: %s",
374 response_xpath
, response_info
)
376 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, response_xpath
, response_info
)
380 def on_request_ready(registration
, status
):
381 self
._log
.debug("Got request ready event (registration: %s) (status: %s)",
382 registration
, status
)
384 if registration
== self
._link
_reg
:
385 self
._link
_reg
_event
.set()
386 elif registration
== self
._vdu
_reg
:
387 self
._vdu
_reg
_event
.set()
389 self
._log
.error("Unknown registration ready event: %s", registration
)
391 link_handlers
= rift
.tasklets
.Group
.Handler(on_event
=onlink_event
,)
392 with self
._dts
.group_create(handler
=link_handlers
) as link_group
:
393 xpath
= self
._project
.add_project(ResourceMgrEvent
.VLINK_REQUEST_XPATH
)
394 self
._log
.debug("Registering for Link Resource Request using xpath: {}".
397 self
._link
_reg
= link_group
.register(xpath
=xpath
,
398 handler
=rift
.tasklets
.DTS
.RegistrationHandler(on_ready
=on_request_ready
,
399 on_prepare
=on_link_request_prepare
),
400 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.DATASTORE
,)
402 vdu_handlers
= rift
.tasklets
.Group
.Handler(on_event
=onvdu_event
, )
403 with self
._dts
.group_create(handler
=vdu_handlers
) as vdu_group
:
405 xpath
= self
._project
.add_project(ResourceMgrEvent
.VDU_REQUEST_XPATH
)
406 self
._log
.debug("Registering for VDU Resource Request using xpath: {}".
409 self
._vdu
_reg
= vdu_group
.register(xpath
=xpath
,
410 handler
=rift
.tasklets
.DTS
.RegistrationHandler(on_ready
=on_request_ready
,
411 on_prepare
=on_vdu_request_prepare
),
412 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.DATASTORE
,)
415 def deregister(self
):
416 self
._log
.debug("De-register for project {}".format(self
._project
.name
))
419 self
._vdu
_reg
.deregister()
423 self
._link
_reg
.deregister()
424 self
._link
_reg
= None