update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwnsm / rift / tasklets / rwnsmtasklet / rwvnffgmgr.py
1
2 #
3 # Copyright 2016-2017 RIFT.IO Inc
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import asyncio
19
20 from gi.repository import (
21 RwDts as rwdts,
22 RwsdnalYang,
23 RwTypes,
24 ProtobufC,
25 )
26
27 from gi.repository.RwTypes import RwStatus
28 import rw_peas
29 import rift.tasklets
30
31 class SdnGetPluginError(Exception):
32 """ Error while fetching SDN plugin """
33 pass
34
35
36 class SdnGetInterfaceError(Exception):
37 """ Error while fetching SDN interface"""
38 pass
39
40
41 class SdnAccountError(Exception):
42 """ Error while creating/deleting/updating SDN Account"""
43 pass
44
45 class VnffgrDoesNotExist(Exception):
46 """ Error while fetching SDN interface"""
47 pass
48
49 class VnffgrAlreadyExist(Exception):
50 """ Vnffgr already exists Error"""
51 pass
52
53 class VnffgrCreationFailed(Exception):
54 """ Error while creating VNFFGR"""
55 pass
56
57
58 class VnffgrUpdateFailed(Exception):
59 """ Error while updating VNFFGR"""
60 pass
61
62 class VnffgMgr(object):
63 """ Implements the interface to backend plugins to fetch topology """
64 def __init__(self, dts, log, log_hdl, loop, project, cloud_account_handler):
65 self._account = {}
66 self._dts = dts
67 self._log = log
68 self._log_hdl = log_hdl
69 self._loop = loop
70 self._cloud_account_handler = cloud_account_handler
71 self._project = project
72 self._sdn = {}
73 self._sdn_handler = SDNAccountDtsHandler(self._dts, self._log, self)
74 self._vnffgr_list = {}
75
76 @asyncio.coroutine
77 def register(self):
78 yield from self._sdn_handler.register()
79
80 def deregister(self):
81 self._log.debug("Project {} de-register vnffgmgr".
82 format(self._project.name))
83 self._sdn_handler.deregister()
84
85 def set_sdn_account(self,account):
86 if (account.name in self._account):
87 self._log.error("SDN Account is already set")
88 else:
89 sdn_account = RwsdnalYang.YangData_RwProject_Project_SdnAccounts_SdnAccountList()
90 sdn_account.from_dict(account.as_dict())
91 sdn_account.name = account.name
92 self._account[account.name] = sdn_account
93 self._log.debug("Account set is %s , %s",type(self._account), self._account)
94
95 def del_sdn_account(self, name):
96 self._log.debug("Account deleted is %s , %s", type(self._account), name)
97 del self._account[name]
98
99 def update_sdn_account(self,account):
100 self._log.debug("Account updated is %s , %s", type(self._account), account)
101 if account.name in self._account:
102 sdn_account = self._account[account.name]
103
104 sdn_account.from_dict(
105 account.as_dict(),
106 ignore_missing_keys=True,
107 )
108 self._account[account.name] = sdn_account
109
110 def get_sdn_account(self, name):
111 """
112 Creates an object for class RwsdnalYang.YangData_RwProject_Project_SdnAccounts_SdnAccountList()
113 """
114 if (name in self._account):
115 return self._account[name]
116 else:
117 self._log.error("SDN account is not configured")
118
119
120 def get_sdn_plugin(self,name):
121 """
122 Loads rw.sdn plugin via libpeas
123 """
124 if (name in self._sdn):
125 return self._sdn[name]
126 account = self.get_sdn_account(name)
127 plugin_name = getattr(account, account.account_type).plugin_name
128 self._log.debug("SDN plugin being created")
129 plugin = rw_peas.PeasPlugin(plugin_name, 'RwSdn-1.0')
130 engine, info, extension = plugin()
131
132 self._sdn[name] = plugin.get_interface("Topology")
133 try:
134 rc = self._sdn[name].init(self._log_hdl)
135 assert rc == RwStatus.SUCCESS
136 except:
137 self._log.error("ERROR:SDN plugin instantiation failed ")
138 else:
139 self._log.debug("SDN plugin successfully instantiated")
140 return self._sdn[name]
141
142 def fetch_vnffgr(self,vnffgr_id):
143 if vnffgr_id not in self._vnffgr_list:
144 self._log.error("VNFFGR with id %s not present in VNFFGMgr", vnffgr_id)
145 msg = "VNFFGR with id {} not present in VNFFGMgr".format(vnffgr_id)
146 raise VnffgrDoesNotExist(msg)
147 sdn_acct = self.get_sdn_account(self._vnffgr_list[vnffgr_id].sdn_account)
148 self._log.debug("SDN account received during vnffg update is %s",sdn_acct)
149 if sdn_acct.account_type != 'openstack':
150 self.update_vnffgrs(self._vnffgr_list[vnffgr_id].sdn_account)
151 vnffgr = self._vnffgr_list[vnffgr_id].deep_copy()
152 self._log.debug("VNFFGR for id %s is %s",vnffgr_id,vnffgr)
153 return vnffgr
154
155 def create_vnffgr(self, vnffgr,classifier_list,sff_list):
156 """
157 """
158 self._log.debug("Received VNFFG chain Create msg %s",vnffgr)
159 if vnffgr.id in self._vnffgr_list:
160 self._log.error("VNFFGR with id %s already present in VNFFGMgr", vnffgr.id)
161 vnffgr.operational_status = 'failed'
162 msg = "VNFFGR with id {} already present in VNFFGMgr".format(vnffgr.id)
163 raise VnffgrAlreadyExist(msg)
164
165 self._vnffgr_list[vnffgr.id] = vnffgr
166 vnffgr.operational_status = 'init'
167 if len(self._account) == 0:
168 self._log.error("SDN Account not configured")
169 vnffgr.operational_status = 'failed'
170 return
171 if vnffgr.sdn_account:
172 sdn_acct_name = vnffgr.sdn_account
173 else:
174 self._log.error("SDN Account is not associated to create VNFFGR")
175 # TODO Fail the VNFFGR creation if SDN account is not associated
176 #vnffgr.operational_status = 'failed'
177 #msg = "SDN Account is not associated to create VNFFGR"
178 #raise VnffgrCreationFailed(msg)
179 sdn_account = [sdn_account.name for _,sdn_account in self._account.items()]
180 sdn_acct_name = sdn_account[0]
181 vnffgr.sdn_account = sdn_acct_name
182 sdn_plugin = self.get_sdn_plugin(sdn_acct_name)
183
184 for rsp in vnffgr.rsp:
185 vnffg = RwsdnalYang.YangData_RwProject_Project_Vnffgs_VnffgChain()
186 vnffg.name = rsp.name
187 vnffg.classifier_name = rsp.classifier_name
188
189 vnfr_list = list()
190 for index,cp_ref in enumerate(rsp.vnfr_connection_point_ref):
191 cpath = vnffg.vnf_chain_path.add()
192 cpath.order=cp_ref.hop_number
193 cpath.service_function_type = cp_ref.service_function_type
194 cpath.nsh_aware=True
195 cpath.transport_type = 'vxlan-gpe'
196
197 vnfr=cpath.vnfr_ids.add()
198 vnfr.vnfr_id = cp_ref.vnfr_id_ref
199 vnfr.vnfr_name = cp_ref.vnfr_name_ref
200 vnfr.mgmt_address = cp_ref.connection_point_params.mgmt_address
201 vnfr.mgmt_port = 5000
202 vnfr_list.append(vnfr)
203
204 vdu = vnfr.vdu_list.add()
205 vdu.name = cp_ref.connection_point_params.name
206 vdu.port_id = cp_ref.connection_point_params.port_id
207 vdu.vm_id = cp_ref.connection_point_params.vm_id
208 vdu.address = cp_ref.connection_point_params.address
209 vdu.port = cp_ref.connection_point_params.port
210
211 for sff in sff_list.values():
212 _sff = vnffg.sff.add()
213 _sff.from_dict(sff.as_dict())
214 if sff.function_type == 'SFF':
215 for vnfr in vnfr_list:
216 vnfr.sff_name = sff.name
217 self._log.debug("Recevied SFF %s, Created SFF is %s",sff, _sff)
218
219 self._log.debug("VNFFG chain msg is %s",vnffg)
220 rc,rs = sdn_plugin.create_vnffg_chain(self._account[sdn_acct_name],vnffg)
221 if rc != RwTypes.RwStatus.SUCCESS:
222 vnffgr.operational_status = 'failed'
223 msg = "Instantiation of VNFFGR with id {} failed".format(vnffgr.id)
224 raise VnffgrCreationFailed(msg)
225 rsp.rsp_id = rs
226 self._log.info("VNFFG chain created successfully for rsp with id %s",rsp.id)
227
228
229 meta = {}
230 if(len(classifier_list) == 2):
231 meta[vnffgr.classifier[0].id] = '0x' + ''.join(str("%0.2X"%int(i)) for i in vnffgr.classifier[1].ip_address.split('.'))
232 meta[vnffgr.classifier[1].id] = '0x' + ''.join(str("%0.2X"%int(i)) for i in vnffgr.classifier[0].ip_address.split('.'))
233
234 self._log.debug("VNFFG Meta VNFFG chain is {}".format(meta))
235
236 for classifier in classifier_list:
237 vnffgr_cl = [_classifier for _classifier in vnffgr.classifier if classifier.id == _classifier.id]
238 if len(vnffgr_cl) > 0:
239 cl_rsp_name = vnffgr_cl[0].rsp_name
240 rsp_ids = [rsp.rsp_id for rsp in vnffgr.rsp if rsp.name == cl_rsp_name]
241 self._log.debug("Received RSP id for Cl is %s",rsp_ids)
242 else:
243 self._log.error("No RSP wiht name %s found; Skipping classifier %s creation",classifier.rsp_id_ref,classifier.name)
244 continue
245 vnffgcl = RwsdnalYang.YangData_RwProject_Project_VnffgClassifiers_VnffgClassifier()
246 vnffgcl.name = classifier.name
247 vnffgcl.rsp_name = cl_rsp_name
248 vnffgcl.rsp_id = rsp_ids[0]
249 vnffgcl.port_id = vnffgr_cl[0].port_id
250 vnffgcl.vm_id = vnffgr_cl[0].vm_id
251 # Get the symmetric classifier endpoint ip and set it in nsh ctx1
252
253 vnffgcl.vnffg_metadata.ctx1 = meta.get(vnffgr_cl[0].id,'0')
254 vnffgcl.vnffg_metadata.ctx2 = '0'
255 vnffgcl.vnffg_metadata.ctx3 = '0'
256 vnffgcl.vnffg_metadata.ctx4 = '0'
257 if vnffgr_cl[0].has_field('sff_name'):
258 vnffgcl.sff_name = vnffgr_cl[0].sff_name
259 for index,match_rule in enumerate(classifier.match_attributes):
260 acl = vnffgcl.match_attributes.add()
261 #acl.name = vnffgcl.name + str(index)
262 acl.name = match_rule.id
263 acl.ip_proto = match_rule.ip_proto
264 if match_rule.source_ip_address:
265 acl.source_ip_address = match_rule.source_ip_address + '/32'
266 acl.source_port = match_rule.source_port
267 if match_rule.destination_ip_address:
268 acl.destination_ip_address = match_rule.destination_ip_address + '/32'
269 acl.destination_port = match_rule.destination_port
270
271 self._log.debug(" Creating VNFFG Classifier Classifier %s for RSP: %s",vnffgcl.name,vnffgcl.rsp_name)
272 rc,rs = sdn_plugin.create_vnffg_classifier(self._account[sdn_acct_name],vnffgcl)
273 if rc != RwTypes.RwStatus.SUCCESS:
274 self._log.error("VNFFG Classifier cretaion failed for Classifier %s for RSP ID: %s",classifier.name,classifier.rsp_id_ref)
275 #vnffgr.operational_status = 'failed'
276 #msg = "Instantiation of VNFFGR with id {} failed".format(vnffgr.id)
277 #raise VnffgrCreationFailed(msg)
278 else:
279 vnffgr_cl[0].classifier_id = rs
280
281 vnffgr.operational_status = 'running'
282 sdn_acct = self.get_sdn_account(vnffgr.sdn_account)
283 self._log.debug("SDN account received during vnffg update is %s",sdn_acct)
284 if sdn_acct.account_type != 'openstack':
285 self.update_vnffgrs(vnffgr.sdn_account)
286 return vnffgr
287
288 def update_vnffgrs(self,sdn_acct_name):
289 """
290 Update VNFFGR by reading data from SDN Plugin
291 """
292 sdn_plugin = self.get_sdn_plugin(sdn_acct_name)
293 rc,rs = sdn_plugin.get_vnffg_rendered_paths(self._account[sdn_acct_name])
294 if rc != RwTypes.RwStatus.SUCCESS:
295 msg = "Reading of VNFFGR from SDN Plugin failed"
296 raise VnffgrUpdateFailed(msg)
297
298 vnffgr_list = [_vnffgr for _vnffgr in self._vnffgr_list.values() if _vnffgr.sdn_account == sdn_acct_name and _vnffgr.operational_status == 'running']
299
300 for _vnffgr in vnffgr_list:
301 for _vnffgr_rsp in _vnffgr.rsp:
302 vnffg_rsp_list = [vnffg_rsp for vnffg_rsp in rs.vnffg_rendered_path if vnffg_rsp.name == _vnffgr_rsp.name]
303 if vnffg_rsp_list is not None and len(vnffg_rsp_list) > 0:
304 vnffg_rsp = vnffg_rsp_list[0]
305 if len(vnffg_rsp.rendered_path_hop) != len(_vnffgr_rsp.vnfr_connection_point_ref):
306 _vnffgr.operational_status = 'failed'
307 self._log.error("Received hop count %d doesnt match the VNFFGD hop count %d", len(vnffg_rsp.rendered_path_hop),
308 len(_vnffgr_rsp.vnfr_connection_point_ref))
309 msg = "Fetching of VNFFGR with id {} failed".format(_vnffgr.id)
310 raise VnffgrUpdateFailed(msg)
311 _vnffgr_rsp.path_id = vnffg_rsp.path_id
312 for index, rendered_hop in enumerate(vnffg_rsp.rendered_path_hop):
313 for vnfr_cp_ref in _vnffgr_rsp.vnfr_connection_point_ref:
314 if rendered_hop.vnfr_name == vnfr_cp_ref.vnfr_name_ref:
315 vnfr_cp_ref.hop_number = rendered_hop.hop_number
316 vnfr_cp_ref.service_index = rendered_hop.service_index
317 vnfr_cp_ref.service_function_forwarder.name = rendered_hop.service_function_forwarder.name
318 vnfr_cp_ref.service_function_forwarder.ip_address = rendered_hop.service_function_forwarder.ip_address
319 vnfr_cp_ref.service_function_forwarder.port = rendered_hop.service_function_forwarder.port
320 else:
321 _vnffgr.operational_status = 'failed'
322 self._log.error("VNFFGR RSP with name %s in VNFFG %s not found",_vnffgr_rsp.name, _vnffgr.id)
323 msg = "Fetching of VNFFGR with name {} failed".format(_vnffgr_rsp.name)
324 raise VnffgrUpdateFailed(msg)
325
326
327 def terminate_vnffgr(self,vnffgr_id,sdn_account_name = None):
328 """
329 Deletet the VNFFG chain
330 """
331 if vnffgr_id not in self._vnffgr_list:
332 self._log.error("VNFFGR with id %s not present in VNFFGMgr during termination", vnffgr_id)
333 msg = "VNFFGR with id {} not present in VNFFGMgr during termination".format(vnffgr_id)
334 return
335 #raise VnffgrDoesNotExist(msg)
336 self._log.info("Received VNFFG chain terminate for id %s",vnffgr_id)
337 if sdn_account_name is None:
338 sdn_account = [sdn_account.name for _,sdn_account in self._account.items()]
339 sdn_account_name = sdn_account[0]
340 sdn_plugin = self.get_sdn_plugin(sdn_account_name)
341 vnffgr = self._vnffgr_list[vnffgr_id]
342 sdn_acct = self.get_sdn_account(vnffgr.sdn_account)
343 self._log.debug("SDN account received during vnffg update is %s",sdn_acct)
344 if sdn_acct.account_type == 'openstack':
345 for rsp in vnffgr.rsp:
346 sdn_plugin.terminate_vnffg_chain(self._account[sdn_account_name],rsp.rsp_id)
347 for classifier in vnffgr.classifier:
348 sdn_plugin.terminate_vnffg_classifier(self._account[sdn_account_name],classifier.classifier_id)
349 else:
350 sdn_plugin.terminate_vnffg_chain(self._account[sdn_account_name],vnffgr_id)
351 sdn_plugin.terminate_vnffg_classifier(self._account[sdn_account_name],vnffgr_id)
352 del self._vnffgr_list[vnffgr_id]
353
354 class SDNAccountDtsHandler(object):
355 XPATH = "C,/rw-sdn:sdn/rw-sdn:account"
356
357 def __init__(self, dts, log, parent):
358 self._dts = dts
359 self._log = log
360 self._parent = parent
361 self._project = self._parent._project
362
363 self._sdn_account = {}
364 self._reg = None
365
366 def _set_sdn_account(self, account):
367 self._log.info("Setting sdn account: {}".format(account))
368 if account.name in self._sdn_account:
369 self._log.error("SDN Account with name %s already exists. Ignoring config", account.name);
370 self._sdn_account[account.name] = account
371 self._parent.set_sdn_account(account)
372
373 def _del_sdn_account(self, account_name):
374 self._log.info("Deleting sdn account: {}".format(account_name))
375 del self._sdn_account[account_name]
376
377 self._parent.del_sdn_account(account_name)
378
379 def _update_sdn_account(self, account):
380 self._log.info("Updating sdn account: {}".format(account))
381 # No need to update locally saved sdn_account's updated fields, as they
382 # are not used anywhere. Call the parent's update callback.
383 self._parent.update_sdn_account(account)
384
385 @asyncio.coroutine
386 def register(self):
387 def apply_config(dts, acg, xact, action, _):
388 self._log.debug("Got sdn account apply config (xact: %s) (action: %s)", xact, action)
389 if xact.id is None:
390 if action == rwdts.AppconfAction.INSTALL:
391 curr_cfg = self._reg.elements
392 for cfg in curr_cfg:
393 self._log.info("Config Agent Account {} being re-added after restart.".
394 format(cfg.name))
395 self._set_sdn_account(cfg)
396 else:
397 self._log.debug("No xact handle. Skipping apply config")
398 return RwTypes.RwStatus.SUCCESS
399
400 return RwTypes.RwStatus.SUCCESS
401
402 @asyncio.coroutine
403 def on_prepare(dts, acg, xact, xact_info, ks_path, msg, scratch):
404 """ Prepare callback from DTS for SDN Account config """
405
406 self._log.info("SDN Cloud account config received: %s", msg)
407
408 fref = ProtobufC.FieldReference.alloc()
409 fref.goto_whole_message(msg.to_pbcm())
410
411 if fref.is_field_deleted():
412 # Delete the sdn account record
413 self._del_sdn_account(msg.name)
414 else:
415 # If the account already exists, then this is an update.
416 if msg.name in self._sdn_account:
417 self._log.debug("SDN account already exists. Invoking on_prepare update request")
418 if msg.has_field("account_type"):
419 errmsg = "Cannot update SDN account's account-type."
420 self._log.error(errmsg)
421 xact_info.send_error_xpath(
422 RwTypes.RwStatus.FAILURE,
423 self._project.add_project(SDNAccountDtsHandler.XPATH),
424 errmsg
425 )
426 raise SdnAccountError(errmsg)
427
428 # Update the sdn account record
429 self._update_sdn_account(msg)
430 else:
431 self._log.debug("SDN account does not already exist. Invoking on_prepare add request")
432 if not msg.has_field('account_type'):
433 errmsg = "New SDN account must contain account-type field."
434 self._log.error(errmsg)
435 xact_info.send_error_xpath(
436 RwTypes.RwStatus.FAILURE,
437 self._project.add_project(SDNAccountDtsHandler.XPATH),
438 errmsg
439 )
440 raise SdnAccountError(errmsg)
441
442 # Set the sdn account record
443 self._set_sdn_account(msg)
444
445 xact_info.respond_xpath(rwdts.XactRspCode.ACK)
446
447
448 xpath = self._project.add_project(SDNAccountDtsHandler.XPATH)
449 self._log.debug("Registering for Sdn Account config using xpath: {}".
450 format(xpath))
451
452 acg_handler = rift.tasklets.AppConfGroup.Handler(
453 on_apply=apply_config,
454 )
455
456 with self._dts.appconf_group_create(acg_handler) as acg:
457 self._reg = acg.register(
458 xpath=xpath,
459 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.DELTA_READY,
460 on_prepare=on_prepare
461 )
462
463 def deregister(self):
464 self._log.debug("De-register SDN Account handler in vnffg for project".
465 format(self._project.name))
466 self._reg.deregister()
467 self._reg = None