2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
20 import ncclient
.asyncio_manager
32 from collections
import deque
33 from collections
import defaultdict
37 gi
.require_version('RwYang', '1.0')
38 gi
.require_version('RwNsdYang', '1.0')
39 gi
.require_version('RwDts', '1.0')
40 gi
.require_version('RwNsmYang', '1.0')
41 gi
.require_version('RwNsrYang', '1.0')
42 gi
.require_version('RwTypes', '1.0')
43 gi
.require_version('RwVlrYang', '1.0')
44 gi
.require_version('RwVnfrYang', '1.0')
45 from gi
.repository
import (
61 import rift
.mano
.ncclient
62 import rift
.mano
.config_data
.config
63 import rift
.mano
.dts
as mano_dts
65 from . import rwnsm_conman
as conman
67 from . import publisher
69 from . import config_value_pool
70 from . import rwvnffgmgr
71 from . import scale_group
74 class NetworkServiceRecordState(Enum
):
75 """ Network Service Record State """
79 VNFFG_INIT_PHASE
= 104
85 VL_TERMINATE_PHASE
= 111
86 VNF_TERMINATE_PHASE
= 112
87 VNFFG_TERMINATE_PHASE
= 113
94 class NetworkServiceRecordError(Exception):
95 """ Network Service Record Error """
99 class NetworkServiceDescriptorError(Exception):
100 """ Network Service Descriptor Error """
104 class VirtualNetworkFunctionRecordError(Exception):
105 """ Virtual Network Function Record Error """
109 class NetworkServiceDescriptorNotFound(Exception):
110 """ Cannot find Network Service Descriptor"""
114 class NetworkServiceDescriptorRefCountExists(Exception):
115 """ Network Service Descriptor reference count exists """
119 class NetworkServiceDescriptorUnrefError(Exception):
120 """ Failed to unref a network service descriptor """
124 class NsrInstantiationFailed(Exception):
125 """ Failed to instantiate network service """
129 class VnfInstantiationFailed(Exception):
130 """ Failed to instantiate virtual network function"""
134 class VnffgInstantiationFailed(Exception):
135 """ Failed to instantiate virtual network function"""
139 class VnfDescriptorError(Exception):
140 """Failed to instantiate virtual network function"""
144 class ScalingOperationError(Exception):
148 class ScaleGroupMissingError(Exception):
152 class PlacementGroupError(Exception):
156 class NsrNsdUpdateError(Exception):
160 class NsrVlUpdateError(NsrNsdUpdateError
):
164 class VlRecordState(Enum
):
165 """ VL Record State """
167 INSTANTIATION_PENDING
= 102
169 TERMINATE_PENDING
= 104
174 class VnffgRecordState(Enum
):
175 """ VNFFG Record State """
177 INSTANTIATION_PENDING
= 102
179 TERMINATE_PENDING
= 104
184 class VnffgRecord(object):
185 """ Vnffg Records class"""
188 def __init__(self
, dts
, log
, loop
, vnffgmgr
, nsr
, nsr_name
, vnffgd_msg
, sdn_account_name
):
193 self
._vnffgmgr
= vnffgmgr
195 self
._nsr
_name
= nsr_name
196 self
._vnffgd
_msg
= vnffgd_msg
197 if sdn_account_name
is None:
198 self
._sdn
_account
_name
= ''
200 self
._sdn
_account
_name
= sdn_account_name
202 self
._vnffgr
_id
= str(uuid
.uuid4())
203 self
._vnffgr
_rsp
_id
= list()
204 self
._vnffgr
_state
= VnffgRecordState
.INIT
209 return self
._vnffgr
_id
213 """ state of this VNF """
214 return self
._vnffgr
_state
216 def fetch_vnffgr(self
):
218 Get VNFFGR message to be published
221 if self
._vnffgr
_state
== VnffgRecordState
.INIT
:
222 vnffgr_dict
= {"id": self
._vnffgr
_id
,
223 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
224 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
225 "sdn_account": self
._sdn
_account
_name
,
226 "operational_status": 'init',
228 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
229 elif self
._vnffgr
_state
== VnffgRecordState
.TERMINATED
:
230 vnffgr_dict
= {"id": self
._vnffgr
_id
,
231 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
232 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
233 "sdn_account": self
._sdn
_account
_name
,
234 "operational_status": 'terminated',
236 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
239 vnffgr
= self
._vnffgmgr
.fetch_vnffgr(self
._vnffgr
_id
)
241 self
._log
.exception("Fetching VNFFGR for VNFFG with id %s failed", self
._vnffgr
_id
)
242 self
._vnffgr
_state
= VnffgRecordState
.FAILED
243 vnffgr_dict
= {"id": self
._vnffgr
_id
,
244 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
245 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
246 "sdn_account": self
._sdn
_account
_name
,
247 "operational_status": 'failed',
249 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
254 def vnffgr_create_msg(self
):
255 """ Virtual Link Record message for Creating VLR in VNS """
256 vnffgr_dict
= {"id": self
._vnffgr
_id
,
257 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
258 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
259 "sdn_account": self
._sdn
_account
_name
,
261 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
262 for rsp
in self
._vnffgd
_msg
.rsp
:
263 vnffgr_rsp
= vnffgr
.rsp
.add()
264 vnffgr_rsp
.id = str(uuid
.uuid4())
265 vnffgr_rsp
.name
= self
._nsr
.name
+ '.' + rsp
.name
266 self
._vnffgr
_rsp
_id
.append(vnffgr_rsp
.id)
267 vnffgr_rsp
.vnffgd_rsp_id_ref
= rsp
.id
268 vnffgr_rsp
.vnffgd_rsp_name_ref
= rsp
.name
269 for rsp_cp_ref
in rsp
.vnfd_connection_point_ref
:
270 vnfd
= [vnfr
.vnfd
for vnfr
in self
._nsr
.vnfrs
.values() if vnfr
.vnfd
.id == rsp_cp_ref
.vnfd_id_ref
]
271 self
._log
.debug("VNFD message during VNFFG instantiation is %s",vnfd
)
272 if len(vnfd
) > 0 and vnfd
[0].has_field('service_function_type'):
273 self
._log
.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref
.vnfd_id_ref
, vnfd
[0].service_function_type
)
275 self
._log
.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref
.vnfd_id_ref
)
278 vnfr_cp_ref
= vnffgr_rsp
.vnfr_connection_point_ref
.add()
279 vnfr_cp_ref
.member_vnf_index_ref
= rsp_cp_ref
.member_vnf_index_ref
280 vnfr_cp_ref
.hop_number
= rsp_cp_ref
.order
281 vnfr_cp_ref
.vnfd_id_ref
=rsp_cp_ref
.vnfd_id_ref
282 vnfr_cp_ref
.service_function_type
= vnfd
[0].service_function_type
283 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
284 if (nsr_vnfr
.vnfd
.id == vnfr_cp_ref
.vnfd_id_ref
and
285 nsr_vnfr
.member_vnf_index
== vnfr_cp_ref
.member_vnf_index_ref
):
286 vnfr_cp_ref
.vnfr_id_ref
= nsr_vnfr
.id
287 vnfr_cp_ref
.vnfr_name_ref
= nsr_vnfr
.name
288 vnfr_cp_ref
.vnfr_connection_point_ref
= rsp_cp_ref
.vnfd_connection_point_ref
290 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
291 self
._log
.debug(" Received VNFR is %s", vnfr
)
292 while vnfr
.operational_status
!= 'running':
293 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
294 if vnfr
.operational_status
== 'failed':
295 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
296 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
297 yield from asyncio
.sleep(2, loop
=self
._loop
)
298 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
299 self
._log
.debug("Received VNFR is %s", vnfr
)
301 vnfr_cp_ref
.connection_point_params
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
302 for cp
in vnfr
.connection_point
:
303 if cp
.name
== vnfr_cp_ref
.vnfr_connection_point_ref
:
304 vnfr_cp_ref
.connection_point_params
.port_id
= cp
.connection_point_id
305 vnfr_cp_ref
.connection_point_params
.name
= self
._nsr
.name
+ '.' + cp
.name
306 for vdu
in vnfr
.vdur
:
307 for ext_intf
in vdu
.external_interface
:
308 if ext_intf
.name
== vnfr_cp_ref
.vnfr_connection_point_ref
:
309 vnfr_cp_ref
.connection_point_params
.vm_id
= vdu
.vim_id
310 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",cp
.name
,nsr_vnfr
.id,
311 vnfr_cp_ref
.connection_point_params
.vm_id
)
314 vnfr_cp_ref
.connection_point_params
.address
= cp
.ip_address
315 vnfr_cp_ref
.connection_point_params
.port
= VnffgRecord
.SFF_DP_PORT
317 for vnffgd_classifier
in self
._vnffgd
_msg
.classifier
:
318 _rsp
= [rsp
for rsp
in vnffgr
.rsp
if rsp
.vnffgd_rsp_id_ref
== vnffgd_classifier
.rsp_id_ref
]
320 rsp_id_ref
= _rsp
[0].id
321 rsp_name
= _rsp
[0].name
323 self
._log
.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier
.rsp_id_ref
,vnffgd_classifier
.id)
325 vnffgr_classifier
= vnffgr
.classifier
.add()
326 vnffgr_classifier
.id = vnffgd_classifier
.id
327 vnffgr_classifier
.name
= self
._nsr
.name
+ '.' + vnffgd_classifier
.name
328 _rsp
[0].classifier_name
= vnffgr_classifier
.name
329 vnffgr_classifier
.rsp_id_ref
= rsp_id_ref
330 vnffgr_classifier
.rsp_name
= rsp_name
331 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
332 if (nsr_vnfr
.vnfd
.id == vnffgd_classifier
.vnfd_id_ref
and
333 nsr_vnfr
.member_vnf_index
== vnffgd_classifier
.member_vnf_index_ref
):
334 vnffgr_classifier
.vnfr_id_ref
= nsr_vnfr
.id
335 vnffgr_classifier
.vnfr_name_ref
= nsr_vnfr
.name
336 vnffgr_classifier
.vnfr_connection_point_ref
= vnffgd_classifier
.vnfd_connection_point_ref
338 if nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER':
339 vnffgr_classifier
.sff_name
= nsr_vnfr
.name
341 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
342 self
._log
.debug(" Received VNFR is %s", vnfr
)
343 while vnfr
.operational_status
!= 'running':
344 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
345 if vnfr
.operational_status
== 'failed':
346 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
347 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
348 yield from asyncio
.sleep(2, loop
=self
._loop
)
349 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
350 self
._log
.debug("Received VNFR is %s", vnfr
)
352 for cp
in vnfr
.connection_point
:
353 if cp
.name
== vnffgr_classifier
.vnfr_connection_point_ref
:
354 vnffgr_classifier
.port_id
= cp
.connection_point_id
355 vnffgr_classifier
.ip_address
= cp
.ip_address
356 for vdu
in vnfr
.vdur
:
357 for ext_intf
in vdu
.external_interface
:
358 if ext_intf
.name
== vnffgr_classifier
.vnfr_connection_point_ref
:
359 vnffgr_classifier
.vm_id
= vdu
.vim_id
360 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",cp
.name
,nsr_vnfr
.id,
361 vnfr_cp_ref
.connection_point_params
.vm_id
)
364 self
._log
.info("VNFFGR msg to be sent is %s", vnffgr
)
368 def vnffgr_nsr_sff_list(self
):
369 """ SFF List for VNFR """
371 sf_list
= [nsr_vnfr
.name
for nsr_vnfr
in self
._nsr
.vnfrs
.values() if nsr_vnfr
.vnfd
.service_function_chain
== 'SF']
373 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
374 if (nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER' or nsr_vnfr
.vnfd
.service_function_chain
== 'SFF'):
375 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
376 self
._log
.debug(" Received VNFR is %s", vnfr
)
377 while vnfr
.operational_status
!= 'running':
378 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
379 if vnfr
.operational_status
== 'failed':
380 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
381 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
382 yield from asyncio
.sleep(2, loop
=self
._loop
)
383 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
384 self
._log
.debug("Received VNFR is %s", vnfr
)
386 sff
= RwsdnYang
.VNFFGSff()
387 sff_list
[nsr_vnfr
.vnfd
.id] = sff
388 sff
.name
= nsr_vnfr
.name
389 sff
.function_type
= nsr_vnfr
.vnfd
.service_function_chain
391 sff
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
392 sff
.mgmt_port
= VnffgRecord
.SFF_MGMT_PORT
393 for cp
in vnfr
.connection_point
:
394 sff_dp
= sff
.dp_endpoints
.add()
395 sff_dp
.name
= self
._nsr
.name
+ '.' + cp
.name
396 sff_dp
.address
= cp
.ip_address
397 sff_dp
.port
= VnffgRecord
.SFF_DP_PORT
398 if nsr_vnfr
.vnfd
.service_function_chain
== 'SFF':
399 for sf_name
in sf_list
:
400 _sf
= sff
.vnfr_list
.add()
401 _sf
.vnfr_name
= sf_name
406 def instantiate(self
):
407 """ Instantiate this VNFFG """
409 self
._log
.info("Instaniating VNFFGR with vnffgd %s",
413 vnffgr_request
= yield from self
.vnffgr_create_msg()
414 vnffg_sff_list
= yield from self
.vnffgr_nsr_sff_list()
417 vnffgr
= self
._vnffgmgr
.create_vnffgr(vnffgr_request
,self
._vnffgd
_msg
.classifier
,vnffg_sff_list
)
418 except Exception as e
:
419 self
._log
.exception("VNFFG instantiation failed: %s", str(e
))
420 self
._vnffgr
_state
= VnffgRecordState
.FAILED
421 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self
.id, vnffgr_request
.id))
423 self
._vnffgr
_state
= VnffgRecordState
.INSTANTIATION_PENDING
425 self
._log
.info("Instantiated VNFFGR :%s", vnffgr
)
426 self
._vnffgr
_state
= VnffgRecordState
.ACTIVE
428 self
._log
.info("Invoking update_state to update NSR state for NSR ID: %s", self
._nsr
.id)
429 yield from self
._nsr
.update_state()
431 def vnffgr_in_vnffgrm(self
):
432 """ Is there a VNFR record in VNFM """
433 if (self
._vnffgr
_state
== VnffgRecordState
.ACTIVE
or
434 self
._vnffgr
_state
== VnffgRecordState
.INSTANTIATION_PENDING
or
435 self
._vnffgr
_state
== VnffgRecordState
.FAILED
):
442 """ Terminate this VNFFGR """
443 if not self
.vnffgr_in_vnffgrm():
444 self
._log
.error("Ignoring terminate request for id %s in state %s",
445 self
.id, self
._vnffgr
_state
)
448 self
._log
.info("Terminating VNFFGR id:%s", self
.id)
449 self
._vnffgr
_state
= VnffgRecordState
.TERMINATE_PENDING
451 self
._vnffgmgr
.terminate_vnffgr(self
._vnffgr
_id
)
453 self
._vnffgr
_state
= VnffgRecordState
.TERMINATED
454 self
._log
.debug("Terminated VNFFGR id:%s", self
.id)
457 class VirtualLinkRecord(object):
458 """ Virtual Link Records class"""
459 XPATH
= "D,/vlr:vlr-catalog/vlr:vlr"
462 def create_record(dts
, log
, loop
, nsr_name
, vld_msg
, cloud_account_name
, om_datacenter
, ip_profile
, nsr_id
, restart_mode
=False):
463 """Creates a new VLR object based on the given data.
465 If restart mode is enabled, then we look for existing records in the
466 DTS and create a VLR records using the exiting data(ID)
471 vlr_obj
= VirtualLinkRecord(
484 res_iter
= yield from dts
.query_read(
485 "D,/vlr:vlr-catalog/vlr:vlr",
486 rwdts
.XactFlag
.MERGE
)
489 response
= yield from fut
490 vlr
= response
.result
492 # Check if the record is already present, if so use the ID of
493 # the existing record. Since the name of the record is uniquely
494 # formed we can use it as a search key!
495 if vlr
.name
== vlr_obj
.name
:
496 vlr_obj
.reset_id(vlr
.id)
501 def __init__(self
, dts
, log
, loop
, nsr_name
, vld_msg
, cloud_account_name
, om_datacenter
, ip_profile
, nsr_id
):
505 self
._nsr
_name
= nsr_name
506 self
._vld
_msg
= vld_msg
507 self
._cloud
_account
_name
= cloud_account_name
508 self
._om
_datacenter
_name
= om_datacenter
509 self
._assigned
_subnet
= None
510 self
._nsr
_id
= nsr_id
511 self
._ip
_profile
= ip_profile
512 self
._vlr
_id
= str(uuid
.uuid4())
513 self
._state
= VlRecordState
.INIT
514 self
._prev
_state
= None
515 self
._create
_time
= int(time
.time())
519 """ path for this object """
520 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
._vlr
_id
)
529 """ Get NSR name for this VL """
534 """ Virtual Link Desciptor """
538 def assigned_subnet(self
):
539 """ Subnet assigned to this VL"""
540 return self
._assigned
_subnet
545 Get the name for this VLR.
546 VLR name is "nsr name:VLD name"
548 if self
.vld_msg
.vim_network_name
:
549 return self
.vld_msg
.vim_network_name
550 elif self
.vld_msg
.name
== "multisite":
551 # This is a temporary hack to identify manually provisioned inter-site network
552 return self
.vld_msg
.name
554 return self
._nsr
_name
+ "." + self
.vld_msg
.name
557 def cloud_account_name(self
):
558 """ Cloud account that this VLR should be created in """
559 return self
._cloud
_account
_name
562 def om_datacenter_name(self
):
563 """ Datacenter that this VLR should be created in """
564 return self
._om
_datacenter
_name
568 """ Get the VLR path from VLR """
569 return (VirtualLinkRecord
.XPATH
+ "[vlr:id = '{}']").format(vlr
.id)
577 def state(self
, value
):
578 """ VLR set state """
582 def prev_state(self
):
583 """ VLR previous state """
584 return self
._prev
_state
587 def prev_state(self
, value
):
588 """ VLR set previous state """
589 self
._prev
_state
= value
593 """ Virtual Link Record message for Creating VLR in VNS """
594 vld_fields
= ["short_name",
602 vld_copy_dict
= {k
: v
for k
, v
in self
.vld_msg
.as_dict().items()
605 vlr_dict
= {"id": self
._vlr
_id
,
606 "nsr_id_ref": self
._nsr
_id
,
607 "vld_ref": self
.vld_msg
.id,
609 "create_time": self
._create
_time
,
610 "cloud_account": self
.cloud_account_name
,
611 "om_datacenter": self
.om_datacenter_name
,
614 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
615 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
617 vlr_dict
.update(vld_copy_dict
)
618 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
621 def reset_id(self
, vlr_id
):
622 self
._vlr
_id
= vlr_id
624 def create_nsr_vlr_msg(self
, vnfrs
):
625 """ The VLR message"""
626 nsr_vlr
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr()
627 nsr_vlr
.vlr_ref
= self
._vlr
_id
628 nsr_vlr
.assigned_subnet
= self
.assigned_subnet
629 nsr_vlr
.cloud_account
= self
.cloud_account_name
630 nsr_vlr
.om_datacenter
= self
.om_datacenter_name
632 for conn
in self
.vld_msg
.vnfd_connection_point_ref
:
634 if (vnfr
.vnfd
.id == conn
.vnfd_id_ref
and
635 vnfr
.member_vnf_index
== conn
.member_vnf_index_ref
and
636 self
.cloud_account_name
== vnfr
.cloud_account_name
and
637 self
.om_datacenter_name
== vnfr
.om_datacenter_name
):
638 cp_entry
= nsr_vlr
.vnfr_connection_point_ref
.add()
639 cp_entry
.vnfr_id
= vnfr
.id
640 cp_entry
.connection_point
= conn
.vnfd_connection_point_ref
645 def instantiate(self
):
646 """ Instantiate this VL """
647 self
._log
.debug("Instaniating VLR key %s, vld %s",
648 self
.xpath
, self
._vld
_msg
)
650 self
._state
= VlRecordState
.INSTANTIATION_PENDING
651 self
._log
.debug("Executing VL create path:%s msg:%s",
652 self
.xpath
, self
.vlr_msg
)
654 with self
._dts
.transaction(flags
=0) as xact
:
655 block
= xact
.block_create()
656 block
.add_query_create(self
.xpath
, self
.vlr_msg
)
657 self
._log
.debug("Executing VL create path:%s msg:%s",
658 self
.xpath
, self
.vlr_msg
)
659 res_iter
= yield from block
.execute(now
=True)
665 self
._state
= VlRecordState
.FAILED
666 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self
.id)
668 if vlr
.operational_status
== 'failed':
669 self
._log
.debug("NS Id:%s VL creation failed for vlr id %s", self
.id, vlr
.id)
670 self
._state
= VlRecordState
.FAILED
671 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr
.id, vlr
.operational_status_details
))
673 self
._log
.info("Instantiated VL with xpath %s and vlr:%s",
675 self
._state
= VlRecordState
.ACTIVE
676 self
._assigned
_subnet
= vlr
.assigned_subnet
678 def vlr_in_vns(self
):
679 """ Is there a VLR record in VNS """
680 if (self
._state
== VlRecordState
.ACTIVE
or
681 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
682 self
._state
== VlRecordState
.TERMINATE_PENDING
or
683 self
._state
== VlRecordState
.FAILED
):
690 """ Terminate this VL """
691 if not self
.vlr_in_vns():
692 self
._log
.debug("Ignoring terminate request for id %s in state %s",
693 self
.id, self
._state
)
696 self
._log
.debug("Terminating VL id:%s", self
.id)
697 self
._state
= VlRecordState
.TERMINATE_PENDING
699 with self
._dts
.transaction(flags
=0) as xact
:
700 block
= xact
.block_create()
701 block
.add_query_delete(self
.xpath
)
702 yield from block
.execute(flags
=0, now
=True)
704 self
._state
= VlRecordState
.TERMINATED
705 self
._log
.debug("Terminated VL id:%s", self
.id)
708 class VnfRecordState(Enum
):
709 """ Vnf Record State """
711 INSTANTIATION_PENDING
= 102
713 TERMINATE_PENDING
= 104
718 class VirtualNetworkFunctionRecord(object):
719 """ Virtual Network Function Record class"""
720 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
724 def create_record(dts
, log
, loop
, vnfd
, const_vnfd_msg
, nsd_id
, nsr_name
,
725 cloud_account_name
, om_datacenter_name
, nsr_id
, group_name
, group_instance_id
,
726 placement_groups
, restart_mode
=False):
727 """Creates a new VNFR object based on the given data.
729 If restart mode is enabled, then we look for existing records in the
730 DTS and create a VNFR records using the exiting data(ID)
733 VirtualNetworkFunctionRecord
735 vnfr_obj
= VirtualNetworkFunctionRecord(
749 restart_mode
=restart_mode
)
752 res_iter
= yield from dts
.query_read(
753 "D,/vnfr:vnfr-catalog/vnfr:vnfr",
754 rwdts
.XactFlag
.MERGE
)
757 response
= yield from fut
758 vnfr
= response
.result
760 if vnfr
.name
== vnfr_obj
.name
:
761 vnfr_obj
.reset_id(vnfr
.id)
778 group_instance_id
=None,
779 placement_groups
= [],
780 restart_mode
= False):
785 self
._const
_vnfd
_msg
= const_vnfd_msg
786 self
._nsd
_id
= nsd_id
787 self
._nsr
_name
= nsr_name
788 self
._nsr
_id
= nsr_id
789 self
._cloud
_account
_name
= cloud_account_name
790 self
._om
_datacenter
_name
= om_datacenter_name
791 self
._group
_name
= group_name
792 self
._group
_instance
_id
= group_instance_id
793 self
._placement
_groups
= placement_groups
794 self
._config
_status
= NsrYang
.ConfigStates
.INIT
795 self
._create
_time
= int(time
.time())
797 self
._prev
_state
= VnfRecordState
.INIT
798 self
._state
= VnfRecordState
.INIT
799 self
._state
_failed
_reason
= None
801 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
804 self
._vnfr
_id
= str(uuid
.uuid4())
806 self
._vnfr
_msg
= self
.create_vnfr_msg()
807 self
._log
.debug("Set VNFR {} config type to {}".
808 format(self
.name
, self
.config_type
))
809 self
.restart_mode
= restart_mode
812 if group_name
is None and group_instance_id
is not None:
813 raise ValueError("Group instance id must not be provided with an empty group name")
823 return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self
.id)
828 return self
._vnfr
_msg
831 def const_vnfr_msg(self
):
833 return RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id
=self
.id,cloud_account
=self
.cloud_account_name
,om_datacenter
=self
._om
_datacenter
_name
)
841 def cloud_account_name(self
):
842 """ Cloud account that this VNF should be created in """
843 return self
._cloud
_account
_name
846 def om_datacenter_name(self
):
847 """ Datacenter that this VNF should be created in """
848 return self
._om
_datacenter
_name
853 """ Is this VNF actve """
854 return True if self
._state
== VnfRecordState
.ACTIVE
else False
858 """ state of this VNF """
862 def state_failed_reason(self
):
863 """ Error message in case this VNF is in failed state """
864 return self
._state
_failed
_reason
867 def member_vnf_index(self
):
868 """ Member VNF index """
869 return self
._const
_vnfd
_msg
.member_vnf_index
874 return self
._nsr
_name
878 """ Name of this VNFR """
879 if self
._name
is not None:
882 name_tags
= [self
._nsr
_name
]
884 if self
._group
_name
is not None:
885 name_tags
.append(self
._group
_name
)
887 if self
._group
_instance
_id
is not None:
888 name_tags
.append(str(self
._group
_instance
_id
))
890 name_tags
.extend([self
.vnfd
.name
, str(self
.member_vnf_index
)])
892 self
._name
= "__".join(name_tags
)
897 def vnfr_xpath(vnfr
):
898 """ Get the VNFR path from VNFR """
899 return (VirtualNetworkFunctionRecord
.XPATH
+ "[vnfr:id = '{}']").format(vnfr
.id)
902 def config_type(self
):
903 cfg_types
= ['netconf', 'juju', 'script']
904 for method
in cfg_types
:
905 if self
._vnfd
.vnf_configuration
.has_field(method
):
910 def config_status(self
):
911 """Return the config status as YANG ENUM string"""
912 self
._log
.debug("Map VNFR {} config status {} ({})".
913 format(self
.name
, self
._config
_status
, self
.config_type
))
914 if self
.config_type
== 'none':
915 return 'config_not_needed'
916 elif self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
918 elif self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
923 def set_state(self
, state
):
924 """ set the state of this object """
925 self
._prev
_state
= self
._state
928 def reset_id(self
, vnfr_id
):
929 self
._vnfr
_id
= vnfr_id
930 self
._vnfr
_msg
= self
.create_vnfr_msg()
933 self
.config_store
.merge_vnfd_config(
936 self
.member_vnf_index
,
939 def create_vnfr_msg(self
):
940 """ VNFR message for this VNFR """
948 vnfd_copy_dict
= {k
: v
for k
, v
in self
._vnfd
.as_dict().items() if k
in vnfd_fields
}
951 "nsr_id_ref": self
._nsr
_id
,
953 "cloud_account": self
._cloud
_account
_name
,
954 "om_datacenter": self
._om
_datacenter
_name
,
955 "config_status": self
.config_status
957 vnfr_dict
.update(vnfd_copy_dict
)
959 vnfr
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
961 vnfr
.vnfd
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd
.from_dict(self
.vnfd
.as_dict())
962 vnfr
.member_vnf_index_ref
= self
.member_vnf_index
963 vnfr
.vnf_configuration
.from_dict(self
._vnfd
.vnf_configuration
.as_dict())
965 if self
._vnfd
.mgmt_interface
.has_field("port"):
966 vnfr
.mgmt_interface
.port
= self
._vnfd
.mgmt_interface
.port
968 for group_info
in self
._placement
_groups
:
969 group
= vnfr
.placement_groups_info
.add()
970 group
.from_dict(group_info
.as_dict())
972 # UI expects the monitoring param field to exist
973 vnfr
.monitoring_param
= []
975 self
._log
.debug("Get vnfr_msg for VNFR {} : {}".format(self
.name
, vnfr
))
979 def update_vnfm(self
):
980 self
._log
.debug("Send an update to VNFM for VNFR {} with {}".
981 format(self
.name
, self
.vnfr_msg
))
982 yield from self
._dts
.query_update(
984 rwdts
.XactFlag
.TRACE
,
988 def get_config_status(self
):
989 """Return the config status as YANG ENUM"""
990 return self
._config
_status
993 def set_config_status(self
, status
):
995 def status_to_string(status
):
997 NsrYang
.ConfigStates
.INIT
: 'init',
998 NsrYang
.ConfigStates
.CONFIGURING
: 'configuring',
999 NsrYang
.ConfigStates
.CONFIG_NOT_NEEDED
: 'config_not_needed',
1000 NsrYang
.ConfigStates
.CONFIGURED
: 'configured',
1001 NsrYang
.ConfigStates
.FAILED
: 'failed',
1004 return status_dc
[status
]
1006 self
._log
.debug("Update VNFR {} from {} ({}) to {}".
1007 format(self
.name
, self
._config
_status
,
1008 self
.config_type
, status
))
1009 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
1010 self
._log
.error("Updating already configured VNFR {}".
1014 if self
._config
_status
!= status
:
1016 self
._config
_status
= status
1017 # I don't think this is used. Original implementor can check.
1018 # Caused Exception, so corrected it by status_to_string
1019 # But not sure whats the use of this variable?
1020 self
.vnfr_msg
.config_status
= status_to_string(status
)
1021 except Exception as e
:
1022 self
._log
.error("Exception=%s", str(e
))
1025 self
._log
.debug("Updated VNFR {} status to {}".format(self
.name
, status
))
1027 if self
._config
_status
!= NsrYang
.ConfigStates
.INIT
:
1029 # Publish only after VNFM has the VNFR created
1030 yield from self
.update_vnfm()
1031 except Exception as e
:
1032 self
._log
.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1033 format(status
, self
.name
, e
))
1034 self
._log
.exception(e
)
1036 def is_configured(self
):
1037 if self
.config_type
== 'none':
1040 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
1046 def instantiate(self
, nsr
):
1047 """ Instantiate this VNFR"""
1049 self
._log
.debug("Instaniating VNFR key %s, vnfd %s",
1050 self
.xpath
, self
._vnfd
)
1052 self
._log
.debug("Create VNF with xpath %s and vnfr %s",
1053 self
.xpath
, self
.vnfr_msg
)
1055 self
.set_state(VnfRecordState
.INSTANTIATION_PENDING
)
1057 def find_vlr_for_cp(conn
):
1058 """ Find VLR for the given connection point """
1059 for vlr
in nsr
.vlrs
:
1060 for vnfd_cp
in vlr
.vld_msg
.vnfd_connection_point_ref
:
1061 if (vnfd_cp
.vnfd_id_ref
== self
._vnfd
.id and
1062 vnfd_cp
.vnfd_connection_point_ref
== conn
.name
and
1063 vnfd_cp
.member_vnf_index_ref
== self
.member_vnf_index
and
1064 vlr
.cloud_account_name
== self
.cloud_account_name
):
1065 self
._log
.debug("Found VLR for cp_name:%s and vnf-index:%d",
1066 conn
.name
, self
.member_vnf_index
)
1070 # For every connection point in the VNFD fill in the identifier
1071 for conn_p
in self
._vnfd
.connection_point
:
1072 cpr
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint()
1073 cpr
.name
= conn_p
.name
1074 cpr
.type_yang
= conn_p
.type_yang
1075 vlr_ref
= find_vlr_for_cp(conn_p
)
1077 msg
= "Failed to find VLR for cp = %s" % conn_p
.name
1078 self
._log
.debug("%s", msg
)
1079 # raise VirtualNetworkFunctionRecordError(msg)
1082 cpr
.vlr_ref
= vlr_ref
.id
1083 self
.vnfr_msg
.connection_point
.append(cpr
)
1084 self
._log
.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1085 cpr
, self
.vnfr_msg
.id, self
.vnfr_msg
.vnfd
.id)
1087 if not self
.restart_mode
:
1088 yield from self
._dts
.query_create(self
.xpath
,
1092 yield from self
._dts
.query_update(self
.xpath
,
1096 self
._log
.info("Created VNF with xpath %s and vnfr %s",
1097 self
.xpath
, self
.vnfr_msg
)
1099 self
._log
.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s",
1100 self
.xpath
, self
._vnfd
, self
.vnfr_msg
)
1103 def update_state(self
, vnfr_msg
):
1104 """ Update this VNFR"""
1105 if vnfr_msg
.operational_status
== "running":
1106 if self
.vnfr_msg
.operational_status
!= "running":
1107 yield from self
.is_active()
1108 elif vnfr_msg
.operational_status
== "failed":
1109 yield from self
.instantiation_failed(failed_reason
=vnfr_msg
.operational_status_details
)
1112 def is_active(self
):
1113 """ This VNFR is active """
1114 self
._log
.debug("VNFR %s is active", self
._vnfr
_id
)
1115 self
.set_state(VnfRecordState
.ACTIVE
)
1118 def instantiation_failed(self
, failed_reason
=None):
1119 """ This VNFR instantiation failed"""
1120 self
._log
.error("VNFR %s instantiation failed", self
._vnfr
_id
)
1121 self
.set_state(VnfRecordState
.FAILED
)
1122 self
._state
_failed
_reason
= failed_reason
1124 def vnfr_in_vnfm(self
):
1125 """ Is there a VNFR record in VNFM """
1126 if (self
._state
== VnfRecordState
.ACTIVE
or
1127 self
._state
== VnfRecordState
.INSTANTIATION_PENDING
or
1128 self
._state
== VnfRecordState
.FAILED
):
1134 def terminate(self
):
1135 """ Terminate this VNF """
1136 if not self
.vnfr_in_vnfm():
1137 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1138 self
.id, self
._state
)
1141 self
._log
.debug("Terminating VNF id:%s", self
.id)
1142 self
.set_state(VnfRecordState
.TERMINATE_PENDING
)
1143 with self
._dts
.transaction(flags
=0) as xact
:
1144 block
= xact
.block_create()
1145 block
.add_query_delete(self
.xpath
)
1146 yield from block
.execute(flags
=0)
1147 self
.set_state(VnfRecordState
.TERMINATED
)
1148 self
._log
.debug("Terminated VNF id:%s", self
.id)
1151 class NetworkServiceStatus(object):
1152 """ A class representing the Network service's status """
1153 MAX_EVENTS_RECORDED
= 10
1154 """ Network service Status class"""
1155 def __init__(self
, dts
, log
, loop
):
1160 self
._state
= NetworkServiceRecordState
.INIT
1161 self
._events
= deque([])
1164 def create_notification(self
, evt
, evt_desc
, evt_details
):
1165 xp
= "N,/rw-nsr:nsm-notification"
1166 notif
= RwNsrYang
.YangNotif_RwNsr_NsmNotification()
1168 notif
.description
= evt_desc
1169 notif
.details
= evt_details
if evt_details
is not None else None
1171 yield from self
._dts
.query_create(xp
, rwdts
.XactFlag
.ADVISE
, notif
)
1172 self
._log
.info("Notification called by creating dts query: %s", notif
)
1174 def record_event(self
, evt
, evt_desc
, evt_details
):
1175 """ Record an event """
1176 self
._log
.debug("Recording event - evt %s, evt_descr %s len = %s",
1177 evt
, evt_desc
, len(self
._events
))
1178 if len(self
._events
) >= NetworkServiceStatus
.MAX_EVENTS_RECORDED
:
1179 self
._events
.popleft()
1180 self
._events
.append((int(time
.time()), evt
, evt_desc
,
1181 evt_details
if evt_details
is not None else None))
1183 self
._loop
.create_task(self
.create_notification(evt
,evt_desc
,evt_details
))
1185 def set_state(self
, state
):
1186 """ set the state of this status object """
1190 """ Return the state as a yang enum string """
1191 state_to_str_map
= {"INIT": "init",
1192 "VL_INIT_PHASE": "vl_init_phase",
1193 "VNF_INIT_PHASE": "vnf_init_phase",
1194 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1195 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1196 "RUNNING": "running",
1197 "SCALING_OUT": "scaling_out",
1198 "SCALING_IN": "scaling_in",
1199 "TERMINATE_RCVD": "terminate_rcvd",
1200 "TERMINATE": "terminate",
1201 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1202 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1203 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1204 "TERMINATED": "terminated",
1206 "VL_INSTANTIATE": "vl_instantiate",
1207 "VL_TERMINATE": "vl_terminate",
1209 return state_to_str_map
[self
._state
.name
]
1213 """ State of this status object """
1218 """ Network Service Record as a message"""
1221 for entry
in self
._events
:
1222 event
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents()
1225 event
.timestamp
, event
.event
, event
.description
, event
.details
= entry
1226 event_list
.append(event
)
1230 class NetworkServiceRecord(object):
1231 """ Network service record """
1232 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
1234 def __init__(self
, dts
, log
, loop
, nsm
, nsm_plugin
, nsr_cfg_msg
, sdn_account_name
, key_pairs
, restart_mode
=False,
1240 self
._nsr
_cfg
_msg
= nsr_cfg_msg
1241 self
._nsm
_plugin
= nsm_plugin
1242 self
._sdn
_account
_name
= sdn_account_name
1243 self
._vlr
_handler
= vlr_handler
1246 self
._nsr
_msg
= None
1247 self
._nsr
_regh
= None
1248 self
._key
_pairs
= key_pairs
1253 self
._param
_pools
= {}
1254 self
._scaling
_groups
= {}
1255 self
._create
_time
= int(time
.time())
1256 self
._op
_status
= NetworkServiceStatus(dts
, log
, loop
)
1257 self
._config
_status
= NsrYang
.ConfigStates
.CONFIGURING
1258 self
._config
_status
_details
= None
1260 self
.restart_mode
= restart_mode
1261 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
1262 self
._debug
_running
= False
1263 self
._is
_active
= False
1264 self
._vl
_phase
_completed
= False
1265 self
._vnf
_phase
_completed
= False
1266 self
.vlr_uptime_tasks
= {}
1269 # Initalise the state to init
1270 # The NSR moves through the following transitions
1271 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1272 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1273 # 3. VNFS_READY - READY when the NSR is published
1275 self
.set_state(NetworkServiceRecordState
.INIT
)
1277 self
.substitute_input_parameters
= InputParameterSubstitution(self
._log
)
1280 def nsm_plugin(self
):
1282 return self
._nsm
_plugin
1284 def set_state(self
, state
):
1285 """ Set state for this NSR"""
1286 self
._log
.debug("Setting state to %s", state
)
1287 # We are in init phase and is moving to the next state
1288 # The new state could be a FAILED state or VNF_INIIT_PHASE
1289 if self
.state
== NetworkServiceRecordState
.VL_INIT_PHASE
:
1290 self
._vl
_phase
_completed
= True
1292 if self
.state
== NetworkServiceRecordState
.VNF_INIT_PHASE
:
1293 self
._vnf
_phase
_completed
= True
1295 self
._op
_status
.set_state(state
)
1299 """ Get id for this NSR"""
1300 return self
._nsr
_cfg
_msg
.id
1304 """ Name of this network service record """
1305 return self
._nsr
_cfg
_msg
.name
1308 def cloud_account_name(self
):
1309 return self
._nsr
_cfg
_msg
.cloud_account
1312 def om_datacenter_name(self
):
1313 if self
._nsr
_cfg
_msg
.has_field('om_datacenter'):
1314 return self
._nsr
_cfg
_msg
.om_datacenter
1319 """State of this NetworkServiceRecord"""
1320 return self
._op
_status
.state
1324 """ Is this NSR active ?"""
1325 return True if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
else False
1329 """ VLRs associated with this NSR"""
1334 """ VNFRs associated with this NSR"""
1339 """ VNFFGRs associated with this NSR"""
1340 return self
._vnffgrs
1343 def scaling_groups(self
):
1344 """ Scaling groups associated with this NSR """
1345 return self
._scaling
_groups
1348 def param_pools(self
):
1349 """ Parameter value pools associated with this NSR"""
1350 return self
._param
_pools
1353 def nsr_cfg_msg(self
):
1354 return self
._nsr
_cfg
_msg
1357 def nsr_cfg_msg(self
, msg
):
1358 self
._nsr
_cfg
_msg
= msg
1362 """ NSD Protobuf for this NSR """
1363 if self
._nsd
is not None:
1365 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
1370 """ NSD ID for this NSR """
1371 return self
.nsd_msg
.id
1375 ''' Get a new job id for config primitive'''
1380 def config_status(self
):
1381 """ Config status for NSR """
1382 return self
._config
_status
1384 def resolve_placement_group_cloud_construct(self
, input_group
):
1386 Returns the cloud specific construct for placement group
1388 copy_dict
= ['name', 'requirement', 'strategy']
1390 for group_info
in self
._nsr
_cfg
_msg
.nsd_placement_group_maps
:
1391 if group_info
.placement_group_ref
== input_group
.name
:
1392 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1393 group_dict
= {k
:v
for k
,v
in
1394 group_info
.as_dict().items() if k
!= 'placement_group_ref'}
1395 for param
in copy_dict
:
1396 group_dict
.update({param
: getattr(input_group
, param
)})
1397 group
.from_dict(group_dict
)
1403 return "NSR(name={}, nsd_id={}, cloud_account={})".format(
1404 self
.name
, self
.nsd_id
, self
.cloud_account_name
1407 def _get_vnfd(self
, vnfd_id
, config_xact
):
1408 """ Fetch vnfd msg for the passed vnfd id """
1409 return self
._nsm
.get_vnfd(vnfd_id
, config_xact
)
1411 def _get_vnfd_cloud_account(self
, vnfd_member_index
):
1412 """ Fetch Cloud Account for the passed vnfd id """
1413 if self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1414 vim_accounts
= [(vnf
.cloud_account
,vnf
.om_datacenter
) for vnf
in self
._nsr
_cfg
_msg
.vnf_cloud_account_map \
1415 if vnfd_member_index
== vnf
.member_vnf_index_ref
]
1416 if vim_accounts
and vim_accounts
[0]:
1417 return vim_accounts
[0]
1418 return (self
.cloud_account_name
,self
.om_datacenter_name
)
1420 def _get_constituent_vnfd_msg(self
, vnf_index
):
1421 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
1422 if const_vnfd
.member_vnf_index
== vnf_index
:
1425 raise ValueError("Constituent VNF index %s not found" % vnf_index
)
1427 def record_event(self
, evt
, evt_desc
, evt_details
=None, state
=None):
1428 """ Record an event """
1429 self
._op
_status
.record_event(evt
, evt_desc
, evt_details
)
1430 if state
is not None:
1431 self
.set_state(state
)
1433 def scaling_trigger_str(self
, trigger
):
1434 SCALING_TRIGGER_STRS
= {
1435 NsdYang
.ScalingTrigger
.PRE_SCALE_IN
: 'pre-scale-in',
1436 NsdYang
.ScalingTrigger
.POST_SCALE_IN
: 'post-scale-in',
1437 NsdYang
.ScalingTrigger
.PRE_SCALE_OUT
: 'pre-scale-out',
1438 NsdYang
.ScalingTrigger
.POST_SCALE_OUT
: 'post-scale-out',
1441 return SCALING_TRIGGER_STRS
[trigger
]
1442 except Exception as e
:
1443 self
._log
.error("Scaling trigger mapping error for {} : {}".
1445 self
._log
.exception(e
)
1446 return "Unknown trigger"
1449 def instantiate_vls(self
):
1451 This function instantiates VLs for every VL in this Network Service
1453 self
._log
.debug("Instantiating %d VLs in NSD id %s", len(self
._vlrs
),
1455 for vlr
in self
._vlrs
:
1456 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
1457 vlr
.state
= VlRecordState
.ACTIVE
1458 self
.vlr_uptime_tasks
[vlr
.id] = self
._loop
.create_task(self
.vlr_uptime_update(vlr
))
1461 def vlr_uptime_update(self
, vlr
):
1464 vlr_
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict({'id': vlr
.id})
1466 vlr_
.uptime
= int(time
.time()) - vlr
._create
_time
1467 yield from self
._vlr
_handler
.update(None, VirtualLinkRecord
.vlr_xpath(vlr
), vlr_
)
1468 yield from asyncio
.sleep(2, loop
=self
._loop
)
1469 except asyncio
.CancelledError
:
1470 self
._log
.debug("Received cancellation request for vlr_uptime_update task")
1471 yield from self
._vlr
_handler
.delete(None, VirtualLinkRecord
.vlr_xpath(vlr
))
1475 def create(self
, config_xact
):
1476 """ Create this network service"""
1477 # Create virtual links for all the external vnf
1478 # connection points in this NS
1479 yield from self
.create_vls()
1481 # Create VNFs in this network service
1482 yield from self
.create_vnfs(config_xact
)
1484 # Create VNFFG for network service
1485 self
.create_vnffgs()
1487 # Create Scaling Groups for each scaling group in NSD
1488 self
.create_scaling_groups()
1490 # Create Parameter Pools
1491 self
.create_param_pools()
1494 def apply_scale_group_config_script(self
, script
, group
, scale_instance
, trigger
, vnfrs
=None):
1495 """ Apply config based on script for scale group """
1498 def add_vnfrs_data(vnfrs_list
):
1499 """ Add as a dict each of the VNFRs data """
1501 for vnfr
in vnfrs_list
:
1502 self
._log
.debug("Add VNFR {} data".format(vnfr
))
1504 vnfr_data
['name'] = vnfr
.name
1505 if trigger
in [NsdYang
.ScalingTrigger
.PRE_SCALE_IN
, NsdYang
.ScalingTrigger
.POST_SCALE_OUT
]:
1506 # Get VNF management and other IPs, etc
1507 opdata
= yield from self
.fetch_vnfr(vnfr
.xpath
)
1508 self
._log
.debug("VNFR {} op data: {}".format(vnfr
.name
, opdata
))
1510 vnfr_data
['rw_mgmt_ip'] = opdata
.mgmt_interface
.ip_address
1511 vnfr_data
['rw_mgmt_port'] = opdata
.mgmt_interface
.port
1512 except Exception as e
:
1513 self
._log
.error("Unable to get management IP for vnfr {}:{}".
1514 format(vnfr
.name
, e
))
1517 vnfr_data
['connection_points'] = []
1518 for cp
in opdata
.connection_point
:
1520 con_pt
['name'] = cp
.name
1521 con_pt
['ip_address'] = cp
.ip_address
1522 vnfr_data
['connection_points'].append(con_pt
)
1523 except Exception as e
:
1524 self
._log
.error("Exception getting connections points for VNFR {}: {}".
1525 format(vnfr
.name
, e
))
1527 vnfrs_data
.append(vnfr_data
)
1528 self
._log
.debug("VNFRs data: {}".format(vnfrs_data
))
1532 def add_nsr_data(nsr
):
1534 nsr_data
['name'] = nsr
.name
1537 if script
is None or len(script
) == 0:
1538 self
._log
.error("Script not provided for scale group config: {}".format(group
.name
))
1541 if script
[0] == '/':
1544 path
= os
.path
.join(os
.environ
['RIFT_INSTALL'], "usr/bin", script
)
1545 if not os
.path
.exists(path
):
1546 self
._log
.error("Config faled for scale group {}: Script does not exist at {}".
1547 format(group
.name
, path
))
1550 # Build a YAML file with all parameters for the script to execute
1551 # The data consists of 5 sections
1553 # 2. Scale group config
1554 # 3. VNFRs in the scale group
1555 # 4. VNFRs outside scale group
1558 data
['trigger'] = group
.trigger_map(trigger
)
1559 data
['config'] = group
.group_msg
.as_dict()
1562 data
["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs
)
1564 data
["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance
.vnfrs
)
1566 data
["vnfrs_others"] = yield from add_vnfrs_data(self
.vnfrs
.values())
1567 data
["nsr"] = add_nsr_data(self
)
1570 with tempfile
.NamedTemporaryFile(delete
=False) as tmp_file
:
1571 tmp_file
.write(yaml
.dump(data
, default_flow_style
=True)
1574 self
._log
.debug("Creating a temp file: {} with input data: {}".
1575 format(tmp_file
.name
, data
))
1577 cmd
= "{} {}".format(path
, tmp_file
.name
)
1578 self
._log
.debug("Running the CMD: {}".format(cmd
))
1579 proc
= yield from asyncio
.create_subprocess_shell(cmd
, loop
=self
._loop
)
1580 rc
= yield from proc
.wait()
1582 self
._log
.error("The script {} for scale group {} config returned: {}".
1583 format(script
, group
.name
, rc
))
1591 def apply_scaling_group_config(self
, trigger
, group
, scale_instance
, vnfrs
=None):
1592 """ Apply the config for the scaling group based on trigger """
1593 if group
is None or scale_instance
is None:
1597 def update_config_status(success
=True, err_msg
=None):
1598 self
._log
.debug("Update %s config status to %r : %s",
1599 scale_instance
, success
, err_msg
)
1600 if (scale_instance
.config_status
== "failed"):
1601 # Do not update the config status if it is already in failed state
1604 if scale_instance
.config_status
== "configured":
1605 # Update only to failed state an already configured scale instance
1607 scale_instance
.config_status
= "failed"
1608 scale_instance
.config_err_msg
= err_msg
1609 yield from self
.update_state()
1611 # We are in configuring state
1612 # Only after post scale out mark instance as configured
1613 if trigger
== NsdYang
.ScalingTrigger
.POST_SCALE_OUT
:
1615 scale_instance
.config_status
= "configured"
1617 scale_instance
.config_status
= "failed"
1618 scale_instance
.config_err_msg
= err_msg
1619 yield from self
.update_state()
1621 config
= group
.trigger_config(trigger
)
1625 self
._log
.debug("Scaling group {} config: {}".format(group
.name
, config
))
1626 if config
.has_field("ns_config_primitive_name_ref"):
1627 config_name
= config
.ns_config_primitive_name_ref
1628 nsd_msg
= self
.nsd_msg
1629 config_primitive
= None
1630 for ns_cfg_prim
in nsd_msg
.service_primitive
:
1631 if ns_cfg_prim
.name
== config_name
:
1632 config_primitive
= ns_cfg_prim
1635 if config_primitive
is None:
1636 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name
, self
.name
))
1638 self
._log
.debug("Scaling group {} config primitive: {}".format(group
.name
, config_primitive
))
1639 if config_primitive
.has_field("user_defined_script"):
1640 rc
= yield from self
.apply_scale_group_config_script(config_primitive
.user_defined_script
,
1641 group
, scale_instance
, trigger
, vnfrs
)
1644 err_msg
= "Failed config for trigger {} using config script '{}'". \
1645 format(self
.scaling_trigger_str(trigger
),
1646 config_primitive
.user_defined_script
)
1647 yield from update_config_status(success
=rc
, err_msg
=err_msg
)
1650 err_msg
= "Failed config for trigger {} as config script is not specified". \
1651 format(self
.scaling_trigger_str(trigger
))
1652 yield from update_config_status(success
=False, err_msg
=err_msg
)
1653 raise NotImplementedError("Only script based config support for scale group for now: {}".
1656 err_msg
= "Failed config for trigger {} as config primitive is not specified".\
1657 format(self
.scaling_trigger_str(trigger
))
1658 yield from update_config_status(success
=False, err_msg
=err_msg
)
1659 self
._log
.error("Config primitive not specified for config action in scale group %s" %
1663 def create_scaling_groups(self
):
1664 """ This function creates a NSScalingGroup for every scaling
1665 group defined in he NSD"""
1667 for scaling_group_msg
in self
.nsd_msg
.scaling_group_descriptor
:
1668 self
._log
.debug("Found scaling_group %s in nsr id %s",
1669 scaling_group_msg
.name
, self
.id)
1671 group_record
= scale_group
.ScalingGroup(
1676 self
._scaling
_groups
[group_record
.name
] = group_record
1679 def create_scale_group_instance(self
, group_name
, index
, config_xact
, is_default
=False):
1680 group
= self
._scaling
_groups
[group_name
]
1681 scale_instance
= group
.create_instance(index
, is_default
)
1685 self
._log
.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1686 len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
)
1689 for vnf_index
, count
in group
.vnf_index_count_map
.items():
1690 const_vnfd_msg
= self
._get
_constituent
_vnfd
_msg
(vnf_index
)
1691 vnfd_msg
= self
._get
_vnfd
(const_vnfd_msg
.vnfd_id_ref
, config_xact
)
1693 cloud_account_name
, om_datacenter_name
= self
._get
_vnfd
_cloud
_account
(const_vnfd_msg
.member_vnf_index
)
1694 if cloud_account_name
is None:
1695 cloud_account_name
= self
.cloud_account_name
1696 for _
in range(count
):
1697 vnfr
= yield from self
.create_vnf_record(vnfd_msg
, const_vnfd_msg
, cloud_account_name
, om_datacenter_name
, group_name
, index
)
1698 scale_instance
.add_vnfr(vnfr
)
1704 def instantiate_instance():
1705 self
._log
.debug("Creating %s VNFRS", scale_instance
)
1706 vnfrs
= yield from create_vnfs()
1707 yield from self
.publish()
1709 self
._log
.debug("Instantiating %s VNFRS for %s", len(vnfrs
), scale_instance
)
1710 scale_instance
.operational_status
= "vnf_init_phase"
1711 yield from self
.update_state()
1714 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.PRE_SCALE_OUT
,
1715 group
, scale_instance
, vnfrs
)
1717 self
._log
.error("Pre scale out config for scale group {} ({}) failed".
1718 format(group
.name
, index
))
1719 scale_instance
.operational_status
= "failed"
1721 yield from self
.instantiate_vnfs(vnfrs
)
1723 except Exception as e
:
1724 self
._log
.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1725 format(group
.name
, e
))
1726 self
._log
.exception(e
)
1727 scale_instance
.operational_status
= "failed"
1729 yield from self
.update_state()
1731 yield from instantiate_instance()
1734 def delete_scale_group_instance(self
, group_name
, index
):
1735 group
= self
._scaling
_groups
[group_name
]
1736 scale_instance
= group
.get_instance(index
)
1737 if scale_instance
.is_default
:
1738 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1740 scale_instance
.operational_status
= "terminate"
1741 yield from self
.update_state()
1744 def terminate_instance():
1745 self
._log
.debug("Terminating %s VNFRS" % scale_instance
)
1746 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.PRE_SCALE_IN
,
1747 group
, scale_instance
)
1749 self
._log
.error("Pre scale in config for scale group {} ({}) failed".
1750 format(group
.name
, index
))
1752 # Going ahead with terminate, even if there is an error in pre-scale-in config
1753 # as this could be result of scale out failure and we need to cleanup this group
1754 yield from self
.terminate_vnfrs(scale_instance
.vnfrs
)
1755 group
.delete_instance(index
)
1757 scale_instance
.operational_status
= "vnf_terminate_phase"
1758 yield from self
.update_state()
1760 yield from terminate_instance()
1763 def _update_scale_group_instances_status(self
):
1765 def post_scale_out_task(group
, instance
):
1766 # Apply post scale out config once all VNFRs are active
1767 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.POST_SCALE_OUT
,
1769 instance
.operational_status
= "running"
1771 self
._log
.debug("Scale out for group {} and instance {} succeeded".
1772 format(group
.name
, instance
.instance_id
))
1774 self
._log
.error("Post scale out config for scale group {} ({}) failed".
1775 format(group
.name
, instance
.instance_id
))
1777 yield from self
.update_state()
1779 group_instances
= {group
: group
.instances
for group
in self
._scaling
_groups
.values()}
1780 for group
, instances
in group_instances
.items():
1781 self
._log
.debug("Updating %s instance status", group
)
1782 for instance
in instances
:
1783 instance_vnf_state_list
= [vnfr
.state
for vnfr
in instance
.vnfrs
]
1784 self
._log
.debug("Got vnfr instance states: %s", instance_vnf_state_list
)
1785 if instance
.operational_status
== "vnf_init_phase":
1786 if all([state
== VnfRecordState
.ACTIVE
for state
in instance_vnf_state_list
]):
1787 instance
.operational_status
= "running"
1789 # Create a task for post scale out to allow us to sleep before attempting
1790 # to configure newly created VM's
1791 self
._loop
.create_task(post_scale_out_task(group
, instance
))
1793 elif any([state
== VnfRecordState
.FAILED
for state
in instance_vnf_state_list
]):
1794 self
._log
.debug("Scale out for group {} and instance {} failed".
1795 format(group
.name
, instance
.instance_id
))
1796 instance
.operational_status
= "failed"
1798 elif instance
.operational_status
== "vnf_terminate_phase":
1799 if all([state
== VnfRecordState
.TERMINATED
for state
in instance_vnf_state_list
]):
1800 instance
.operational_status
= "terminated"
1801 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.POST_SCALE_IN
,
1804 self
._log
.debug("Scale in for group {} and instance {} succeeded".
1805 format(group
.name
, instance
.instance_id
))
1807 self
._log
.error("Post scale in config for scale group {} ({}) failed".
1808 format(group
.name
, instance
.instance_id
))
1810 def create_vnffgs(self
):
1811 """ This function creates VNFFGs for every VNFFG in the NSD
1812 associated with this NSR"""
1814 for vnffgd
in self
.nsd_msg
.vnffgd
:
1815 self
._log
.debug("Found vnffgd %s in nsr id %s", vnffgd
, self
.id)
1816 vnffgr
= VnffgRecord(self
._dts
,
1819 self
._nsm
._vnffgmgr
,
1823 self
._sdn
_account
_name
1825 self
._vnffgrs
[vnffgr
.id] = vnffgr
1827 def resolve_vld_ip_profile(self
, nsd_msg
, vld
):
1828 self
._log
.debug("Receieved ip profile ref is %s",vld
.ip_profile_ref
)
1829 if not vld
.has_field('ip_profile_ref'):
1831 profile
= [profile
for profile
in nsd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
1832 return profile
[0] if profile
else None
1835 def _create_vls(self
, vld
, cloud_account
,om_datacenter
):
1836 """Create a VLR in the cloud account specified using the given VLD
1840 cloud_account : Cloud account name
1845 vlr
= yield from VirtualLinkRecord
.create_record(
1853 self
.resolve_vld_ip_profile(self
.nsd_msg
, vld
),
1855 restart_mode
=self
.restart_mode
)
1859 def _extract_cloud_accounts_for_vl(self
, vld
):
1861 Extracts the list of cloud accounts from the NS Config obj
1864 1. Cloud accounts based connection point (vnf_cloud_account_map)
1866 vld : VLD yang object
1871 cloud_account_list
= []
1873 if self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1874 # Handle case where cloud_account is None
1876 for vnf
in self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1877 if vnf
.cloud_account
is not None or vnf
.om_datacenter
is not None:
1878 vnf_cloud_map
[vnf
.member_vnf_index_ref
] = (vnf
.cloud_account
,vnf
.om_datacenter
)
1880 for vnfc
in vld
.vnfd_connection_point_ref
:
1881 cloud_account
= vnf_cloud_map
.get(
1882 vnfc
.member_vnf_index_ref
,
1883 (self
.cloud_account_name
,self
.om_datacenter_name
))
1885 cloud_account_list
.append(cloud_account
)
1887 if self
._nsr
_cfg
_msg
.vl_cloud_account_map
:
1888 for vld_map
in self
._nsr
_cfg
_msg
.vl_cloud_account_map
:
1889 if vld_map
.vld_id_ref
== vld
.id:
1890 for cloud_account
in vld_map
.cloud_accounts
:
1891 cloud_account_list
.extend((cloud_account
,None))
1892 for om_datacenter
in vld_map
.om_datacenters
:
1893 cloud_account_list
.extend((None,om_datacenter
))
1895 # If no config has been provided then fall-back to the default
1897 if not cloud_account_list
:
1898 cloud_account_list
= [(self
.cloud_account_name
,self
.om_datacenter_name
)]
1900 self
._log
.debug("VL {} cloud accounts: {}".
1901 format(vld
.name
, cloud_account_list
))
1902 return set(cloud_account_list
)
1905 def create_vls(self
):
1906 """ This function creates VLs for every VLD in the NSD
1907 associated with this NSR"""
1908 for vld
in self
.nsd_msg
.vld
:
1910 self
._log
.debug("Found vld %s in nsr id %s", vld
, self
.id)
1911 cloud_account_list
= self
._extract
_cloud
_accounts
_for
_vl
(vld
)
1912 for cloud_account
,om_datacenter
in cloud_account_list
:
1913 vlr
= yield from self
._create
_vls
(vld
, cloud_account
,om_datacenter
)
1914 self
._vlrs
.append(vlr
)
1918 def create_vl_instance(self
, vld
):
1919 self
._log
.debug("Create VL for {}: {}".format(self
.id, vld
.as_dict()))
1920 # Check if the VL is already present
1922 for vl
in self
._vlrs
:
1923 if vl
.vld_msg
.id == vld
.id:
1924 self
._log
.debug("The VLD %s already in NSR %s as VLR %s with status %s",
1925 vld
.id, self
.id, vl
.id, vl
.state
)
1927 if vlr
.state
!= VlRecordState
.TERMINATED
:
1928 err_msg
= "VLR for VL %s in NSR %s already instantiated", \
1930 self
._log
.error(err_msg
)
1931 raise NsrVlUpdateError(err_msg
)
1935 cloud_account_list
= self
._extract
_cloud
_accounts
_for
_vl
(vld
)
1936 for account
,om_datacenter
in cloud_account_list
:
1937 vlr
= yield from self
._create
_vls
(vld
, account
,om_datacenter
)
1938 self
._vlrs
.append(vlr
)
1940 vlr
.state
= VlRecordState
.INSTANTIATION_PENDING
1941 yield from self
.update_state()
1944 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
1945 vlr
.state
= VlRecordState
.ACTIVE
1947 except Exception as e
:
1948 err_msg
= "Error instantiating VL for NSR {} and VLD {}: {}". \
1949 format(self
.id, vld
.id, e
)
1950 self
._log
.error(err_msg
)
1951 self
._log
.exception(e
)
1952 vlr
.state
= VlRecordState
.FAILED
1954 yield from self
.update_state()
1957 def delete_vl_instance(self
, vld
):
1958 for vlr
in self
._vlrs
:
1959 if vlr
.vld_msg
.id == vld
.id:
1960 self
._log
.debug("Found VLR %s for VLD %s in NSR %s",
1961 vlr
.id, vld
.id, self
.id)
1962 vlr
.state
= VlRecordState
.TERMINATE_PENDING
1963 yield from self
.update_state()
1966 yield from self
.nsm_plugin
.terminate_vl(vlr
)
1967 vlr
.state
= VlRecordState
.TERMINATED
1968 self
._vlrs
.remove(vlr
)
1970 except Exception as e
:
1971 err_msg
= "Error terminating VL for NSR {} and VLD {}: {}". \
1972 format(self
.id, vld
.id, e
)
1973 self
._log
.error(err_msg
)
1974 self
._log
.exception(e
)
1975 vlr
.state
= VlRecordState
.FAILED
1977 yield from self
.update_state()
1981 def create_vnfs(self
, config_xact
):
1983 This function creates VNFs for every VNF in the NSD
1984 associated with this NSR
1986 self
._log
.debug("Creating %u VNFs associated with this NS id %s",
1987 len(self
.nsd_msg
.constituent_vnfd
), self
.id)
1989 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
1990 if not const_vnfd
.start_by_default
:
1991 self
._log
.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
1992 const_vnfd
.member_vnf_index
)
1995 vnfd_msg
= self
._get
_vnfd
(const_vnfd
.vnfd_id_ref
, config_xact
)
1996 cloud_account_name
,om_datacenter_name
= self
._get
_vnfd
_cloud
_account
(const_vnfd
.member_vnf_index
)
1997 if cloud_account_name
is None:
1998 cloud_account_name
= self
.cloud_account_name
1999 yield from self
.create_vnf_record(vnfd_msg
, const_vnfd
, cloud_account_name
, om_datacenter_name
)
2002 def get_placement_groups(self
, vnfd_msg
, const_vnfd
):
2003 placement_groups
= []
2004 for group
in self
.nsd_msg
.placement_groups
:
2005 for member_vnfd
in group
.member_vnfd
:
2006 if (member_vnfd
.vnfd_id_ref
== vnfd_msg
.id) and \
2007 (member_vnfd
.member_vnf_index_ref
== const_vnfd
.member_vnf_index
):
2008 group_info
= self
.resolve_placement_group_cloud_construct(group
)
2009 if group_info
is None:
2010 self
._log
.error("Could not resolve cloud-construct for placement group: %s", group
.name
)
2011 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
2013 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
2016 const_vnfd
.member_vnf_index
)
2017 placement_groups
.append(group_info
)
2018 return placement_groups
2021 def create_vnf_record(self
, vnfd_msg
, const_vnfd
, cloud_account_name
, om_datacenter_name
, group_name
=None, group_instance_id
=None):
2022 # Fetch the VNFD associated with this VNF
2023 placement_groups
= self
.get_placement_groups(vnfd_msg
, const_vnfd
)
2024 self
._log
.info("Cloud Account for VNF %d is %s",const_vnfd
.member_vnf_index
,cloud_account_name
)
2025 self
._log
.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2027 const_vnfd
.member_vnf_index
,
2028 [ group
.name
for group
in placement_groups
])
2029 vnfr
= yield from VirtualNetworkFunctionRecord
.create_record(self
._dts
,
2042 restart_mode
=self
.restart_mode
,
2044 if vnfr
.id in self
._vnfrs
:
2045 err
= "VNF with VNFR id %s already in vnf list" % (vnfr
.id,)
2046 raise NetworkServiceRecordError(err
)
2048 self
._vnfrs
[vnfr
.id] = vnfr
2049 self
._nsm
.vnfrs
[vnfr
.id] = vnfr
2051 yield from vnfr
.set_config_status(NsrYang
.ConfigStates
.INIT
)
2053 self
._log
.debug("Added VNFR %s to NSM VNFR list with id %s",
2059 def create_param_pools(self
):
2060 for param_pool
in self
.nsd_msg
.parameter_pool
:
2061 self
._log
.debug("Found parameter pool %s in nsr id %s", param_pool
, self
.id)
2063 start_value
= param_pool
.range.start_value
2064 end_value
= param_pool
.range.end_value
2065 if end_value
< start_value
:
2066 raise NetworkServiceRecordError(
2067 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2068 start_value
, end_value
2072 self
._param
_pools
[param_pool
.name
] = config_value_pool
.ParameterValuePool(
2075 range(start_value
, end_value
)
2079 def fetch_vnfr(self
, vnfr_path
):
2080 """ Fetch VNFR record """
2082 self
._log
.debug("Fetching VNFR with key %s while instantiating %s",
2084 res_iter
= yield from self
._dts
.query_read(vnfr_path
, rwdts
.XactFlag
.MERGE
)
2086 for ent
in res_iter
:
2087 res
= yield from ent
2093 def instantiate_vnfs(self
, vnfrs
):
2095 This function instantiates VNFs for every VNF in this Network Service
2097 self
._log
.debug("Instantiating %u VNFs in NS %s", len(vnfrs
), self
.id)
2099 self
._log
.debug("Instantiating VNF: %s in NS %s", vnf
, self
.id)
2100 yield from self
.nsm_plugin
.instantiate_vnf(self
, vnf
)
2103 def instantiate_vnffgs(self
):
2105 This function instantiates VNFFGs for every VNFFG in this Network Service
2107 self
._log
.debug("Instantiating %u VNFFGs in NS %s",
2108 len(self
.nsd_msg
.vnffgd
), self
.id)
2109 for _
, vnfr
in self
.vnfrs
.items():
2110 while vnfr
.state
in [VnfRecordState
.INSTANTIATION_PENDING
, VnfRecordState
.INIT
]:
2111 self
._log
.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr
.name
,vnfr
.state
)
2112 yield from asyncio
.sleep(2, loop
=self
._loop
)
2113 if vnfr
.state
== VnfRecordState
.ACTIVE
:
2114 self
._log
.debug("Received vnfr state for vnfr %s is %s ",vnfr
.name
,vnfr
.state
)
2117 self
._log
.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr
.name
,vnfr
.state
)
2118 self
._vnffgr
_state
= VnffgRecordState
.FAILED
2121 self
._log
.info("Waiting for 90 seconds for VMs to come up")
2122 yield from asyncio
.sleep(90, loop
=self
._loop
)
2123 self
._log
.info("Starting VNFFG orchestration")
2124 for vnffg
in self
._vnffgrs
.values():
2125 self
._log
.debug("Instantiating VNFFG: %s in NS %s", vnffg
, self
.id)
2126 yield from vnffg
.instantiate()
2129 def instantiate_scaling_instances(self
, config_xact
):
2130 """ Instantiate any default scaling instances in this Network Service """
2131 for group
in self
._scaling
_groups
.values():
2132 for i
in range(group
.min_instance_count
):
2133 self
._log
.debug("Instantiating %s default scaling instance %s", group
, i
)
2134 yield from self
.create_scale_group_instance(
2135 group
.name
, i
, config_xact
, is_default
=True
2138 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2139 if group_msg
.scaling_group_name_ref
!= group
.name
:
2142 for instance
in group_msg
.instance
:
2143 self
._log
.debug("Reloading %s scaling instance %s", group_msg
, instance
.id)
2144 yield from self
.create_scale_group_instance(
2145 group
.name
, instance
.id, config_xact
, is_default
=False
2148 def has_scaling_instances(self
):
2149 """ Return boolean indicating if the network service has default scaling groups """
2150 for group
in self
._scaling
_groups
.values():
2151 if group
.min_instance_count
> 0:
2154 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2155 if len(group_msg
.instance
) > 0:
2162 """ This function publishes this NSR """
2163 self
._nsr
_msg
= self
.create_msg()
2165 self
._log
.debug("Publishing the NSR with xpath %s and nsr %s",
2169 if self
._debug
_running
:
2170 self
._log
.debug("Publishing NSR in RUNNING state!")
2173 with self
._dts
.transaction() as xact
:
2174 yield from self
._nsm
.nsr_handler
.update(xact
, self
.nsr_xpath
, self
._nsr
_msg
)
2175 if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
:
2176 self
._debug
_running
= True
2179 def unpublish(self
, xact
):
2180 """ Unpublish this NSR object """
2181 self
._log
.debug("Unpublishing Network service id %s", self
.id)
2182 yield from self
._nsm
.nsr_handler
.delete(xact
, self
.nsr_xpath
)
2185 def nsr_xpath(self
):
2186 """ Returns the xpath associated with this NSR """
2188 "D,/nsr:ns-instance-opdata" +
2189 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']"
2193 def xpath_from_nsr(nsr
):
2194 """ Returns the xpath associated with this NSR op data"""
2195 return (NetworkServiceRecord
.XPATH
+
2196 "[nsr:ns-instance-config-ref = '{}']").format(nsr
.id)
2199 def nsd_xpath(self
):
2200 """ Return NSD config xpath."""
2202 "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']"
2203 ).format(self
.nsd_id
)
2206 def instantiate(self
, config_xact
):
2207 """"Instantiates a NetworkServiceRecord.
2209 This function instantiates a Network service
2210 which involves the following steps,
2212 * Instantiate every VL in NSD by sending create VLR request to DTS.
2213 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2214 * Publish the NSR details to DTS
2217 nsr: The NSR configuration request containing nsr-id and nsd
2218 config_xact: The configuration transaction which initiated the instatiation
2221 NetworkServiceRecordError if the NSR creation fails
2227 self
._log
.debug("Instantiating NS - %s xact - %s", self
, config_xact
)
2229 # Move the state to INIITALIZING
2230 self
.set_state(NetworkServiceRecordState
.INIT
)
2232 event_descr
= "Instantiation Request Received NSR Id:%s" % self
.id
2233 self
.record_event("instantiating", event_descr
)
2236 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
2239 # Update ref count if nsd present in catalog
2240 self
._nsm
.get_nsd_ref(self
.nsd_id
)
2242 except NetworkServiceDescriptorError
:
2243 # This could be an NSD not in the nsd-catalog
2246 # Merge any config and initial config primitive values
2247 self
.config_store
.merge_nsd_config(self
.nsd_msg
)
2248 self
._log
.debug("Merged NSD: {}".format(self
.nsd_msg
.as_dict()))
2250 event_descr
= "Fetched NSD with descriptor id %s" % self
.nsd_id
2251 self
.record_event("nsd-fetched", event_descr
)
2253 if self
._nsd
is None:
2254 msg
= "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2255 self
._log
.debug(msg
, self
.nsd_id
, self
.id)
2256 raise NetworkServiceRecordError(self
)
2258 self
._log
.debug("Got nsd result %s", self
._nsd
)
2260 # Substitute any input parameters
2261 self
.substitute_input_parameters(self
._nsd
, self
._nsr
_cfg
_msg
)
2264 yield from self
.create(config_xact
)
2266 # Publish the NSR to DTS
2267 yield from self
.publish()
2270 def do_instantiate():
2272 Instantiate network service
2274 self
._log
.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2275 self
.id, self
.nsd_id
)
2277 # instantiate the VLs
2278 event_descr
= ("Instantiating %s external VLs for NSR id %s" %
2279 (len(self
.nsd_msg
.vld
), self
.id))
2280 self
.record_event("begin-external-vls-instantiation", event_descr
)
2282 self
.set_state(NetworkServiceRecordState
.VL_INIT_PHASE
)
2284 yield from self
.instantiate_vls()
2286 # Publish the NSR to DTS
2287 yield from self
.publish()
2289 event_descr
= ("Finished instantiating %s external VLs for NSR id %s" %
2290 (len(self
.nsd_msg
.vld
), self
.id))
2291 self
.record_event("end-external-vls-instantiation", event_descr
)
2293 self
.set_state(NetworkServiceRecordState
.VNF_INIT_PHASE
)
2295 self
._log
.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2296 self
.id, self
.nsd_id
)
2298 # instantiate the VNFs
2299 event_descr
= ("Instantiating %s VNFS for NSR id %s" %
2300 (len(self
.nsd_msg
.constituent_vnfd
), self
.id))
2302 self
.record_event("begin-vnf-instantiation", event_descr
)
2304 yield from self
.instantiate_vnfs(self
._vnfrs
.values())
2306 self
._log
.debug(" Finished instantiating %d VNFs for NSR id %s",
2307 len(self
.nsd_msg
.constituent_vnfd
), self
.id)
2309 event_descr
= ("Finished instantiating %s VNFs for NSR id %s" %
2310 (len(self
.nsd_msg
.constituent_vnfd
), self
.id))
2311 self
.record_event("end-vnf-instantiation", event_descr
)
2313 if len(self
.vnffgrs
) > 0:
2314 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2315 event_descr
= ("Instantiating %s VNFFGS for NSR id %s" %
2316 (len(self
.nsd_msg
.vnffgd
), self
.id))
2318 self
.record_event("begin-vnffg-instantiation", event_descr
)
2320 yield from self
.instantiate_vnffgs()
2322 event_descr
= ("Finished instantiating %s VNFFGDs for NSR id %s" %
2323 (len(self
.nsd_msg
.vnffgd
), self
.id))
2324 self
.record_event("end-vnffg-instantiation", event_descr
)
2326 if self
.has_scaling_instances():
2327 event_descr
= ("Instantiating %s Scaling Groups for NSR id %s" %
2328 (len(self
._scaling
_groups
), self
.id))
2330 self
.record_event("begin-scaling-group-instantiation", event_descr
)
2331 yield from self
.instantiate_scaling_instances(config_xact
)
2332 self
.record_event("end-scaling-group-instantiation", event_descr
)
2334 # Give the plugin a chance to deploy the network service now that all
2335 # virtual links and vnfs are instantiated
2336 yield from self
.nsm_plugin
.deploy(self
._nsr
_msg
)
2338 self
._log
.debug("Publishing NSR...... nsr[%s], nsd[%s]",
2339 self
.id, self
.nsd_id
)
2341 # Publish the NSR to DTS
2342 yield from self
.publish()
2344 self
._log
.debug("Published NSR...... nsr[%s], nsd[%s]",
2345 self
.id, self
.nsd_id
)
2347 def on_instantiate_done(fut
):
2348 # If the do_instantiate fails, then publish NSR with failed result
2349 if fut
.exception() is not None:
2350 self
._log
.error("NSR instantiation failed for NSR id %s: %s", self
.id, str(fut
.exception()))
2351 self
._loop
.create_task(self
.instantiation_failed(failed_reason
=str(fut
.exception())))
2353 instantiate_task
= self
._loop
.create_task(do_instantiate())
2354 instantiate_task
.add_done_callback(on_instantiate_done
)
2357 def set_config_status(self
, status
, status_details
=None):
2358 if self
.config_status
!= status
:
2359 self
._log
.debug("Updating NSR {} status for {} to {}".
2360 format(self
.name
, self
.config_status
, status
))
2361 self
._config
_status
= status
2362 self
._config
_status
_details
= status_details
2364 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2365 self
.record_event("config-failed", "NS configuration failed",
2366 evt_details
=self
._config
_status
_details
)
2368 yield from self
.publish()
2371 def is_active(self
):
2372 """ This NS is active """
2373 self
.set_state(NetworkServiceRecordState
.RUNNING
)
2377 # Publish the NSR to DTS
2378 self
._log
.debug("Network service %s is active ", self
.id)
2379 self
._is
_active
= True
2381 event_descr
= "NSR in running state for NSR id %s" % self
.id
2382 self
.record_event("ns-running", event_descr
)
2384 yield from self
.publish()
2387 def instantiation_failed(self
, failed_reason
=None):
2388 """ The NS instantiation failed"""
2389 self
._log
.error("Network service id:%s, name:%s instantiation failed",
2391 self
.set_state(NetworkServiceRecordState
.FAILED
)
2393 event_descr
= "Instantiation of NS %s failed" % self
.id
2394 self
.record_event("ns-failed", event_descr
, evt_details
=failed_reason
)
2396 # Publish the NSR to DTS
2397 yield from self
.publish()
2400 def terminate_vnfrs(self
, vnfrs
):
2401 """ Terminate VNFRS in this network service """
2402 self
._log
.debug("Terminating VNFs in network service %s", self
.id)
2404 yield from self
.nsm_plugin
.terminate_vnf(vnfr
)
2407 def terminate(self
):
2408 """ Terminate a NetworkServiceRecord."""
2409 def terminate_vnffgrs():
2410 """ Terminate VNFFGRS in this network service """
2411 self
._log
.debug("Terminating VNFFGRs in network service %s", self
.id)
2412 for vnffgr
in self
.vnffgrs
.values():
2413 yield from vnffgr
.terminate()
2415 def terminate_vlrs():
2416 """ Terminate VLRs in this netork service """
2417 self
._log
.debug("Terminating VLs in network service %s", self
.id)
2418 for vlr
in self
.vlrs
:
2419 yield from self
.nsm_plugin
.terminate_vl(vlr
)
2420 vlr
.state
= VlRecordState
.TERMINATED
2421 if vlr
.id in self
.vlr_uptime_tasks
:
2422 self
.vlr_uptime_tasks
[vlr
.id].cancel()
2424 self
._log
.debug("Terminating network service id %s", self
.id)
2426 # Move the state to TERMINATE
2427 self
.set_state(NetworkServiceRecordState
.TERMINATE
)
2428 event_descr
= "Terminate being processed for NS Id:%s" % self
.id
2429 self
.record_event("terminate", event_descr
)
2431 # Move the state to VNF_TERMINATE_PHASE
2432 self
._log
.debug("Terminating VNFFGs in NS ID: %s", self
.id)
2433 self
.set_state(NetworkServiceRecordState
.VNFFG_TERMINATE_PHASE
)
2434 event_descr
= "Terminating VNFFGS in NS Id:%s" % self
.id
2435 self
.record_event("terminating-vnffgss", event_descr
)
2436 yield from terminate_vnffgrs()
2438 # Move the state to VNF_TERMINATE_PHASE
2439 self
.set_state(NetworkServiceRecordState
.VNF_TERMINATE_PHASE
)
2440 event_descr
= "Terminating VNFS in NS Id:%s" % self
.id
2441 self
.record_event("terminating-vnfs", event_descr
)
2442 yield from self
.terminate_vnfrs(self
.vnfrs
.values())
2444 # Move the state to VL_TERMINATE_PHASE
2445 self
.set_state(NetworkServiceRecordState
.VL_TERMINATE_PHASE
)
2446 event_descr
= "Terminating VLs in NS Id:%s" % self
.id
2447 self
.record_event("terminating-vls", event_descr
)
2448 yield from terminate_vlrs()
2450 yield from self
.nsm_plugin
.terminate_ns(self
)
2452 # Move the state to TERMINATED
2453 self
.set_state(NetworkServiceRecordState
.TERMINATED
)
2454 event_descr
= "Terminated NS Id:%s" % self
.id
2455 self
.record_event("terminated", event_descr
)
2458 """"Enable a NetworkServiceRecord."""
2462 """"Disable a NetworkServiceRecord."""
2465 def map_config_status(self
):
2466 self
._log
.debug("Config status for ns {} is {}".
2467 format(self
.name
, self
._config
_status
))
2468 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURING
:
2469 return 'configuring'
2470 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2474 def vl_phase_completed(self
):
2475 """ Are VLs created in this NS?"""
2476 return self
._vl
_phase
_completed
2478 def vnf_phase_completed(self
):
2479 """ Are VLs created in this NS?"""
2480 return self
._vnf
_phase
_completed
2482 def create_msg(self
):
2483 """ The network serice record as a message """
2484 nsr_dict
= {"ns_instance_config_ref": self
.id}
2485 nsr
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr
.from_dict(nsr_dict
)
2486 #nsr.cloud_account = self.cloud_account_name
2487 nsr
.sdn_account
= self
._sdn
_account
_name
2488 nsr
.name_ref
= self
.name
2489 nsr
.nsd_ref
= self
.nsd_id
2490 nsr
.nsd_name_ref
= self
.nsd_msg
.name
2491 nsr
.operational_events
= self
._op
_status
.msg
2492 nsr
.operational_status
= self
._op
_status
.yang_str()
2493 nsr
.config_status
= self
.map_config_status()
2494 nsr
.config_status_details
= self
._config
_status
_details
2495 nsr
.create_time
= self
._create
_time
2496 nsr
.uptime
= int(time
.time()) - self
._create
_time
2498 for cfg_prim
in self
.nsd_msg
.service_primitive
:
2499 cfg_prim
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive
.from_dict(
2501 nsr
.service_primitive
.append(cfg_prim
)
2503 for init_cfg
in self
.nsd_msg
.initial_config_primitive
:
2504 prim
= NsrYang
.NsrInitialConfigPrimitive
.from_dict(
2506 nsr
.initial_config_primitive
.append(prim
)
2508 if self
.vl_phase_completed():
2509 for vlr
in self
.vlrs
:
2510 nsr
.vlr
.append(vlr
.create_nsr_vlr_msg(self
.vnfrs
.values()))
2512 if self
.vnf_phase_completed():
2513 for vnfr_id
in self
.vnfrs
:
2514 nsr
.constituent_vnfr_ref
.append(self
.vnfrs
[vnfr_id
].const_vnfr_msg
)
2515 for vnffgr
in self
.vnffgrs
.values():
2516 nsr
.vnffgr
.append(vnffgr
.fetch_vnffgr())
2517 for scaling_group
in self
._scaling
_groups
.values():
2518 nsr
.scaling_group_record
.append(scaling_group
.create_record_msg())
2522 def all_vnfs_active(self
):
2523 """ Are all VNFS in this NS active? """
2524 for _
, vnfr
in self
.vnfrs
.items():
2525 if vnfr
.active
is not True:
2530 def update_state(self
):
2531 """ Re-evaluate this NS's state """
2532 curr_state
= self
._op
_status
.state
2534 if curr_state
== NetworkServiceRecordState
.TERMINATED
:
2535 self
._log
.debug("NS (%s) in terminated state, not updating state", self
.id)
2538 new_state
= NetworkServiceRecordState
.RUNNING
2539 self
._log
.info("Received update_state for nsr: %s, curr-state: %s",
2540 self
.id, curr_state
)
2542 # Check all the VNFRs are present
2543 for _
, vnfr
in self
.vnfrs
.items():
2544 if vnfr
.state
in [VnfRecordState
.ACTIVE
, VnfRecordState
.TERMINATED
]:
2546 elif vnfr
.state
== VnfRecordState
.FAILED
:
2547 if vnfr
._prev
_state
!= vnfr
.state
:
2548 event_descr
= "Instantiation of VNF %s failed" % vnfr
.id
2549 event_error_details
= vnfr
.state_failed_reason
2550 self
.record_event("vnf-failed", event_descr
, evt_details
=event_error_details
)
2551 vnfr
.set_state(VnfRecordState
.FAILED
)
2553 self
._log
.info("VNF state did not change, curr=%s, prev=%s",
2554 vnfr
.state
, vnfr
._prev
_state
)
2555 new_state
= NetworkServiceRecordState
.FAILED
2558 self
._log
.info("VNF %s in NSR %s is still not active; current state is: %s",
2559 vnfr
.id, self
.id, vnfr
.state
)
2560 new_state
= curr_state
2562 # If new state is RUNNING; check all VLs
2563 if new_state
== NetworkServiceRecordState
.RUNNING
:
2564 for vl
in self
.vlrs
:
2566 if vl
.state
in [VlRecordState
.ACTIVE
, VlRecordState
.TERMINATED
]:
2568 elif vl
.state
== VlRecordState
.FAILED
:
2569 if vl
.prev_state
!= vl
.state
:
2570 event_descr
= "Instantiation of VL %s failed" % vl
.id
2571 event_error_details
= vl
.state_failed_reason
2572 self
.record_event("vl-failed", event_descr
, evt_details
=event_error_details
)
2573 vl
.prev_state
= vl
.state
2575 self
._log
.debug("VL %s already in failed state")
2577 if vl
.state
in [VlRecordState
.INSTANTIATION_PENDING
, VlRecordState
.INIT
]:
2578 new_state
= NetworkServiceRecordState
.VL_INSTANTIATE
2581 if vl
.state
in [VlRecordState
.TERMINATE_PENDING
]:
2582 new_state
= NetworkServiceRecordState
.VL_TERMINATE
2585 # If new state is RUNNING; check VNFFGRs are also active
2586 if new_state
== NetworkServiceRecordState
.RUNNING
:
2587 for _
, vnffgr
in self
.vnffgrs
.items():
2588 self
._log
.info("Checking vnffgr state for nsr %s is: %s",
2589 self
.id, vnffgr
.state
)
2590 if vnffgr
.state
== VnffgRecordState
.ACTIVE
:
2592 elif vnffgr
.state
== VnffgRecordState
.FAILED
:
2593 event_descr
= "Instantiation of VNFFGR %s failed" % vnffgr
.id
2594 self
.record_event("vnffg-failed", event_descr
)
2595 new_state
= NetworkServiceRecordState
.FAILED
2598 self
._log
.info("VNFFGR %s in NSR %s is still not active; current state is: %s",
2599 vnffgr
.id, self
.id, vnffgr
.state
)
2600 new_state
= curr_state
2602 # Update all the scaling group instance operational status to
2603 # reflect the state of all VNFR within that instance
2604 yield from self
._update
_scale
_group
_instances
_status
()
2606 for _
, group
in self
._scaling
_groups
.items():
2607 if group
.state
== scale_group
.ScaleGroupState
.SCALING_OUT
:
2608 new_state
= NetworkServiceRecordState
.SCALING_OUT
2610 elif group
.state
== scale_group
.ScaleGroupState
.SCALING_IN
:
2611 new_state
= NetworkServiceRecordState
.SCALING_IN
2614 if new_state
!= curr_state
:
2615 self
._log
.debug("Changing state of Network service %s from %s to %s",
2616 self
.id, curr_state
, new_state
)
2617 if new_state
== NetworkServiceRecordState
.RUNNING
:
2618 yield from self
.is_active()
2619 elif new_state
== NetworkServiceRecordState
.FAILED
:
2620 # If the NS is already active and we entered scaling_in, scaling_out,
2621 # do not mark the NS as failing if scaling operation failed.
2622 if curr_state
in [NetworkServiceRecordState
.SCALING_OUT
,
2623 NetworkServiceRecordState
.SCALING_IN
] and self
._is
_active
:
2624 new_state
= NetworkServiceRecordState
.RUNNING
2625 self
.set_state(new_state
)
2627 yield from self
.instantiation_failed()
2629 self
.set_state(new_state
)
2631 yield from self
.publish()
2634 class InputParameterSubstitution(object):
2636 This class is responsible for substituting input parameters into an NSD.
2639 def __init__(self
, log
):
2640 """Create an instance of InputParameterSubstitution
2643 log - a logger for this object to use
2648 def __call__(self
, nsd
, nsr_config
):
2649 """Substitutes input parameters from the NSR config into the NSD
2651 This call modifies the provided NSD with the input parameters that are
2652 contained in the NSR config.
2655 nsd - a GI NSD object
2656 nsr_config - a GI NSR config object
2659 if nsd
is None or nsr_config
is None:
2662 # Create a lookup of the xpath elements that this descriptor allows
2664 optional_input_parameters
= set()
2665 for input_parameter
in nsd
.input_parameter_xpath
:
2666 optional_input_parameters
.add(input_parameter
.xpath
)
2668 # Apply the input parameters to the descriptor
2669 if nsr_config
.input_parameter
:
2670 for param
in nsr_config
.input_parameter
:
2671 if param
.xpath
not in optional_input_parameters
:
2672 msg
= "tried to set an invalid input parameter ({})"
2673 self
.log
.error(msg
.format(param
.xpath
))
2677 "input-parameter:{} = {}".format(
2684 xpath
.setxattr(nsd
, param
.xpath
, param
.value
)
2686 except Exception as e
:
2687 self
.log
.exception(e
)
2690 class NetworkServiceDescriptor(object):
2692 Network service descriptor class
2695 def __init__(self
, dts
, log
, loop
, nsd
, nsm
):
2707 """ Returns nsd id """
2712 """ Returns name of nsd """
2713 return self
._nsd
.name
2716 def ref_count(self
):
2717 """ Returns reference count"""
2718 return self
._ref
_count
2721 """ Returns whether nsd is in use or not """
2722 return True if self
.ref_count
> 0 else False
2725 """ Take a reference on this object """
2726 self
._ref
_count
+= 1
2729 """ Release reference on this object """
2730 if self
.ref_count
< 1:
2731 msg
= ("Unref on a NSD object - nsd id %s, ref_count = %s" %
2732 (self
.id, self
.ref_count
))
2733 self
._log
.critical(msg
)
2734 raise NetworkServiceDescriptorError(msg
)
2735 self
._ref
_count
-= 1
2739 """ Return the message associated with this NetworkServiceDescriptor"""
2743 def path_for_id(nsd_id
):
2744 """ Return path for the passed nsd_id"""
2745 return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id
)
2748 """ Return the message associated with this NetworkServiceDescriptor"""
2749 return NetworkServiceDescriptor
.path_for_id(self
.id)
2751 def update(self
, nsd
):
2752 """ Update the NSD descriptor """
2756 class NsdDtsHandler(object):
2757 """ The network service descriptor DTS handler """
2758 XPATH
= "C,/nsd:nsd-catalog/nsd:nsd"
2760 def __init__(self
, dts
, log
, loop
, nsm
):
2770 """ Return registration handle """
2775 """ Register for Nsd create/update/delete/read requests from dts """
2777 def on_apply(dts
, acg
, xact
, action
, scratch
):
2778 """Apply the configuration"""
2779 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
2780 self
._log
.debug("Got nsd apply cfg (xact:%s) (action:%s)",
2782 # Create/Update an NSD record
2783 for cfg
in self
._regh
.get_xact_elements(xact
):
2784 # Only interested in those NSD cfgs whose ID was received in prepare callback
2785 if cfg
.id in scratch
.get('nsds', []) or is_recovery
:
2786 self
._nsm
.update_nsd(cfg
)
2788 scratch
.pop('nsds', None)
2790 return RwTypes
.RwStatus
.SUCCESS
2793 def delete_nsd_libs(nsd_id
):
2794 """ Remove any files uploaded with NSD and stored under $RIFT_ARTIFACTS/libs/<id> """
2796 rift_artifacts_dir
= os
.environ
['RIFT_ARTIFACTS']
2797 nsd_dir
= os
.path
.join(rift_artifacts_dir
, 'launchpad/libs', nsd_id
)
2799 if os
.path
.exists (nsd_dir
):
2800 shutil
.rmtree(nsd_dir
, ignore_errors
=True)
2801 except Exception as e
:
2802 self
._log
.error("Exception in cleaning up NSD libs {}: {}".
2804 self
._log
.excpetion(e
)
2807 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
2808 """ Prepare callback from DTS for NSD config """
2810 self
._log
.info("Got nsd prepare - config received nsd id %s, msg %s",
2813 fref
= ProtobufC
.FieldReference
.alloc()
2814 fref
.goto_whole_message(msg
.to_pbcm())
2816 if fref
.is_field_deleted():
2817 # Delete an NSD record
2818 self
._log
.debug("Deleting NSD with id %s", msg
.id)
2819 if self
._nsm
.nsd_in_use(msg
.id):
2820 self
._log
.debug("Cannot delete NSD in use - %s", msg
.id)
2821 err
= "Cannot delete an NSD in use - %s" % msg
.id
2822 raise NetworkServiceDescriptorRefCountExists(err
)
2824 yield from delete_nsd_libs(msg
.id)
2825 self
._nsm
.delete_nsd(msg
.id)
2827 # Add this NSD to scratch to create/update in apply callback
2828 nsds
= scratch
.setdefault('nsds', [])
2830 # acg._scratch['nsds'].append(msg.id)
2832 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2835 "Registering for NSD config using xpath: %s",
2836 NsdDtsHandler
.XPATH
,
2839 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2840 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2841 # Need a list in scratch to store NSDs to create/update later
2842 # acg._scratch['nsds'] = list()
2843 self
._regh
= acg
.register(
2844 xpath
=NsdDtsHandler
.XPATH
,
2845 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
2846 on_prepare
=on_prepare
)
2849 class VnfdDtsHandler(object):
2850 """ DTS handler for VNFD config changes """
2851 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2853 def __init__(self
, dts
, log
, loop
, nsm
):
2862 """ DTS registration handle """
2867 """ Register for VNFD configuration"""
2870 def on_apply(dts
, acg
, xact
, action
, scratch
):
2871 """Apply the configuration"""
2872 self
._log
.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2873 xact
, action
, scratch
)
2875 # Create/Update a VNFD record
2876 for cfg
in self
._regh
.get_xact_elements(xact
):
2877 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2878 if cfg
.id in scratch
.get('vnfds', []):
2879 self
._nsm
.update_vnfd(cfg
)
2881 for cfg
in self
._regh
.elements
:
2882 if cfg
.id in scratch
.get('deleted_vnfds', []):
2883 yield from self
._nsm
.delete_vnfd(cfg
.id)
2885 scratch
.pop('vnfds', None)
2886 scratch
.pop('deleted_vnfds', None)
2889 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
2890 """ on prepare callback """
2891 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2892 ks_path
.to_xpath(RwNsmYang
.get_schema()), xact_info
.query_action
, msg
)
2894 fref
= ProtobufC
.FieldReference
.alloc()
2895 fref
.goto_whole_message(msg
.to_pbcm())
2897 # Handle deletes in prepare_callback, but adds/updates in apply_callback
2898 if fref
.is_field_deleted():
2899 self
._log
.debug("Adding msg to deleted field")
2900 deleted_vnfds
= scratch
.setdefault('deleted_vnfds', [])
2901 deleted_vnfds
.append(msg
.id)
2903 # Add this VNFD to scratch to create/update in apply callback
2904 vnfds
= scratch
.setdefault('vnfds', [])
2905 vnfds
.append(msg
.id)
2907 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2910 "Registering for VNFD config using xpath: %s",
2911 VnfdDtsHandler
.XPATH
,
2913 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2914 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2915 # Need a list in scratch to store VNFDs to create/update later
2916 # acg._scratch['vnfds'] = list()
2917 # acg._scratch['deleted_vnfds'] = list()
2918 self
._regh
= acg
.register(
2919 xpath
=VnfdDtsHandler
.XPATH
,
2920 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
2921 on_prepare
=on_prepare
)
2923 class NsrRpcDtsHandler(object):
2924 """ The network service instantiation RPC DTS handler """
2925 EXEC_NSR_CONF_XPATH
= "I,/nsr:start-network-service"
2926 EXEC_NSR_CONF_O_XPATH
= "O,/nsr:start-network-service"
2927 NETCONF_IP_ADDRESS
= "127.0.0.1"
2929 RESTCONF_PORT
= 8888
2930 NETCONF_USER
= "admin"
2931 NETCONF_PW
= "admin"
2932 REST_BASE_V2_URL
= 'https://{}:{}/v2/api/'.format("127.0.0.1",8888)
2934 def __init__(self
, dts
, log
, loop
, nsm
):
2941 self
._ns
_regh
= None
2943 self
._manager
= None
2944 self
._nsr
_config
_url
= NsrRpcDtsHandler
.REST_BASE_V2_URL
+ 'config/ns-instance-config'
2946 self
._model
= RwYang
.Model
.create_libncx()
2947 self
._model
.load_schema_ypbc(RwNsrYang
.get_schema())
2951 """ Return the NS manager instance """
2955 def wrap_netconf_config_xml(xml
):
2956 xml
= '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml
)
2960 def _connect(self
, timeout_secs
=240):
2962 start_time
= time
.time()
2963 while (time
.time() - start_time
) < timeout_secs
:
2966 self
._log
.debug("Attemping NsmTasklet netconf connection.")
2968 manager
= yield from ncclient
.asyncio_manager
.asyncio_connect(
2970 host
=NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
,
2971 port
=NsrRpcDtsHandler
.NETCONF_PORT
,
2972 username
=NsrRpcDtsHandler
.NETCONF_USER
,
2973 password
=NsrRpcDtsHandler
.NETCONF_PW
,
2975 look_for_keys
=False,
2976 hostkey_verify
=False,
2981 except ncclient
.transport
.errors
.SSHError
as e
:
2982 self
._log
.warning("Netconf connection to launchpad %s failed: %s",
2983 NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
, str(e
))
2985 yield from asyncio
.sleep(5, loop
=self
._loop
)
2987 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
2990 def _apply_ns_instance_config(self
,payload_dict
):
2991 #self._log.debug("At apply NS instance config with payload %s",payload_dict)
2992 req_hdr
= {'accept':'application/vnd.yang.data+json','content-type':'application/vnd.yang.data+json'}
2993 response
=requests
.post(self
._nsr
_config
_url
, headers
=req_hdr
, auth
=('admin', 'admin'),data
=payload_dict
,verify
=False)
2998 """ Register for NS monitoring read from dts """
3000 def on_ns_config_prepare(xact_info
, action
, ks_path
, msg
):
3001 """ prepare callback from dts start-network-service"""
3002 assert action
== rwdts
.QueryAction
.RPC
3004 rpc_op
= NsrYang
.YangOutput_Nsr_StartNetworkService
.from_dict({
3005 "nsr_id":str(uuid
.uuid4())
3008 if not ('name' in rpc_ip
and 'nsd_ref' in rpc_ip
and ('cloud_account' in rpc_ip
or 'om_datacenter' in rpc_ip
)):
3009 self
._log
.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip
))
3012 self
._log
.debug("start-network-service RPC input: {}".format(rpc_ip
))
3015 # Add used value to the pool
3016 self
._log
.debug("RPC output: {}".format(rpc_op
))
3018 nsd_copy
= self
.nsm
.get_nsd(rpc_ip
.nsd_ref
)
3020 #if not self._manager:
3021 # self._manager = yield from self._connect()
3023 self
._log
.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
3024 rpc_ip
.name
, rpc_ip
.nsd_ref
)
3026 ns_instance_config_dict
= {"id":rpc_op
.nsr_id
, "admin_status":"ENABLED"}
3027 ns_instance_config_copy_dict
= {k
:v
for k
, v
in rpc_ip
.as_dict().items()
3028 if k
in RwNsrYang
.YangData_Nsr_NsInstanceConfig_Nsr().fields
}
3029 ns_instance_config_dict
.update(ns_instance_config_copy_dict
)
3031 ns_instance_config
= RwNsrYang
.YangData_Nsr_NsInstanceConfig_Nsr
.from_dict(ns_instance_config_dict
)
3032 ns_instance_config
.nsd
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_Nsd()
3033 ns_instance_config
.nsd
.from_dict(nsd_copy
.msg
.as_dict())
3035 payload_dict
= ns_instance_config
.to_json(self
._model
)
3036 #xml = ns_instance_config.to_xml_v2(self._model)
3037 #netconf_xml = self.wrap_netconf_config_xml(xml)
3039 #self._log.debug("Sending configure ns-instance-config xml to %s: %s",
3040 # netconf_xml, NsrRpcDtsHandler.NETCONF_IP_ADDRESS)
3041 self
._log
.debug("Sending configure ns-instance-config json to %s: %s",
3042 self
._nsr
_config
_url
,ns_instance_config
)
3044 #response = yield from self._manager.edit_config(
3046 # config=netconf_xml,
3048 response
= yield from self
._loop
.run_in_executor(
3050 self
._apply
_ns
_instance
_config
,
3053 response
.raise_for_status()
3054 self
._log
.debug("Received edit config response: %s", response
.json())
3056 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
,
3057 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
,
3059 except Exception as e
:
3060 self
._log
.error("Exception processing the "
3061 "start-network-service: {}".format(e
))
3062 self
._log
.exception(e
)
3063 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
3064 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
)
3067 hdl_ns
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_ns_config_prepare
,)
3069 with self
._dts
.group_create() as group
:
3070 self
._ns
_regh
= group
.register(xpath
=NsrRpcDtsHandler
.EXEC_NSR_CONF_XPATH
,
3072 flags
=rwdts
.Flag
.PUBLISHER
,
3076 class NsrDtsHandler(object):
3077 """ The network service DTS handler """
3078 NSR_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr"
3079 SCALE_INSTANCE_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3080 KEY_PAIR_XPATH
= "C,/nsr:key-pair"
3082 def __init__(self
, dts
, log
, loop
, nsm
):
3088 self
._nsr
_regh
= None
3089 self
._scale
_regh
= None
3090 self
._key
_pair
_regh
= None
3094 """ Return the NS manager instance """
3099 """ Register for Nsr create/update/delete/read requests from dts """
3101 def nsr_id_from_keyspec(ks
):
3102 nsr_path_entry
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr
.schema().keyspec_to_entry(ks
)
3103 nsr_id
= nsr_path_entry
.key00
.id
3106 def group_name_from_keyspec(ks
):
3107 group_path_entry
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
.schema().keyspec_to_entry(ks
)
3108 group_name
= group_path_entry
.key00
.scaling_group_name_ref
3111 def is_instance_in_reg_elements(nsr_id
, group_name
, instance_id
):
3112 """ Return boolean indicating if scaling group instance was already commited previously.
3114 By looking at the existing elements in this registration handle (elements not part
3115 of this current xact), we can tell if the instance was configured previously without
3116 keeping any application state.
3118 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3119 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3120 elem_group_name
= group_name_from_keyspec(keyspec
)
3122 if elem_nsr_id
!= nsr_id
or group_name
!= elem_group_name
:
3125 if instance_cfg
.id == instance_id
:
3130 def get_scale_group_instance_delta(nsr_id
, group_name
, xact
):
3131 delta
= {"added": [], "deleted": []}
3132 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3133 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3134 if elem_nsr_id
!= nsr_id
:
3137 elem_group_name
= group_name_from_keyspec(keyspec
)
3138 if elem_group_name
!= group_name
:
3141 delta
["added"].append(instance_cfg
.id)
3143 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(include_keyspec
=True):
3144 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3145 if elem_nsr_id
!= nsr_id
:
3148 elem_group_name
= group_name_from_keyspec(keyspec
)
3149 if elem_group_name
!= group_name
:
3152 if instance_cfg
.id in delta
["added"]:
3153 delta
["added"].remove(instance_cfg
.id)
3155 delta
["deleted"].append(instance_cfg
.id)
3160 def update_nsr_nsd(nsr_id
, xact
, scratch
):
3163 def get_nsr_vl_delta(nsr_id
, xact
, scratch
):
3164 delta
= {"added": [], "deleted": []}
3165 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3166 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3167 if elem_nsr_id
!= nsr_id
:
3170 if 'vld' in instance_cfg
.nsd
:
3171 for vld
in instance_cfg
.nsd
.vld
:
3172 delta
["added"].append(vld
)
3174 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3175 self
._log
.debug("NSR update: %s", instance_cfg
)
3176 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3177 if elem_nsr_id
!= nsr_id
:
3180 if 'vld' in instance_cfg
.nsd
:
3181 for vld
in instance_cfg
.nsd
.vld
:
3182 if vld
in delta
["added"]:
3183 delta
["added"].remove(vld
)
3185 delta
["deleted"].append(vld
)
3189 vl_delta
= yield from get_nsr_vl_delta(nsr_id
, xact
, scratch
)
3190 self
._log
.debug("Got NSR:%s VL instance delta: %s", nsr_id
, vl_delta
)
3192 for vld
in vl_delta
["added"]:
3193 yield from self
._nsm
.nsr_instantiate_vl(nsr_id
, vld
)
3195 for vld
in vl_delta
["deleted"]:
3196 yield from self
._nsm
.nsr_terminate_vl(nsr_id
, vld
)
3198 def get_add_delete_update_cfgs(dts_member_reg
, xact
, key_name
, scratch
):
3199 # Unfortunately, it is currently difficult to figure out what has exactly
3200 # changed in this xact without Pbdelta support (RIFT-4916)
3201 # As a workaround, we can fetch the pre and post xact elements and
3202 # perform a comparison to figure out adds/deletes/updates
3203 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
3204 curr_cfgs
= list(dts_member_reg
.elements
)
3206 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
3207 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
3210 added_keys
= set(xact_key_map
) - set(curr_key_map
)
3211 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
3214 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
3215 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
3218 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
3219 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
3220 if xact_key_map
[key
] != curr_key_map
[key
]]
3222 return added_cfgs
, deleted_cfgs
, updated_cfgs
3224 def get_nsr_key_pairs(dts_member_reg
, xact
):
3226 for instance_cfg
, keyspec
in dts_member_reg
.get_xact_elements(xact
, include_keyspec
=True):
3227 self
._log
.debug("Key pair received is {} KS: {}".format(instance_cfg
, keyspec
))
3228 xpath
= keyspec
.to_xpath(RwNsrYang
.get_schema())
3229 key_pairs
[instance_cfg
.name
] = instance_cfg
3232 def on_apply(dts
, acg
, xact
, action
, scratch
):
3233 """Apply the configuration"""
3234 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3235 xact
, action
, scratch
)
3237 def handle_create_nsr(msg
, key_pairs
=None, restart_mode
=False):
3238 # Handle create nsr requests """
3239 # Do some validations
3240 if not msg
.has_field("nsd"):
3241 err
= "NSD not provided"
3242 self
._log
.error(err
)
3243 raise NetworkServiceRecordError(err
)
3245 self
._log
.debug("Creating NetworkServiceRecord %s from nsr config %s",
3246 msg
.id, msg
.as_dict())
3247 nsr
= self
.nsm
.create_nsr(msg
, key_pairs
=key_pairs
, restart_mode
=restart_mode
)
3250 def handle_delete_nsr(msg
):
3252 def delete_instantiation(ns_id
):
3253 """ Delete instantiation """
3254 with self
._dts
.transaction() as xact
:
3255 yield from self
._nsm
.terminate_ns(ns_id
, xact
)
3257 # Handle delete NSR requests
3258 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
3259 # Terminate the NSR instance
3260 nsr
= self
._nsm
.get_ns_by_nsr_id(msg
.id)
3262 nsr
.set_state(NetworkServiceRecordState
.TERMINATE_RCVD
)
3263 event_descr
= "Terminate rcvd for NS Id:%s" % msg
.id
3264 nsr
.record_event("terminate-rcvd", event_descr
)
3266 self
._loop
.create_task(delete_instantiation(msg
.id))
3269 def begin_instantiation(nsr
):
3270 # Begin instantiation
3271 self
._log
.info("Beginning NS instantiation: %s", nsr
.id)
3272 yield from self
._nsm
.instantiate_ns(nsr
.id, xact
)
3274 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3275 xact
, action
, scratch
)
3277 if action
== rwdts
.AppconfAction
.INSTALL
and xact
.id is None:
3279 for element
in self
._key
_pair
_regh
.elements
:
3280 key_pairs
.append(element
)
3281 for element
in self
._nsr
_regh
.elements
:
3282 nsr
= handle_create_nsr(element
, key_pairs
, restart_mode
=True)
3283 self
._loop
.create_task(begin_instantiation(nsr
))
3286 (added_msgs
, deleted_msgs
, updated_msgs
) = get_add_delete_update_cfgs(self
._nsr
_regh
,
3290 self
._log
.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs
,
3291 deleted_msgs
, updated_msgs
)
3293 for msg
in added_msgs
:
3294 if msg
.id not in self
._nsm
.nsrs
:
3295 self
._log
.info("Create NSR received in on_apply to instantiate NS:%s", msg
.id)
3296 key_pairs
= get_nsr_key_pairs(self
._key
_pair
_regh
, xact
)
3297 nsr
= handle_create_nsr(msg
,key_pairs
)
3298 self
._loop
.create_task(begin_instantiation(nsr
))
3300 for msg
in deleted_msgs
:
3301 self
._log
.info("Delete NSR received in on_apply to terminate NS:%s", msg
.id)
3303 handle_delete_nsr(msg
)
3305 self
._log
.exception("Failed to terminate NS:%s", msg
.id)
3307 for msg
in updated_msgs
:
3308 self
._log
.info("Update NSR received in on_apply: %s", msg
)
3310 self
._nsm
.nsr_update_cfg(msg
.id, msg
)
3313 self
._loop
.create_task(update_nsr_nsd(msg
.id, xact
, scratch
))
3315 for group
in msg
.scaling_group
:
3316 instance_delta
= get_scale_group_instance_delta(msg
.id, group
.scaling_group_name_ref
, xact
)
3317 self
._log
.debug("Got NSR:%s scale group instance delta: %s", msg
.id, instance_delta
)
3319 for instance_id
in instance_delta
["added"]:
3320 self
._nsm
.scale_nsr_out(msg
.id, group
.scaling_group_name_ref
, instance_id
, xact
)
3322 for instance_id
in instance_delta
["deleted"]:
3323 self
._nsm
.scale_nsr_in(msg
.id, group
.scaling_group_name_ref
, instance_id
)
3326 return RwTypes
.RwStatus
.SUCCESS
3329 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
3330 """ Prepare calllback from DTS for NSR """
3332 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3333 action
= xact_info
.query_action
3335 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3336 xact
, action
, xact_info
, xpath
, msg
3340 def delete_instantiation(ns_id
):
3341 """ Delete instantiation """
3342 yield from self
._nsm
.terminate_ns(ns_id
, None)
3344 def handle_delete_nsr():
3345 """ Handle delete NSR requests """
3346 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
3347 # Terminate the NSR instance
3348 nsr
= self
._nsm
.get_ns_by_nsr_id(msg
.id)
3350 nsr
.set_state(NetworkServiceRecordState
.TERMINATE_RCVD
)
3351 event_descr
= "Terminate rcvd for NS Id:%s" % msg
.id
3352 nsr
.record_event("terminate-rcvd", event_descr
)
3354 self
._loop
.create_task(delete_instantiation(msg
.id))
3356 fref
= ProtobufC
.FieldReference
.alloc()
3357 fref
.goto_whole_message(msg
.to_pbcm())
3359 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
, rwdts
.QueryAction
.DELETE
]:
3360 # if this is an NSR create
3361 if action
!= rwdts
.QueryAction
.DELETE
and msg
.id not in self
._nsm
.nsrs
:
3362 # Ensure the Cloud account/datacenter has been specified
3363 if not msg
.has_field("cloud_account") and not msg
.has_field("om_datacenter"):
3364 raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR")
3366 # Check if nsd is specified
3367 if not msg
.has_field("nsd"):
3368 raise NsrInstantiationFailed("NSD not specified in NSR")
3371 nsr
= self
._nsm
.nsrs
[msg
.id]
3373 if msg
.has_field("nsd"):
3374 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3375 raise NsrVlUpdateError("Unable to update VL when NSR not in running state")
3376 if 'vld' not in msg
.nsd
or len(msg
.nsd
.vld
) == 0:
3377 raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined")
3379 if msg
.has_field("scaling_group"):
3380 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3381 raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
3383 if len(msg
.scaling_group
) > 1:
3384 raise ScalingOperationError("Only a single scaling group can be configured at a time")
3386 for group_msg
in msg
.scaling_group
:
3387 num_new_group_instances
= len(group_msg
.instance
)
3388 if num_new_group_instances
> 1:
3389 raise ScalingOperationError("Only a single scaling instance can be modified at a time")
3391 elif num_new_group_instances
== 1:
3392 scale_group
= nsr
.scaling_groups
[group_msg
.scaling_group_name_ref
]
3393 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
3394 if len(scale_group
.instances
) == scale_group
.max_instance_count
:
3395 raise ScalingOperationError("Max instances for %s reached" % scale_group
)
3397 acg
.handle
.prepare_complete_ok(xact_info
.handle
)
3400 self
._log
.debug("Registering for NSR config using xpath: %s",
3401 NsrDtsHandler
.NSR_XPATH
)
3403 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
3404 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
3405 self
._nsr
_regh
= acg
.register(xpath
=NsrDtsHandler
.NSR_XPATH
,
3406 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
3407 on_prepare
=on_prepare
)
3409 self
._scale
_regh
= acg
.register(
3410 xpath
=NsrDtsHandler
.SCALE_INSTANCE_XPATH
,
3411 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY| rwdts
.Flag
.CACHE
,
3414 self
._key
_pair
_regh
= acg
.register(
3415 xpath
=NsrDtsHandler
.KEY_PAIR_XPATH
,
3416 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
3420 class NsrOpDataDtsHandler(object):
3421 """ The network service op data DTS handler """
3422 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
3424 def __init__(self
, dts
, log
, loop
, nsm
):
3433 """ Return the registration handle"""
3438 """ Return the NS manager instance """
3443 """ Register for Nsr op data publisher registration"""
3444 self
._log
.debug("Registering Nsr op data path %s as publisher",
3445 NsrOpDataDtsHandler
.XPATH
)
3447 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
3448 handlers
= rift
.tasklets
.Group
.Handler()
3449 with self
._dts
.group_create(handler
=handlers
) as group
:
3450 self
._regh
= group
.register(xpath
=NsrOpDataDtsHandler
.XPATH
,
3452 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ | rwdts
.Flag
.DATASTORE
)
3455 def create(self
, path
, msg
):
3457 Create an NS record in DTS with the path and message
3459 self
._log
.debug("Creating NSR %s:%s", path
, msg
)
3460 self
.regh
.create_element(path
, msg
)
3461 self
._log
.debug("Created NSR, %s:%s", path
, msg
)
3464 def update(self
, path
, msg
, flags
=rwdts
.XactFlag
.REPLACE
):
3466 Update an NS record in DTS with the path and message
3468 self
._log
.debug("Updating NSR, %s:%s regh = %s", path
, msg
, self
.regh
)
3469 self
.regh
.update_element(path
, msg
, flags
)
3470 self
._log
.debug("Updated NSR, %s:%s", path
, msg
)
3473 def delete(self
, path
):
3475 Update an NS record in DTS with the path and message
3477 self
._log
.debug("Deleting NSR path:%s", path
)
3478 self
.regh
.delete_element(path
)
3479 self
._log
.debug("Deleted NSR path:%s", path
)
3482 class VnfrDtsHandler(object):
3483 """ The virtual network service DTS handler """
3484 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
3486 def __init__(self
, dts
, log
, loop
, nsm
):
3496 """ Return registration handle """
3501 """ Return the NS manager instance """
3506 """ Register for vnfr create/update/delete/ advises from dts """
3508 def on_commit(xact_info
):
3509 """ The transaction has been committed """
3510 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
3511 return rwdts
.MemberRspCode
.ACTION_OK
3514 def on_prepare(xact_info
, action
, ks_path
, msg
):
3515 """ prepare callback from dts """
3516 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3518 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
3519 xact_info
, action
, ks_path
, msg
3522 schema
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
3523 path_entry
= schema
.keyspec_to_entry(ks_path
)
3524 if path_entry
.key00
.id not in self
._nsm
._vnfrs
:
3525 self
._log
.error("%s request for non existent record path %s",
3527 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
, xpath
)
3531 self
._log
.debug("Deleting VNFR with id %s", path_entry
.key00
.id)
3532 if action
== rwdts
.QueryAction
.CREATE
or action
== rwdts
.QueryAction
.UPDATE
:
3533 yield from self
._nsm
.update_vnfr(msg
)
3534 elif action
== rwdts
.QueryAction
.DELETE
:
3535 self
._log
.debug("Deleting VNFR with id %s", path_entry
.key00
.id)
3536 self
._nsm
.delete_vnfr(path_entry
.key00
.id)
3538 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, xpath
)
3540 self
._log
.debug("Registering for VNFR using xpath: %s",
3541 VnfrDtsHandler
.XPATH
,)
3543 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
3544 on_prepare
=on_prepare
,)
3545 with self
._dts
.group_create() as group
:
3546 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
3548 flags
=(rwdts
.Flag
.SUBSCRIBER
),)
3551 class NsdRefCountDtsHandler(object):
3552 """ The NSD Ref Count DTS handler """
3553 XPATH
= "D,/nsr:ns-instance-opdata/rw-nsr:nsd-ref-count"
3555 def __init__(self
, dts
, log
, loop
, nsm
):
3565 """ Return registration handle """
3570 """ Return the NS manager instance """
3575 """ Register for NSD ref count read from dts """
3578 def on_prepare(xact_info
, action
, ks_path
, msg
):
3579 """ prepare callback from dts """
3580 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3582 if action
== rwdts
.QueryAction
.READ
:
3583 schema
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_NsdRefCount
.schema()
3584 path_entry
= schema
.keyspec_to_entry(ks_path
)
3585 nsd_list
= yield from self
._nsm
.get_nsd_refcount(path_entry
.key00
.nsd_id_ref
)
3586 for xpath
, msg
in nsd_list
:
3587 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
3590 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
3592 raise NetworkServiceRecordError("Not supported operation %s" % action
)
3594 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
3595 with self
._dts
.group_create() as group
:
3596 self
._regh
= group
.register(xpath
=NsdRefCountDtsHandler
.XPATH
,
3598 flags
=rwdts
.Flag
.PUBLISHER
,)
3601 class NsManager(object):
3602 """ The Network Service Manager class"""
3603 def __init__(self
, dts
, log
, loop
,
3604 nsr_handler
, vnfr_handler
, vlr_handler
, ro_plugin_selector
,
3605 vnffgmgr
, vnfd_pub_handler
, cloud_account_handler
):
3609 self
._nsr
_handler
= nsr_handler
3610 self
._vnfr
_pub
_handler
= vnfr_handler
3611 self
._vlr
_pub
_handler
= vlr_handler
3612 self
._vnffgmgr
= vnffgmgr
3613 self
._vnfd
_pub
_handler
= vnfd_pub_handler
3614 self
._cloud
_account
_handler
= cloud_account_handler
3616 self
._ro
_plugin
_selector
= ro_plugin_selector
3617 self
._ncclient
= rift
.mano
.ncclient
.NcClient(
3629 self
.cfgmgr_obj
= conman
.ROConfigManager(log
, loop
, dts
, self
)
3631 # TODO: All these handlers should move to tasklet level.
3632 # Passing self is often an indication of bad design
3633 self
._nsd
_dts
_handler
= NsdDtsHandler(dts
, log
, loop
, self
)
3634 self
._vnfd
_dts
_handler
= VnfdDtsHandler(dts
, log
, loop
, self
)
3635 self
._dts
_handlers
= [self
._nsd
_dts
_handler
,
3636 VnfrDtsHandler(dts
, log
, loop
, self
),
3637 NsdRefCountDtsHandler(dts
, log
, loop
, self
),
3638 NsrDtsHandler(dts
, log
, loop
, self
),
3639 ScalingRpcHandler(log
, dts
, loop
, self
.scale_rpc_callback
),
3640 NsrRpcDtsHandler(dts
,log
,loop
,self
),
3641 self
._vnfd
_dts
_handler
,
3662 def nsr_handler(self
):
3663 """" NSR handler """
3664 return self
._nsr
_handler
3668 """" So Obj handler """
3673 """ NSRs in this NSM"""
3678 """ NSDs in this NSM"""
3683 """ VNFDs in this NSM"""
3688 """ VNFRs in this NSM"""
3692 def nsr_pub_handler(self
):
3693 """ NSR publication handler """
3694 return self
._nsr
_handler
3697 def vnfr_pub_handler(self
):
3698 """ VNFR publication handler """
3699 return self
._vnfr
_pub
_handler
3702 def vlr_pub_handler(self
):
3703 """ VLR publication handler """
3704 return self
._vlr
_pub
_handler
3707 def vnfd_pub_handler(self
):
3708 return self
._vnfd
_pub
_handler
3712 """ Register all static DTS handlers """
3713 for dts_handle
in self
._dts
_handlers
:
3714 yield from dts_handle
.register()
3717 def get_ns_by_nsr_id(self
, nsr_id
):
3718 """ get NSR by nsr id """
3719 if nsr_id
not in self
._nsrs
:
3720 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id
)
3722 return self
._nsrs
[nsr_id
]
3724 def scale_nsr_out(self
, nsr_id
, scale_group_name
, instance_id
, config_xact
):
3725 self
.log
.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3730 nsr
= self
._nsrs
[nsr_id
]
3731 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3732 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3734 self
._loop
.create_task(nsr
.create_scale_group_instance(scale_group_name
, instance_id
, config_xact
))
3736 def scale_nsr_in(self
, nsr_id
, scale_group_name
, instance_id
):
3737 self
.log
.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3742 nsr
= self
._nsrs
[nsr_id
]
3743 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3744 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3746 self
._loop
.create_task(nsr
.delete_scale_group_instance(scale_group_name
, instance_id
))
3748 def scale_rpc_callback(self
, xact
, msg
, action
):
3749 """Callback handler for RPC calls
3751 xact : Transaction Handler
3753 action : Scaling Action
3755 ScalingGroupInstance
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance
3756 ScalingGroup
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
3758 xpath
= ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format(
3760 instance
= ScalingGroupInstance
.from_dict({"id": msg
.instance_id
})
3763 def get_nsr_scaling_group():
3764 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
3766 for result
in results
:
3767 res
= yield from result
3768 nsr_config
= res
.result
3770 for scaling_group
in nsr_config
.scaling_group
:
3771 if scaling_group
.scaling_group_name_ref
== msg
.scaling_group_name_ref
:
3774 scaling_group
= nsr_config
.scaling_group
.add()
3775 scaling_group
.scaling_group_name_ref
= msg
.scaling_group_name_ref
3777 return (nsr_config
, scaling_group
)
3780 def update_config(nsr_config
):
3781 xml
= self
._ncclient
.convert_to_xml(RwNsrYang
, nsr_config
)
3782 xml
= '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml
)
3783 yield from self
._ncclient
.connect()
3784 yield from self
._ncclient
.manager
.edit_config(target
="running", config
=xml
, default_operation
="replace")
3788 nsr_config
, scaling_group
= yield from get_nsr_scaling_group()
3789 scaling_group
.instance
.append(instance
)
3790 yield from update_config(nsr_config
)
3794 nsr_config
, scaling_group
= yield from get_nsr_scaling_group()
3795 scaling_group
.instance
.remove(instance
)
3796 yield from update_config(nsr_config
)
3798 if action
== ScalingRpcHandler
.ACTION
.SCALE_OUT
:
3799 self
._loop
.create_task(scale_out())
3801 self
._loop
.create_task(scale_in())
3803 # Opdata based calls, disabled for now!
3804 # if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3805 # self.scale_nsr_out(
3807 # msg.scaling_group_name_ref,
3811 # self.scale_nsr_in(
3813 # msg.scaling_group_name_ref,
3816 def nsr_update_cfg(self
, nsr_id
, msg
):
3817 nsr
= self
._nsrs
[nsr_id
]
3818 nsr
.nsr_cfg_msg
= msg
3820 def nsr_instantiate_vl(self
, nsr_id
, vld
):
3821 self
.log
.debug("NSR {} create VL {}".format(nsr_id
, vld
))
3822 nsr
= self
._nsrs
[nsr_id
]
3823 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3824 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
3826 # Not calling in a separate task as this is called from a separate task
3827 yield from nsr
.create_vl_instance(vld
)
3829 def nsr_terminate_vl(self
, nsr_id
, vld
):
3830 self
.log
.debug("NSR {} delete VL {}".format(nsr_id
, vld
.id))
3831 nsr
= self
._nsrs
[nsr_id
]
3832 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3833 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
3835 # Not calling in a separate task as this is called from a separate task
3836 yield from nsr
.delete_vl_instance(vld
)
3838 def create_nsr(self
, nsr_msg
, key_pairs
=None,restart_mode
=False):
3839 """ Create an NSR instance """
3840 if nsr_msg
.id in self
._nsrs
:
3841 msg
= "NSR id %s already exists" % nsr_msg
.id
3842 self
._log
.error(msg
)
3843 raise NetworkServiceRecordError(msg
)
3845 self
._log
.info("Create NetworkServiceRecord nsr id %s from nsd_id %s",
3849 nsm_plugin
= self
._ro
_plugin
_selector
.ro_plugin
3850 sdn_account_name
= self
._cloud
_account
_handler
.get_cloud_account_sdn_name(nsr_msg
.cloud_account
)
3852 nsr
= NetworkServiceRecord(self
._dts
,
3860 restart_mode
=restart_mode
,
3861 vlr_handler
=self
._ro
_plugin
_selector
._records
_publisher
._vlr
_pub
_hdlr
3863 self
._nsrs
[nsr_msg
.id] = nsr
3864 nsm_plugin
.create_nsr(nsr_msg
, nsr_msg
.nsd
, key_pairs
)
3868 def delete_nsr(self
, nsr_id
):
3870 Delete NSR with the passed nsr id
3872 del self
._nsrs
[nsr_id
]
3875 def instantiate_ns(self
, nsr_id
, config_xact
):
3876 """ Instantiate an NS instance """
3877 self
._log
.debug("Instantiating Network service id %s", nsr_id
)
3878 if nsr_id
not in self
._nsrs
:
3879 err
= "NSR id %s not found " % nsr_id
3880 self
._log
.error(err
)
3881 raise NetworkServiceRecordError(err
)
3883 nsr
= self
._nsrs
[nsr_id
]
3884 yield from nsr
.nsm_plugin
.instantiate_ns(nsr
, config_xact
)
3887 def update_vnfr(self
, vnfr
):
3888 """Create/Update an VNFR """
3890 vnfr_state
= self
._vnfrs
[vnfr
.id].state
3891 self
._log
.debug("Updating VNFR with state %s: vnfr %s", vnfr_state
, vnfr
)
3893 yield from self
._vnfrs
[vnfr
.id].update_state(vnfr
)
3894 nsr
= self
.find_nsr_for_vnfr(vnfr
.id)
3895 yield from nsr
.update_state()
3897 def find_nsr_for_vnfr(self
, vnfr_id
):
3898 """ Find the NSR which )has the passed vnfr id"""
3899 for nsr
in list(self
.nsrs
.values()):
3900 for vnfr
in list(nsr
.vnfrs
.values()):
3901 if vnfr
.id == vnfr_id
:
3905 def delete_vnfr(self
, vnfr_id
):
3906 """ Delete VNFR with the passed id"""
3907 del self
._vnfrs
[vnfr_id
]
3909 def get_nsd_ref(self
, nsd_id
):
3910 """ Get network service descriptor for the passed nsd_id
3912 nsd
= self
.get_nsd(nsd_id
)
3917 def get_nsr_config(self
, nsd_id
):
3918 xpath
= "C,/nsr:ns-instance-config"
3919 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
3921 for result
in results
:
3922 entry
= yield from result
3923 ns_instance_config
= entry
.result
3925 for nsr
in ns_instance_config
.nsr
:
3926 if nsr
.nsd
.id == nsd_id
:
3932 def nsd_unref_by_nsr_id(self
, nsr_id
):
3933 """ Unref the network service descriptor based on NSR id """
3934 self
._log
.debug("NSR Unref called for Nsr Id:%s", nsr_id
)
3935 if nsr_id
in self
._nsrs
:
3936 nsr
= self
._nsrs
[nsr_id
]
3939 nsd
= self
.get_nsd(nsr
.nsd_id
)
3940 self
._log
.debug("Releasing ref on NSD %s held by NSR %s - Curr %d",
3941 nsd
.id, nsr
.id, nsd
.ref_count
)
3943 except NetworkServiceDescriptorError
:
3944 # We store a copy of NSD in NSR and the NSD in nsd-catalog
3949 self
._log
.error("Cannot find NSR with id %s", nsr_id
)
3950 raise NetworkServiceDescriptorUnrefError("No NSR with id" % nsr_id
)
3953 def nsd_unref(self
, nsd_id
):
3954 """ Unref the network service descriptor associated with the id """
3955 nsd
= self
.get_nsd(nsd_id
)
3958 def get_nsd(self
, nsd_id
):
3959 """ Get network service descriptor for the passed nsd_id"""
3960 if nsd_id
not in self
._nsds
:
3961 self
._log
.error("Cannot find NSD id:%s", nsd_id
)
3962 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id
)
3964 return self
._nsds
[nsd_id
]
3966 def create_nsd(self
, nsd_msg
):
3967 """ Create a network service descriptor """
3968 self
._log
.debug("Create network service descriptor - %s", nsd_msg
)
3969 if nsd_msg
.id in self
._nsds
:
3970 self
._log
.error("Cannot create NSD %s -NSD ID already exists", nsd_msg
)
3971 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg
.id)
3973 nsd
= NetworkServiceDescriptor(
3980 self
._nsds
[nsd_msg
.id] = nsd
3984 def update_nsd(self
, nsd
):
3985 """ update the Network service descriptor """
3986 self
._log
.debug("Update network service descriptor - %s", nsd
)
3987 if nsd
.id not in self
._nsds
:
3988 self
._log
.debug("No NSD found - creating NSD id = %s", nsd
.id)
3989 self
.create_nsd(nsd
)
3991 self
._log
.debug("Updating NSD id = %s, nsd = %s", nsd
.id, nsd
)
3992 self
._nsds
[nsd
.id].update(nsd
)
3994 def delete_nsd(self
, nsd_id
):
3995 """ Delete the Network service descriptor with the passed id """
3996 self
._log
.debug("Deleting the network service descriptor - %s", nsd_id
)
3997 if nsd_id
not in self
._nsds
:
3998 self
._log
.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id
)
3999 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id
)
4001 if nsd_id
not in self
._nsds
:
4002 self
._log
.debug("Cannot delete NSD id %s reference exists %s",
4004 self
._nsds
[nsd_id
].ref_count
)
4005 raise NetworkServiceDescriptorRefCountExists(
4006 "Cannot delete :%s, ref_count:%s",
4008 self
._nsds
[nsd_id
].ref_count
)
4010 del self
._nsds
[nsd_id
]
4012 def get_vnfd_config(self
, xact
):
4013 vnfd_dts_reg
= self
._vnfd
_dts
_handler
.regh
4014 for cfg
in vnfd_dts_reg
.get_xact_elements(xact
):
4015 if cfg
.id not in self
._vnfds
:
4016 self
.create_vnfd(cfg
)
4018 def get_vnfd(self
, vnfd_id
, xact
):
4019 """ Get virtual network function descriptor for the passed vnfd_id"""
4020 if vnfd_id
not in self
._vnfds
:
4021 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
4022 self
.get_vnfd_config(xact
)
4024 if vnfd_id
not in self
._vnfds
:
4025 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
4026 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id
)
4028 return self
._vnfds
[vnfd_id
]
4030 def create_vnfd(self
, vnfd
):
4031 """ Create a virtual network function descriptor """
4032 self
._log
.debug("Create virtual network function descriptor - %s", vnfd
)
4033 if vnfd
.id in self
._vnfds
:
4034 self
._log
.error("Cannot create VNFD %s -VNFD ID already exists", vnfd
)
4035 raise VnfDescriptorError("VNFD already exists-%s", vnfd
.id)
4037 self
._vnfds
[vnfd
.id] = vnfd
4038 return self
._vnfds
[vnfd
.id]
4040 def update_vnfd(self
, vnfd
):
4041 """ Update the virtual network function descriptor """
4042 self
._log
.debug("Update virtual network function descriptor- %s", vnfd
)
4045 if vnfd
.id not in self
._vnfds
:
4046 self
._log
.debug("No VNFD found - creating VNFD id = %s", vnfd
.id)
4047 self
.create_vnfd(vnfd
)
4049 self
._log
.debug("Updating VNFD id = %s, vnfd = %s", vnfd
.id, vnfd
)
4050 self
._vnfds
[vnfd
.id] = vnfd
4053 def delete_vnfd(self
, vnfd_id
):
4054 """ Delete the virtual network function descriptor with the passed id """
4055 self
._log
.debug("Deleting the virtual network function descriptor - %s", vnfd_id
)
4056 if vnfd_id
not in self
._vnfds
:
4057 self
._log
.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id
)
4058 raise VnfDescriptorError("Cannot find %s", vnfd_id
)
4060 del self
._vnfds
[vnfd_id
]
4062 def nsd_in_use(self
, nsd_id
):
4063 """ Is the NSD with the passed id in use """
4064 self
._log
.debug("Is this NSD in use - msg:%s", nsd_id
)
4065 if nsd_id
in self
._nsds
:
4066 return self
._nsds
[nsd_id
].in_use()
4070 def publish_nsr(self
, xact
, path
, msg
):
4071 """ Publish a NSR """
4072 self
._log
.debug("Publish NSR with path %s, msg %s",
4074 yield from self
.nsr_handler
.update(xact
, path
, msg
)
4077 def unpublish_nsr(self
, xact
, path
):
4078 """ Un Publish an NSR """
4079 self
._log
.debug("Publishing delete NSR with path %s", path
)
4080 yield from self
.nsr_handler
.delete(path
, xact
)
4082 def vnfr_is_ready(self
, vnfr_id
):
4083 """ VNFR with the id is ready """
4084 self
._log
.debug("VNFR id %s ready", vnfr_id
)
4085 if vnfr_id
not in self
._vnfds
:
4086 err
= "Did not find VNFR ID with id %s" % vnfr_id
4087 self
._log
.critical("err")
4088 raise VirtualNetworkFunctionRecordError(err
)
4089 self
._vnfrs
[vnfr_id
].is_ready()
4092 def get_nsd_refcount(self
, nsd_id
):
4093 """ Get the nsd_list from this NSM"""
4095 def nsd_refcount_xpath(nsd_id
):
4096 """ xpath for ref count entry """
4097 return (NsdRefCountDtsHandler
.XPATH
+
4098 "[rw-nsr:nsd-id-ref = '{}']").format(nsd_id
)
4101 if nsd_id
is None or nsd_id
== "":
4102 for nsd
in self
._nsds
.values():
4103 nsd_msg
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4104 nsd_msg
.nsd_id_ref
= nsd
.id
4105 nsd_msg
.instance_ref_count
= nsd
.ref_count
4106 nsd_list
.append((nsd_refcount_xpath(nsd
.id), nsd_msg
))
4107 elif nsd_id
in self
._nsds
:
4108 nsd_msg
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4109 nsd_msg
.nsd_id_ref
= self
._nsds
[nsd_id
].id
4110 nsd_msg
.instance_ref_count
= self
._nsds
[nsd_id
].ref_count
4111 nsd_list
.append((nsd_refcount_xpath(nsd_id
), nsd_msg
))
4116 def terminate_ns(self
, nsr_id
, xact
):
4118 Terminate network service for the given NSR Id
4121 # Terminate the instances/networks assocaited with this nw service
4122 self
._log
.debug("Terminating the network service %s", nsr_id
)
4124 yield from self
._nsrs
[nsr_id
].terminate()
4125 except Exception as e
:
4126 self
.log
.exception("Failed to terminate NSR[id=%s]", nsr_id
)
4129 yield from self
.nsd_unref_by_nsr_id(nsr_id
)
4131 # Unpublish the NSR record
4132 self
._log
.debug("Unpublishing the network service %s", nsr_id
)
4133 yield from self
._nsrs
[nsr_id
].unpublish(xact
)
4135 # Finaly delete the NS instance from this NS Manager
4136 self
._log
.debug("Deletng the network service %s", nsr_id
)
4137 self
.delete_nsr(nsr_id
)
4140 class NsmRecordsPublisherProxy(object):
4141 """ This class provides a publisher interface that allows plugin objects
4142 to publish NSR/VNFR/VLR"""
4144 def __init__(self
, dts
, log
, loop
, nsr_pub_hdlr
, vnfr_pub_hdlr
, vlr_pub_hdlr
):
4148 self
._nsr
_pub
_hdlr
= nsr_pub_hdlr
4149 self
._vlr
_pub
_hdlr
= vlr_pub_hdlr
4150 self
._vnfr
_pub
_hdlr
= vnfr_pub_hdlr
4153 def publish_nsr(self
, xact
, nsr
):
4154 """ Publish an NSR """
4155 path
= NetworkServiceRecord
.xpath_from_nsr(nsr
)
4156 return (yield from self
._nsr
_pub
_hdlr
.update(xact
, path
, nsr
))
4159 def unpublish_nsr(self
, xact
, nsr
):
4160 """ Unpublish an NSR """
4161 path
= NetworkServiceRecord
.xpath_from_nsr(nsr
)
4162 return (yield from self
._nsr
_pub
_hdlr
.delete(xact
, path
))
4165 def publish_vnfr(self
, xact
, vnfr
):
4166 """ Publish an VNFR """
4167 path
= VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
)
4168 return (yield from self
._vnfr
_pub
_hdlr
.update(xact
, path
, vnfr
))
4171 def unpublish_vnfr(self
, xact
, vnfr
):
4172 """ Unpublish a VNFR """
4173 path
= VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
)
4174 return (yield from self
._vnfr
_pub
_hdlr
.delete(xact
, path
))
4177 def publish_vlr(self
, xact
, vlr
):
4178 """ Publish a VLR """
4179 path
= VirtualLinkRecord
.vlr_xpath(vlr
)
4180 return (yield from self
._vlr
_pub
_hdlr
.update(xact
, path
, vlr
))
4183 def unpublish_vlr(self
, xact
, vlr
):
4184 """ Unpublish a VLR """
4185 path
= VirtualLinkRecord
.vlr_xpath(vlr
)
4186 return (yield from self
._vlr
_pub
_hdlr
.delete(xact
, path
))
4189 class ScalingRpcHandler(mano_dts
.DtsHandler
):
4190 """ The Network service Monitor DTS handler """
4191 SCALE_IN_INPUT_XPATH
= "I,/nsr:exec-scale-in"
4192 SCALE_IN_OUTPUT_XPATH
= "O,/nsr:exec-scale-in"
4194 SCALE_OUT_INPUT_XPATH
= "I,/nsr:exec-scale-out"
4195 SCALE_OUT_OUTPUT_XPATH
= "O,/nsr:exec-scale-out"
4197 ACTION
= Enum('ACTION', 'SCALE_IN SCALE_OUT')
4199 def __init__(self
, log
, dts
, loop
, callback
=None):
4200 super().__init
__(log
, dts
, loop
)
4201 self
.callback
= callback
4202 self
.last_instance_id
= defaultdict(int)
4208 def on_scale_in_prepare(xact_info
, action
, ks_path
, msg
):
4209 assert action
== rwdts
.QueryAction
.RPC
4213 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_IN
)
4215 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleIn
.from_dict({
4216 "instance_id": msg
.instance_id
})
4218 xact_info
.respond_xpath(
4219 rwdts
.XactRspCode
.ACK
,
4220 self
.__class
__.SCALE_IN_OUTPUT_XPATH
,
4223 except Exception as e
:
4224 self
.log
.exception(e
)
4225 xact_info
.respond_xpath(
4226 rwdts
.XactRspCode
.NACK
,
4227 self
.__class
__.SCALE_IN_OUTPUT_XPATH
)
4230 def on_scale_out_prepare(xact_info
, action
, ks_path
, msg
):
4231 assert action
== rwdts
.QueryAction
.RPC
4234 scaling_group
= msg
.scaling_group_name_ref
4235 if not msg
.instance_id
:
4236 last_instance_id
= self
.last_instance_id
[scale_group
]
4237 msg
.instance_id
= last_instance_id
+ 1
4238 self
.last_instance_id
[scale_group
] += 1
4241 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_OUT
)
4243 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleOut
.from_dict({
4244 "instance_id": msg
.instance_id
})
4246 xact_info
.respond_xpath(
4247 rwdts
.XactRspCode
.ACK
,
4248 self
.__class
__.SCALE_OUT_OUTPUT_XPATH
,
4251 except Exception as e
:
4252 self
.log
.exception(e
)
4253 xact_info
.respond_xpath(
4254 rwdts
.XactRspCode
.NACK
,
4255 self
.__class
__.SCALE_OUT_OUTPUT_XPATH
)
4257 scale_in_hdl
= rift
.tasklets
.DTS
.RegistrationHandler(
4258 on_prepare
=on_scale_in_prepare
)
4259 scale_out_hdl
= rift
.tasklets
.DTS
.RegistrationHandler(
4260 on_prepare
=on_scale_out_prepare
)
4262 with self
.dts
.group_create() as group
:
4264 xpath
=self
.__class
__.SCALE_IN_INPUT_XPATH
,
4265 handler
=scale_in_hdl
,
4266 flags
=rwdts
.Flag
.PUBLISHER
)
4268 xpath
=self
.__class
__.SCALE_OUT_INPUT_XPATH
,
4269 handler
=scale_out_hdl
,
4270 flags
=rwdts
.Flag
.PUBLISHER
)
4273 class NsmTasklet(rift
.tasklets
.Tasklet
):
4275 The network service manager tasklet
4277 def __init__(self
, *args
, **kwargs
):
4278 super(NsmTasklet
, self
).__init
__(*args
, **kwargs
)
4279 self
.rwlog
.set_category("rw-mano-log")
4280 self
.rwlog
.set_subcategory("nsm")
4285 self
._ro
_plugin
_selector
= None
4286 self
._vnffgmgr
= None
4288 self
._nsr
_handler
= None
4289 self
._vnfr
_pub
_handler
= None
4290 self
._vlr
_pub
_handler
= None
4291 self
._vnfd
_pub
_handler
= None
4292 self
._scale
_cfg
_handler
= None
4294 self
._records
_publisher
_proxy
= None
4297 """ The task start callback """
4298 super(NsmTasklet
, self
).start()
4299 self
.log
.info("Starting NsmTasklet")
4301 self
.log
.debug("Registering with dts")
4302 self
._dts
= rift
.tasklets
.DTS(self
.tasklet_info
,
4303 RwNsmYang
.get_schema(),
4305 self
.on_dts_state_change
)
4307 self
.log
.debug("Created DTS Api GI Object: %s", self
._dts
)
4313 print("Caught Exception in NSM stop:", sys
.exc_info()[0])
4316 def on_instance_started(self
):
4317 """ Task instance started callback """
4318 self
.log
.debug("Got instance started callback")
4322 """ Task init callback """
4323 self
.log
.debug("Got instance started callback")
4325 self
.log
.debug("creating config account handler")
4327 self
._nsr
_pub
_handler
= publisher
.NsrOpDataDtsHandler(self
._dts
, self
.log
, self
.loop
)
4328 yield from self
._nsr
_pub
_handler
.register()
4330 self
._vnfr
_pub
_handler
= publisher
.VnfrPublisherDtsHandler(self
._dts
, self
.log
, self
.loop
)
4331 yield from self
._vnfr
_pub
_handler
.register()
4333 self
._vlr
_pub
_handler
= publisher
.VlrPublisherDtsHandler(self
._dts
, self
.log
, self
.loop
)
4334 yield from self
._vlr
_pub
_handler
.register()
4336 manifest
= self
.tasklet_info
.get_pb_manifest()
4337 use_ssl
= manifest
.bootstrap_phase
.rwsecurity
.use_ssl
4338 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
4339 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
4341 self
._vnfd
_pub
_handler
= publisher
.VnfdPublisher(use_ssl
, ssl_cert
, ssl_key
, self
.loop
)
4343 self
._records
_publisher
_proxy
= NsmRecordsPublisherProxy(
4347 self
._nsr
_pub
_handler
,
4348 self
._vnfr
_pub
_handler
,
4349 self
._vlr
_pub
_handler
,
4352 # Register the NSM to receive the nsm plugin
4353 # when cloud account is configured
4354 self
._ro
_plugin
_selector
= cloud
.ROAccountPluginSelector(
4358 self
._records
_publisher
_proxy
,
4360 yield from self
._ro
_plugin
_selector
.register()
4362 self
._cloud
_account
_handler
= cloud
.CloudAccountConfigSubscriber(
4367 yield from self
._cloud
_account
_handler
.register()
4369 self
._vnffgmgr
= rwvnffgmgr
.VnffgMgr(self
._dts
, self
.log
, self
.log_hdl
, self
.loop
)
4370 yield from self
._vnffgmgr
.register()
4372 self
._nsm
= NsManager(
4376 self
._nsr
_pub
_handler
,
4377 self
._vnfr
_pub
_handler
,
4378 self
._vlr
_pub
_handler
,
4379 self
._ro
_plugin
_selector
,
4381 self
._vnfd
_pub
_handler
,
4382 self
._cloud
_account
_handler
4385 yield from self
._nsm
.register()
4389 """ Task run callback """
4393 def on_dts_state_change(self
, state
):
4394 """Take action according to current dts state to transition
4395 application into the corresponding application state
4398 state - current dts state
4401 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
4402 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
4406 rwdts
.State
.INIT
: self
.init
,
4407 rwdts
.State
.RUN
: self
.run
,
4410 # Transition application to next state
4411 handler
= handlers
.get(state
, None)
4412 if handler
is not None:
4413 yield from handler()
4415 # Transition dts to next state
4416 next_state
= switch
.get(state
, None)
4417 if next_state
is not None:
4418 self
.log
.debug("Changing state to %s", next_state
)
4419 self
._dts
.handle
.set_state(next_state
)