bdea4efce6f7a39dca3b4bfbcb86e8a6d1a0a623
[osm/SO.git] / rwlaunchpad / plugins / rwvns / rift / vlmgr / rwvlmgr.py
1
2 #
3 # Copyright 2016 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19 import enum
20 import uuid
21 import time
22
23 import gi
24 gi.require_version('RwVlrYang', '1.0')
25 gi.require_version('RwDts', '1.0')
26 gi.require_version('RwResourceMgrYang', '1.0')
27 from gi.repository import (
28 RwVlrYang,
29 VldYang,
30 RwDts as rwdts,
31 RwResourceMgrYang,
32 )
33 import rift.tasklets
34
35
36 class NetworkResourceError(Exception):
37 """ Network Resource Error """
38 pass
39
40
41 class VlrRecordExistsError(Exception):
42 """ VLR record already exists"""
43 pass
44
45
46 class VlRecordError(Exception):
47 """ VLR record error """
48 pass
49
50
51 class VirtualLinkRecordState(enum.Enum):
52 """ Virtual Link record state """
53 INIT = 1
54 INSTANTIATING = 2
55 RESOURCE_ALLOC_PENDING = 3
56 READY = 4
57 TERMINATING = 5
58 TERMINATED = 6
59 FAILED = 10
60
61
62 class VirtualLinkRecord(object):
63 """
64 Virtual Link Record object
65 """
66 def __init__(self, dts, log, loop, vnsm, vlr_msg, req_id=None):
67 self._dts = dts
68 self._log = log
69 self._loop = loop
70 self._vnsm = vnsm
71 self._vlr_msg = vlr_msg
72
73 self._network_id = None
74 self._network_pool = None
75 self._assigned_subnet = None
76 self._create_time = int(time.time())
77 if req_id == None:
78 self._request_id = str(uuid.uuid4())
79 else:
80 self._request_id = req_id
81
82 self._state = VirtualLinkRecordState.INIT
83 self._state_failed_reason = None
84
85 @property
86 def vld_xpath(self):
87 """ VLD xpath associated with this VLR record """
88 return "C,/vld:vld-catalog/vld:vld[id='{}']".format(self.vld_id)
89
90 @property
91 def vld_id(self):
92 """ VLD id associated with this VLR record """
93 return self._vlr_msg.vld_ref
94
95 @property
96 def vlr_id(self):
97 """ VLR id associated with this VLR record """
98 return self._vlr_msg.id
99
100 @property
101 def xpath(self):
102 """ path for this VLR """
103 return("D,/vlr:vlr-catalog"
104 "/vlr:vlr[vlr:id='{}']".format(self.vlr_id))
105
106 @property
107 def name(self):
108 """ Name of this VLR """
109 return self._vlr_msg.name
110
111 @property
112 def cloud_account_name(self):
113 """ Cloud Account to instantiate the virtual link on """
114 return self._vlr_msg.cloud_account
115
116 @property
117 def resmgr_path(self):
118 """ path for resource-mgr"""
119 return ("D,/rw-resource-mgr:resource-mgmt" +
120 "/vlink-event/vlink-event-data[event-id='{}']".format(self._request_id))
121
122 @property
123 def operational_status(self):
124 """ Operational status of this VLR"""
125 op_stats_dict = {"INIT": "init",
126 "INSTANTIATING": "vl_alloc_pending",
127 "RESOURCE_ALLOC_PENDING": "vl_alloc_pending",
128 "READY": "running",
129 "FAILED": "failed",
130 "TERMINATING": "vl_terminate_pending",
131 "TERMINATED": "terminated"}
132
133 return op_stats_dict[self._state.name]
134
135 @property
136 def msg(self):
137 """ VLR message for this VLR """
138 msg = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr()
139 msg.copy_from(self._vlr_msg)
140
141 if self._network_id is not None:
142 msg.network_id = self._network_id
143
144 if self._network_pool is not None:
145 msg.network_pool = self._network_pool
146
147 if self._assigned_subnet is not None:
148 msg.assigned_subnet = self._assigned_subnet
149
150 msg.operational_status = self.operational_status
151 msg.operational_status_details = self._state_failed_reason
152 msg.res_id = self._request_id
153
154 return msg
155
156 @property
157 def resmgr_msg(self):
158 """ VLR message for this VLR """
159 msg = RwResourceMgrYang.VirtualLinkEventData()
160 msg.event_id = self._request_id
161 msg.cloud_account = self.cloud_account_name
162 msg.request_info.name = self.name
163 msg.request_info.vim_network_name = self._vlr_msg.vim_network_name
164 msg.request_info.provider_network.from_dict(
165 self._vlr_msg.provider_network.as_dict()
166 )
167 if self._vlr_msg.has_field('ip_profile_params'):
168 msg.request_info.ip_profile_params.from_dict(self._vlr_msg.ip_profile_params.as_dict())
169
170 return msg
171
172 @asyncio.coroutine
173 def create_network(self, xact):
174 """ Create network for this VL """
175 self._log.debug("Creating network req-id: %s", self._request_id)
176 return (yield from self.request_network(xact, "create"))
177
178 @asyncio.coroutine
179 def delete_network(self, xact):
180 """ Delete network for this VL """
181 self._log.debug("Deleting network - req-id: %s", self._request_id)
182 return (yield from self.request_network(xact, "delete"))
183
184 @asyncio.coroutine
185 def read_network(self, xact):
186 """ Read network for this VL """
187 self._log.debug("Reading network - req-id: %s", self._request_id)
188 return (yield from self.request_network(xact, "read"))
189
190 @asyncio.coroutine
191 def request_network(self, xact, action):
192 """Request creation/deletion network for this VL """
193
194 block = xact.block_create()
195
196 if action == "create":
197 self._log.debug("Creating network path:%s, msg:%s",
198 self.resmgr_path, self.resmgr_msg)
199 block.add_query_create(self.resmgr_path, self.resmgr_msg)
200 elif action == "delete":
201 self._log.debug("Deleting network path:%s", self.resmgr_path)
202 if self.resmgr_msg.request_info.name != "multisite":
203 block.add_query_delete(self.resmgr_path)
204 elif action == "read":
205 self._log.debug("Reading network path:%s", self.resmgr_path)
206 block.add_query_read(self.resmgr_path)
207 else:
208 raise VlRecordError("Invalid action %s received" % action)
209
210 res_iter = yield from block.execute(now=True)
211
212 resp = None
213
214 if action == "create" or action == "read":
215 for i in res_iter:
216 r = yield from i
217 resp = r.result
218
219 if resp is None:
220 raise NetworkResourceError("Did not get a network resource response (resp: %s)", resp)
221
222 if resp.has_field('resource_info') and resp.resource_info.resource_state == "failed":
223 raise NetworkResourceError(resp.resource_info.resource_errors)
224
225 if not (resp.has_field('resource_info') and
226 resp.resource_info.has_field('virtual_link_id')):
227 raise NetworkResourceError("Did not get a valid network resource response (resp: %s)", resp)
228
229 self._log.debug("Got network request response: %s", resp)
230
231 return resp
232
233 @asyncio.coroutine
234 def instantiate(self, xact, restart=0):
235 """ Instantiate this VL """
236 self._state = VirtualLinkRecordState.INSTANTIATING
237
238 self._log.debug("Instantiating VLR path = [%s]", self.xpath)
239
240 try:
241 self._state = VirtualLinkRecordState.RESOURCE_ALLOC_PENDING
242
243 if restart == 0:
244 network_resp = yield from self.create_network(xact)
245 else:
246 network_resp = yield from self.read_network(xact)
247 if network_resp == None:
248 network_resp = yield from self.create_network(xact)
249
250 # Note network_resp.virtual_link_id is CAL assigned network_id.
251
252 self._network_id = network_resp.resource_info.virtual_link_id
253 self._network_pool = network_resp.resource_info.pool_name
254 self._assigned_subnet = network_resp.resource_info.subnet
255
256 self._state = VirtualLinkRecordState.READY
257
258 yield from self.publish(xact)
259
260 except Exception as e:
261 self._log.error("Instantiatiation of VLR record failed: %s", str(e))
262 self._state = VirtualLinkRecordState.FAILED
263 self._state_failed_reason = str(e)
264 yield from self.publish(xact)
265
266 @asyncio.coroutine
267 def publish(self, xact):
268 """ publish this VLR """
269 vlr = self.msg
270 self._log.debug("Publishing VLR path = [%s], record = [%s]",
271 self.xpath, self.msg)
272 vlr.create_time = self._create_time
273 yield from self._vnsm.publish_vlr(xact, self.xpath, self.msg)
274 self._log.debug("Published VLR path = [%s], record = [%s]",
275 self.xpath, self.msg)
276
277 @asyncio.coroutine
278 def terminate(self, xact):
279 """ Terminate this VL """
280 if self._state not in [VirtualLinkRecordState.READY, VirtualLinkRecordState.FAILED]:
281 self._log.error("Ignoring terminate for VL %s is in %s state",
282 self.vlr_id, self._state)
283 return
284
285 if self._state == VirtualLinkRecordState.READY:
286 self._log.debug("Terminating VL with id %s", self.vlr_id)
287 self._state = VirtualLinkRecordState.TERMINATING
288 try:
289 yield from self.delete_network(xact)
290 except Exception:
291 self._log.exception("Caught exception while deleting VL %s", self.vlr_id)
292 self._log.debug("Terminated VL with id %s", self.vlr_id)
293
294 yield from self.unpublish(xact)
295 self._state = VirtualLinkRecordState.TERMINATED
296
297 @asyncio.coroutine
298 def unpublish(self, xact):
299 """ Unpublish this VLR """
300 self._log.debug("UnPublishing VLR id %s", self.vlr_id)
301 yield from self._vnsm.unpublish_vlr(xact, self.xpath)
302 self._log.debug("UnPublished VLR id %s", self.vlr_id)
303
304
305 class VlrDtsHandler(object):
306 """ Handles DTS interactions for the VLR registration """
307 XPATH = "D,/vlr:vlr-catalog/vlr:vlr"
308
309 def __init__(self, dts, log, loop, vnsm):
310 self._dts = dts
311 self._log = log
312 self._loop = loop
313 self._vnsm = vnsm
314
315 self._regh = None
316
317 @property
318 def regh(self):
319 """ The registration handle assocaited with this Handler"""
320 return self._regh
321
322 @asyncio.coroutine
323 def register(self):
324 """ Register for the VLR path """
325 def on_commit(xact_info):
326 """ The transaction has been committed """
327 self._log.debug("Got vlr commit (xact_info: %s)", xact_info)
328
329 return rwdts.MemberRspCode.ACTION_OK
330
331 @asyncio.coroutine
332 def on_event(dts, g_reg, xact, xact_event, scratch_data):
333 @asyncio.coroutine
334 def instantiate_realloc_vlr(vlr):
335 """Re-populate the virtual link information after restart
336
337 Arguments:
338 vlink
339
340 """
341
342 with self._dts.transaction(flags=0) as xact:
343 yield from vlr.instantiate(xact, 1)
344
345 if (xact_event == rwdts.MemberEvent.INSTALL):
346 curr_cfg = self.regh.elements
347 for cfg in curr_cfg:
348 vlr = self._vnsm.create_vlr(cfg)
349 self._loop.create_task(instantiate_realloc_vlr(vlr))
350
351 self._log.debug("Got on_event")
352 return rwdts.MemberRspCode.ACTION_OK
353
354 @asyncio.coroutine
355 def on_prepare(xact_info, action, ks_path, msg):
356 """ prepare for VLR registration"""
357 self._log.debug(
358 "Got vlr on_prepare callback (xact_info: %s, action: %s): %s",
359 xact_info, action, msg
360 )
361
362 if action == rwdts.QueryAction.CREATE:
363 vlr = self._vnsm.create_vlr(msg)
364 with self._dts.transaction(flags=0) as xact:
365 yield from vlr.instantiate(xact)
366 self._log.debug("Responding to VL create request path:%s, msg:%s",
367 vlr.xpath, vlr.msg)
368 xact_info.respond_xpath(rwdts.XactRspCode.ACK, xpath=vlr.xpath, msg=vlr.msg)
369 return
370 elif action == rwdts.QueryAction.DELETE:
371 # Delete an VLR record
372 schema = RwVlrYang.YangData_Vlr_VlrCatalog_Vlr.schema()
373 path_entry = schema.keyspec_to_entry(ks_path)
374 self._log.debug("Terminating VLR id %s", path_entry.key00.id)
375 yield from self._vnsm.delete_vlr(path_entry.key00.id, xact_info.xact)
376 else:
377 err = "%s action on VirtualLinkRecord not supported" % action
378 raise NotImplementedError(err)
379 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
380 return
381
382 self._log.debug("Registering for VLR using xpath: %s",
383 VlrDtsHandler.XPATH)
384
385 reg_handle = rift.tasklets.DTS.RegistrationHandler(
386 on_commit=on_commit,
387 on_prepare=on_prepare,
388 )
389 handlers = rift.tasklets.Group.Handler(on_event=on_event,)
390 with self._dts.group_create(handler=handlers) as group:
391 self._regh = group.register(
392 xpath=VlrDtsHandler.XPATH,
393 handler=reg_handle,
394 flags=rwdts.Flag.PUBLISHER | rwdts.Flag.NO_PREP_READ| rwdts.Flag.DATASTORE,
395 )
396
397 @asyncio.coroutine
398 def create(self, xact, path, msg):
399 """
400 Create a VLR record in DTS with path and message
401 """
402 self._log.debug("Creating VLR xact = %s, %s:%s",
403 xact, path, msg)
404 self.regh.create_element(path, msg)
405 self._log.debug("Created VLR xact = %s, %s:%s",
406 xact, path, msg)
407
408 @asyncio.coroutine
409 def update(self, xact, path, msg):
410 """
411 Update a VLR record in DTS with path and message
412 """
413 self._log.debug("Updating VLR xact = %s, %s:%s",
414 xact, path, msg)
415 self.regh.update_element(path, msg)
416 self._log.debug("Updated VLR xact = %s, %s:%s",
417 xact, path, msg)
418
419 @asyncio.coroutine
420 def delete(self, xact, path):
421 """
422 Delete a VLR record in DTS with path and message
423 """
424 self._log.debug("Deleting VLR xact = %s, %s", xact, path)
425 self.regh.delete_element(path)
426 self._log.debug("Deleted VLR xact = %s, %s", xact, path)
427
428
429 class VldDtsHandler(object):
430 """ DTS handler for the VLD registration """
431 XPATH = "C,/vld:vld-catalog/vld:vld"
432
433 def __init__(self, dts, log, loop, vnsm):
434 self._dts = dts
435 self._log = log
436 self._loop = loop
437 self._vnsm = vnsm
438
439 self._regh = None
440
441 @property
442 def regh(self):
443 """ The registration handle assocaited with this Handler"""
444 return self._regh
445
446 @asyncio.coroutine
447 def register(self):
448 """ Register the VLD path """
449 @asyncio.coroutine
450 def on_prepare(xact_info, query_action, ks_path, msg):
451 """ prepare callback on vld path """
452 self._log.debug(
453 "Got on prepare for VLD update (ks_path: %s) (action: %s)",
454 ks_path.to_xpath(VldYang.get_schema()), msg)
455
456 schema = VldYang.YangData_Vld_VldCatalog_Vld.schema()
457 path_entry = schema.keyspec_to_entry(ks_path)
458 vld_id = path_entry.key00.id
459
460 disabled_actions = [rwdts.QueryAction.DELETE, rwdts.QueryAction.UPDATE]
461 if query_action not in disabled_actions:
462 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
463 return
464
465 vlr = self._vnsm.find_vlr_by_vld_id(vld_id)
466 if vlr is None:
467 self._log.debug(
468 "Did not find an existing VLR record for vld %s. "
469 "Permitting %s vld action", vld_id, query_action)
470 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
471 return
472
473 raise VlrRecordExistsError(
474 "Vlr record(s) exists."
475 "Cannot perform %s action on VLD." % query_action)
476
477 handler = rift.tasklets.DTS.RegistrationHandler(on_prepare=on_prepare)
478
479 yield from self._dts.register(
480 VldDtsHandler.XPATH,
481 flags=rwdts.Flag.SUBSCRIBER,
482 handler=handler
483 )