3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
24 gi
.require_version('RwVlrYang', '1.0')
25 gi
.require_version('RwDts', '1.0')
26 gi
.require_version('RwResourceMgrYang', '1.0')
27 from gi
.repository
import (
36 class NetworkResourceError(Exception):
37 """ Network Resource Error """
41 class VlrRecordExistsError(Exception):
42 """ VLR record already exists"""
46 class VlRecordError(Exception):
47 """ VLR record error """
51 class VirtualLinkRecordState(enum
.Enum
):
52 """ Virtual Link record state """
55 RESOURCE_ALLOC_PENDING
= 3
62 class VirtualLinkRecord(object):
64 Virtual Link Record object
66 def __init__(self
, dts
, log
, loop
, vnsm
, vlr_msg
, req_id
=None):
71 self
._vlr
_msg
= vlr_msg
73 self
._project
= vnsm
._project
74 self
._network
_id
= None
75 self
._network
_pool
= None
76 self
._assigned
_subnet
= None
77 self
._create
_time
= int(time
.time())
79 self
._request
_id
= str(uuid
.uuid4())
81 self
._request
_id
= req_id
83 self
._state
= VirtualLinkRecordState
.INIT
84 self
._state
_failed
_reason
= None
88 """ VLD xpath associated with this VLR record """
89 return self
._project
.add_project("C,/vld:vld-catalog/vld:vld[id='{}']".
94 """ VLD id associated with this VLR record """
95 return self
._vlr
_msg
.vld_ref
99 """ VLR id associated with this VLR record """
100 return self
._vlr
_msg
.id
104 """ path for this VLR """
105 return self
._project
.add_project("D,/vlr:vlr-catalog"
106 "/vlr:vlr[vlr:id='{}']".format(self
.vlr_id
))
110 """ Name of this VLR """
111 return self
._vlr
_msg
.name
114 def cloud_account_name(self
):
115 """ Cloud Account to instantiate the virtual link on """
116 return self
._vlr
_msg
.cloud_account
119 def resmgr_path(self
):
120 """ path for resource-mgr"""
121 return self
._project
.add_project("D,/rw-resource-mgr:resource-mgmt" +
122 "/vlink-event/vlink-event-data[event-id='{}']".format(self
._request
_id
))
125 def operational_status(self
):
126 """ Operational status of this VLR"""
127 op_stats_dict
= {"INIT": "init",
128 "INSTANTIATING": "vl_alloc_pending",
129 "RESOURCE_ALLOC_PENDING": "vl_alloc_pending",
132 "TERMINATING": "vl_terminate_pending",
133 "TERMINATED": "terminated"}
135 return op_stats_dict
[self
._state
.name
]
139 """ VLR message for this VLR """
140 msg
= RwVlrYang
.YangData_RwProject_Project_VlrCatalog_Vlr()
141 msg
.copy_from(self
._vlr
_msg
)
143 if self
._network
_id
is not None:
144 msg
.network_id
= self
._network
_id
146 if self
._network
_pool
is not None:
147 msg
.network_pool
= self
._network
_pool
149 if self
._assigned
_subnet
is not None:
150 msg
.assigned_subnet
= self
._assigned
_subnet
152 msg
.operational_status
= self
.operational_status
153 msg
.operational_status_details
= self
._state
_failed
_reason
154 msg
.res_id
= self
._request
_id
159 def resmgr_msg(self
):
160 """ VLR message for this VLR """
161 msg
= RwResourceMgrYang
.VirtualLinkEventData()
162 msg
.event_id
= self
._request
_id
163 msg
.cloud_account
= self
.cloud_account_name
164 msg
.request_info
.name
= self
.name
165 msg
.request_info
.vim_network_name
= self
._vlr
_msg
.vim_network_name
166 msg
.request_info
.provider_network
.from_dict(
167 self
._vlr
_msg
.provider_network
.as_dict()
169 if self
._vlr
_msg
.has_field('ip_profile_params'):
170 msg
.request_info
.ip_profile_params
.from_dict(self
._vlr
_msg
.ip_profile_params
.as_dict())
175 def create_network(self
, xact
):
176 """ Create network for this VL """
177 self
._log
.debug("Creating network req-id: %s", self
._request
_id
)
178 return (yield from self
.request_network(xact
, "create"))
181 def delete_network(self
, xact
):
182 """ Delete network for this VL """
183 self
._log
.debug("Deleting network - req-id: %s", self
._request
_id
)
184 return (yield from self
.request_network(xact
, "delete"))
187 def read_network(self
, xact
):
188 """ Read network for this VL """
189 self
._log
.debug("Reading network - req-id: %s", self
._request
_id
)
190 return (yield from self
.request_network(xact
, "read"))
193 def request_network(self
, xact
, action
):
194 """Request creation/deletion network for this VL """
196 block
= xact
.block_create()
198 if action
== "create":
199 self
._log
.debug("Creating network path:%s, msg:%s",
200 self
.resmgr_path
, self
.resmgr_msg
)
201 block
.add_query_create(self
.resmgr_path
, self
.resmgr_msg
)
202 elif action
== "delete":
203 self
._log
.debug("Deleting network path:%s", self
.resmgr_path
)
204 if self
.resmgr_msg
.request_info
.name
!= "multisite":
205 block
.add_query_delete(self
.resmgr_path
)
206 elif action
== "read":
207 self
._log
.debug("Reading network path:%s", self
.resmgr_path
)
208 block
.add_query_read(self
.resmgr_path
)
210 raise VlRecordError("Invalid action %s received" % action
)
212 res_iter
= yield from block
.execute(now
=True)
216 if action
== "create" or action
== "read":
222 raise NetworkResourceError("Did not get a network resource response (resp: %s)", resp
)
224 if resp
.has_field('resource_info') and resp
.resource_info
.resource_state
== "failed":
225 raise NetworkResourceError(resp
.resource_info
.resource_errors
)
227 if not (resp
.has_field('resource_info') and
228 resp
.resource_info
.has_field('virtual_link_id')):
229 raise NetworkResourceError("Did not get a valid network resource response (resp: %s)", resp
)
231 self
._log
.debug("Got network request response: %s", resp
)
236 def instantiate(self
, xact
, restart
=0):
237 """ Instantiate this VL """
238 self
._state
= VirtualLinkRecordState
.INSTANTIATING
240 self
._log
.debug("Instantiating VLR path = [%s]", self
.xpath
)
243 self
._state
= VirtualLinkRecordState
.RESOURCE_ALLOC_PENDING
246 network_resp
= yield from self
.create_network(xact
)
248 network_resp
= yield from self
.read_network(xact
)
249 if network_resp
== None:
250 network_resp
= yield from self
.create_network(xact
)
252 # Note network_resp.virtual_link_id is CAL assigned network_id.
254 self
._network
_id
= network_resp
.resource_info
.virtual_link_id
255 self
._network
_pool
= network_resp
.resource_info
.pool_name
256 self
._assigned
_subnet
= network_resp
.resource_info
.subnet
258 self
._state
= VirtualLinkRecordState
.READY
260 yield from self
.publish(xact
)
262 except Exception as e
:
263 self
._log
.error("Instantiatiation of VLR record failed: %s", str(e
))
264 self
._state
= VirtualLinkRecordState
.FAILED
265 self
._state
_failed
_reason
= str(e
)
266 yield from self
.publish(xact
)
269 def publish(self
, xact
):
270 """ publish this VLR """
272 self
._log
.debug("Publishing VLR path = [%s], record = [%s]",
273 self
.xpath
, self
.msg
)
274 vlr
.create_time
= self
._create
_time
275 yield from self
._vnsm
.publish_vlr(xact
, self
.xpath
, self
.msg
)
276 self
._log
.debug("Published VLR path = [%s], record = [%s]",
277 self
.xpath
, self
.msg
)
280 def terminate(self
, xact
):
281 """ Terminate this VL """
282 if self
._state
not in [VirtualLinkRecordState
.READY
, VirtualLinkRecordState
.FAILED
]:
283 self
._log
.error("Ignoring terminate for VL %s is in %s state",
284 self
.vlr_id
, self
._state
)
287 if self
._state
== VirtualLinkRecordState
.READY
:
288 self
._log
.debug("Terminating VL with id %s", self
.vlr_id
)
289 self
._state
= VirtualLinkRecordState
.TERMINATING
291 yield from self
.delete_network(xact
)
293 self
._log
.exception("Caught exception while deleting VL %s", self
.vlr_id
)
294 self
._log
.debug("Terminated VL with id %s", self
.vlr_id
)
296 yield from self
.unpublish(xact
)
297 self
._state
= VirtualLinkRecordState
.TERMINATED
300 def unpublish(self
, xact
):
301 """ Unpublish this VLR """
302 self
._log
.debug("UnPublishing VLR id %s", self
.vlr_id
)
303 yield from self
._vnsm
.unpublish_vlr(xact
, self
.xpath
)
304 self
._log
.debug("UnPublished VLR id %s", self
.vlr_id
)
307 class VlrDtsHandler(object):
308 """ Handles DTS interactions for the VLR registration """
309 XPATH
= "D,/vlr:vlr-catalog/vlr:vlr"
311 def __init__(self
, dts
, log
, loop
, vnsm
):
318 self
._project
= vnsm
._project
322 """ The registration handle assocaited with this Handler"""
327 """ Register for the VLR path """
328 def on_commit(xact_info
):
329 """ The transaction has been committed """
330 self
._log
.debug("Got vlr commit (xact_info: %s)", xact_info
)
332 return rwdts
.MemberRspCode
.ACTION_OK
335 def on_event(dts
, g_reg
, xact
, xact_event
, scratch_data
):
337 def instantiate_realloc_vlr(vlr
):
338 """Re-populate the virtual link information after restart
345 with self
._dts
.transaction(flags
=0) as xact
:
346 yield from vlr
.instantiate(xact
, 1)
348 if (xact_event
== rwdts
.MemberEvent
.INSTALL
):
349 curr_cfg
= self
.regh
.elements
351 vlr
= self
._vnsm
.create_vlr(cfg
)
352 self
._loop
.create_task(instantiate_realloc_vlr(vlr
))
354 self
._log
.debug("Got on_event")
355 return rwdts
.MemberRspCode
.ACTION_OK
358 def on_prepare(xact_info
, action
, ks_path
, msg
):
359 """ prepare for VLR registration"""
361 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
362 xact_info
, action
, msg
365 if action
== rwdts
.QueryAction
.CREATE
:
366 vlr
= self
._vnsm
.create_vlr(msg
)
367 with self
._dts
.transaction(flags
=0) as xact
:
368 yield from vlr
.instantiate(xact
)
369 self
._log
.debug("Responding to VL create request path:%s, msg:%s",
371 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, xpath
=vlr
.xpath
, msg
=vlr
.msg
)
373 elif action
== rwdts
.QueryAction
.DELETE
:
374 # Delete an VLR record
375 schema
= RwVlrYang
.YangData_RwProject_Project_VlrCatalog_Vlr
.schema()
376 path_entry
= schema
.keyspec_to_entry(ks_path
)
377 self
._log
.debug("Terminating VLR id %s", path_entry
.key00
.id)
378 yield from self
._vnsm
.delete_vlr(path_entry
.key00
.id, xact_info
.xact
)
380 err
= "%s action on VirtualLinkRecord not supported" % action
381 raise NotImplementedError(err
)
382 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
385 xpath
= self
._project
.add_project(VlrDtsHandler
.XPATH
)
386 self
._log
.debug("Registering for VLR using xpath: {}".
389 reg_handle
= rift
.tasklets
.DTS
.RegistrationHandler(
391 on_prepare
=on_prepare
,
393 handlers
= rift
.tasklets
.Group
.Handler(on_event
=on_event
,)
394 with self
._dts
.group_create(handler
=handlers
) as group
:
395 self
._regh
= group
.register(
398 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ| rwdts
.Flag
.DATASTORE
,
401 def deregister(self
):
402 self
._log
.debug("De-register VLR handler for project {}".
403 format(self
._project
.name
))
405 self
._regh
.deregister()
409 def create(self
, xact
, xpath
, msg
):
411 Create a VLR record in DTS with path and message
413 path
= self
._project
.add_project(xpath
)
414 self
._log
.debug("Creating VLR xact = %s, %s:%s",
416 self
.regh
.create_element(path
, msg
)
417 self
._log
.debug("Created VLR xact = %s, %s:%s",
421 def update(self
, xact
, xpath
, msg
):
423 Update a VLR record in DTS with path and message
425 path
= self
._project
.add_project(xpath
)
426 self
._log
.debug("Updating VLR xact = %s, %s:%s",
428 self
.regh
.update_element(path
, msg
)
429 self
._log
.debug("Updated VLR xact = %s, %s:%s",
433 def delete(self
, xact
, xpath
):
435 Delete a VLR record in DTS with path and message
437 path
= self
._project
.add_project(xpath
)
438 self
._log
.debug("Deleting VLR xact = %s, %s", xact
, path
)
439 self
.regh
.delete_element(path
)
440 self
._log
.debug("Deleted VLR xact = %s, %s", xact
, path
)
443 class VldDtsHandler(object):
444 """ DTS handler for the VLD registration """
445 XPATH
= "C,/vld:vld-catalog/vld:vld"
447 def __init__(self
, dts
, log
, loop
, vnsm
):
457 """ The registration handle assocaited with this Handler"""
462 """ Register the VLD path """
464 def on_prepare(xact_info
, query_action
, ks_path
, msg
):
465 """ prepare callback on vld path """
467 "Got on prepare for VLD update (ks_path: %s) (action: %s)",
468 ks_path
.to_xpath(VldYang
.get_schema()), msg
)
470 schema
= VldYang
.YangData_RwProject_Project_VldCatalog_Vld
.schema()
471 path_entry
= schema
.keyspec_to_entry(ks_path
)
472 # TODO: Check why on project delete this gets called
474 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
477 vld_id
= path_entry
.key00
.id
479 disabled_actions
= [rwdts
.QueryAction
.DELETE
, rwdts
.QueryAction
.UPDATE
]
480 if query_action
not in disabled_actions
:
481 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
484 vlr
= self
._vnsm
.find_vlr_by_vld_id(vld_id
)
487 "Did not find an existing VLR record for vld %s. "
488 "Permitting %s vld action", vld_id
, query_action
)
489 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
492 raise VlrRecordExistsError(
493 "Vlr record(s) exists."
494 "Cannot perform %s action on VLD." % query_action
)
496 handler
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
)
498 yield from self
._dts
.register(
499 self
._vnsm
._project
.add_project(VldDtsHandler
.XPATH
),
500 flags
=rwdts
.Flag
.SUBSCRIBER
,
504 def deregister(self
):
505 self
._log
.debug("De-register VLD handler for project {}".
506 format(self
._vnsm
._project
.name
))
508 self
._regh
.deregister()