3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
24 gi
.require_version('RwVlrYang', '1.0')
25 gi
.require_version('RwDts', '1.0')
26 gi
.require_version('RwResourceMgrYang', '1.0')
27 from gi
.repository
import (
36 class NetworkResourceError(Exception):
37 """ Network Resource Error """
41 class VlrRecordExistsError(Exception):
42 """ VLR record already exists"""
46 class VlRecordError(Exception):
47 """ VLR record error """
51 class VirtualLinkRecordState(enum
.Enum
):
52 """ Virtual Link record state """
55 RESOURCE_ALLOC_PENDING
= 3
62 class VirtualLinkRecord(object):
64 Virtual Link Record object
66 def __init__(self
, dts
, log
, loop
, vnsm
, vlr_msg
, req_id
=None):
71 self
._vlr
_msg
= vlr_msg
73 self
._network
_id
= None
74 self
._network
_pool
= None
75 self
._assigned
_subnet
= None
76 self
._create
_time
= int(time
.time())
78 self
._request
_id
= str(uuid
.uuid4())
80 self
._request
_id
= req_id
82 self
._state
= VirtualLinkRecordState
.INIT
83 self
._state
_failed
_reason
= None
87 """ VLD xpath associated with this VLR record """
88 return "C,/vld:vld-catalog/vld:vld[id='{}']".format(self
.vld_id
)
92 """ VLD id associated with this VLR record """
93 return self
._vlr
_msg
.vld_ref
97 """ VLR id associated with this VLR record """
98 return self
._vlr
_msg
.id
102 """ path for this VLR """
103 return("D,/vlr:vlr-catalog"
104 "/vlr:vlr[vlr:id='{}']".format(self
.vlr_id
))
108 """ Name of this VLR """
109 return self
._vlr
_msg
.name
112 def cloud_account_name(self
):
113 """ Cloud Account to instantiate the virtual link on """
114 return self
._vlr
_msg
.cloud_account
117 def resmgr_path(self
):
118 """ path for resource-mgr"""
119 return ("D,/rw-resource-mgr:resource-mgmt" +
120 "/vlink-event/vlink-event-data[event-id='{}']".format(self
._request
_id
))
123 def operational_status(self
):
124 """ Operational status of this VLR"""
125 op_stats_dict
= {"INIT": "init",
126 "INSTANTIATING": "vl_alloc_pending",
127 "RESOURCE_ALLOC_PENDING": "vl_alloc_pending",
130 "TERMINATING": "vl_terminate_pending",
131 "TERMINATED": "terminated"}
133 return op_stats_dict
[self
._state
.name
]
137 """ VLR message for this VLR """
138 msg
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr()
139 msg
.copy_from(self
._vlr
_msg
)
141 if self
._network
_id
is not None:
142 msg
.network_id
= self
._network
_id
144 if self
._network
_pool
is not None:
145 msg
.network_pool
= self
._network
_pool
147 if self
._assigned
_subnet
is not None:
148 msg
.assigned_subnet
= self
._assigned
_subnet
150 msg
.operational_status
= self
.operational_status
151 msg
.operational_status_details
= self
._state
_failed
_reason
152 msg
.res_id
= self
._request
_id
157 def resmgr_msg(self
):
158 """ VLR message for this VLR """
159 msg
= RwResourceMgrYang
.VirtualLinkEventData()
160 msg
.event_id
= self
._request
_id
161 msg
.cloud_account
= self
.cloud_account_name
162 msg
.request_info
.name
= self
.name
163 msg
.request_info
.vim_network_name
= self
._vlr
_msg
.vim_network_name
164 msg
.request_info
.provider_network
.from_dict(
165 self
._vlr
_msg
.provider_network
.as_dict()
167 if self
._vlr
_msg
.has_field('ip_profile_params'):
168 msg
.request_info
.ip_profile_params
.from_dict(self
._vlr
_msg
.ip_profile_params
.as_dict())
173 def create_network(self
, xact
):
174 """ Create network for this VL """
175 self
._log
.debug("Creating network req-id: %s", self
._request
_id
)
176 return (yield from self
.request_network(xact
, "create"))
179 def delete_network(self
, xact
):
180 """ Delete network for this VL """
181 self
._log
.debug("Deleting network - req-id: %s", self
._request
_id
)
182 return (yield from self
.request_network(xact
, "delete"))
185 def read_network(self
, xact
):
186 """ Read network for this VL """
187 self
._log
.debug("Reading network - req-id: %s", self
._request
_id
)
188 return (yield from self
.request_network(xact
, "read"))
191 def request_network(self
, xact
, action
):
192 """Request creation/deletion network for this VL """
194 block
= xact
.block_create()
196 if action
== "create":
197 self
._log
.debug("Creating network path:%s, msg:%s",
198 self
.resmgr_path
, self
.resmgr_msg
)
199 block
.add_query_create(self
.resmgr_path
, self
.resmgr_msg
)
200 elif action
== "delete":
201 self
._log
.debug("Deleting network path:%s", self
.resmgr_path
)
202 if self
.resmgr_msg
.request_info
.name
!= "multisite":
203 block
.add_query_delete(self
.resmgr_path
)
204 elif action
== "read":
205 self
._log
.debug("Reading network path:%s", self
.resmgr_path
)
206 block
.add_query_read(self
.resmgr_path
)
208 raise VlRecordError("Invalid action %s received" % action
)
210 res_iter
= yield from block
.execute(now
=True)
214 if action
== "create" or action
== "read":
220 raise NetworkResourceError("Did not get a network resource response (resp: %s)", resp
)
222 if resp
.has_field('resource_info') and resp
.resource_info
.resource_state
== "failed":
223 raise NetworkResourceError(resp
.resource_info
.resource_errors
)
225 if not (resp
.has_field('resource_info') and
226 resp
.resource_info
.has_field('virtual_link_id')):
227 raise NetworkResourceError("Did not get a valid network resource response (resp: %s)", resp
)
229 self
._log
.debug("Got network request response: %s", resp
)
234 def instantiate(self
, xact
, restart
=0):
235 """ Instantiate this VL """
236 self
._state
= VirtualLinkRecordState
.INSTANTIATING
238 self
._log
.debug("Instantiating VLR path = [%s]", self
.xpath
)
241 self
._state
= VirtualLinkRecordState
.RESOURCE_ALLOC_PENDING
244 network_resp
= yield from self
.create_network(xact
)
246 network_resp
= yield from self
.read_network(xact
)
247 if network_resp
== None:
248 network_resp
= yield from self
.create_network(xact
)
250 # Note network_resp.virtual_link_id is CAL assigned network_id.
252 self
._network
_id
= network_resp
.resource_info
.virtual_link_id
253 self
._network
_pool
= network_resp
.resource_info
.pool_name
254 self
._assigned
_subnet
= network_resp
.resource_info
.subnet
256 self
._state
= VirtualLinkRecordState
.READY
258 yield from self
.publish(xact
)
260 except Exception as e
:
261 self
._log
.error("Instantiatiation of VLR record failed: %s", str(e
))
262 self
._state
= VirtualLinkRecordState
.FAILED
263 self
._state
_failed
_reason
= str(e
)
264 yield from self
.publish(xact
)
267 def publish(self
, xact
):
268 """ publish this VLR """
270 self
._log
.debug("Publishing VLR path = [%s], record = [%s]",
271 self
.xpath
, self
.msg
)
272 vlr
.create_time
= self
._create
_time
273 yield from self
._vnsm
.publish_vlr(xact
, self
.xpath
, self
.msg
)
274 self
._log
.debug("Published VLR path = [%s], record = [%s]",
275 self
.xpath
, self
.msg
)
278 def terminate(self
, xact
):
279 """ Terminate this VL """
280 if self
._state
not in [VirtualLinkRecordState
.READY
, VirtualLinkRecordState
.FAILED
]:
281 self
._log
.error("Ignoring terminate for VL %s is in %s state",
282 self
.vlr_id
, self
._state
)
285 if self
._state
== VirtualLinkRecordState
.READY
:
286 self
._log
.debug("Terminating VL with id %s", self
.vlr_id
)
287 self
._state
= VirtualLinkRecordState
.TERMINATING
289 yield from self
.delete_network(xact
)
291 self
._log
.exception("Caught exception while deleting VL %s", self
.vlr_id
)
292 self
._log
.debug("Terminated VL with id %s", self
.vlr_id
)
294 yield from self
.unpublish(xact
)
295 self
._state
= VirtualLinkRecordState
.TERMINATED
298 def unpublish(self
, xact
):
299 """ Unpublish this VLR """
300 self
._log
.debug("UnPublishing VLR id %s", self
.vlr_id
)
301 yield from self
._vnsm
.unpublish_vlr(xact
, self
.xpath
)
302 self
._log
.debug("UnPublished VLR id %s", self
.vlr_id
)
305 class VlrDtsHandler(object):
306 """ Handles DTS interactions for the VLR registration """
307 XPATH
= "D,/vlr:vlr-catalog/vlr:vlr"
309 def __init__(self
, dts
, log
, loop
, vnsm
):
319 """ The registration handle assocaited with this Handler"""
324 """ Register for the VLR path """
325 def on_commit(xact_info
):
326 """ The transaction has been committed """
327 self
._log
.debug("Got vlr commit (xact_info: %s)", xact_info
)
329 return rwdts
.MemberRspCode
.ACTION_OK
332 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
334 def instantiate_realloc_vlr(vlr
):
335 """Re-populate the virtual link information after restart
342 with self
._dts
.transaction(flags
=0) as xact
:
343 yield from vlr
.instantiate(xact
, 1)
345 if (xact_event
== rwdts
.MemberEvent
.INSTALL
):
346 curr_cfg
= self
.regh
.elements
348 vlr
= self
._vnsm
.create_vlr(cfg
)
349 self
._loop
.create_task(instantiate_realloc_vlr(vlr
))
351 self
._log
.debug("Got on_event")
352 return rwdts
.MemberRspCode
.ACTION_OK
355 def on_prepare(xact_info
, action
, ks_path
, msg
):
356 """ prepare for VLR registration"""
358 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
359 xact_info
, action
, msg
362 if action
== rwdts
.QueryAction
.CREATE
:
363 vlr
= self
._vnsm
.create_vlr(msg
)
364 with self
._dts
.transaction(flags
=0) as xact
:
365 yield from vlr
.instantiate(xact
)
366 self
._log
.debug("Responding to VL create request path:%s, msg:%s",
368 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, xpath
=vlr
.xpath
, msg
=vlr
.msg
)
370 elif action
== rwdts
.QueryAction
.DELETE
:
371 # Delete an VLR record
372 schema
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.schema()
373 path_entry
= schema
.keyspec_to_entry(ks_path
)
374 self
._log
.debug("Terminating VLR id %s", path_entry
.key00
.id)
375 yield from self
._vnsm
.delete_vlr(path_entry
.key00
.id, xact_info
.xact
)
377 err
= "%s action on VirtualLinkRecord not supported" % action
378 raise NotImplementedError(err
)
379 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
382 self
._log
.debug("Registering for VLR using xpath: %s",
385 reg_handle
= rift
.tasklets
.DTS
.RegistrationHandler(
387 on_prepare
=on_prepare
,
389 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
390 with self
._dts
.group_create(handler
=handlers
) as group
:
391 self
._regh
= group
.register(
392 xpath
=VlrDtsHandler
.XPATH
,
394 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ| rwdts
.Flag
.DATASTORE
,
398 def create(self
, xact
, path
, msg
):
400 Create a VLR record in DTS with path and message
402 self
._log
.debug("Creating VLR xact = %s, %s:%s",
404 self
.regh
.create_element(path
, msg
)
405 self
._log
.debug("Created VLR xact = %s, %s:%s",
409 def update(self
, xact
, path
, msg
):
411 Update a VLR record in DTS with path and message
413 self
._log
.debug("Updating VLR xact = %s, %s:%s",
415 self
.regh
.update_element(path
, msg
)
416 self
._log
.debug("Updated VLR xact = %s, %s:%s",
420 def delete(self
, xact
, path
):
422 Delete a VLR record in DTS with path and message
424 self
._log
.debug("Deleting VLR xact = %s, %s", xact
, path
)
425 self
.regh
.delete_element(path
)
426 self
._log
.debug("Deleted VLR xact = %s, %s", xact
, path
)
429 class VldDtsHandler(object):
430 """ DTS handler for the VLD registration """
431 XPATH
= "C,/vld:vld-catalog/vld:vld"
433 def __init__(self
, dts
, log
, loop
, vnsm
):
443 """ The registration handle assocaited with this Handler"""
448 """ Register the VLD path """
450 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
451 """ prepare callback on vld path """
453 "Got on prepare for VLD update (ks_path: %s) (action: %s)",
454 ks_path
.to_xpath(VldYang
.get_schema()), msg
)
456 schema
= VldYang
.YangData_Vld_VldCatalog_Vld
.schema()
457 path_entry
= schema
.keyspec_to_entry(ks_path
)
458 vld_id
= path_entry
.key00
.id
460 disabled_actions
= [rwdts
.QueryAction
.DELETE
, rwdts
.QueryAction
.UPDATE
]
461 if query_action
not in disabled_actions
:
462 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
465 vlr
= self
._vnsm
.find_vlr_by_vld_id(vld_id
)
468 "Did not find an existing VLR record for vld %s. "
469 "Permitting %s vld action", vld_id
, query_action
)
470 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
473 raise VlrRecordExistsError(
474 "Vlr record(s) exists."
475 "Cannot perform %s action on VLD." % query_action
)
477 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
)
479 yield from self
._dts
.register(
481 flags
=rwdts
.Flag
.SUBSCRIBER
,