2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
23 gi
.require_version('RwVlrYang', '1.0')
24 gi
.require_version('RwDts', '1.0')
25 gi
.require_version('RwResourceMgrYang', '1.0')
26 from gi
.repository
import (
32 gi
.require_version('RwKeyspec', '1.0')
33 from gi
.repository
.RwKeyspec
import quoted_key
37 class NetworkResourceError(Exception):
38 """ Network Resource Error """
42 class VlrRecordExistsError(Exception):
43 """ VLR record already exists"""
47 class VlRecordError(Exception):
48 """ VLR record error """
52 class VirtualLinkRecordState(enum
.Enum
):
53 """ Virtual Link record state """
56 RESOURCE_ALLOC_PENDING
= 3
63 class VirtualLinkRecord(object):
65 Virtual Link Record object
67 def __init__(self
, dts
, log
, loop
, vnsm
, vlr_msg
):
72 self
._vlr
_msg
= vlr_msg
73 self
._vlr
_id
= self
._vlr
_msg
.id
75 self
._project
= vnsm
._project
76 self
._network
_id
= None
77 self
._network
_pool
= None
78 self
._assigned
_subnet
= None
79 self
._virtual
_cps
= list()
80 self
._create
_time
= int(time
.time())
82 self
._state
= VirtualLinkRecordState
.INIT
83 self
._state
_failed
_reason
= None
84 self
._name
= self
._vlr
_msg
.name
88 """ VLD xpath associated with this VLR record """
89 return self
._project
.add_project("C,/vld:vld-catalog/vld:vld[id={}]".
90 format(quoted_key(self
.vld_id
)))
94 """ VLD id associated with this VLR record """
95 return self
._vlr
_msg
.vld_ref
99 """ VLR id associated with this VLR record """
104 """ path for this VLR """
105 return self
._project
.add_project("D,/vlr:vlr-catalog"
106 "/vlr:vlr[vlr:id={}]".format(quoted_key(self
.vlr_id
)))
110 """ Name of this VLR """
114 def datacenter(self
):
115 """ RO Account to instantiate the virtual link on """
116 return self
._vlr
_msg
.datacenter
120 """ Event Identifier for this virtual link """
124 def resmgr_path(self
):
125 """ path for resource-mgr"""
126 return self
._project
.add_project("D,/rw-resource-mgr:resource-mgmt" +
127 "/vlink-event/vlink-event-data[event-id={}]".format(quoted_key(self
.event_id
)))
130 def operational_status(self
):
131 """ Operational status of this VLR"""
132 op_stats_dict
= {"INIT": "init",
133 "INSTANTIATING": "vl_alloc_pending",
134 "RESOURCE_ALLOC_PENDING": "vl_alloc_pending",
137 "TERMINATING": "vl_terminate_pending",
138 "TERMINATED": "terminated"}
140 return op_stats_dict
[self
._state
.name
]
144 """ VLR message for this VLR """
145 msg
= RwVlrYang
.YangData_RwProject_Project_VlrCatalog_Vlr()
146 msg
.copy_from(self
._vlr
_msg
)
148 if self
._network
_id
is not None:
149 msg
.network_id
= self
._network
_id
151 if self
._network
_pool
is not None:
152 msg
.network_pool
= self
._network
_pool
154 if self
._assigned
_subnet
is not None:
155 msg
.assigned_subnet
= self
._assigned
_subnet
157 if self
._virtual
_cps
:
158 for cp
in msg
.virtual_connection_points
:
159 for vcp
in self
._virtual
_cps
:
160 if cp
.name
== vcp
['name']:
161 cp
.ip_address
= vcp
['ip_address']
162 cp
.mac_address
= vcp
['mac_address']
163 cp
.connection_point_id
= vcp
['connection_point_id']
165 msg
.operational_status
= self
.operational_status
166 msg
.operational_status_details
= self
._state
_failed
_reason
167 msg
.res_id
= self
.event_id
171 def resmgr_msg(self
):
172 """ VLR message for this VLR """
173 msg
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData()
174 msg
.event_id
= self
.event_id
175 msg
.cloud_account
= self
.datacenter
176 msg
.request_info
.name
= self
.name
177 msg
.request_info
.vim_network_name
= self
._vlr
_msg
.vim_network_name
178 msg
.request_info
.provider_network
.from_dict(
179 self
._vlr
_msg
.provider_network
.as_dict()
181 if self
._vlr
_msg
.has_field('ip_profile_params'):
182 msg
.request_info
.ip_profile_params
.from_dict(self
._vlr
_msg
.ip_profile_params
.as_dict())
184 for cp
in self
._vlr
_msg
.virtual_connection_points
:
185 vcp
= msg
.request_info
.virtual_cps
.add()
186 vcp
.from_dict({k
:v
for k
,v
in cp
.as_dict().items()
187 if k
in ['name','port_security_enabled','type_yang']})
188 if (self
._vlr
_msg
.has_field('ip_profile_params')) and (self
._vlr
_msg
.ip_profile_params
.has_field('security_group')):
189 vcp
.security_group
= self
._vlr
_msg
.ip_profile_params
.security_group
194 def create_network(self
, xact
):
195 """ Create network for this VL """
196 self
._log
.debug("Creating network event-id: %s:%s", self
.event_id
, self
._vlr
_msg
)
197 network_rsp
= yield from self
.request_network(xact
, "create")
201 def delete_network(self
, xact
):
202 """ Delete network for this VL """
203 self
._log
.debug("Deleting network - event-id: %s", self
.event_id
)
204 return (yield from self
.request_network(xact
, "delete"))
207 def read_network(self
, xact
):
208 """ Read network for this VL """
209 self
._log
.debug("Reading network - event-id: %s", self
.event_id
)
210 return (yield from self
.request_network(xact
, "read"))
213 def request_network(self
, xact
, action
):
214 """Request creation/deletion network for this VL """
216 block
= xact
.block_create()
218 if action
== "create":
219 self
._log
.debug("Creating network path:%s, msg:%s",
220 self
.resmgr_path
, self
.resmgr_msg
)
221 block
.add_query_create(self
.resmgr_path
, self
.resmgr_msg
)
222 elif action
== "delete":
223 self
._log
.debug("Deleting network path:%s", self
.resmgr_path
)
224 block
.add_query_delete(self
.resmgr_path
)
225 elif action
== "read":
226 self
._log
.debug("Reading network path:%s", self
.resmgr_path
)
227 block
.add_query_read(self
.resmgr_path
)
229 raise VlRecordError("Invalid action %s received" % action
)
231 res_iter
= yield from block
.execute(now
=True)
235 if action
== "create" or action
== "read":
241 raise NetworkResourceError("Did not get a network resource response (resp: %s)", resp
)
243 if resp
.has_field('resource_info') and resp
.resource_info
.resource_state
== "failed":
244 raise NetworkResourceError(resp
.resource_info
.resource_errors
)
246 if not resp
.has_field('resource_info') :
247 raise NetworkResourceError("Did not get a valid network resource response (resp: %s)", resp
)
249 self
._log
.debug("Got network request response: %s", resp
)
254 def instantiate(self
, xact
, restart
=0):
255 """ Instantiate this VL """
256 self
._state
= VirtualLinkRecordState
.INSTANTIATING
258 self
._log
.debug("Instantiating VLR path = [%s]", self
.xpath
)
261 self
._state
= VirtualLinkRecordState
.RESOURCE_ALLOC_PENDING
265 network_resp
= yield from self
.create_network(xact
)
267 network_resp
= yield from self
.read_network(xact
)
268 if network_resp
== None:
269 network_resp
= yield from self
.create_network(xact
)
272 self
._state
= self
.vl_state_from_network_resp(network_resp
)
274 if self
._state
== VirtualLinkRecordState
.READY
:
275 # Move this VL into ready state
276 yield from self
.ready(network_resp
, xact
)
278 yield from self
.publish(xact
)
279 except Exception as e
:
280 self
._log
.error("Instantiatiation of VLR record failed: %s", str(e
))
281 self
._state
= VirtualLinkRecordState
.FAILED
282 self
._state
_failed
_reason
= str(e
)
283 yield from self
.publish(xact
)
285 def vl_state_from_network_resp(self
, network_resp
):
286 """ Determine VL state from network response """
287 if network_resp
.resource_info
.resource_state
== 'pending':
288 return VirtualLinkRecordState
.RESOURCE_ALLOC_PENDING
289 elif network_resp
.resource_info
.resource_state
== 'active':
290 return VirtualLinkRecordState
.READY
291 elif network_resp
.resource_info
.resource_state
== 'failed':
292 return VirtualLinkRecordState
.FAILED
293 return VirtualLinkRecordState
.RESOURCE_ALLOC_PENDING
296 def ready(self
, event_resp
, xact
):
297 """ This virtual link is ready """
298 # Note network_resp.virtual_link_id is CAL assigned network_id.
299 self
._log
.debug("Virtual Link id %s name %s in ready state, event_rsp:%s",
303 self
._network
_id
= event_resp
.resource_info
.virtual_link_id
304 self
._network
_pool
= event_resp
.resource_info
.pool_name
305 self
._assigned
_subnet
= event_resp
.resource_info
.subnet
306 self
._virtual
_cps
= [ vcp
.as_dict()
307 for vcp
in event_resp
.resource_info
.virtual_connection_points
]
309 yield from self
.publish(xact
)
311 self
._state
= VirtualLinkRecordState
.READY
313 yield from self
.publish(xact
)
316 def failed(self
, event_resp
, xact
):
317 """ This virtual link Failed """
318 self
._log
.debug("Virtual Link id %s name %s failed to instantiate, event_rsp:%s",
323 self
._state
= VirtualLinkRecordState
.FAILED
325 yield from self
.publish(xact
)
328 def publish(self
, xact
):
329 """ publish this VLR """
331 self
._log
.debug("Publishing VLR path = [%s], record = [%s]",
332 self
.xpath
, self
.msg
)
333 vlr
.create_time
= self
._create
_time
334 yield from self
._vnsm
.publish_vlr(xact
, self
.xpath
, self
.msg
)
335 self
._log
.debug("Published VLR path = [%s], record = [%s]",
336 self
.xpath
, self
.msg
)
339 def terminate(self
, xact
):
340 """ Terminate this VL """
341 if self
._state
not in [VirtualLinkRecordState
.READY
, VirtualLinkRecordState
.FAILED
]:
342 self
._log
.error("Ignoring terminate for VL %s is in %s state",
343 self
.vlr_id
, self
._state
)
346 if self
._state
== VirtualLinkRecordState
.READY
:
347 self
._log
.debug("Terminating VL with id %s", self
.vlr_id
)
348 self
._state
= VirtualLinkRecordState
.TERMINATING
350 yield from self
.delete_network(xact
)
352 self
._log
.exception("Caught exception while deleting VL %s", self
.vlr_id
)
353 self
._log
.debug("Terminated VL with id %s", self
.vlr_id
)
355 yield from self
.unpublish(xact
)
356 self
._state
= VirtualLinkRecordState
.TERMINATED
359 def unpublish(self
, xact
):
360 """ Unpublish this VLR """
361 self
._log
.debug("UnPublishing VLR id %s", self
.vlr_id
)
362 yield from self
._vnsm
.unpublish_vlr(xact
, self
.xpath
)
363 self
._log
.debug("UnPublished VLR id %s", self
.vlr_id
)
366 class VlrDtsHandler(object):
367 """ Handles DTS interactions for the VLR registration """
368 XPATH
= "D,/vlr:vlr-catalog/vlr:vlr"
370 def __init__(self
, dts
, log
, loop
, vnsm
):
377 self
._project
= vnsm
._project
381 """ The registration handle assocaited with this Handler"""
386 """ Register for the VLR path """
389 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
391 def instantiate_realloc_vlr(vlr
):
392 """Re-populate the virtual link information after restart
399 with self
._dts
.transaction(flags
=0) as xact
:
400 yield from vlr
.instantiate(xact
, 1)
402 if (xact_event
== rwdts
.MemberEvent
.INSTALL
):
403 curr_cfg
= self
.regh
.elements
405 vlr
= self
._vnsm
.create_vlr(cfg
)
406 self
._loop
.create_task(instantiate_realloc_vlr(vlr
))
408 self
._log
.debug("Got on_event")
409 return rwdts
.MemberRspCode
.ACTION_OK
412 def on_prepare(xact_info
, action
, ks_path
, msg
):
413 """ prepare for VLR registration"""
415 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
416 xact_info
, action
, msg
419 if action
== rwdts
.QueryAction
.CREATE
:
420 vlr
= self
._vnsm
.create_vlr(msg
)
421 with self
._dts
.transaction(flags
=0) as xact
:
422 yield from vlr
.instantiate(xact
)
423 self
._log
.debug("Responding to VL create request path:%s, msg:%s",
425 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, xpath
=vlr
.xpath
, msg
=vlr
.msg
)
427 elif action
== rwdts
.QueryAction
.DELETE
:
428 # Delete an VLR record
429 schema
= RwVlrYang
.YangData_RwProject_Project_VlrCatalog_Vlr
.schema()
430 path_entry
= schema
.keyspec_to_entry(ks_path
)
431 self
._log
.debug("Terminating VLR id %s", path_entry
.key00
.id)
432 yield from self
._vnsm
.delete_vlr(path_entry
.key00
.id, xact_info
.xact
)
434 err
= "%s action on VirtualLinkRecord not supported" % action
435 raise NotImplementedError(err
)
436 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
439 xpath
= self
._project
.add_project(VlrDtsHandler
.XPATH
)
440 self
._log
.debug("Registering for VLR using xpath: {}".
443 reg_handle
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
444 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
445 with self
._dts
.group_create(handler
=handlers
) as group
:
446 self
._regh
= group
.register(
449 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ| rwdts
.Flag
.DATASTORE
,
452 def deregister(self
):
453 self
._log
.debug("De-register VLR handler for project {}".
454 format(self
._project
.name
))
456 self
._regh
.deregister()
460 def create(self
, xact
, xpath
, msg
):
462 Create a VLR record in DTS with path and message
464 path
= self
._project
.add_project(xpath
)
465 self
._log
.debug("Creating VLR xact = %s, %s:%s",
467 self
.regh
.create_element(path
, msg
)
468 self
._log
.debug("Created VLR xact = %s, %s:%s",
472 def update(self
, xact
, xpath
, msg
):
474 Update a VLR record in DTS with path and message
476 path
= self
._project
.add_project(xpath
)
477 self
._log
.debug("Updating VLR xact = %s, %s:%s",
479 self
.regh
.update_element(path
, msg
)
480 self
._log
.debug("Updated VLR xact = %s, %s:%s",
484 def delete(self
, xact
, xpath
):
486 Delete a VLR record in DTS with path and message
488 path
= self
._project
.add_project(xpath
)
489 self
._log
.debug("Deleting VLR xact = %s, %s", xact
, path
)
490 self
.regh
.delete_element(path
)
491 self
._log
.debug("Deleted VLR xact = %s, %s", xact
, path
)
494 class VldDtsHandler(object):
495 """ DTS handler for the VLD registration """
496 XPATH
= "C,/vld:vld-catalog/vld:vld"
498 def __init__(self
, dts
, log
, loop
, vnsm
):
508 """ The registration handle assocaited with this Handler"""
513 """ Register the VLD path """
515 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
516 """ prepare callback on vld path """
518 "Got on prepare for VLD update (ks_path: %s) (action: %s)",
519 ks_path
.to_xpath(VldYang
.get_schema()), msg
)
521 schema
= VldYang
.YangData_RwProject_Project_VldCatalog_Vld
.schema()
522 path_entry
= schema
.keyspec_to_entry(ks_path
)
523 # TODO: Check why on project delete this gets called
525 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
528 vld_id
= path_entry
.key00
.id
530 disabled_actions
= [rwdts
.QueryAction
.DELETE
, rwdts
.QueryAction
.UPDATE
]
531 if query_action
not in disabled_actions
:
532 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
535 vlr
= self
._vnsm
.find_vlr_by_vld_id(vld_id
)
538 "Did not find an existing VLR record for vld %s. "
539 "Permitting %s vld action", vld_id
, query_action
)
540 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
543 raise VlrRecordExistsError(
544 "Vlr record(s) exists."
545 "Cannot perform %s action on VLD." % query_action
)
547 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
)
549 self
._regh
= yield from self
._dts
.register(
550 self
._vnsm
._project
.add_project(VldDtsHandler
.XPATH
),
551 flags
=rwdts
.Flag
.SUBSCRIBER
,
555 def deregister(self
):
556 self
._log
.debug("De-register VLD handler for project {}".
557 format(self
._vnsm
._project
.name
))
559 self
._regh
.deregister()
562 class VirtualLinkEventListener(object):
563 """ DTS Listener to listen on Virtual Link related events """
564 XPATH
= "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
565 def __init__(self
, dts
, log
, loop
, vnsm
):
574 """ The registration handle assocaited with this Handler"""
577 def event_id_from_keyspec(self
, ks
):
578 """ Get the event id from the keyspec """
579 event_pe
= RwResourceMgrYang
.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData
.schema().keyspec_to_entry(ks
)
581 # Can get just path without event id when
583 event_id
= event_pe
.key00
.event_id
584 except AttributeError:
590 """ Register the Virtual Link Event path """
592 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
593 """ prepare callback on Virtual Link Events """
596 "Got on prepare for Virtual Link Event id (ks_path: %s) (msg: %s)",
597 ks_path
.to_xpath(RwResourceMgrYang
.get_schema()), msg
)
598 event_id
= self
.event_id_from_keyspec(ks_path
)
600 if query_action
== rwdts
.QueryAction
.CREATE
or query_action
== rwdts
.QueryAction
.UPDATE
:
601 yield from self
._vnsm
.update_virual_link_event(event_id
, msg
)
602 elif query_action
== rwdts
.QueryAction
.DELETE
:
603 self
._vnsm
.delete_virual_link_event(event_id
)
604 except Exception as e
:
605 self
._log
.exception("Caught execption in Virtual Link Event handler", e
)
607 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
609 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
)
611 self
._regh
= yield from self
._dts
.register(
612 self
._vnsm
._project
.add_project(VirtualLinkEventListener
.XPATH
),
613 flags
=rwdts
.Flag
.SUBSCRIBER
,
617 def deregister(self
):
619 self
._regh
.deregister()