Merge from OSM SO master
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwvnffgmgr.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19
20 from gi.repository import (
21 RwDts as rwdts,
22 RwsdnalYang,
23 RwTypes,
24 ProtobufC,
25 )
26
27 from gi.repository.RwTypes import RwStatus
28 import rw_peas
29 import rift.tasklets
30
31 class SdnGetPluginError(Exception):
32 """ Error while fetching SDN plugin """
33 pass
34
35
36 class SdnGetInterfaceError(Exception):
37 """ Error while fetching SDN interface"""
38 pass
39
40
41 class SdnAccountError(Exception):
42 """ Error while creating/deleting/updating SDN Account"""
43 pass
44
45 class VnffgrDoesNotExist(Exception):
46 """ Error while fetching SDN interface"""
47 pass
48
49 class VnffgrAlreadyExist(Exception):
50 """ Vnffgr already exists Error"""
51 pass
52
53 class VnffgrCreationFailed(Exception):
54 """ Error while creating VNFFGR"""
55 pass
56
57
58 class VnffgrUpdateFailed(Exception):
59 """ Error while updating VNFFGR"""
60 pass
61
62 class VnffgMgr(object):
63 """ Implements the interface to backend plugins to fetch topology """
64 def __init__(self, dts, log, log_hdl, loop, project):
65 self._account = {}
66 self._dts = dts
67 self._log = log
68 self._log_hdl = log_hdl
69 self._loop = loop
70 self._project = project
71 self._sdn = {}
72 self._sdn_handler = SDNAccountDtsHandler(self._dts, self._log, self)
73 self._vnffgr_list = {}
74
75 @asyncio.coroutine
76 def register(self):
77 yield from self._sdn_handler.register()
78
79 def deregister(self):
80 self._log.debug("Project {} de-register vnffgmgr".
81 format(self._project.name))
82 self._sdn_handler.deregister()
83
84 def set_sdn_account(self,account):
85 if (account.name in self._account):
86 self._log.error("SDN Account is already set")
87 else:
88 sdn_account = RwsdnalYang.SDNAccount()
89 sdn_account.from_dict(account.as_dict())
90 sdn_account.name = account.name
91 self._account[account.name] = sdn_account
92 self._log.debug("Account set is %s , %s",type(self._account), self._account)
93
94 def del_sdn_account(self, name):
95 self._log.debug("Account deleted is %s , %s", type(self._account), name)
96 del self._account[name]
97
98 def update_sdn_account(self,account):
99 self._log.debug("Account updated is %s , %s", type(self._account), account)
100 if account.name in self._account:
101 sdn_account = self._account[account.name]
102
103 sdn_account.from_dict(
104 account.as_dict(),
105 ignore_missing_keys=True,
106 )
107 self._account[account.name] = sdn_account
108
109 def get_sdn_account(self, name):
110 """
111 Creates an object for class RwsdnalYang.SdnAccount()
112 """
113 if (name in self._account):
114 return self._account[name]
115 else:
116 self._log.error("SDN account is not configured")
117
118
119 def get_sdn_plugin(self,name):
120 """
121 Loads rw.sdn plugin via libpeas
122 """
123 if (name in self._sdn):
124 return self._sdn[name]
125 account = self.get_sdn_account(name)
126 plugin_name = getattr(account, account.account_type).plugin_name
127 self._log.debug("SDN plugin being created")
128 plugin = rw_peas.PeasPlugin(plugin_name, 'RwSdn-1.0')
129 engine, info, extension = plugin()
130
131 self._sdn[name] = plugin.get_interface("Topology")
132 try:
133 rc = self._sdn[name].init(self._log_hdl)
134 assert rc == RwStatus.SUCCESS
135 except:
136 self._log.error("ERROR:SDN plugin instantiation failed ")
137 else:
138 self._log.debug("SDN plugin successfully instantiated")
139 return self._sdn[name]
140
141 def fetch_vnffgr(self,vnffgr_id):
142 if vnffgr_id not in self._vnffgr_list:
143 self._log.error("VNFFGR with id %s not present in VNFFGMgr", vnffgr_id)
144 msg = "VNFFGR with id {} not present in VNFFGMgr".format(vnffgr_id)
145 raise VnffgrDoesNotExist(msg)
146 self.update_vnffgrs(self._vnffgr_list[vnffgr_id].sdn_account)
147 vnffgr = self._vnffgr_list[vnffgr_id].deep_copy()
148 self._log.debug("VNFFGR for id %s is %s",vnffgr_id,vnffgr)
149 return vnffgr
150
151 def create_vnffgr(self, vnffgr,classifier_list,sff_list):
152 """
153 """
154 self._log.debug("Received VNFFG chain Create msg %s",vnffgr)
155 if vnffgr.id in self._vnffgr_list:
156 self._log.error("VNFFGR with id %s already present in VNFFGMgr", vnffgr.id)
157 vnffgr.operational_status = 'failed'
158 msg = "VNFFGR with id {} already present in VNFFGMgr".format(vnffgr.id)
159 raise VnffgrAlreadyExist(msg)
160
161 self._vnffgr_list[vnffgr.id] = vnffgr
162 vnffgr.operational_status = 'init'
163 if len(self._account) == 0:
164 self._log.error("SDN Account not configured")
165 vnffgr.operational_status = 'failed'
166 return
167 if vnffgr.sdn_account:
168 sdn_acct_name = vnffgr.sdn_account
169 else:
170 self._log.error("SDN Account is not associated to create VNFFGR")
171 # TODO Fail the VNFFGR creation if SDN account is not associated
172 #vnffgr.operational_status = 'failed'
173 #msg = "SDN Account is not associated to create VNFFGR"
174 #raise VnffgrCreationFailed(msg)
175 sdn_account = [sdn_account.name for _,sdn_account in self._account.items()]
176 sdn_acct_name = sdn_account[0]
177 vnffgr.sdn_account = sdn_acct_name
178 sdn_plugin = self.get_sdn_plugin(sdn_acct_name)
179
180 for rsp in vnffgr.rsp:
181 vnffg = RwsdnalYang.VNFFGChain()
182 vnffg.name = rsp.name
183 vnffg.classifier_name = rsp.classifier_name
184
185 vnfr_list = list()
186 for index,cp_ref in enumerate(rsp.vnfr_connection_point_ref):
187 cpath = vnffg.vnf_chain_path.add()
188 cpath.order=cp_ref.hop_number
189 cpath.service_function_type = cp_ref.service_function_type
190 cpath.nsh_aware=True
191 cpath.transport_type = 'vxlan-gpe'
192
193 vnfr=cpath.vnfr_ids.add()
194 vnfr.vnfr_id = cp_ref.vnfr_id_ref
195 vnfr.vnfr_name = cp_ref.vnfr_name_ref
196 vnfr.mgmt_address = cp_ref.connection_point_params.mgmt_address
197 vnfr.mgmt_port = 5000
198 vnfr_list.append(vnfr)
199
200 vdu = vnfr.vdu_list.add()
201 vdu.name = cp_ref.connection_point_params.name
202 vdu.port_id = cp_ref.connection_point_params.port_id
203 vdu.vm_id = cp_ref.connection_point_params.vm_id
204 vdu.address = cp_ref.connection_point_params.address
205 vdu.port = cp_ref.connection_point_params.port
206
207 for sff in sff_list.values():
208 _sff = vnffg.sff.add()
209 _sff.from_dict(sff.as_dict())
210 if sff.function_type == 'SFF':
211 for vnfr in vnfr_list:
212 vnfr.sff_name = sff.name
213 self._log.debug("Recevied SFF %s, Created SFF is %s",sff, _sff)
214
215 self._log.debug("VNFFG chain msg is %s",vnffg)
216 rc,rs = sdn_plugin.create_vnffg_chain(self._account[sdn_acct_name],vnffg)
217 if rc != RwTypes.RwStatus.SUCCESS:
218 vnffgr.operational_status = 'failed'
219 msg = "Instantiation of VNFFGR with id {} failed".format(vnffgr.id)
220 raise VnffgrCreationFailed(msg)
221
222 self._log.info("VNFFG chain created successfully for rsp with id %s",rsp.id)
223
224
225 meta = {}
226 if(len(classifier_list) == 2):
227 meta[vnffgr.classifier[0].id] = '0x' + ''.join(str("%0.2X"%int(i)) for i in vnffgr.classifier[1].ip_address.split('.'))
228 meta[vnffgr.classifier[1].id] = '0x' + ''.join(str("%0.2X"%int(i)) for i in vnffgr.classifier[0].ip_address.split('.'))
229
230 self._log.debug("VNFFG Meta VNFFG chain is {}".format(meta))
231
232 for classifier in classifier_list:
233 vnffgr_cl = [_classifier for _classifier in vnffgr.classifier if classifier.id == _classifier.id]
234 if len(vnffgr_cl) > 0:
235 cl_rsp_name = vnffgr_cl[0].rsp_name
236 else:
237 self._log.error("No RSP wiht name %s found; Skipping classifier %s creation",classifier.rsp_id_ref,classifier.name)
238 continue
239 vnffgcl = RwsdnalYang.VNFFGClassifier()
240 vnffgcl.name = classifier.name
241 vnffgcl.rsp_name = cl_rsp_name
242 vnffgcl.port_id = vnffgr_cl[0].port_id
243 vnffgcl.vm_id = vnffgr_cl[0].vm_id
244 # Get the symmetric classifier endpoint ip and set it in nsh ctx1
245
246 vnffgcl.vnffg_metadata.ctx1 = meta.get(vnffgr_cl[0].id,'0')
247 vnffgcl.vnffg_metadata.ctx2 = '0'
248 vnffgcl.vnffg_metadata.ctx3 = '0'
249 vnffgcl.vnffg_metadata.ctx4 = '0'
250 if vnffgr_cl[0].has_field('sff_name'):
251 vnffgcl.sff_name = vnffgr_cl[0].sff_name
252 for index,match_rule in enumerate(classifier.match_attributes):
253 acl = vnffgcl.match_attributes.add()
254 #acl.name = vnffgcl.name + str(index)
255 acl.name = match_rule.id
256 acl.ip_proto = match_rule.ip_proto
257 acl.source_ip_address = match_rule.source_ip_address + '/32'
258 acl.source_port = match_rule.source_port
259 acl.destination_ip_address = match_rule.destination_ip_address + '/32'
260 acl.destination_port = match_rule.destination_port
261
262 self._log.debug(" Creating VNFFG Classifier Classifier %s for RSP: %s",vnffgcl.name,vnffgcl.rsp_name)
263 rc,rs = sdn_plugin.create_vnffg_classifier(self._account[sdn_acct_name],vnffgcl)
264 if rc != RwTypes.RwStatus.SUCCESS:
265 self._log.error("VNFFG Classifier cretaion failed for Classifier %s for RSP ID: %s",classifier.name,classifier.rsp_id_ref)
266 #vnffgr.operational_status = 'failed'
267 #msg = "Instantiation of VNFFGR with id {} failed".format(vnffgr.id)
268 #raise VnffgrCreationFailed(msg)
269
270 vnffgr.operational_status = 'running'
271 self.update_vnffgrs(vnffgr.sdn_account)
272 return vnffgr
273
274 def update_vnffgrs(self,sdn_acct_name):
275 """
276 Update VNFFGR by reading data from SDN Plugin
277 """
278 sdn_plugin = self.get_sdn_plugin(sdn_acct_name)
279 rc,rs = sdn_plugin.get_vnffg_rendered_paths(self._account[sdn_acct_name])
280 if rc != RwTypes.RwStatus.SUCCESS:
281 msg = "Reading of VNFFGR from SDN Plugin failed"
282 raise VnffgrUpdateFailed(msg)
283
284 vnffgr_list = [_vnffgr for _vnffgr in self._vnffgr_list.values() if _vnffgr.sdn_account == sdn_acct_name and _vnffgr.operational_status == 'running']
285
286 for _vnffgr in vnffgr_list:
287 for _vnffgr_rsp in _vnffgr.rsp:
288 vnffg_rsp_list = [vnffg_rsp for vnffg_rsp in rs.vnffg_rendered_path if vnffg_rsp.name == _vnffgr_rsp.name]
289 if vnffg_rsp_list is not None and len(vnffg_rsp_list) > 0:
290 vnffg_rsp = vnffg_rsp_list[0]
291 if len(vnffg_rsp.rendered_path_hop) != len(_vnffgr_rsp.vnfr_connection_point_ref):
292 _vnffgr.operational_status = 'failed'
293 self._log.error("Received hop count %d doesnt match the VNFFGD hop count %d", len(vnffg_rsp.rendered_path_hop),
294 len(_vnffgr_rsp.vnfr_connection_point_ref))
295 msg = "Fetching of VNFFGR with id {} failed".format(_vnffgr.id)
296 raise VnffgrUpdateFailed(msg)
297 _vnffgr_rsp.path_id = vnffg_rsp.path_id
298 for index, rendered_hop in enumerate(vnffg_rsp.rendered_path_hop):
299 for vnfr_cp_ref in _vnffgr_rsp.vnfr_connection_point_ref:
300 if rendered_hop.vnfr_name == vnfr_cp_ref.vnfr_name_ref:
301 vnfr_cp_ref.hop_number = rendered_hop.hop_number
302 vnfr_cp_ref.service_index = rendered_hop.service_index
303 vnfr_cp_ref.service_function_forwarder.name = rendered_hop.service_function_forwarder.name
304 vnfr_cp_ref.service_function_forwarder.ip_address = rendered_hop.service_function_forwarder.ip_address
305 vnfr_cp_ref.service_function_forwarder.port = rendered_hop.service_function_forwarder.port
306 else:
307 _vnffgr.operational_status = 'failed'
308 self._log.error("VNFFGR RSP with name %s in VNFFG %s not found",_vnffgr_rsp.name, _vnffgr.id)
309 msg = "Fetching of VNFFGR with name {} failed".format(_vnffgr_rsp.name)
310 raise VnffgrUpdateFailed(msg)
311
312
313 def terminate_vnffgr(self,vnffgr_id,sdn_account_name = None):
314 """
315 Deletet the VNFFG chain
316 """
317 if vnffgr_id not in self._vnffgr_list:
318 self._log.error("VNFFGR with id %s not present in VNFFGMgr during termination", vnffgr_id)
319 msg = "VNFFGR with id {} not present in VNFFGMgr during termination".format(vnffgr_id)
320 return
321 #raise VnffgrDoesNotExist(msg)
322 self._log.info("Received VNFFG chain terminate for id %s",vnffgr_id)
323 if sdn_account_name is None:
324 sdn_account = [sdn_account.name for _,sdn_account in self._account.items()]
325 sdn_account_name = sdn_account[0]
326 sdn_plugin = self.get_sdn_plugin(sdn_account_name)
327 sdn_plugin.terminate_vnffg_chain(self._account[sdn_account_name],vnffgr_id)
328 sdn_plugin.terminate_vnffg_classifier(self._account[sdn_account_name],vnffgr_id)
329 del self._vnffgr_list[vnffgr_id]
330
331 class SDNAccountDtsHandler(object):
332 XPATH = "C,/rw-sdn:sdn/rw-sdn:account"
333
334 def __init__(self, dts, log, parent):
335 self._dts = dts
336 self._log = log
337 self._parent = parent
338 self._project = self._parent._project
339
340 self._sdn_account = {}
341 self._reg = None
342
343 def _set_sdn_account(self, account):
344 self._log.info("Setting sdn account: {}".format(account))
345 if account.name in self._sdn_account:
346 self._log.error("SDN Account with name %s already exists. Ignoring config", account.name);
347 self._sdn_account[account.name] = account
348 self._parent.set_sdn_account(account)
349
350 def _del_sdn_account(self, account_name):
351 self._log.info("Deleting sdn account: {}".format(account_name))
352 del self._sdn_account[account_name]
353
354 self._parent.del_sdn_account(account_name)
355
356 def _update_sdn_account(self, account):
357 self._log.info("Updating sdn account: {}".format(account))
358 # No need to update locally saved sdn_account's updated fields, as they
359 # are not used anywhere. Call the parent's update callback.
360 self._parent.update_sdn_account(account)
361
362 @asyncio.coroutine
363 def register(self):
364 def apply_config(dts, acg, xact, action, _):
365 self._log.debug("Got sdn account apply config (xact: %s) (action: %s)", xact, action)
366 if action == rwdts.AppconfAction.INSTALL and xact.id is None:
367 self._log.debug("No xact handle. Skipping apply config")
368 return RwTypes.RwStatus.SUCCESS
369
370 return RwTypes.RwStatus.SUCCESS
371
372 @asyncio.coroutine
373 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
374 """ Prepare callback from DTS for SDN Account config """
375
376 self._log.info("SDN Cloud account config received: %s", msg)
377
378 fref = ProtobufC.FieldReference.alloc()
379 fref.goto_whole_message(msg.to_pbcm())
380
381 if fref.is_field_deleted():
382 # Delete the sdn account record
383 self._del_sdn_account(msg.name)
384 else:
385 # If the account already exists, then this is an update.
386 if msg.name in self._sdn_account:
387 self._log.debug("SDN account already exists. Invoking on_prepare update request")
388 if msg.has_field("account_type"):
389 errmsg = "Cannot update SDN account's account-type."
390 self._log.error(errmsg)
391 xact_info.send_error_xpath(
392 RwTypes.RwStatus.FAILURE,
393 self._project.add_project(SDNAccountDtsHandler.XPATH),
394 errmsg
395 )
396 raise SdnAccountError(errmsg)
397
398 # Update the sdn account record
399 self._update_sdn_account(msg)
400 else:
401 self._log.debug("SDN account does not already exist. Invoking on_prepare add request")
402 if not msg.has_field('account_type'):
403 errmsg = "New SDN account must contain account-type field."
404 self._log.error(errmsg)
405 xact_info.send_error_xpath(
406 RwTypes.RwStatus.FAILURE,
407 self._project.add_project(SDNAccountDtsHandler.XPATH),
408 errmsg
409 )
410 raise SdnAccountError(errmsg)
411
412 # Set the sdn account record
413 self._set_sdn_account(msg)
414
415 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
416
417
418 xpath = self._project.add_project(SDNAccountDtsHandler.XPATH)
419 self._log.debug("Registering for Sdn Account config using xpath: {}".
420 format(xpath))
421
422 acg_handler = rift.tasklets.AppConfGroup.Handler(
423 on_apply=apply_config,
424 )
425
426 with self._dts.appconf_group_create(acg_handler) as acg:
427 self._reg = acg.register(
428 xpath=xpath,
429 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
430 on_prepare=on_prepare
431 )
432
433 def deregister(self):
434 self._log.debug("De-register SDN Account handler in vnffg for project".
435 format(self._project.name))
436 self._reg.deregister()
437 self._reg = None