Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwvns / rift / vlmgr / rwvlmgr.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import enum
20 import uuid
21 import time
22
23 import gi
24 gi.require_version('RwVlrYang', '1.0')
25 gi.require_version('RwDts', '1.0')
26 gi.require_version('RwResourceMgrYang', '1.0')
27 from gi.repository import (
28 RwVlrYang,
29 VldYang,
30 RwDts as rwdts,
31 RwResourceMgrYang,
32 )
33 import rift.tasklets
34
35
36 class NetworkResourceError(Exception):
37 """ Network Resource Error """
38 pass
39
40
41 class VlrRecordExistsError(Exception):
42 """ VLR record already exists"""
43 pass
44
45
46 class VlRecordError(Exception):
47 """ VLR record error """
48 pass
49
50
51 class VirtualLinkRecordState(enum.Enum):
52 """ Virtual Link record state """
53 INIT = 1
54 INSTANTIATING = 2
55 RESOURCE_ALLOC_PENDING = 3
56 READY = 4
57 TERMINATING = 5
58 TERMINATED = 6
59 FAILED = 10
60
61
62 class VirtualLinkRecord(object):
63 """
64 Virtual Link Record object
65 """
66 def __init__(self, dts, log, loop, vnsm, vlr_msg, req_id=None):
67 self._dts = dts
68 self._log = log
69 self._loop = loop
70 self._vnsm = vnsm
71 self._vlr_msg = vlr_msg
72
73 self._project = vnsm._project
74 self._network_id = None
75 self._network_pool = None
76 self._assigned_subnet = None
77 self._create_time = int(time.time())
78 if req_id == None:
79 self._request_id = str(uuid.uuid4())
80 else:
81 self._request_id = req_id
82
83 self._state = VirtualLinkRecordState.INIT
84 self._state_failed_reason = None
85
86 @property
87 def vld_xpath(self):
88 """ VLD xpath associated with this VLR record """
89 return self._project.add_project("C,/vld:vld-catalog/vld:vld[id='{}']".
90 format(self.vld_id))
91
92 @property
93 def vld_id(self):
94 """ VLD id associated with this VLR record """
95 return self._vlr_msg.vld_ref
96
97 @property
98 def vlr_id(self):
99 """ VLR id associated with this VLR record """
100 return self._vlr_msg.id
101
102 @property
103 def xpath(self):
104 """ path for this VLR """
105 return self._project.add_project("D,/vlr:vlr-catalog"
106 "/vlr:vlr[vlr:id='{}']".format(self.vlr_id))
107
108 @property
109 def name(self):
110 """ Name of this VLR """
111 return self._vlr_msg.name
112
113 @property
114 def cloud_account_name(self):
115 """ Cloud Account to instantiate the virtual link on """
116 return self._vlr_msg.cloud_account
117
118 @property
119 def resmgr_path(self):
120 """ path for resource-mgr"""
121 return self._project.add_project("D,/rw-resource-mgr:resource-mgmt" +
122 "/vlink-event/vlink-event-data[event-id='{}']".format(self._request_id))
123
124 @property
125 def operational_status(self):
126 """ Operational status of this VLR"""
127 op_stats_dict = {"INIT": "init",
128 "INSTANTIATING": "vl_alloc_pending",
129 "RESOURCE_ALLOC_PENDING": "vl_alloc_pending",
130 "READY": "running",
131 "FAILED": "failed",
132 "TERMINATING": "vl_terminate_pending",
133 "TERMINATED": "terminated"}
134
135 return op_stats_dict[self._state.name]
136
137 @property
138 def msg(self):
139 """ VLR message for this VLR """
140 msg = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr()
141 msg.copy_from(self._vlr_msg)
142
143 if self._network_id is not None:
144 msg.network_id = self._network_id
145
146 if self._network_pool is not None:
147 msg.network_pool = self._network_pool
148
149 if self._assigned_subnet is not None:
150 msg.assigned_subnet = self._assigned_subnet
151
152 msg.operational_status = self.operational_status
153 msg.operational_status_details = self._state_failed_reason
154 msg.res_id = self._request_id
155
156 return msg
157
158 @property
159 def resmgr_msg(self):
160 """ VLR message for this VLR """
161 msg = RwResourceMgrYang.VirtualLinkEventData()
162 msg.event_id = self._request_id
163 msg.cloud_account = self.cloud_account_name
164 msg.request_info.name = self.name
165 msg.request_info.vim_network_name = self._vlr_msg.vim_network_name
166 msg.request_info.provider_network.from_dict(
167 self._vlr_msg.provider_network.as_dict()
168 )
169 if self._vlr_msg.has_field('ip_profile_params'):
170 msg.request_info.ip_profile_params.from_dict(self._vlr_msg.ip_profile_params.as_dict())
171
172 return msg
173
174 @asyncio.coroutine
175 def create_network(self, xact):
176 """ Create network for this VL """
177 self._log.debug("Creating network req-id: %s", self._request_id)
178 return (yield from self.request_network(xact, "create"))
179
180 @asyncio.coroutine
181 def delete_network(self, xact):
182 """ Delete network for this VL """
183 self._log.debug("Deleting network - req-id: %s", self._request_id)
184 return (yield from self.request_network(xact, "delete"))
185
186 @asyncio.coroutine
187 def read_network(self, xact):
188 """ Read network for this VL """
189 self._log.debug("Reading network - req-id: %s", self._request_id)
190 return (yield from self.request_network(xact, "read"))
191
192 @asyncio.coroutine
193 def request_network(self, xact, action):
194 """Request creation/deletion network for this VL """
195
196 block = xact.block_create()
197
198 if action == "create":
199 self._log.debug("Creating network path:%s, msg:%s",
200 self.resmgr_path, self.resmgr_msg)
201 block.add_query_create(self.resmgr_path, self.resmgr_msg)
202 elif action == "delete":
203 self._log.debug("Deleting network path:%s", self.resmgr_path)
204 if self.resmgr_msg.request_info.name != "multisite":
205 block.add_query_delete(self.resmgr_path)
206 elif action == "read":
207 self._log.debug("Reading network path:%s", self.resmgr_path)
208 block.add_query_read(self.resmgr_path)
209 else:
210 raise VlRecordError("Invalid action %s received" % action)
211
212 res_iter = yield from block.execute(now=True)
213
214 resp = None
215
216 if action == "create" or action == "read":
217 for i in res_iter:
218 r = yield from i
219 resp = r.result
220
221 if resp is None:
222 raise NetworkResourceError("Did not get a network resource response (resp: %s)", resp)
223
224 if resp.has_field('resource_info') and resp.resource_info.resource_state == "failed":
225 raise NetworkResourceError(resp.resource_info.resource_errors)
226
227 if not (resp.has_field('resource_info') and
228 resp.resource_info.has_field('virtual_link_id')):
229 raise NetworkResourceError("Did not get a valid network resource response (resp: %s)", resp)
230
231 self._log.debug("Got network request response: %s", resp)
232
233 return resp
234
235 @asyncio.coroutine
236 def instantiate(self, xact, restart=0):
237 """ Instantiate this VL """
238 self._state = VirtualLinkRecordState.INSTANTIATING
239
240 self._log.debug("Instantiating VLR path = [%s]", self.xpath)
241
242 try:
243 self._state = VirtualLinkRecordState.RESOURCE_ALLOC_PENDING
244
245 if restart == 0:
246 network_resp = yield from self.create_network(xact)
247 else:
248 network_resp = yield from self.read_network(xact)
249 if network_resp == None:
250 network_resp = yield from self.create_network(xact)
251
252 # Note network_resp.virtual_link_id is CAL assigned network_id.
253
254 self._network_id = network_resp.resource_info.virtual_link_id
255 self._network_pool = network_resp.resource_info.pool_name
256 self._assigned_subnet = network_resp.resource_info.subnet
257
258 self._state = VirtualLinkRecordState.READY
259
260 yield from self.publish(xact)
261
262 except Exception as e:
263 self._log.error("Instantiatiation of VLR record failed: %s", str(e))
264 self._state = VirtualLinkRecordState.FAILED
265 self._state_failed_reason = str(e)
266 yield from self.publish(xact)
267
268 @asyncio.coroutine
269 def publish(self, xact):
270 """ publish this VLR """
271 vlr = self.msg
272 self._log.debug("Publishing VLR path = [%s], record = [%s]",
273 self.xpath, self.msg)
274 vlr.create_time = self._create_time
275 yield from self._vnsm.publish_vlr(xact, self.xpath, self.msg)
276 self._log.debug("Published VLR path = [%s], record = [%s]",
277 self.xpath, self.msg)
278
279 @asyncio.coroutine
280 def terminate(self, xact):
281 """ Terminate this VL """
282 if self._state not in [VirtualLinkRecordState.READY, VirtualLinkRecordState.FAILED]:
283 self._log.error("Ignoring terminate for VL %s is in %s state",
284 self.vlr_id, self._state)
285 return
286
287 if self._state == VirtualLinkRecordState.READY:
288 self._log.debug("Terminating VL with id %s", self.vlr_id)
289 self._state = VirtualLinkRecordState.TERMINATING
290 try:
291 yield from self.delete_network(xact)
292 except Exception:
293 self._log.exception("Caught exception while deleting VL %s", self.vlr_id)
294 self._log.debug("Terminated VL with id %s", self.vlr_id)
295
296 yield from self.unpublish(xact)
297 self._state = VirtualLinkRecordState.TERMINATED
298
299 @asyncio.coroutine
300 def unpublish(self, xact):
301 """ Unpublish this VLR """
302 self._log.debug("UnPublishing VLR id %s", self.vlr_id)
303 yield from self._vnsm.unpublish_vlr(xact, self.xpath)
304 self._log.debug("UnPublished VLR id %s", self.vlr_id)
305
306
307 class VlrDtsHandler(object):
308 """ Handles DTS interactions for the VLR registration """
309 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
310
311 def __init__(self, dts, log, loop, vnsm):
312 self._dts = dts
313 self._log = log
314 self._loop = loop
315 self._vnsm = vnsm
316
317 self._regh = None
318 self._project = vnsm._project
319
320 @property
321 def regh(self):
322 """ The registration handle assocaited with this Handler"""
323 return self._regh
324
325 @asyncio.coroutine
326 def register(self):
327 """ Register for the VLR path """
328 def on_commit(xact_info):
329 """ The transaction has been committed """
330 self._log.debug("Got vlr commit (xact_info: %s)", xact_info)
331
332 return rwdts.MemberRspCode.ACTION_OK
333
334 @asyncio.coroutine
335 def on_event(dts, g_reg, xact, xact_event, scratch_data):
336 @asyncio.coroutine
337 def instantiate_realloc_vlr(vlr):
338 """Re-populate the virtual link information after restart
339
340 Arguments:
341 vlink
342
343 """
344
345 with self._dts.transaction(flags=0) as xact:
346 yield from vlr.instantiate(xact, 1)
347
348 if (xact_event == rwdts.MemberEvent.INSTALL):
349 curr_cfg = self.regh.elements
350 for cfg in curr_cfg:
351 vlr = self._vnsm.create_vlr(cfg)
352 self._loop.create_task(instantiate_realloc_vlr(vlr))
353
354 self._log.debug("Got on_event")
355 return rwdts.MemberRspCode.ACTION_OK
356
357 @asyncio.coroutine
358 def on_prepare(xact_info, action, ks_path, msg):
359 """ prepare for VLR registration"""
360 self._log.debug(
361 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
362 xact_info, action, msg
363 )
364
365 if action == rwdts.QueryAction.CREATE:
366 vlr = self._vnsm.create_vlr(msg)
367 with self._dts.transaction(flags=0) as xact:
368 yield from vlr.instantiate(xact)
369 self._log.debug("Responding to VL create request path:%s, msg:%s",
370 vlr.xpath, vlr.msg)
371 xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath=vlr.xpath, msg=vlr.msg)
372 return
373 elif action == rwdts.QueryAction.DELETE:
374 # Delete an VLR record
375 schema = RwVlrYang.YangData_RwProject_Project_VlrCatalog_Vlr.schema()
376 path_entry = schema.keyspec_to_entry(ks_path)
377 self._log.debug("Terminating VLR id %s", path_entry.key00.id)
378 yield from self._vnsm.delete_vlr(path_entry.key00.id, xact_info.xact)
379 else:
380 err = "%s action on VirtualLinkRecord not supported" % action
381 raise NotImplementedError(err)
382 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
383 return
384
385 xpath = self._project.add_project(VlrDtsHandler.XPATH)
386 self._log.debug("Registering for VLR using xpath: {}".
387 format(xpath))
388
389 reg_handle = rift.tasklets.DTS.RegistrationHandler(
390 on_commit=on_commit,
391 on_prepare=on_prepare,
392 )
393 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
394 with self._dts.group_create(handler=handlers) as group:
395 self._regh = group.register(
396 xpath=xpath,
397 handler=reg_handle,
398 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ| rwdts.Flag.DATASTORE,
399 )
400
401 def deregister(self):
402 self._log.debug("De-register VLR handler for project {}".
403 format(self._project.name))
404 if self._regh:
405 self._regh.deregister()
406 self._regh = None
407
408 @asyncio.coroutine
409 def create(self, xact, xpath, msg):
410 """
411 Create a VLR record in DTS with path and message
412 """
413 path = self._project.add_project(xpath)
414 self._log.debug("Creating VLR xact = %s, %s:%s",
415 xact, path, msg)
416 self.regh.create_element(path, msg)
417 self._log.debug("Created VLR xact = %s, %s:%s",
418 xact, path, msg)
419
420 @asyncio.coroutine
421 def update(self, xact, xpath, msg):
422 """
423 Update a VLR record in DTS with path and message
424 """
425 path = self._project.add_project(xpath)
426 self._log.debug("Updating VLR xact = %s, %s:%s",
427 xact, path, msg)
428 self.regh.update_element(path, msg)
429 self._log.debug("Updated VLR xact = %s, %s:%s",
430 xact, path, msg)
431
432 @asyncio.coroutine
433 def delete(self, xact, xpath):
434 """
435 Delete a VLR record in DTS with path and message
436 """
437 path = self._project.add_project(xpath)
438 self._log.debug("Deleting VLR xact = %s, %s", xact, path)
439 self.regh.delete_element(path)
440 self._log.debug("Deleted VLR xact = %s, %s", xact, path)
441
442
443 class VldDtsHandler(object):
444 """ DTS handler for the VLD registration """
445 XPATH = "C,/vld:vld-catalog/vld:vld"
446
447 def __init__(self, dts, log, loop, vnsm):
448 self._dts = dts
449 self._log = log
450 self._loop = loop
451 self._vnsm = vnsm
452
453 self._regh = None
454
455 @property
456 def regh(self):
457 """ The registration handle assocaited with this Handler"""
458 return self._regh
459
460 @asyncio.coroutine
461 def register(self):
462 """ Register the VLD path """
463 @asyncio.coroutine
464 def on_prepare(xact_info, query_action, ks_path, msg):
465 """ prepare callback on vld path """
466 self._log.debug(
467 "Got on prepare for VLD update (ks_path: %s) (action: %s)",
468 ks_path.to_xpath(VldYang.get_schema()), msg)
469
470 schema = VldYang.YangData_RwProject_Project_VldCatalog_Vld.schema()
471 path_entry = schema.keyspec_to_entry(ks_path)
472 # TODO: Check why on project delete this gets called
473 if not path_entry:
474 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
475 return
476
477 vld_id = path_entry.key00.id
478
479 disabled_actions = [rwdts.QueryAction.DELETE, rwdts.QueryAction.UPDATE]
480 if query_action not in disabled_actions:
481 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
482 return
483
484 vlr = self._vnsm.find_vlr_by_vld_id(vld_id)
485 if vlr is None:
486 self._log.debug(
487 "Did not find an existing VLR record for vld %s. "
488 "Permitting %s vld action", vld_id, query_action)
489 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
490 return
491
492 raise VlrRecordExistsError(
493 "Vlr record(s) exists."
494 "Cannot perform %s action on VLD." % query_action)
495
496 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
497
498 yield from self._dts.register(
499 self._vnsm._project.add_project(VldDtsHandler.XPATH),
500 flags=rwdts.Flag.SUBSCRIBER,
501 handler=handler
502 )
503
504 def deregister(self):
505 self._log.debug("De-register VLD handler for project {}".
506 format(self._vnsm._project.name))
507 if self._regh:
508 self._regh.deregister()
509 self._regh = None