271ed39e57f272c299e81aaa3f62175563ad05eb
[osm/SO.git] / rwlaunchpad / plugins / rwvns / rift / vlmgr / rwvlmgr.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import enum
19 import gi
20 import time
21 import uuid
22
23 gi.require_version('RwVlrYang', '1.0')
24 gi.require_version('RwDts', '1.0')
25 gi.require_version('RwResourceMgrYang', '1.0')
26 from gi.repository import (
27 RwVlrYang,
28 VldYang,
29 RwDts as rwdts,
30 RwResourceMgrYang,
31 )
32 gi.require_version('RwKeyspec', '1.0')
33 from gi.repository.RwKeyspec import quoted_key
34 import rift.tasklets
35
36
37 class NetworkResourceError(Exception):
38 """ Network Resource Error """
39 pass
40
41
42 class VlrRecordExistsError(Exception):
43 """ VLR record already exists"""
44 pass
45
46
47 class VlRecordError(Exception):
48 """ VLR record error """
49 pass
50
51
52 class VirtualLinkRecordState(enum.Enum):
53 """ Virtual Link record state """
54 INIT = 1
55 INSTANTIATING = 2
56 RESOURCE_ALLOC_PENDING = 3
57 READY = 4
58 TERMINATING = 5
59 TERMINATED = 6
60 FAILED = 10
61
62
63 class VirtualLinkRecord(object):
64 """
65 Virtual Link Record object
66 """
67 def __init__(self, dts, log, loop, vnsm, vlr_msg):
68 self._dts = dts
69 self._log = log
70 self._loop = loop
71 self._vnsm = vnsm
72 self._vlr_msg = vlr_msg
73 self._vlr_id = self._vlr_msg.id
74
75 self._project = vnsm._project
76 self._network_id = None
77 self._network_pool = None
78 self._assigned_subnet = None
79 self._virtual_cps = list()
80 self._create_time = int(time.time())
81
82 self._state = VirtualLinkRecordState.INIT
83 self._state_failed_reason = None
84 self._name = self._vlr_msg.name
85
86 @property
87 def vld_xpath(self):
88 """ VLD xpath associated with this VLR record """
89 return self._project.add_project("C,/vld:vld-catalog/vld:vld[id={}]".
90 format(quoted_key(self.vld_id)))
91
92 @property
93 def vld_id(self):
94 """ VLD id associated with this VLR record """
95 return self._vlr_msg.vld_ref
96
97 @property
98 def vlr_id(self):
99 """ VLR id associated with this VLR record """
100 return self._vlr_id
101
102 @property
103 def xpath(self):
104 """ path for this VLR """
105 return self._project.add_project("D,/vlr:vlr-catalog"
106 "/vlr:vlr[vlr:id={}]".format(quoted_key(self.vlr_id)))
107
108 @property
109 def name(self):
110 """ Name of this VLR """
111 return self._name
112
113 @property
114 def datacenter(self):
115 """ RO Account to instantiate the virtual link on """
116 return self._vlr_msg.datacenter
117
118 @property
119 def event_id(self):
120 """ Event Identifier for this virtual link """
121 return self._vlr_id
122
123 @property
124 def resmgr_path(self):
125 """ path for resource-mgr"""
126 return self._project.add_project("D,/rw-resource-mgr:resource-mgmt" +
127 "/vlink-event/vlink-event-data[event-id={}]".format(quoted_key(self.event_id)))
128
129 @property
130 def operational_status(self):
131 """ Operational status of this VLR"""
132 op_stats_dict = {"INIT": "init",
133 "INSTANTIATING": "vl_alloc_pending",
134 "RESOURCE_ALLOC_PENDING": "vl_alloc_pending",
135 "READY": "running",
136 "FAILED": "failed",
137 "TERMINATING": "vl_terminate_pending",
138 "TERMINATED": "terminated"}
139
140 return op_stats_dict[self._state.name]
141
142 @property
143 def msg(self):
144 """ VLR message for this VLR """
145 msg = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr()
146 msg.copy_from(self._vlr_msg)
147
148 if self._network_id is not None:
149 msg.network_id = self._network_id
150
151 if self._network_pool is not None:
152 msg.network_pool = self._network_pool
153
154 if self._assigned_subnet is not None:
155 msg.assigned_subnet = self._assigned_subnet
156
157 if self._virtual_cps:
158 for cp in msg.virtual_connection_points:
159 for vcp in self._virtual_cps:
160 if cp.name == vcp['name']:
161 cp.ip_address = vcp['ip_address']
162 cp.mac_address = vcp['mac_address']
163 cp.connection_point_id = vcp['connection_point_id']
164 break
165 msg.operational_status = self.operational_status
166 msg.operational_status_details = self._state_failed_reason
167 msg.res_id = self.event_id
168 return msg
169
170 @property
171 def resmgr_msg(self):
172 """ VLR message for this VLR """
173 msg = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData()
174 msg.event_id = self.event_id
175 msg.cloud_account = self.datacenter
176 msg.request_info.name = self.name
177 msg.request_info.vim_network_name = self._vlr_msg.vim_network_name
178 msg.request_info.provider_network.from_dict(
179 self._vlr_msg.provider_network.as_dict()
180 )
181 if self._vlr_msg.has_field('ip_profile_params'):
182 msg.request_info.ip_profile_params.from_dict(self._vlr_msg.ip_profile_params.as_dict())
183
184 for cp in self._vlr_msg.virtual_connection_points:
185 vcp = msg.request_info.virtual_cps.add()
186 vcp.from_dict({k:v for k,v in cp.as_dict().items()
187 if k in ['name','port_security_enabled','type_yang']})
188 if (self._vlr_msg.has_field('ip_profile_params')) and (self._vlr_msg.ip_profile_params.has_field('security_group')):
189 vcp.security_group = self._vlr_msg.ip_profile_params.security_group
190
191 return msg
192
193 @asyncio.coroutine
194 def create_network(self, xact):
195 """ Create network for this VL """
196 self._log.debug("Creating network event-id: %s:%s", self.event_id, self._vlr_msg)
197 network_rsp = yield from self.request_network(xact, "create")
198 return network_rsp
199
200 @asyncio.coroutine
201 def delete_network(self, xact):
202 """ Delete network for this VL """
203 self._log.debug("Deleting network - event-id: %s", self.event_id)
204 return (yield from self.request_network(xact, "delete"))
205
206 @asyncio.coroutine
207 def read_network(self, xact):
208 """ Read network for this VL """
209 self._log.debug("Reading network - event-id: %s", self.event_id)
210 return (yield from self.request_network(xact, "read"))
211
212 @asyncio.coroutine
213 def request_network(self, xact, action):
214 """Request creation/deletion network for this VL """
215
216 block = xact.block_create()
217
218 if action == "create":
219 self._log.debug("Creating network path:%s, msg:%s",
220 self.resmgr_path, self.resmgr_msg)
221 block.add_query_create(self.resmgr_path, self.resmgr_msg)
222 elif action == "delete":
223 self._log.debug("Deleting network path:%s", self.resmgr_path)
224 block.add_query_delete(self.resmgr_path)
225 elif action == "read":
226 self._log.debug("Reading network path:%s", self.resmgr_path)
227 block.add_query_read(self.resmgr_path)
228 else:
229 raise VlRecordError("Invalid action %s received" % action)
230
231 res_iter = yield from block.execute(now=True)
232
233 resp = None
234
235 if action == "create" or action == "read":
236 for i in res_iter:
237 r = yield from i
238 resp = r.result
239
240 if resp is None:
241 raise NetworkResourceError("Did not get a network resource response (resp: %s)", resp)
242
243 if resp.has_field('resource_info') and resp.resource_info.resource_state == "failed":
244 raise NetworkResourceError(resp.resource_info.resource_errors)
245
246 if not resp.has_field('resource_info') :
247 raise NetworkResourceError("Did not get a valid network resource response (resp: %s)", resp)
248
249 self._log.debug("Got network request response: %s", resp)
250
251 return resp
252
253 @asyncio.coroutine
254 def instantiate(self, xact, restart=0):
255 """ Instantiate this VL """
256 self._state = VirtualLinkRecordState.INSTANTIATING
257
258 self._log.debug("Instantiating VLR path = [%s]", self.xpath)
259
260 try:
261 self._state = VirtualLinkRecordState.RESOURCE_ALLOC_PENDING
262
263 network_rsp = None
264 if restart == 0:
265 network_resp = yield from self.create_network(xact)
266 else:
267 network_resp = yield from self.read_network(xact)
268 if network_resp == None:
269 network_resp = yield from self.create_network(xact)
270
271 if network_resp:
272 self._state = self.vl_state_from_network_resp(network_resp)
273
274 if self._state == VirtualLinkRecordState.READY:
275 # Move this VL into ready state
276 yield from self.ready(network_resp, xact)
277 else:
278 yield from self.publish(xact)
279 except Exception as e:
280 self._log.error("Instantiatiation of VLR record failed: %s", str(e))
281 self._state = VirtualLinkRecordState.FAILED
282 self._state_failed_reason = str(e)
283 yield from self.publish(xact)
284
285 def vl_state_from_network_resp(self, network_resp):
286 """ Determine VL state from network response """
287 if network_resp.resource_info.resource_state == 'pending':
288 return VirtualLinkRecordState.RESOURCE_ALLOC_PENDING
289 elif network_resp.resource_info.resource_state == 'active':
290 return VirtualLinkRecordState.READY
291 elif network_resp.resource_info.resource_state == 'failed':
292 return VirtualLinkRecordState.FAILED
293 return VirtualLinkRecordState.RESOURCE_ALLOC_PENDING
294
295 @asyncio.coroutine
296 def ready(self, event_resp, xact):
297 """ This virtual link is ready """
298 # Note network_resp.virtual_link_id is CAL assigned network_id.
299 self._log.debug("Virtual Link id %s name %s in ready state, event_rsp:%s",
300 self.vlr_id,
301 self.name,
302 event_resp)
303 self._network_id = event_resp.resource_info.virtual_link_id
304 self._network_pool = event_resp.resource_info.pool_name
305 self._assigned_subnet = event_resp.resource_info.subnet
306 self._virtual_cps = [ vcp.as_dict()
307 for vcp in event_resp.resource_info.virtual_connection_points ]
308
309 yield from self.publish(xact)
310
311 self._state = VirtualLinkRecordState.READY
312
313 yield from self.publish(xact)
314
315 @asyncio.coroutine
316 def failed(self, event_resp, xact):
317 """ This virtual link Failed """
318 self._log.debug("Virtual Link id %s name %s failed to instantiate, event_rsp:%s",
319 self.vlr_id,
320 self.name,
321 event_resp)
322
323 self._state = VirtualLinkRecordState.FAILED
324
325 yield from self.publish(xact)
326
327 @asyncio.coroutine
328 def publish(self, xact):
329 """ publish this VLR """
330 vlr = self.msg
331 self._log.debug("Publishing VLR path = [%s], record = [%s]",
332 self.xpath, self.msg)
333 vlr.create_time = self._create_time
334 yield from self._vnsm.publish_vlr(xact, self.xpath, self.msg)
335 self._log.debug("Published VLR path = [%s], record = [%s]",
336 self.xpath, self.msg)
337
338 @asyncio.coroutine
339 def terminate(self, xact):
340 """ Terminate this VL """
341 if self._state not in [VirtualLinkRecordState.READY, VirtualLinkRecordState.FAILED]:
342 self._log.error("Ignoring terminate for VL %s is in %s state",
343 self.vlr_id, self._state)
344 return
345
346 if self._state == VirtualLinkRecordState.READY:
347 self._log.debug("Terminating VL with id %s", self.vlr_id)
348 self._state = VirtualLinkRecordState.TERMINATING
349 try:
350 yield from self.delete_network(xact)
351 except Exception:
352 self._log.exception("Caught exception while deleting VL %s", self.vlr_id)
353 self._log.debug("Terminated VL with id %s", self.vlr_id)
354
355 yield from self.unpublish(xact)
356 self._state = VirtualLinkRecordState.TERMINATED
357
358 @asyncio.coroutine
359 def unpublish(self, xact):
360 """ Unpublish this VLR """
361 self._log.debug("UnPublishing VLR id %s", self.vlr_id)
362 yield from self._vnsm.unpublish_vlr(xact, self.xpath)
363 self._log.debug("UnPublished VLR id %s", self.vlr_id)
364
365
366 class VlrDtsHandler(object):
367 """ Handles DTS interactions for the VLR registration """
368 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
369
370 def __init__(self, dts, log, loop, vnsm):
371 self._dts = dts
372 self._log = log
373 self._loop = loop
374 self._vnsm = vnsm
375
376 self._regh = None
377 self._project = vnsm._project
378
379 @property
380 def regh(self):
381 """ The registration handle assocaited with this Handler"""
382 return self._regh
383
384 @asyncio.coroutine
385 def register(self):
386 """ Register for the VLR path """
387
388 @asyncio.coroutine
389 def on_event(dts, g_reg, xact, xact_event, scratch_data):
390 @asyncio.coroutine
391 def instantiate_realloc_vlr(vlr):
392 """Re-populate the virtual link information after restart
393
394 Arguments:
395 vlink
396
397 """
398
399 with self._dts.transaction(flags=0) as xact:
400 yield from vlr.instantiate(xact, 1)
401
402 if (xact_event == rwdts.MemberEvent.INSTALL):
403 curr_cfg = self.regh.elements
404 for cfg in curr_cfg:
405 vlr = self._vnsm.create_vlr(cfg)
406 self._loop.create_task(instantiate_realloc_vlr(vlr))
407
408 self._log.debug("Got on_event")
409 return rwdts.MemberRspCode.ACTION_OK
410
411 @asyncio.coroutine
412 def on_prepare(xact_info, action, ks_path, msg):
413 """ prepare for VLR registration"""
414 self._log.debug(
415 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
416 xact_info, action, msg
417 )
418
419 if action == rwdts.QueryAction.CREATE:
420 vlr = self._vnsm.create_vlr(msg)
421 with self._dts.transaction(flags=0) as xact:
422 yield from vlr.instantiate(xact)
423 self._log.debug("Responding to VL create request path:%s, msg:%s",
424 vlr.xpath, vlr.msg)
425 xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath=vlr.xpath, msg=vlr.msg)
426 return
427 elif action == rwdts.QueryAction.DELETE:
428 # Delete an VLR record
429 schema = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.schema()
430 path_entry = schema.keyspec_to_entry(ks_path)
431 self._log.debug("Terminating VLR id %s", path_entry.key00.id)
432 yield from self._vnsm.delete_vlr(path_entry.key00.id, xact_info.xact)
433 else:
434 err = "%s action on VirtualLinkRecord not supported" % action
435 raise NotImplementedError(err)
436 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
437 return
438
439 xpath = self._project.add_project(VlrDtsHandler.XPATH)
440 self._log.debug("Registering for VLR using xpath: {}".
441 format(xpath))
442
443 reg_handle = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare,)
444 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
445 with self._dts.group_create(handler=handlers) as group:
446 self._regh = group.register(
447 xpath=xpath,
448 handler=reg_handle,
449 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ| rwdts.Flag.DATASTORE,
450 )
451
452 def deregister(self):
453 self._log.debug("De-register VLR handler for project {}".
454 format(self._project.name))
455 if self._regh:
456 self._regh.deregister()
457 self._regh = None
458
459 @asyncio.coroutine
460 def create(self, xact, xpath, msg):
461 """
462 Create a VLR record in DTS with path and message
463 """
464 path = self._project.add_project(xpath)
465 self._log.debug("Creating VLR xact = %s, %s:%s",
466 xact, path, msg)
467 self.regh.create_element(path, msg)
468 self._log.debug("Created VLR xact = %s, %s:%s",
469 xact, path, msg)
470
471 @asyncio.coroutine
472 def update(self, xact, xpath, msg):
473 """
474 Update a VLR record in DTS with path and message
475 """
476 path = self._project.add_project(xpath)
477 self._log.debug("Updating VLR xact = %s, %s:%s",
478 xact, path, msg)
479 self.regh.update_element(path, msg)
480 self._log.debug("Updated VLR xact = %s, %s:%s",
481 xact, path, msg)
482
483 @asyncio.coroutine
484 def delete(self, xact, xpath):
485 """
486 Delete a VLR record in DTS with path and message
487 """
488 path = self._project.add_project(xpath)
489 self._log.debug("Deleting VLR xact = %s, %s", xact, path)
490 self.regh.delete_element(path)
491 self._log.debug("Deleted VLR xact = %s, %s", xact, path)
492
493
494 class VldDtsHandler(object):
495 """ DTS handler for the VLD registration """
496 XPATH = "C,/vld:vld-catalog/vld:vld"
497
498 def __init__(self, dts, log, loop, vnsm):
499 self._dts = dts
500 self._log = log
501 self._loop = loop
502 self._vnsm = vnsm
503
504 self._regh = None
505
506 @property
507 def regh(self):
508 """ The registration handle assocaited with this Handler"""
509 return self._regh
510
511 @asyncio.coroutine
512 def register(self):
513 """ Register the VLD path """
514 @asyncio.coroutine
515 def on_prepare(xact_info, query_action, ks_path, msg):
516 """ prepare callback on vld path """
517 self._log.debug(
518 "Got on prepare for VLD update (ks_path: %s) (action: %s)",
519 ks_path.to_xpath(VldYang.get_schema()), msg)
520
521 schema = VldYang.YangData_RwProject_Project_VldCatalog_Vld.schema()
522 path_entry = schema.keyspec_to_entry(ks_path)
523 # TODO: Check why on project delete this gets called
524 if not path_entry:
525 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
526 return
527
528 vld_id = path_entry.key00.id
529
530 disabled_actions = [rwdts.QueryAction.DELETE, rwdts.QueryAction.UPDATE]
531 if query_action not in disabled_actions:
532 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
533 return
534
535 vlr = self._vnsm.find_vlr_by_vld_id(vld_id)
536 if vlr is None:
537 self._log.debug(
538 "Did not find an existing VLR record for vld %s. "
539 "Permitting %s vld action", vld_id, query_action)
540 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
541 return
542
543 raise VlrRecordExistsError(
544 "Vlr record(s) exists."
545 "Cannot perform %s action on VLD." % query_action)
546
547 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
548
549 self._regh = yield from self._dts.register(
550 self._vnsm._project.add_project(VldDtsHandler.XPATH),
551 flags=rwdts.Flag.SUBSCRIBER,
552 handler=handler
553 )
554
555 def deregister(self):
556 self._log.debug("De-register VLD handler for project {}".
557 format(self._vnsm._project.name))
558 if self._regh:
559 self._regh.deregister()
560 self._regh = None
561
562 class VirtualLinkEventListener(object):
563 """ DTS Listener to listen on Virtual Link related events """
564 XPATH = "D,/rw-resource-mgr:resource-mgmt/vlink-event/vlink-event-data"
565 def __init__(self, dts, log, loop, vnsm):
566 self._dts = dts
567 self._log = log
568 self._loop = loop
569 self._vnsm = vnsm
570 self._regh = None
571
572 @property
573 def regh(self):
574 """ The registration handle assocaited with this Handler"""
575 return self._regh
576
577 def event_id_from_keyspec(self, ks):
578 """ Get the event id from the keyspec """
579 event_pe = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData.schema().keyspec_to_entry(ks)
580 try:
581 # Can get just path without event id when
582 # deleting project
583 event_id = event_pe.key00.event_id
584 except AttributeError:
585 return None
586 return event_id
587
588 @asyncio.coroutine
589 def register(self):
590 """ Register the Virtual Link Event path """
591 @asyncio.coroutine
592 def on_prepare(xact_info, query_action, ks_path, msg):
593 """ prepare callback on Virtual Link Events """
594 try:
595 self._log.debug(
596 "Got on prepare for Virtual Link Event id (ks_path: %s) (msg: %s)",
597 ks_path.to_xpath(RwResourceMgrYang.get_schema()), msg)
598 event_id = self.event_id_from_keyspec(ks_path)
599 if event_id:
600 if query_action == rwdts.QueryAction.CREATE or query_action == rwdts.QueryAction.UPDATE:
601 yield from self._vnsm.update_virual_link_event(event_id, msg)
602 elif query_action == rwdts.QueryAction.DELETE:
603 self._vnsm.delete_virual_link_event(event_id)
604 except Exception as e:
605 self._log.exception("Caught execption in Virtual Link Event handler", e)
606
607 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
608
609 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
610
611 self._regh = yield from self._dts.register(
612 self._vnsm._project.add_project(VirtualLinkEventListener.XPATH),
613 flags=rwdts.Flag.SUBSCRIBER,
614 handler=handler
615 )
616
617 def deregister(self):
618 if self._regh:
619 self._regh.deregister()
620 self._regh = None