2 # Copyright 2016 RIFT.IO Inc
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
20 import ncclient
.asyncio_manager
30 from collections
import deque
31 from collections
import defaultdict
35 gi
.require_version('RwYang', '1.0')
36 gi
.require_version('RwNsdYang', '1.0')
37 gi
.require_version('RwDts', '1.0')
38 gi
.require_version('RwNsmYang', '1.0')
39 gi
.require_version('RwNsrYang', '1.0')
40 gi
.require_version('RwTypes', '1.0')
41 gi
.require_version('RwVlrYang', '1.0')
42 gi
.require_version('RwVnfrYang', '1.0')
43 from gi
.repository
import (
59 import rift
.mano
.ncclient
60 import rift
.mano
.config_data
.config
61 import rift
.mano
.dts
as mano_dts
63 from . import rwnsm_conman
as conman
65 from . import publisher
67 from . import config_value_pool
68 from . import rwvnffgmgr
69 from . import scale_group
72 class NetworkServiceRecordState(Enum
):
73 """ Network Service Record State """
77 VNFFG_INIT_PHASE
= 104
83 VL_TERMINATE_PHASE
= 111
84 VNF_TERMINATE_PHASE
= 112
85 VNFFG_TERMINATE_PHASE
= 113
92 class NetworkServiceRecordError(Exception):
93 """ Network Service Record Error """
97 class NetworkServiceDescriptorError(Exception):
98 """ Network Service Descriptor Error """
102 class VirtualNetworkFunctionRecordError(Exception):
103 """ Virtual Network Function Record Error """
107 class NetworkServiceDescriptorNotFound(Exception):
108 """ Cannot find Network Service Descriptor"""
112 class NetworkServiceDescriptorRefCountExists(Exception):
113 """ Network Service Descriptor reference count exists """
117 class NetworkServiceDescriptorUnrefError(Exception):
118 """ Failed to unref a network service descriptor """
122 class NsrInstantiationFailed(Exception):
123 """ Failed to instantiate network service """
127 class VnfInstantiationFailed(Exception):
128 """ Failed to instantiate virtual network function"""
132 class VnffgInstantiationFailed(Exception):
133 """ Failed to instantiate virtual network function"""
137 class VnfDescriptorError(Exception):
138 """Failed to instantiate virtual network function"""
142 class ScalingOperationError(Exception):
146 class ScaleGroupMissingError(Exception):
150 class PlacementGroupError(Exception):
154 class NsrNsdUpdateError(Exception):
158 class NsrVlUpdateError(NsrNsdUpdateError
):
162 class VlRecordState(Enum
):
163 """ VL Record State """
165 INSTANTIATION_PENDING
= 102
167 TERMINATE_PENDING
= 104
172 class VnffgRecordState(Enum
):
173 """ VNFFG Record State """
175 INSTANTIATION_PENDING
= 102
177 TERMINATE_PENDING
= 104
182 class VnffgRecord(object):
183 """ Vnffg Records class"""
186 def __init__(self
, dts
, log
, loop
, vnffgmgr
, nsr
, nsr_name
, vnffgd_msg
, sdn_account_name
):
191 self
._vnffgmgr
= vnffgmgr
193 self
._nsr
_name
= nsr_name
194 self
._vnffgd
_msg
= vnffgd_msg
195 if sdn_account_name
is None:
196 self
._sdn
_account
_name
= ''
198 self
._sdn
_account
_name
= sdn_account_name
200 self
._vnffgr
_id
= str(uuid
.uuid4())
201 self
._vnffgr
_rsp
_id
= list()
202 self
._vnffgr
_state
= VnffgRecordState
.INIT
207 return self
._vnffgr
_id
211 """ state of this VNF """
212 return self
._vnffgr
_state
214 def fetch_vnffgr(self
):
216 Get VNFFGR message to be published
219 if self
._vnffgr
_state
== VnffgRecordState
.INIT
:
220 vnffgr_dict
= {"id": self
._vnffgr
_id
,
221 "nsd_id": self
._nsr
.nsd_id
,
222 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
223 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
224 "sdn_account": self
._sdn
_account
_name
,
225 "operational_status": 'init',
227 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
228 elif self
._vnffgr
_state
== VnffgRecordState
.TERMINATED
:
229 vnffgr_dict
= {"id": self
._vnffgr
_id
,
230 "nsd_id": self
._nsr
.nsd_id
,
231 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
232 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
233 "sdn_account": self
._sdn
_account
_name
,
234 "operational_status": 'terminated',
236 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
239 vnffgr
= self
._vnffgmgr
.fetch_vnffgr(self
._vnffgr
_id
)
241 self
._log
.exception("Fetching VNFFGR for VNFFG with id %s failed", self
._vnffgr
_id
)
242 self
._vnffgr
_state
= VnffgRecordState
.FAILED
243 vnffgr_dict
= {"id": self
._vnffgr
_id
,
244 "nsd_id": self
._nsr
.nsd_id
,
245 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
246 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
247 "sdn_account": self
._sdn
_account
_name
,
248 "operational_status": 'failed',
250 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
255 def vnffgr_create_msg(self
):
256 """ Virtual Link Record message for Creating VLR in VNS """
257 vnffgr_dict
= {"id": self
._vnffgr
_id
,
258 "nsd_id": self
._nsr
.nsd_id
,
259 "vnffgd_id_ref": self
._vnffgd
_msg
.id,
260 "vnffgd_name_ref": self
._vnffgd
_msg
.name
,
261 "sdn_account": self
._sdn
_account
_name
,
263 vnffgr
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vnffgr
.from_dict(vnffgr_dict
)
264 for rsp
in self
._vnffgd
_msg
.rsp
:
265 vnffgr_rsp
= vnffgr
.rsp
.add()
266 vnffgr_rsp
.id = str(uuid
.uuid4())
267 vnffgr_rsp
.name
= self
._nsr
.name
+ '.' + rsp
.name
268 self
._vnffgr
_rsp
_id
.append(vnffgr_rsp
.id)
269 vnffgr_rsp
.vnffgd_rsp_id_ref
= rsp
.id
270 vnffgr_rsp
.vnffgd_rsp_name_ref
= rsp
.name
271 for rsp_cp_ref
in rsp
.vnfd_connection_point_ref
:
272 vnfd
= [vnfr
.vnfd
for vnfr
in self
._nsr
.vnfrs
.values() if vnfr
.vnfd
.id == rsp_cp_ref
.vnfd_id_ref
]
273 self
._log
.debug("VNFD message during VNFFG instantiation is %s",vnfd
)
274 if len(vnfd
) > 0 and vnfd
[0].has_field('service_function_type'):
275 self
._log
.debug("Service Function Type for VNFD ID %s is %s",rsp_cp_ref
.vnfd_id_ref
, vnfd
[0].service_function_type
)
277 self
._log
.error("Service Function Type not available for VNFD ID %s; Skipping in chain",rsp_cp_ref
.vnfd_id_ref
)
280 vnfr_cp_ref
= vnffgr_rsp
.vnfr_connection_point_ref
.add()
281 vnfr_cp_ref
.member_vnf_index_ref
= rsp_cp_ref
.member_vnf_index_ref
282 vnfr_cp_ref
.hop_number
= rsp_cp_ref
.order
283 vnfr_cp_ref
.vnfd_id_ref
=rsp_cp_ref
.vnfd_id_ref
284 vnfr_cp_ref
.service_function_type
= vnfd
[0].service_function_type
285 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
286 if (nsr_vnfr
.vnfd
.id == vnfr_cp_ref
.vnfd_id_ref
and
287 nsr_vnfr
.member_vnf_index
== vnfr_cp_ref
.member_vnf_index_ref
):
288 vnfr_cp_ref
.vnfr_id_ref
= nsr_vnfr
.id
289 vnfr_cp_ref
.vnfr_name_ref
= nsr_vnfr
.name
290 vnfr_cp_ref
.vnfr_connection_point_ref
= rsp_cp_ref
.vnfd_connection_point_ref
292 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
293 self
._log
.debug(" Received VNFR is %s", vnfr
)
294 while vnfr
.operational_status
!= 'running':
295 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
296 if vnfr
.operational_status
== 'failed':
297 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
298 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
299 yield from asyncio
.sleep(2, loop
=self
._loop
)
300 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
301 self
._log
.debug("Received VNFR is %s", vnfr
)
303 vnfr_cp_ref
.connection_point_params
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
304 for cp
in vnfr
.connection_point
:
305 if cp
.name
== vnfr_cp_ref
.vnfr_connection_point_ref
:
306 vnfr_cp_ref
.connection_point_params
.port_id
= cp
.connection_point_id
307 vnfr_cp_ref
.connection_point_params
.name
= self
._nsr
.name
+ '.' + cp
.name
308 for vdu
in vnfr
.vdur
:
309 for ext_intf
in vdu
.external_interface
:
310 if ext_intf
.name
== vnfr_cp_ref
.vnfr_connection_point_ref
:
311 vnfr_cp_ref
.connection_point_params
.vm_id
= vdu
.vim_id
312 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",cp
.name
,nsr_vnfr
.id,
313 vnfr_cp_ref
.connection_point_params
.vm_id
)
316 vnfr_cp_ref
.connection_point_params
.address
= cp
.ip_address
317 vnfr_cp_ref
.connection_point_params
.port
= VnffgRecord
.SFF_DP_PORT
319 for vnffgd_classifier
in self
._vnffgd
_msg
.classifier
:
320 _rsp
= [rsp
for rsp
in vnffgr
.rsp
if rsp
.vnffgd_rsp_id_ref
== vnffgd_classifier
.rsp_id_ref
]
322 rsp_id_ref
= _rsp
[0].id
323 rsp_name
= _rsp
[0].name
325 self
._log
.error("RSP with ID %s not found during classifier creation for classifier id %s",vnffgd_classifier
.rsp_id_ref
,vnffgd_classifier
.id)
327 vnffgr_classifier
= vnffgr
.classifier
.add()
328 vnffgr_classifier
.id = vnffgd_classifier
.id
329 vnffgr_classifier
.name
= self
._nsr
.name
+ '.' + vnffgd_classifier
.name
330 _rsp
[0].classifier_name
= vnffgr_classifier
.name
331 vnffgr_classifier
.rsp_id_ref
= rsp_id_ref
332 vnffgr_classifier
.rsp_name
= rsp_name
333 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
334 if (nsr_vnfr
.vnfd
.id == vnffgd_classifier
.vnfd_id_ref
and
335 nsr_vnfr
.member_vnf_index
== vnffgd_classifier
.member_vnf_index_ref
):
336 vnffgr_classifier
.vnfr_id_ref
= nsr_vnfr
.id
337 vnffgr_classifier
.vnfr_name_ref
= nsr_vnfr
.name
338 vnffgr_classifier
.vnfr_connection_point_ref
= vnffgd_classifier
.vnfd_connection_point_ref
340 if nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER':
341 vnffgr_classifier
.sff_name
= nsr_vnfr
.name
343 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
344 self
._log
.debug(" Received VNFR is %s", vnfr
)
345 while vnfr
.operational_status
!= 'running':
346 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
347 if vnfr
.operational_status
== 'failed':
348 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
349 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
350 yield from asyncio
.sleep(2, loop
=self
._loop
)
351 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
352 self
._log
.debug("Received VNFR is %s", vnfr
)
354 for cp
in vnfr
.connection_point
:
355 if cp
.name
== vnffgr_classifier
.vnfr_connection_point_ref
:
356 vnffgr_classifier
.port_id
= cp
.connection_point_id
357 vnffgr_classifier
.ip_address
= cp
.ip_address
358 for vdu
in vnfr
.vdur
:
359 for ext_intf
in vdu
.external_interface
:
360 if ext_intf
.name
== vnffgr_classifier
.vnfr_connection_point_ref
:
361 vnffgr_classifier
.vm_id
= vdu
.vim_id
362 self
._log
.debug("VIM ID for CP %s in VNFR %s is %s",cp
.name
,nsr_vnfr
.id,
363 vnfr_cp_ref
.connection_point_params
.vm_id
)
366 self
._log
.info("VNFFGR msg to be sent is %s", vnffgr
)
370 def vnffgr_nsr_sff_list(self
):
371 """ SFF List for VNFR """
373 sf_list
= [nsr_vnfr
.name
for nsr_vnfr
in self
._nsr
.vnfrs
.values() if nsr_vnfr
.vnfd
.service_function_chain
== 'SF']
375 for nsr_vnfr
in self
._nsr
.vnfrs
.values():
376 if (nsr_vnfr
.vnfd
.service_function_chain
== 'CLASSIFIER' or nsr_vnfr
.vnfd
.service_function_chain
== 'SFF'):
377 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
378 self
._log
.debug(" Received VNFR is %s", vnfr
)
379 while vnfr
.operational_status
!= 'running':
380 self
._log
.info("Received vnf op status is %s; retrying",vnfr
.operational_status
)
381 if vnfr
.operational_status
== 'failed':
382 self
._log
.error("Fetching VNFR for %s failed", vnfr
.id)
383 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFR %s failure" % (self
.id, vnfr
.id))
384 yield from asyncio
.sleep(2, loop
=self
._loop
)
385 vnfr
= yield from self
._nsr
.fetch_vnfr(nsr_vnfr
.xpath
)
386 self
._log
.debug("Received VNFR is %s", vnfr
)
388 sff
= RwsdnYang
.VNFFGSff()
389 sff_list
[nsr_vnfr
.vnfd
.id] = sff
390 sff
.name
= nsr_vnfr
.name
391 sff
.function_type
= nsr_vnfr
.vnfd
.service_function_chain
393 sff
.mgmt_address
= vnfr
.mgmt_interface
.ip_address
394 sff
.mgmt_port
= VnffgRecord
.SFF_MGMT_PORT
395 for cp
in vnfr
.connection_point
:
396 sff_dp
= sff
.dp_endpoints
.add()
397 sff_dp
.name
= self
._nsr
.name
+ '.' + cp
.name
398 sff_dp
.address
= cp
.ip_address
399 sff_dp
.port
= VnffgRecord
.SFF_DP_PORT
400 if nsr_vnfr
.vnfd
.service_function_chain
== 'SFF':
401 for sf_name
in sf_list
:
402 _sf
= sff
.vnfr_list
.add()
403 _sf
.vnfr_name
= sf_name
408 def instantiate(self
):
409 """ Instantiate this VNFFG """
411 self
._log
.info("Instaniating VNFFGR with vnffgd %s",
415 vnffgr_request
= yield from self
.vnffgr_create_msg()
416 vnffg_sff_list
= yield from self
.vnffgr_nsr_sff_list()
419 vnffgr
= self
._vnffgmgr
.create_vnffgr(vnffgr_request
,self
._vnffgd
_msg
.classifier
,vnffg_sff_list
)
420 except Exception as e
:
421 self
._log
.exception("VNFFG instantiation failed: %s", str(e
))
422 self
._vnffgr
_state
= VnffgRecordState
.FAILED
423 raise NsrInstantiationFailed("Failed NS %s instantiation due to VNFFGR %s failure" % (self
.id, vnffgr_request
.id))
425 self
._vnffgr
_state
= VnffgRecordState
.INSTANTIATION_PENDING
427 self
._log
.info("Instantiated VNFFGR :%s", vnffgr
)
428 self
._vnffgr
_state
= VnffgRecordState
.ACTIVE
430 self
._log
.info("Invoking update_state to update NSR state for NSR ID: %s", self
._nsr
.id)
431 yield from self
._nsr
.update_state()
433 def vnffgr_in_vnffgrm(self
):
434 """ Is there a VNFR record in VNFM """
435 if (self
._vnffgr
_state
== VnffgRecordState
.ACTIVE
or
436 self
._vnffgr
_state
== VnffgRecordState
.INSTANTIATION_PENDING
or
437 self
._vnffgr
_state
== VnffgRecordState
.FAILED
):
444 """ Terminate this VNFFGR """
445 if not self
.vnffgr_in_vnffgrm():
446 self
._log
.error("Ignoring terminate request for id %s in state %s",
447 self
.id, self
._vnffgr
_state
)
450 self
._log
.info("Terminating VNFFGR id:%s", self
.id)
451 self
._vnffgr
_state
= VnffgRecordState
.TERMINATE_PENDING
453 self
._vnffgmgr
.terminate_vnffgr(self
._vnffgr
_id
)
455 self
._vnffgr
_state
= VnffgRecordState
.TERMINATED
456 self
._log
.debug("Terminated VNFFGR id:%s", self
.id)
459 class VirtualLinkRecord(object):
460 """ Virtual Link Records class"""
463 def create_record(dts
, log
, loop
, nsr_name
, vld_msg
, cloud_account_name
, om_datacenter
, ip_profile
, nsr_id
, restart_mode
=False):
464 """Creates a new VLR object based on the given data.
466 If restart mode is enabled, then we look for existing records in the
467 DTS and create a VLR records using the exiting data(ID)
472 vlr_obj
= VirtualLinkRecord(
485 res_iter
= yield from dts
.query_read(
486 "D,/vlr:vlr-catalog/vlr:vlr",
487 rwdts
.XactFlag
.MERGE
)
490 response
= yield from fut
491 vlr
= response
.result
493 # Check if the record is already present, if so use the ID of
494 # the existing record. Since the name of the record is uniquely
495 # formed we can use it as a search key!
496 if vlr
.name
== vlr_obj
.name
:
497 vlr_obj
.reset_id(vlr
.id)
502 def __init__(self
, dts
, log
, loop
, nsr_name
, vld_msg
, cloud_account_name
, om_datacenter
, ip_profile
, nsr_id
):
506 self
._nsr
_name
= nsr_name
507 self
._vld
_msg
= vld_msg
508 self
._cloud
_account
_name
= cloud_account_name
509 self
._om
_datacenter
_name
= om_datacenter
510 self
._assigned
_subnet
= None
511 self
._nsr
_id
= nsr_id
512 self
._ip
_profile
= ip_profile
513 self
._vlr
_id
= str(uuid
.uuid4())
514 self
._state
= VlRecordState
.INIT
515 self
._prev
_state
= None
519 """ path for this object """
520 return "D,/vlr:vlr-catalog/vlr:vlr[vlr:id = '{}']".format(self
._vlr
_id
)
529 """ Get NSR name for this VL """
534 """ Virtual Link Desciptor """
538 def assigned_subnet(self
):
539 """ Subnet assigned to this VL"""
540 return self
._assigned
_subnet
545 Get the name for this VLR.
546 VLR name is "nsr name:VLD name"
548 if self
.vld_msg
.vim_network_name
:
549 return self
.vld_msg
.vim_network_name
550 elif self
.vld_msg
.name
== "multisite":
551 # This is a temporary hack to identify manually provisioned inter-site network
552 return self
.vld_msg
.name
554 return self
._nsr
_name
+ "." + self
.vld_msg
.name
557 def cloud_account_name(self
):
558 """ Cloud account that this VLR should be created in """
559 return self
._cloud
_account
_name
562 def om_datacenter_name(self
):
563 """ Datacenter that this VLR should be created in """
564 return self
._om
_datacenter
_name
568 """ Get the VLR path from VLR """
569 return (VirtualLinkRecord
.XPATH
+ "[vlr:id = '{}']").format(vlr
.id)
577 def state(self
, value
):
578 """ VLR set state """
582 def prev_state(self
):
583 """ VLR previous state """
584 return self
._prev
_state
587 def prev_state(self
, value
):
588 """ VLR set previous state """
589 self
._prev
_state
= value
593 """ Virtual Link Record message for Creating VLR in VNS """
594 vld_fields
= ["short_name",
602 vld_copy_dict
= {k
: v
for k
, v
in self
.vld_msg
.as_dict().items()
605 vlr_dict
= {"id": self
._vlr
_id
,
606 "nsr_id_ref": self
._nsr
_id
,
607 "vld_ref": self
.vld_msg
.id,
609 "cloud_account": self
.cloud_account_name
,
610 "om_datacenter": self
.om_datacenter_name
,
613 if self
._ip
_profile
and self
._ip
_profile
.has_field('ip_profile_params'):
614 vlr_dict
['ip_profile_params' ] = self
._ip
_profile
.ip_profile_params
.as_dict()
616 vlr_dict
.update(vld_copy_dict
)
617 vlr
= RwVlrYang
.YangData_Vlr_VlrCatalog_Vlr
.from_dict(vlr_dict
)
620 def reset_id(self
, vlr_id
):
621 self
._vlr
_id
= vlr_id
623 def create_nsr_vlr_msg(self
, vnfrs
):
624 """ The VLR message"""
625 nsr_vlr
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_Vlr()
626 nsr_vlr
.vlr_ref
= self
._vlr
_id
627 nsr_vlr
.assigned_subnet
= self
.assigned_subnet
628 nsr_vlr
.cloud_account
= self
.cloud_account_name
629 nsr_vlr
.om_datacenter
= self
.om_datacenter_name
631 for conn
in self
.vld_msg
.vnfd_connection_point_ref
:
633 if (vnfr
.vnfd
.id == conn
.vnfd_id_ref
and
634 vnfr
.member_vnf_index
== conn
.member_vnf_index_ref
and
635 self
.cloud_account_name
== vnfr
.cloud_account_name
and
636 self
.om_datacenter_name
== vnfr
.om_datacenter_name
):
637 cp_entry
= nsr_vlr
.vnfr_connection_point_ref
.add()
638 cp_entry
.vnfr_id
= vnfr
.id
639 cp_entry
.connection_point
= conn
.vnfd_connection_point_ref
644 def instantiate(self
):
645 """ Instantiate this VL """
647 self
._log
.debug("Instaniating VLR key %s, vld %s",
648 self
.xpath
, self
._vld
_msg
)
650 self
._state
= VlRecordState
.INSTANTIATION_PENDING
651 self
._log
.debug("Executing VL create path:%s msg:%s",
652 self
.xpath
, self
.vlr_msg
)
654 with self
._dts
.transaction(flags
=0) as xact
:
655 block
= xact
.block_create()
656 block
.add_query_create(self
.xpath
, self
.vlr_msg
)
657 self
._log
.debug("Executing VL create path:%s msg:%s",
658 self
.xpath
, self
.vlr_msg
)
659 res_iter
= yield from block
.execute(now
=True)
665 self
._state
= VlRecordState
.FAILED
666 raise NsrInstantiationFailed("Failed NS %s instantiation due to empty response" % self
.id)
668 if vlr
.operational_status
== 'failed':
669 self
._log
.debug("NS Id:%s VL creation failed for vlr id %s", self
.id, vlr
.id)
670 self
._state
= VlRecordState
.FAILED
671 raise NsrInstantiationFailed("Failed VL %s instantiation (%s)" % (vlr
.id, vlr
.operational_status_details
))
673 self
._log
.info("Instantiated VL with xpath %s and vlr:%s",
675 self
._state
= VlRecordState
.ACTIVE
676 self
._assigned
_subnet
= vlr
.assigned_subnet
678 def vlr_in_vns(self
):
679 """ Is there a VLR record in VNS """
680 if (self
._state
== VlRecordState
.ACTIVE
or
681 self
._state
== VlRecordState
.INSTANTIATION_PENDING
or
682 self
._state
== VlRecordState
.TERMINATE_PENDING
or
683 self
._state
== VlRecordState
.FAILED
):
690 """ Terminate this VL """
691 if not self
.vlr_in_vns():
692 self
._log
.debug("Ignoring terminate request for id %s in state %s",
693 self
.id, self
._state
)
696 self
._log
.debug("Terminating VL id:%s", self
.id)
697 self
._state
= VlRecordState
.TERMINATE_PENDING
699 with self
._dts
.transaction(flags
=0) as xact
:
700 block
= xact
.block_create()
701 block
.add_query_delete(self
.xpath
)
702 yield from block
.execute(flags
=0, now
=True)
704 self
._state
= VlRecordState
.TERMINATED
705 self
._log
.debug("Terminated VL id:%s", self
.id)
708 class VnfRecordState(Enum
):
709 """ Vnf Record State """
711 INSTANTIATION_PENDING
= 102
713 TERMINATE_PENDING
= 104
718 class VirtualNetworkFunctionRecord(object):
719 """ Virtual Network Function Record class"""
720 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
724 def create_record(dts
, log
, loop
, vnfd
, const_vnfd_msg
, nsd_id
, nsr_name
,
725 cloud_account_name
, om_datacenter_name
, nsr_id
, group_name
, group_instance_id
,
726 placement_groups
, restart_mode
=False):
727 """Creates a new VNFR object based on the given data.
729 If restart mode is enabled, then we look for existing records in the
730 DTS and create a VNFR records using the exiting data(ID)
733 VirtualNetworkFunctionRecord
735 vnfr_obj
= VirtualNetworkFunctionRecord(
749 restart_mode
=restart_mode
)
752 res_iter
= yield from dts
.query_read(
753 "D,/vnfr:vnfr-catalog/vnfr:vnfr",
754 rwdts
.XactFlag
.MERGE
)
757 response
= yield from fut
758 vnfr
= response
.result
760 if vnfr
.name
== vnfr_obj
.name
:
761 vnfr_obj
.reset_id(vnfr
.id)
778 group_instance_id
=None,
779 placement_groups
= [],
780 restart_mode
= False):
785 self
._const
_vnfd
_msg
= const_vnfd_msg
786 self
._nsd
_id
= nsd_id
787 self
._nsr
_name
= nsr_name
788 self
._nsr
_id
= nsr_id
789 self
._cloud
_account
_name
= cloud_account_name
790 self
._om
_datacenter
_name
= om_datacenter_name
791 self
._group
_name
= group_name
792 self
._group
_instance
_id
= group_instance_id
793 self
._placement
_groups
= placement_groups
794 self
._config
_status
= NsrYang
.ConfigStates
.INIT
796 self
._prev
_state
= VnfRecordState
.INIT
797 self
._state
= VnfRecordState
.INIT
798 self
._state
_failed
_reason
= None
800 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
803 self
._vnfr
_id
= str(uuid
.uuid4())
805 self
._vnfr
_msg
= self
.create_vnfr_msg()
806 self
._log
.debug("Set VNFR {} config type to {}".
807 format(self
.name
, self
.config_type
))
808 self
.restart_mode
= restart_mode
811 if group_name
is None and group_instance_id
is not None:
812 raise ValueError("Group instance id must not be provided with an empty group name")
822 return "D,/vnfr:vnfr-catalog/vnfr:vnfr[vnfr:id = '{}']".format(self
.id)
827 return self
._vnfr
_msg
830 def const_vnfr_msg(self
):
832 return RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ConstituentVnfrRef(vnfr_id
=self
.id,cloud_account
=self
.cloud_account_name
,om_datacenter
=self
._om
_datacenter
_name
)
840 def cloud_account_name(self
):
841 """ Cloud account that this VNF should be created in """
842 return self
._cloud
_account
_name
845 def om_datacenter_name(self
):
846 """ Datacenter that this VNF should be created in """
847 return self
._om
_datacenter
_name
852 """ Is this VNF actve """
853 return True if self
._state
== VnfRecordState
.ACTIVE
else False
857 """ state of this VNF """
861 def state_failed_reason(self
):
862 """ Error message in case this VNF is in failed state """
863 return self
._state
_failed
_reason
866 def member_vnf_index(self
):
867 """ Member VNF index """
868 return self
._const
_vnfd
_msg
.member_vnf_index
873 return self
._nsr
_name
877 """ Name of this VNFR """
878 if self
._name
is not None:
881 name_tags
= [self
._nsr
_name
]
883 if self
._group
_name
is not None:
884 name_tags
.append(self
._group
_name
)
886 if self
._group
_instance
_id
is not None:
887 name_tags
.append(str(self
._group
_instance
_id
))
889 name_tags
.extend([self
.vnfd
.name
, str(self
.member_vnf_index
)])
891 self
._name
= "__".join(name_tags
)
896 def vnfr_xpath(vnfr
):
897 """ Get the VNFR path from VNFR """
898 return (VirtualNetworkFunctionRecord
.XPATH
+ "[vnfr:id = '{}']").format(vnfr
.id)
901 def config_type(self
):
902 cfg_types
= ['netconf', 'juju', 'script']
903 for method
in cfg_types
:
904 if self
._vnfd
.vnf_configuration
.has_field(method
):
909 def config_status(self
):
910 """Return the config status as YANG ENUM string"""
911 self
._log
.debug("Map VNFR {} config status {} ({})".
912 format(self
.name
, self
._config
_status
, self
.config_type
))
913 if self
.config_type
== 'none':
914 return 'config_not_needed'
915 elif self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
917 elif self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
922 def set_state(self
, state
):
923 """ set the state of this object """
924 self
._prev
_state
= self
._state
927 def reset_id(self
, vnfr_id
):
928 self
._vnfr
_id
= vnfr_id
929 self
._vnfr
_msg
= self
.create_vnfr_msg()
932 self
.config_store
.merge_vnfd_config(
935 self
.member_vnf_index
,
938 def create_vnfr_msg(self
):
939 """ VNFR message for this VNFR """
947 vnfd_copy_dict
= {k
: v
for k
, v
in self
._vnfd
.as_dict().items() if k
in vnfd_fields
}
950 "nsr_id_ref": self
._nsr
_id
,
951 "vnfd_ref": self
.vnfd
.id,
953 "cloud_account": self
._cloud
_account
_name
,
954 "om_datacenter": self
._om
_datacenter
_name
,
955 "config_status": self
.config_status
957 vnfr_dict
.update(vnfd_copy_dict
)
959 vnfr
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict(vnfr_dict
)
960 vnfr
.member_vnf_index_ref
= self
.member_vnf_index
961 vnfr
.vnf_configuration
.from_dict(self
._vnfd
.vnf_configuration
.as_dict())
963 if self
._vnfd
.mgmt_interface
.has_field("port"):
964 vnfr
.mgmt_interface
.port
= self
._vnfd
.mgmt_interface
.port
966 for group_info
in self
._placement
_groups
:
967 group
= vnfr
.placement_groups_info
.add()
968 group
.from_dict(group_info
.as_dict())
970 # UI expects the monitoring param field to exist
971 vnfr
.monitoring_param
= []
973 self
._log
.debug("Get vnfr_msg for VNFR {} : {}".format(self
.name
, vnfr
))
977 def update_vnfm(self
):
978 self
._log
.debug("Send an update to VNFM for VNFR {} with {}".
979 format(self
.name
, self
.vnfr_msg
))
980 yield from self
._dts
.query_update(
982 rwdts
.XactFlag
.TRACE
,
986 def get_config_status(self
):
987 """Return the config status as YANG ENUM"""
988 return self
._config
_status
991 def set_config_status(self
, status
):
993 def status_to_string(status
):
995 NsrYang
.ConfigStates
.INIT
: 'init',
996 NsrYang
.ConfigStates
.CONFIGURING
: 'configuring',
997 NsrYang
.ConfigStates
.CONFIG_NOT_NEEDED
: 'config_not_needed',
998 NsrYang
.ConfigStates
.CONFIGURED
: 'configured',
999 NsrYang
.ConfigStates
.FAILED
: 'failed',
1002 return status_dc
[status
]
1004 self
._log
.debug("Update VNFR {} from {} ({}) to {}".
1005 format(self
.name
, self
._config
_status
,
1006 self
.config_type
, status
))
1007 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
1008 self
._log
.error("Updating already configured VNFR {}".
1012 if self
._config
_status
!= status
:
1014 self
._config
_status
= status
1015 # I don't think this is used. Original implementor can check.
1016 # Caused Exception, so corrected it by status_to_string
1017 # But not sure whats the use of this variable?
1018 self
.vnfr_msg
.config_status
= status_to_string(status
)
1019 except Exception as e
:
1020 self
._log
.error("Exception=%s", str(e
))
1023 self
._log
.debug("Updated VNFR {} status to {}".format(self
.name
, status
))
1025 if self
._config
_status
!= NsrYang
.ConfigStates
.INIT
:
1027 # Publish only after VNFM has the VNFR created
1028 yield from self
.update_vnfm()
1029 except Exception as e
:
1030 self
._log
.error("Exception updating VNFM with new status {} of VNFR {}: {}".
1031 format(status
, self
.name
, e
))
1032 self
._log
.exception(e
)
1034 def is_configured(self
):
1035 if self
.config_type
== 'none':
1038 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURED
:
1044 def instantiate(self
, nsr
):
1045 """ Instantiate this VNFR"""
1047 self
._log
.debug("Instaniating VNFR key %s, vnfd %s",
1048 self
.xpath
, self
._vnfd
)
1050 self
._log
.debug("Create VNF with xpath %s and vnfr %s",
1051 self
.xpath
, self
.vnfr_msg
)
1053 self
.set_state(VnfRecordState
.INSTANTIATION_PENDING
)
1055 def find_vlr_for_cp(conn
):
1056 """ Find VLR for the given connection point """
1057 for vlr
in nsr
.vlrs
:
1058 for vnfd_cp
in vlr
.vld_msg
.vnfd_connection_point_ref
:
1059 if (vnfd_cp
.vnfd_id_ref
== self
._vnfd
.id and
1060 vnfd_cp
.vnfd_connection_point_ref
== conn
.name
and
1061 vnfd_cp
.member_vnf_index_ref
== self
.member_vnf_index
and
1062 vlr
.cloud_account_name
== self
.cloud_account_name
):
1063 self
._log
.debug("Found VLR for cp_name:%s and vnf-index:%d",
1064 conn
.name
, self
.member_vnf_index
)
1068 # For every connection point in the VNFD fill in the identifier
1069 for conn_p
in self
._vnfd
.connection_point
:
1070 cpr
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_ConnectionPoint()
1071 cpr
.name
= conn_p
.name
1072 cpr
.type_yang
= conn_p
.type_yang
1073 vlr_ref
= find_vlr_for_cp(conn_p
)
1075 msg
= "Failed to find VLR for cp = %s" % conn_p
.name
1076 self
._log
.debug("%s", msg
)
1077 # raise VirtualNetworkFunctionRecordError(msg)
1080 cpr
.vlr_ref
= vlr_ref
.id
1081 self
.vnfr_msg
.connection_point
.append(cpr
)
1082 self
._log
.debug("Connection point [%s] added, vnf id=%s vnfd id=%s",
1083 cpr
, self
.vnfr_msg
.id, self
.vnfr_msg
.vnfd_ref
)
1085 if not self
.restart_mode
:
1086 yield from self
._dts
.query_create(self
.xpath
,
1090 yield from self
._dts
.query_update(self
.xpath
,
1094 self
._log
.info("Created VNF with xpath %s and vnfr %s",
1095 self
.xpath
, self
.vnfr_msg
)
1097 self
._log
.info("Instantiated VNFR with xpath %s and vnfd %s, vnfr %s",
1098 self
.xpath
, self
._vnfd
, self
.vnfr_msg
)
1101 def update_state(self
, vnfr_msg
):
1102 """ Update this VNFR"""
1103 if vnfr_msg
.operational_status
== "running":
1104 if self
.vnfr_msg
.operational_status
!= "running":
1105 yield from self
.is_active()
1106 elif vnfr_msg
.operational_status
== "failed":
1107 yield from self
.instantiation_failed(failed_reason
=vnfr_msg
.operational_status_details
)
1110 def is_active(self
):
1111 """ This VNFR is active """
1112 self
._log
.debug("VNFR %s is active", self
._vnfr
_id
)
1113 self
.set_state(VnfRecordState
.ACTIVE
)
1116 def instantiation_failed(self
, failed_reason
=None):
1117 """ This VNFR instantiation failed"""
1118 self
._log
.error("VNFR %s instantiation failed", self
._vnfr
_id
)
1119 self
.set_state(VnfRecordState
.FAILED
)
1120 self
._state
_failed
_reason
= failed_reason
1122 def vnfr_in_vnfm(self
):
1123 """ Is there a VNFR record in VNFM """
1124 if (self
._state
== VnfRecordState
.ACTIVE
or
1125 self
._state
== VnfRecordState
.INSTANTIATION_PENDING
or
1126 self
._state
== VnfRecordState
.FAILED
):
1132 def terminate(self
):
1133 """ Terminate this VNF """
1134 if not self
.vnfr_in_vnfm():
1135 self
._log
.debug("Ignoring terminate request for id %s in state %s",
1136 self
.id, self
._state
)
1139 self
._log
.debug("Terminating VNF id:%s", self
.id)
1140 self
.set_state(VnfRecordState
.TERMINATE_PENDING
)
1141 with self
._dts
.transaction(flags
=0) as xact
:
1142 block
= xact
.block_create()
1143 block
.add_query_delete(self
.xpath
)
1144 yield from block
.execute(flags
=0)
1145 self
.set_state(VnfRecordState
.TERMINATED
)
1146 self
._log
.debug("Terminated VNF id:%s", self
.id)
1149 class NetworkServiceStatus(object):
1150 """ A class representing the Network service's status """
1151 MAX_EVENTS_RECORDED
= 10
1152 """ Network service Status class"""
1153 def __init__(self
, dts
, log
, loop
):
1158 self
._state
= NetworkServiceRecordState
.INIT
1159 self
._events
= deque([])
1162 def create_notification(self
, evt
, evt_desc
, evt_details
):
1163 xp
= "N,/rw-nsr:nsm-notification"
1164 notif
= RwNsrYang
.YangNotif_RwNsr_NsmNotification()
1166 notif
.description
= evt_desc
1167 notif
.details
= evt_details
if evt_details
is not None else None
1169 yield from self
._dts
.query_create(xp
, rwdts
.XactFlag
.ADVISE
, notif
)
1170 self
._log
.info("Notification called by creating dts query: %s", notif
)
1172 def record_event(self
, evt
, evt_desc
, evt_details
):
1173 """ Record an event """
1174 self
._log
.debug("Recording event - evt %s, evt_descr %s len = %s",
1175 evt
, evt_desc
, len(self
._events
))
1176 if len(self
._events
) >= NetworkServiceStatus
.MAX_EVENTS_RECORDED
:
1177 self
._events
.popleft()
1178 self
._events
.append((int(time
.time()), evt
, evt_desc
,
1179 evt_details
if evt_details
is not None else None))
1181 self
._loop
.create_task(self
.create_notification(evt
,evt_desc
,evt_details
))
1183 def set_state(self
, state
):
1184 """ set the state of this status object """
1188 """ Return the state as a yang enum string """
1189 state_to_str_map
= {"INIT": "init",
1190 "VL_INIT_PHASE": "vl_init_phase",
1191 "VNF_INIT_PHASE": "vnf_init_phase",
1192 "VNFFG_INIT_PHASE": "vnffg_init_phase",
1193 "SCALING_GROUP_INIT_PHASE": "scaling_group_init_phase",
1194 "RUNNING": "running",
1195 "SCALING_OUT": "scaling_out",
1196 "SCALING_IN": "scaling_in",
1197 "TERMINATE_RCVD": "terminate_rcvd",
1198 "TERMINATE": "terminate",
1199 "VL_TERMINATE_PHASE": "vl_terminate_phase",
1200 "VNF_TERMINATE_PHASE": "vnf_terminate_phase",
1201 "VNFFG_TERMINATE_PHASE": "vnffg_terminate_phase",
1202 "TERMINATED": "terminated",
1204 "VL_INSTANTIATE": "vl_instantiate",
1205 "VL_TERMINATE": "vl_terminate",
1207 return state_to_str_map
[self
._state
.name
]
1211 """ State of this status object """
1216 """ Network Service Record as a message"""
1219 for entry
in self
._events
:
1220 event
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_OperationalEvents()
1223 event
.timestamp
, event
.event
, event
.description
, event
.details
= entry
1224 event_list
.append(event
)
1228 class NetworkServiceRecord(object):
1229 """ Network service record """
1230 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
1232 def __init__(self
, dts
, log
, loop
, nsm
, nsm_plugin
, nsr_cfg_msg
, sdn_account_name
, restart_mode
=False):
1237 self
._nsr
_cfg
_msg
= nsr_cfg_msg
1238 self
._nsm
_plugin
= nsm_plugin
1239 self
._sdn
_account
_name
= sdn_account_name
1242 self
._nsr
_msg
= None
1243 self
._nsr
_regh
= None
1248 self
._param
_pools
= {}
1249 self
._scaling
_groups
= {}
1250 self
._create
_time
= int(time
.time())
1251 self
._op
_status
= NetworkServiceStatus(dts
, log
, loop
)
1252 self
._config
_status
= NsrYang
.ConfigStates
.CONFIGURING
1253 self
._config
_status
_details
= None
1255 self
.restart_mode
= restart_mode
1256 self
.config_store
= rift
.mano
.config_data
.config
.ConfigStore(self
._log
)
1257 self
._debug
_running
= False
1258 self
._is
_active
= False
1259 self
._vl
_phase
_completed
= False
1260 self
._vnf
_phase
_completed
= False
1263 # Initalise the state to init
1264 # The NSR moves through the following transitions
1265 # 1. INIT -> VLS_READY once all the VLs in the NSD are created
1266 # 2. VLS_READY - VNFS_READY when all the VNFs in the NSD are created
1267 # 3. VNFS_READY - READY when the NSR is published
1269 self
.set_state(NetworkServiceRecordState
.INIT
)
1271 self
.substitute_input_parameters
= InputParameterSubstitution(self
._log
)
1274 def nsm_plugin(self
):
1276 return self
._nsm
_plugin
1278 def set_state(self
, state
):
1279 """ Set state for this NSR"""
1280 self
._log
.debug("Setting state to %s", state
)
1281 # We are in init phase and is moving to the next state
1282 # The new state could be a FAILED state or VNF_INIIT_PHASE
1283 if self
.state
== NetworkServiceRecordState
.VL_INIT_PHASE
:
1284 self
._vl
_phase
_completed
= True
1286 if self
.state
== NetworkServiceRecordState
.VNF_INIT_PHASE
:
1287 self
._vnf
_phase
_completed
= True
1289 self
._op
_status
.set_state(state
)
1293 """ Get id for this NSR"""
1294 return self
._nsr
_cfg
_msg
.id
1298 """ Name of this network service record """
1299 return self
._nsr
_cfg
_msg
.name
1302 def cloud_account_name(self
):
1303 return self
._nsr
_cfg
_msg
.cloud_account
1306 def om_datacenter_name(self
):
1307 if self
._nsr
_cfg
_msg
.has_field('om_datacenter'):
1308 return self
._nsr
_cfg
_msg
.om_datacenter
1313 """State of this NetworkServiceRecord"""
1314 return self
._op
_status
.state
1318 """ Is this NSR active ?"""
1319 return True if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
else False
1323 """ VLRs associated with this NSR"""
1328 """ VNFRs associated with this NSR"""
1333 """ VNFFGRs associated with this NSR"""
1334 return self
._vnffgrs
1337 def scaling_groups(self
):
1338 """ Scaling groups associated with this NSR """
1339 return self
._scaling
_groups
1342 def param_pools(self
):
1343 """ Parameter value pools associated with this NSR"""
1344 return self
._param
_pools
1347 def nsr_cfg_msg(self
):
1348 return self
._nsr
_cfg
_msg
1351 def nsr_cfg_msg(self
, msg
):
1352 self
._nsr
_cfg
_msg
= msg
1356 """ NSD Protobuf for this NSR """
1357 if self
._nsd
is not None:
1359 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
1364 """ NSD ID for this NSR """
1365 return self
.nsd_msg
.id
1369 ''' Get a new job id for config primitive'''
1374 def config_status(self
):
1375 """ Config status for NSR """
1376 return self
._config
_status
1378 def resolve_placement_group_cloud_construct(self
, input_group
):
1380 Returns the cloud specific construct for placement group
1382 copy_dict
= ['name', 'requirement', 'strategy']
1384 for group_info
in self
._nsr
_cfg
_msg
.nsd_placement_group_maps
:
1385 if group_info
.placement_group_ref
== input_group
.name
:
1386 group
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_PlacementGroupsInfo()
1387 group_dict
= {k
:v
for k
,v
in
1388 group_info
.as_dict().items() if k
!= 'placement_group_ref'}
1389 for param
in copy_dict
:
1390 group_dict
.update({param
: getattr(input_group
, param
)})
1391 group
.from_dict(group_dict
)
1397 return "NSR(name={}, nsd_id={}, cloud_account={})".format(
1398 self
.name
, self
.nsd_id
, self
.cloud_account_name
1401 def _get_vnfd(self
, vnfd_id
, config_xact
):
1402 """ Fetch vnfd msg for the passed vnfd id """
1403 return self
._nsm
.get_vnfd(vnfd_id
, config_xact
)
1405 def _get_vnfd_cloud_account(self
, vnfd_member_index
):
1406 """ Fetch Cloud Account for the passed vnfd id """
1407 if self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1408 vim_accounts
= [(vnf
.cloud_account
,vnf
.om_datacenter
) for vnf
in self
._nsr
_cfg
_msg
.vnf_cloud_account_map \
1409 if vnfd_member_index
== vnf
.member_vnf_index_ref
]
1410 if vim_accounts
and vim_accounts
[0]:
1411 return vim_accounts
[0]
1412 return (self
.cloud_account_name
,self
.om_datacenter_name
)
1414 def _get_constituent_vnfd_msg(self
, vnf_index
):
1415 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
1416 if const_vnfd
.member_vnf_index
== vnf_index
:
1419 raise ValueError("Constituent VNF index %s not found" % vnf_index
)
1421 def record_event(self
, evt
, evt_desc
, evt_details
=None, state
=None):
1422 """ Record an event """
1423 self
._op
_status
.record_event(evt
, evt_desc
, evt_details
)
1424 if state
is not None:
1425 self
.set_state(state
)
1427 def scaling_trigger_str(self
, trigger
):
1428 SCALING_TRIGGER_STRS
= {
1429 NsdYang
.ScalingTrigger
.PRE_SCALE_IN
: 'pre-scale-in',
1430 NsdYang
.ScalingTrigger
.POST_SCALE_IN
: 'post-scale-in',
1431 NsdYang
.ScalingTrigger
.PRE_SCALE_OUT
: 'pre-scale-out',
1432 NsdYang
.ScalingTrigger
.POST_SCALE_OUT
: 'post-scale-out',
1435 return SCALING_TRIGGER_STRS
[trigger
]
1436 except Exception as e
:
1437 self
._log
.error("Scaling trigger mapping error for {} : {}".
1439 self
._log
.exception(e
)
1440 return "Unknown trigger"
1443 def instantiate_vls(self
):
1445 This function instantiates VLs for every VL in this Network Service
1447 self
._log
.debug("Instantiating %d VLs in NSD id %s", len(self
._vlrs
),
1449 for vlr
in self
._vlrs
:
1450 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
1451 vlr
.state
= VlRecordState
.ACTIVE
1454 def create(self
, config_xact
):
1455 """ Create this network service"""
1456 # Create virtual links for all the external vnf
1457 # connection points in this NS
1458 yield from self
.create_vls()
1460 # Create VNFs in this network service
1461 yield from self
.create_vnfs(config_xact
)
1463 # Create VNFFG for network service
1464 self
.create_vnffgs()
1466 # Create Scaling Groups for each scaling group in NSD
1467 self
.create_scaling_groups()
1469 # Create Parameter Pools
1470 self
.create_param_pools()
1473 def apply_scale_group_config_script(self
, script
, group
, scale_instance
, trigger
, vnfrs
=None):
1474 """ Apply config based on script for scale group """
1477 def add_vnfrs_data(vnfrs_list
):
1478 """ Add as a dict each of the VNFRs data """
1480 for vnfr
in vnfrs_list
:
1481 self
._log
.debug("Add VNFR {} data".format(vnfr
))
1483 vnfr_data
['name'] = vnfr
.name
1484 if trigger
in [NsdYang
.ScalingTrigger
.PRE_SCALE_IN
, NsdYang
.ScalingTrigger
.POST_SCALE_OUT
]:
1485 # Get VNF management and other IPs, etc
1486 opdata
= yield from self
.fetch_vnfr(vnfr
.xpath
)
1487 self
._log
.debug("VNFR {} op data: {}".format(vnfr
.name
, opdata
))
1489 vnfr_data
['rw_mgmt_ip'] = opdata
.mgmt_interface
.ip_address
1490 vnfr_data
['rw_mgmt_port'] = opdata
.mgmt_interface
.port
1491 except Exception as e
:
1492 self
._log
.error("Unable to get management IP for vnfr {}:{}".
1493 format(vnfr
.name
, e
))
1496 vnfr_data
['connection_points'] = []
1497 for cp
in opdata
.connection_point
:
1499 con_pt
['name'] = cp
.name
1500 con_pt
['ip_address'] = cp
.ip_address
1501 vnfr_data
['connection_points'].append(con_pt
)
1502 except Exception as e
:
1503 self
._log
.error("Exception getting connections points for VNFR {}: {}".
1504 format(vnfr
.name
, e
))
1506 vnfrs_data
.append(vnfr_data
)
1507 self
._log
.debug("VNFRs data: {}".format(vnfrs_data
))
1511 def add_nsr_data(nsr
):
1513 nsr_data
['name'] = nsr
.name
1516 if script
is None or len(script
) == 0:
1517 self
._log
.error("Script not provided for scale group config: {}".format(group
.name
))
1520 if script
[0] == '/':
1523 path
= os
.path
.join(os
.environ
['RIFT_INSTALL'], "usr/bin", script
)
1524 if not os
.path
.exists(path
):
1525 self
._log
.error("Config faled for scale group {}: Script does not exist at {}".
1526 format(group
.name
, path
))
1529 # Build a YAML file with all parameters for the script to execute
1530 # The data consists of 5 sections
1532 # 2. Scale group config
1533 # 3. VNFRs in the scale group
1534 # 4. VNFRs outside scale group
1537 data
['trigger'] = group
.trigger_map(trigger
)
1538 data
['config'] = group
.group_msg
.as_dict()
1541 data
["vnfrs_in_group"] = yield from add_vnfrs_data(vnfrs
)
1543 data
["vnfrs_in_group"] = yield from add_vnfrs_data(scale_instance
.vnfrs
)
1545 data
["vnfrs_others"] = yield from add_vnfrs_data(self
.vnfrs
.values())
1546 data
["nsr"] = add_nsr_data(self
)
1549 with tempfile
.NamedTemporaryFile(delete
=False) as tmp_file
:
1550 tmp_file
.write(yaml
.dump(data
, default_flow_style
=True)
1553 self
._log
.debug("Creating a temp file: {} with input data: {}".
1554 format(tmp_file
.name
, data
))
1556 cmd
= "{} {}".format(path
, tmp_file
.name
)
1557 self
._log
.debug("Running the CMD: {}".format(cmd
))
1558 proc
= yield from asyncio
.create_subprocess_shell(cmd
, loop
=self
._loop
)
1559 rc
= yield from proc
.wait()
1561 self
._log
.error("The script {} for scale group {} config returned: {}".
1562 format(script
, group
.name
, rc
))
1570 def apply_scaling_group_config(self
, trigger
, group
, scale_instance
, vnfrs
=None):
1571 """ Apply the config for the scaling group based on trigger """
1572 if group
is None or scale_instance
is None:
1576 def update_config_status(success
=True, err_msg
=None):
1577 self
._log
.debug("Update %s config status to %r : %s",
1578 scale_instance
, success
, err_msg
)
1579 if (scale_instance
.config_status
== "failed"):
1580 # Do not update the config status if it is already in failed state
1583 if scale_instance
.config_status
== "configured":
1584 # Update only to failed state an already configured scale instance
1586 scale_instance
.config_status
= "failed"
1587 scale_instance
.config_err_msg
= err_msg
1588 yield from self
.update_state()
1590 # We are in configuring state
1591 # Only after post scale out mark instance as configured
1592 if trigger
== NsdYang
.ScalingTrigger
.POST_SCALE_OUT
:
1594 scale_instance
.config_status
= "configured"
1596 scale_instance
.config_status
= "failed"
1597 scale_instance
.config_err_msg
= err_msg
1598 yield from self
.update_state()
1600 config
= group
.trigger_config(trigger
)
1604 self
._log
.debug("Scaling group {} config: {}".format(group
.name
, config
))
1605 if config
.has_field("ns_config_primitive_name_ref"):
1606 config_name
= config
.ns_config_primitive_name_ref
1607 nsd_msg
= self
.nsd_msg
1608 config_primitive
= None
1609 for ns_cfg_prim
in nsd_msg
.service_primitive
:
1610 if ns_cfg_prim
.name
== config_name
:
1611 config_primitive
= ns_cfg_prim
1614 if config_primitive
is None:
1615 raise ValueError("Could not find ns_cfg_prim %s in nsr %s" % (config_name
, self
.name
))
1617 self
._log
.debug("Scaling group {} config primitive: {}".format(group
.name
, config_primitive
))
1618 if config_primitive
.has_field("user_defined_script"):
1619 rc
= yield from self
.apply_scale_group_config_script(config_primitive
.user_defined_script
,
1620 group
, scale_instance
, trigger
, vnfrs
)
1623 err_msg
= "Failed config for trigger {} using config script '{}'". \
1624 format(self
.scaling_trigger_str(trigger
),
1625 config_primitive
.user_defined_script
)
1626 yield from update_config_status(success
=rc
, err_msg
=err_msg
)
1629 err_msg
= "Failed config for trigger {} as config script is not specified". \
1630 format(self
.scaling_trigger_str(trigger
))
1631 yield from update_config_status(success
=False, err_msg
=err_msg
)
1632 raise NotImplementedError("Only script based config support for scale group for now: {}".
1635 err_msg
= "Failed config for trigger {} as config primitive is not specified".\
1636 format(self
.scaling_trigger_str(trigger
))
1637 yield from update_config_status(success
=False, err_msg
=err_msg
)
1638 self
._log
.error("Config primitive not specified for config action in scale group %s" %
1642 def create_scaling_groups(self
):
1643 """ This function creates a NSScalingGroup for every scaling
1644 group defined in he NSD"""
1646 for scaling_group_msg
in self
.nsd_msg
.scaling_group_descriptor
:
1647 self
._log
.debug("Found scaling_group %s in nsr id %s",
1648 scaling_group_msg
.name
, self
.id)
1650 group_record
= scale_group
.ScalingGroup(
1655 self
._scaling
_groups
[group_record
.name
] = group_record
1658 def create_scale_group_instance(self
, group_name
, index
, config_xact
, is_default
=False):
1659 group
= self
._scaling
_groups
[group_name
]
1660 scale_instance
= group
.create_instance(index
, is_default
)
1664 self
._log
.debug("Creating %u VNFs associated with NS id %s scaling group %s",
1665 len(self
.nsd_msg
.constituent_vnfd
), self
.id, self
)
1668 for vnf_index
, count
in group
.vnf_index_count_map
.items():
1669 const_vnfd_msg
= self
._get
_constituent
_vnfd
_msg
(vnf_index
)
1670 vnfd_msg
= self
._get
_vnfd
(const_vnfd_msg
.vnfd_id_ref
, config_xact
)
1672 cloud_account_name
, om_datacenter_name
= self
._get
_vnfd
_cloud
_account
(const_vnfd_msg
.member_vnf_index
)
1673 if cloud_account_name
is None:
1674 cloud_account_name
= self
.cloud_account_name
1675 for _
in range(count
):
1676 vnfr
= yield from self
.create_vnf_record(vnfd_msg
, const_vnfd_msg
, cloud_account_name
, om_datacenter_name
, group_name
, index
)
1677 scale_instance
.add_vnfr(vnfr
)
1683 def instantiate_instance():
1684 self
._log
.debug("Creating %s VNFRS", scale_instance
)
1685 vnfrs
= yield from create_vnfs()
1686 yield from self
.publish()
1688 self
._log
.debug("Instantiating %s VNFRS for %s", len(vnfrs
), scale_instance
)
1689 scale_instance
.operational_status
= "vnf_init_phase"
1690 yield from self
.update_state()
1693 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.PRE_SCALE_OUT
,
1694 group
, scale_instance
, vnfrs
)
1696 self
._log
.error("Pre scale out config for scale group {} ({}) failed".
1697 format(group
.name
, index
))
1698 scale_instance
.operational_status
= "failed"
1700 yield from self
.instantiate_vnfs(vnfrs
)
1702 except Exception as e
:
1703 self
._log
.exception("Failed to begin instantiatiation of vnfs for scale group {}: {}".
1704 format(group
.name
, e
))
1705 self
._log
.exception(e
)
1706 scale_instance
.operational_status
= "failed"
1708 yield from self
.update_state()
1710 yield from instantiate_instance()
1713 def delete_scale_group_instance(self
, group_name
, index
):
1714 group
= self
._scaling
_groups
[group_name
]
1715 scale_instance
= group
.get_instance(index
)
1716 if scale_instance
.is_default
:
1717 raise ScalingOperationError("Cannot terminate a default scaling group instance")
1719 scale_instance
.operational_status
= "terminate"
1720 yield from self
.update_state()
1723 def terminate_instance():
1724 self
._log
.debug("Terminating %s VNFRS" % scale_instance
)
1725 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.PRE_SCALE_IN
,
1726 group
, scale_instance
)
1728 self
._log
.error("Pre scale in config for scale group {} ({}) failed".
1729 format(group
.name
, index
))
1731 # Going ahead with terminate, even if there is an error in pre-scale-in config
1732 # as this could be result of scale out failure and we need to cleanup this group
1733 yield from self
.terminate_vnfrs(scale_instance
.vnfrs
)
1734 group
.delete_instance(index
)
1736 scale_instance
.operational_status
= "vnf_terminate_phase"
1737 yield from self
.update_state()
1739 yield from terminate_instance()
1742 def _update_scale_group_instances_status(self
):
1744 def post_scale_out_task(group
, instance
):
1745 # Apply post scale out config once all VNFRs are active
1746 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.POST_SCALE_OUT
,
1748 instance
.operational_status
= "running"
1750 self
._log
.debug("Scale out for group {} and instance {} succeeded".
1751 format(group
.name
, instance
.instance_id
))
1753 self
._log
.error("Post scale out config for scale group {} ({}) failed".
1754 format(group
.name
, instance
.instance_id
))
1756 yield from self
.update_state()
1758 group_instances
= {group
: group
.instances
for group
in self
._scaling
_groups
.values()}
1759 for group
, instances
in group_instances
.items():
1760 self
._log
.debug("Updating %s instance status", group
)
1761 for instance
in instances
:
1762 instance_vnf_state_list
= [vnfr
.state
for vnfr
in instance
.vnfrs
]
1763 self
._log
.debug("Got vnfr instance states: %s", instance_vnf_state_list
)
1764 if instance
.operational_status
== "vnf_init_phase":
1765 if all([state
== VnfRecordState
.ACTIVE
for state
in instance_vnf_state_list
]):
1766 instance
.operational_status
= "running"
1768 # Create a task for post scale out to allow us to sleep before attempting
1769 # to configure newly created VM's
1770 self
._loop
.create_task(post_scale_out_task(group
, instance
))
1772 elif any([state
== VnfRecordState
.FAILED
for state
in instance_vnf_state_list
]):
1773 self
._log
.debug("Scale out for group {} and instance {} failed".
1774 format(group
.name
, instance
.instance_id
))
1775 instance
.operational_status
= "failed"
1777 elif instance
.operational_status
== "vnf_terminate_phase":
1778 if all([state
== VnfRecordState
.TERMINATED
for state
in instance_vnf_state_list
]):
1779 instance
.operational_status
= "terminated"
1780 rc
= yield from self
.apply_scaling_group_config(NsdYang
.ScalingTrigger
.POST_SCALE_IN
,
1783 self
._log
.debug("Scale in for group {} and instance {} succeeded".
1784 format(group
.name
, instance
.instance_id
))
1786 self
._log
.error("Post scale in config for scale group {} ({}) failed".
1787 format(group
.name
, instance
.instance_id
))
1789 def create_vnffgs(self
):
1790 """ This function creates VNFFGs for every VNFFG in the NSD
1791 associated with this NSR"""
1793 for vnffgd
in self
.nsd_msg
.vnffgd
:
1794 self
._log
.debug("Found vnffgd %s in nsr id %s", vnffgd
, self
.id)
1795 vnffgr
= VnffgRecord(self
._dts
,
1798 self
._nsm
._vnffgmgr
,
1802 self
._sdn
_account
_name
1804 self
._vnffgrs
[vnffgr
.id] = vnffgr
1806 def resolve_vld_ip_profile(self
, nsd_msg
, vld
):
1807 if not vld
.has_field('ip_profile_ref'):
1809 profile
= [ profile
for profile
in nsd_msg
.ip_profiles
if profile
.name
== vld
.ip_profile_ref
]
1810 return profile
[0] if profile
else None
1813 def _create_vls(self
, vld
, cloud_account
,om_datacenter
):
1814 """Create a VLR in the cloud account specified using the given VLD
1818 cloud_account : Cloud account name
1823 vlr
= yield from VirtualLinkRecord
.create_record(
1831 self
.resolve_vld_ip_profile(self
.nsd_msg
, vld
),
1833 restart_mode
=self
.restart_mode
)
1837 def _extract_cloud_accounts_for_vl(self
, vld
):
1839 Extracts the list of cloud accounts from the NS Config obj
1842 1. Cloud accounts based connection point (vnf_cloud_account_map)
1844 vld : VLD yang object
1849 cloud_account_list
= []
1851 if self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1852 # Handle case where cloud_account is None
1854 for vnf
in self
._nsr
_cfg
_msg
.vnf_cloud_account_map
:
1855 if vnf
.cloud_account
is not None or vnf
.om_datacenter
is not None:
1856 vnf_cloud_map
[vnf
.member_vnf_index_ref
] = (vnf
.cloud_account
,vnf
.om_datacenter
)
1858 for vnfc
in vld
.vnfd_connection_point_ref
:
1859 cloud_account
= vnf_cloud_map
.get(
1860 vnfc
.member_vnf_index_ref
,
1861 (self
.cloud_account_name
,self
.om_datacenter_name
))
1863 cloud_account_list
.append(cloud_account
)
1865 if self
._nsr
_cfg
_msg
.vl_cloud_account_map
:
1866 for vld_map
in self
._nsr
_cfg
_msg
.vl_cloud_account_map
:
1867 if vld_map
.vld_id_ref
== vld
.id:
1868 for cloud_account
in vld_map
.cloud_accounts
:
1869 cloud_account_list
.extend((cloud_account
,None))
1870 for om_datacenter
in vld_map
.om_datacenters
:
1871 cloud_account_list
.extend((None,om_datacenter
))
1873 # If no config has been provided then fall-back to the default
1875 if not cloud_account_list
:
1876 cloud_account_list
= [(self
.cloud_account_name
,self
.om_datacenter_name
)]
1878 self
._log
.debug("VL {} cloud accounts: {}".
1879 format(vld
.name
, cloud_account_list
))
1880 return set(cloud_account_list
)
1883 def create_vls(self
):
1884 """ This function creates VLs for every VLD in the NSD
1885 associated with this NSR"""
1886 for vld
in self
.nsd_msg
.vld
:
1887 self
._log
.debug("Found vld %s in nsr id %s", vld
, self
.id)
1888 cloud_account_list
= self
._extract
_cloud
_accounts
_for
_vl
(vld
)
1889 for cloud_account
,om_datacenter
in cloud_account_list
:
1890 vlr
= yield from self
._create
_vls
(vld
, cloud_account
,om_datacenter
)
1891 self
._vlrs
.append(vlr
)
1895 def create_vl_instance(self
, vld
):
1896 self
._log
.debug("Create VL for {}: {}".format(self
.id, vld
.as_dict()))
1897 # Check if the VL is already present
1899 for vl
in self
._vlrs
:
1900 if vl
.vld_msg
.id == vld
.id:
1901 self
._log
.debug("The VLD %s already in NSR %s as VLR %s with status %s",
1902 vld
.id, self
.id, vl
.id, vl
.state
)
1904 if vlr
.state
!= VlRecordState
.TERMINATED
:
1905 err_msg
= "VLR for VL %s in NSR %s already instantiated", \
1907 self
._log
.error(err_msg
)
1908 raise NsrVlUpdateError(err_msg
)
1912 cloud_account_list
= self
._extract
_cloud
_accounts
_for
_vl
(vld
)
1913 for account
in cloud_account_list
:
1914 vlr
= yield from self
._create
_vls
(vld
, account
)
1915 self
._vlrs
.append(vlr
)
1917 vlr
.state
= VlRecordState
.INSTANTIATION_PENDING
1918 yield from self
.update_state()
1921 yield from self
.nsm_plugin
.instantiate_vl(self
, vlr
)
1922 vlr
.state
= VlRecordState
.ACTIVE
1924 except Exception as e
:
1925 err_msg
= "Error instantiating VL for NSR {} and VLD {}: {}". \
1926 format(self
.id, vld
.id, e
)
1927 self
._log
.error(err_msg
)
1928 self
._log
.exception(e
)
1929 vlr
.state
= VlRecordState
.FAILED
1931 yield from self
.update_state()
1934 def delete_vl_instance(self
, vld
):
1935 for vlr
in self
._vlrs
:
1936 if vlr
.vld_msg
.id == vld
.id:
1937 self
._log
.debug("Found VLR %s for VLD %s in NSR %s",
1938 vlr
.id, vld
.id, self
.id)
1939 vlr
.state
= VlRecordState
.TERMINATE_PENDING
1940 yield from self
.update_state()
1943 yield from self
.nsm_plugin
.terminate_vl(vlr
)
1944 vlr
.state
= VlRecordState
.TERMINATED
1945 self
._vlrs
.remove(vlr
)
1947 except Exception as e
:
1948 err_msg
= "Error terminating VL for NSR {} and VLD {}: {}". \
1949 format(self
.id, vld
.id, e
)
1950 self
._log
.error(err_msg
)
1951 self
._log
.exception(e
)
1952 vlr
.state
= VlRecordState
.FAILED
1954 yield from self
.update_state()
1958 def create_vnfs(self
, config_xact
):
1960 This function creates VNFs for every VNF in the NSD
1961 associated with this NSR
1963 self
._log
.debug("Creating %u VNFs associated with this NS id %s",
1964 len(self
.nsd_msg
.constituent_vnfd
), self
.id)
1966 for const_vnfd
in self
.nsd_msg
.constituent_vnfd
:
1967 if not const_vnfd
.start_by_default
:
1968 self
._log
.debug("start_by_default set to False in constituent VNF (%s). Skipping start.",
1969 const_vnfd
.member_vnf_index
)
1972 vnfd_msg
= self
._get
_vnfd
(const_vnfd
.vnfd_id_ref
, config_xact
)
1973 cloud_account_name
,om_datacenter_name
= self
._get
_vnfd
_cloud
_account
(const_vnfd
.member_vnf_index
)
1974 if cloud_account_name
is None:
1975 cloud_account_name
= self
.cloud_account_name
1976 yield from self
.create_vnf_record(vnfd_msg
, const_vnfd
, cloud_account_name
, om_datacenter_name
)
1979 def get_placement_groups(self
, vnfd_msg
, const_vnfd
):
1980 placement_groups
= []
1981 for group
in self
.nsd_msg
.placement_groups
:
1982 for member_vnfd
in group
.member_vnfd
:
1983 if (member_vnfd
.vnfd_id_ref
== vnfd_msg
.id) and \
1984 (member_vnfd
.member_vnf_index_ref
== const_vnfd
.member_vnf_index
):
1985 group_info
= self
.resolve_placement_group_cloud_construct(group
)
1986 if group_info
is None:
1987 self
._log
.error("Could not resolve cloud-construct for placement group: %s", group
.name
)
1988 ### raise PlacementGroupError("Could not resolve cloud-construct for placement group: {}".format(group.name))
1990 self
._log
.info("Successfully resolved cloud construct for placement group: %s for VNF: %s (Member Index: %s)",
1993 const_vnfd
.member_vnf_index
)
1994 placement_groups
.append(group_info
)
1995 return placement_groups
1998 def create_vnf_record(self
, vnfd_msg
, const_vnfd
, cloud_account_name
, om_datacenter_name
, group_name
=None, group_instance_id
=None):
1999 # Fetch the VNFD associated with this VNF
2000 placement_groups
= self
.get_placement_groups(vnfd_msg
, const_vnfd
)
2001 self
._log
.info("Cloud Account for VNF %d is %s",const_vnfd
.member_vnf_index
,cloud_account_name
)
2002 self
._log
.info("Launching VNF: %s (Member Index: %s) in NSD plancement Groups: %s",
2004 const_vnfd
.member_vnf_index
,
2005 [ group
.name
for group
in placement_groups
])
2006 vnfr
= yield from VirtualNetworkFunctionRecord
.create_record(self
._dts
,
2019 restart_mode
=self
.restart_mode
,
2021 if vnfr
.id in self
._vnfrs
:
2022 err
= "VNF with VNFR id %s already in vnf list" % (vnfr
.id,)
2023 raise NetworkServiceRecordError(err
)
2025 self
._vnfrs
[vnfr
.id] = vnfr
2026 self
._nsm
.vnfrs
[vnfr
.id] = vnfr
2028 yield from vnfr
.set_config_status(NsrYang
.ConfigStates
.INIT
)
2030 self
._log
.debug("Added VNFR %s to NSM VNFR list with id %s",
2036 def create_param_pools(self
):
2037 for param_pool
in self
.nsd_msg
.parameter_pool
:
2038 self
._log
.debug("Found parameter pool %s in nsr id %s", param_pool
, self
.id)
2040 start_value
= param_pool
.range.start_value
2041 end_value
= param_pool
.range.end_value
2042 if end_value
< start_value
:
2043 raise NetworkServiceRecordError(
2044 "Parameter pool %s has invalid range (start: {}, end: {})".format(
2045 start_value
, end_value
2049 self
._param
_pools
[param_pool
.name
] = config_value_pool
.ParameterValuePool(
2052 range(start_value
, end_value
)
2056 def fetch_vnfr(self
, vnfr_path
):
2057 """ Fetch VNFR record """
2059 self
._log
.debug("Fetching VNFR with key %s while instantiating %s",
2061 res_iter
= yield from self
._dts
.query_read(vnfr_path
, rwdts
.XactFlag
.MERGE
)
2063 for ent
in res_iter
:
2064 res
= yield from ent
2070 def instantiate_vnfs(self
, vnfrs
):
2072 This function instantiates VNFs for every VNF in this Network Service
2074 self
._log
.debug("Instantiating %u VNFs in NS %s", len(vnfrs
), self
.id)
2076 self
._log
.debug("Instantiating VNF: %s in NS %s", vnf
, self
.id)
2077 yield from self
.nsm_plugin
.instantiate_vnf(self
, vnf
)
2080 def instantiate_vnffgs(self
):
2082 This function instantiates VNFFGs for every VNFFG in this Network Service
2084 self
._log
.debug("Instantiating %u VNFFGs in NS %s",
2085 len(self
.nsd_msg
.vnffgd
), self
.id)
2086 for _
, vnfr
in self
.vnfrs
.items():
2087 while vnfr
.state
in [VnfRecordState
.INSTANTIATION_PENDING
, VnfRecordState
.INIT
]:
2088 self
._log
.debug("Received vnfr state for vnfr %s is %s; retrying",vnfr
.name
,vnfr
.state
)
2089 yield from asyncio
.sleep(2, loop
=self
._loop
)
2090 if vnfr
.state
== VnfRecordState
.ACTIVE
:
2091 self
._log
.debug("Received vnfr state for vnfr %s is %s ",vnfr
.name
,vnfr
.state
)
2094 self
._log
.debug("Received vnfr state for vnfr %s is %s; failing vnffg creation",vnfr
.name
,vnfr
.state
)
2095 self
._vnffgr
_state
= VnffgRecordState
.FAILED
2098 self
._log
.info("Waiting for 90 seconds for VMs to come up")
2099 yield from asyncio
.sleep(90, loop
=self
._loop
)
2100 self
._log
.info("Starting VNFFG orchestration")
2101 for vnffg
in self
._vnffgrs
.values():
2102 self
._log
.debug("Instantiating VNFFG: %s in NS %s", vnffg
, self
.id)
2103 yield from vnffg
.instantiate()
2106 def instantiate_scaling_instances(self
, config_xact
):
2107 """ Instantiate any default scaling instances in this Network Service """
2108 for group
in self
._scaling
_groups
.values():
2109 for i
in range(group
.min_instance_count
):
2110 self
._log
.debug("Instantiating %s default scaling instance %s", group
, i
)
2111 yield from self
.create_scale_group_instance(
2112 group
.name
, i
, config_xact
, is_default
=True
2115 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2116 if group_msg
.scaling_group_name_ref
!= group
.name
:
2119 for instance
in group_msg
.instance
:
2120 self
._log
.debug("Reloading %s scaling instance %s", group_msg
, instance
.id)
2121 yield from self
.create_scale_group_instance(
2122 group
.name
, instance
.id, config_xact
, is_default
=False
2125 def has_scaling_instances(self
):
2126 """ Return boolean indicating if the network service has default scaling groups """
2127 for group
in self
._scaling
_groups
.values():
2128 if group
.min_instance_count
> 0:
2131 for group_msg
in self
._nsr
_cfg
_msg
.scaling_group
:
2132 if len(group_msg
.instance
) > 0:
2139 """ This function publishes this NSR """
2140 self
._nsr
_msg
= self
.create_msg()
2142 self
._log
.debug("Publishing the NSR with xpath %s and nsr %s",
2146 if self
._debug
_running
:
2147 self
._log
.debug("Publishing NSR in RUNNING state!")
2150 with self
._dts
.transaction() as xact
:
2151 yield from self
._nsm
.nsr_handler
.update(xact
, self
.nsr_xpath
, self
._nsr
_msg
)
2152 if self
._op
_status
.state
== NetworkServiceRecordState
.RUNNING
:
2153 self
._debug
_running
= True
2156 def unpublish(self
, xact
):
2157 """ Unpublish this NSR object """
2158 self
._log
.debug("Unpublishing Network service id %s", self
.id)
2159 yield from self
._nsm
.nsr_handler
.delete(xact
, self
.nsr_xpath
)
2162 def nsr_xpath(self
):
2163 """ Returns the xpath associated with this NSR """
2165 "D,/nsr:ns-instance-opdata" +
2166 "/nsr:nsr[nsr:ns-instance-config-ref = '{}']"
2170 def xpath_from_nsr(nsr
):
2171 """ Returns the xpath associated with this NSR op data"""
2172 return (NetworkServiceRecord
.XPATH
+
2173 "[nsr:ns-instance-config-ref = '{}']").format(nsr
.id)
2176 def nsd_xpath(self
):
2177 """ Return NSD config xpath."""
2179 "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}']"
2180 ).format(self
.nsd_id
)
2183 def instantiate(self
, config_xact
):
2184 """"Instantiates a NetworkServiceRecord.
2186 This function instantiates a Network service
2187 which involves the following steps,
2189 * Instantiate every VL in NSD by sending create VLR request to DTS.
2190 * Instantiate every VNF in NSD by sending create VNF reuqest to DTS.
2191 * Publish the NSR details to DTS
2194 nsr: The NSR configuration request containing nsr-id and nsd
2195 config_xact: The configuration transaction which initiated the instatiation
2198 NetworkServiceRecordError if the NSR creation fails
2204 self
._log
.debug("Instantiating NS - %s xact - %s", self
, config_xact
)
2206 # Move the state to INIITALIZING
2207 self
.set_state(NetworkServiceRecordState
.INIT
)
2209 event_descr
= "Instantiation Request Received NSR Id:%s" % self
.id
2210 self
.record_event("instantiating", event_descr
)
2213 self
._nsd
= self
._nsr
_cfg
_msg
.nsd
2216 # Update ref count if nsd present in catalog
2217 self
._nsm
.get_nsd_ref(self
.nsd_id
)
2219 except NetworkServiceDescriptorError
:
2220 # This could be an NSD not in the nsd-catalog
2223 # Merge any config and initial config primitive values
2224 self
.config_store
.merge_nsd_config(self
.nsd_msg
)
2225 self
._log
.debug("Merged NSD: {}".format(self
.nsd_msg
.as_dict()))
2227 event_descr
= "Fetched NSD with descriptor id %s" % self
.nsd_id
2228 self
.record_event("nsd-fetched", event_descr
)
2230 if self
._nsd
is None:
2231 msg
= "Failed to fetch NSD with nsd-id [%s] for nsr-id %s"
2232 self
._log
.debug(msg
, self
.nsd_id
, self
.id)
2233 raise NetworkServiceRecordError(self
)
2235 self
._log
.debug("Got nsd result %s", self
._nsd
)
2237 # Substitute any input parameters
2238 self
.substitute_input_parameters(self
._nsd
, self
._nsr
_cfg
_msg
)
2241 yield from self
.create(config_xact
)
2243 # Publish the NSR to DTS
2244 yield from self
.publish()
2247 def do_instantiate():
2249 Instantiate network service
2251 self
._log
.debug("Instantiating VLs nsr id [%s] nsd id [%s]",
2252 self
.id, self
.nsd_id
)
2254 # instantiate the VLs
2255 event_descr
= ("Instantiating %s external VLs for NSR id %s" %
2256 (len(self
.nsd_msg
.vld
), self
.id))
2257 self
.record_event("begin-external-vls-instantiation", event_descr
)
2259 self
.set_state(NetworkServiceRecordState
.VL_INIT_PHASE
)
2261 yield from self
.instantiate_vls()
2263 # Publish the NSR to DTS
2264 yield from self
.publish()
2266 event_descr
= ("Finished instantiating %s external VLs for NSR id %s" %
2267 (len(self
.nsd_msg
.vld
), self
.id))
2268 self
.record_event("end-external-vls-instantiation", event_descr
)
2270 self
.set_state(NetworkServiceRecordState
.VNF_INIT_PHASE
)
2272 self
._log
.debug("Instantiating VNFs ...... nsr[%s], nsd[%s]",
2273 self
.id, self
.nsd_id
)
2275 # instantiate the VNFs
2276 event_descr
= ("Instantiating %s VNFS for NSR id %s" %
2277 (len(self
.nsd_msg
.constituent_vnfd
), self
.id))
2279 self
.record_event("begin-vnf-instantiation", event_descr
)
2281 yield from self
.instantiate_vnfs(self
._vnfrs
.values())
2283 self
._log
.debug(" Finished instantiating %d VNFs for NSR id %s",
2284 len(self
.nsd_msg
.constituent_vnfd
), self
.id)
2286 event_descr
= ("Finished instantiating %s VNFs for NSR id %s" %
2287 (len(self
.nsd_msg
.constituent_vnfd
), self
.id))
2288 self
.record_event("end-vnf-instantiation", event_descr
)
2290 if len(self
.vnffgrs
) > 0:
2291 #self.set_state(NetworkServiceRecordState.VNFFG_INIT_PHASE)
2292 event_descr
= ("Instantiating %s VNFFGS for NSR id %s" %
2293 (len(self
.nsd_msg
.vnffgd
), self
.id))
2295 self
.record_event("begin-vnffg-instantiation", event_descr
)
2297 yield from self
.instantiate_vnffgs()
2299 event_descr
= ("Finished instantiating %s VNFFGDs for NSR id %s" %
2300 (len(self
.nsd_msg
.vnffgd
), self
.id))
2301 self
.record_event("end-vnffg-instantiation", event_descr
)
2303 if self
.has_scaling_instances():
2304 event_descr
= ("Instantiating %s Scaling Groups for NSR id %s" %
2305 (len(self
._scaling
_groups
), self
.id))
2307 self
.record_event("begin-scaling-group-instantiation", event_descr
)
2308 yield from self
.instantiate_scaling_instances(config_xact
)
2309 self
.record_event("end-scaling-group-instantiation", event_descr
)
2311 # Give the plugin a chance to deploy the network service now that all
2312 # virtual links and vnfs are instantiated
2313 yield from self
.nsm_plugin
.deploy(self
._nsr
_msg
)
2315 self
._log
.debug("Publishing NSR...... nsr[%s], nsd[%s]",
2316 self
.id, self
.nsd_id
)
2318 # Publish the NSR to DTS
2319 yield from self
.publish()
2321 self
._log
.debug("Published NSR...... nsr[%s], nsd[%s]",
2322 self
.id, self
.nsd_id
)
2324 def on_instantiate_done(fut
):
2325 # If the do_instantiate fails, then publish NSR with failed result
2326 if fut
.exception() is not None:
2327 self
._log
.error("NSR instantiation failed for NSR id %s: %s", self
.id, str(fut
.exception()))
2328 self
._loop
.create_task(self
.instantiation_failed(failed_reason
=str(fut
.exception())))
2330 instantiate_task
= self
._loop
.create_task(do_instantiate())
2331 instantiate_task
.add_done_callback(on_instantiate_done
)
2334 def set_config_status(self
, status
, status_details
=None):
2335 if self
.config_status
!= status
:
2336 self
._log
.debug("Updating NSR {} status for {} to {}".
2337 format(self
.name
, self
.config_status
, status
))
2338 self
._config
_status
= status
2339 self
._config
_status
_details
= status_details
2341 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2342 self
.record_event("config-failed", "NS configuration failed",
2343 evt_details
=self
._config
_status
_details
)
2345 yield from self
.publish()
2348 def is_active(self
):
2349 """ This NS is active """
2350 self
.set_state(NetworkServiceRecordState
.RUNNING
)
2354 # Publish the NSR to DTS
2355 self
._log
.debug("Network service %s is active ", self
.id)
2356 self
._is
_active
= True
2358 event_descr
= "NSR in running state for NSR id %s" % self
.id
2359 self
.record_event("ns-running", event_descr
)
2361 yield from self
.publish()
2364 def instantiation_failed(self
, failed_reason
=None):
2365 """ The NS instantiation failed"""
2366 self
._log
.error("Network service id:%s, name:%s instantiation failed",
2368 self
.set_state(NetworkServiceRecordState
.FAILED
)
2370 event_descr
= "Instantiation of NS %s failed" % self
.id
2371 self
.record_event("ns-failed", event_descr
, evt_details
=failed_reason
)
2373 # Publish the NSR to DTS
2374 yield from self
.publish()
2377 def terminate_vnfrs(self
, vnfrs
):
2378 """ Terminate VNFRS in this network service """
2379 self
._log
.debug("Terminating VNFs in network service %s", self
.id)
2381 yield from self
.nsm_plugin
.terminate_vnf(vnfr
)
2384 def terminate(self
):
2385 """ Terminate a NetworkServiceRecord."""
2386 def terminate_vnffgrs():
2387 """ Terminate VNFFGRS in this network service """
2388 self
._log
.debug("Terminating VNFFGRs in network service %s", self
.id)
2389 for vnffgr
in self
.vnffgrs
.values():
2390 yield from vnffgr
.terminate()
2392 def terminate_vlrs():
2393 """ Terminate VLRs in this netork service """
2394 self
._log
.debug("Terminating VLs in network service %s", self
.id)
2395 for vlr
in self
.vlrs
:
2396 yield from self
.nsm_plugin
.terminate_vl(vlr
)
2397 vlr
.state
= VlRecordState
.TERMINATED
2399 self
._log
.debug("Terminating network service id %s", self
.id)
2401 # Move the state to TERMINATE
2402 self
.set_state(NetworkServiceRecordState
.TERMINATE
)
2403 event_descr
= "Terminate being processed for NS Id:%s" % self
.id
2404 self
.record_event("terminate", event_descr
)
2406 # Move the state to VNF_TERMINATE_PHASE
2407 self
._log
.debug("Terminating VNFFGs in NS ID: %s", self
.id)
2408 self
.set_state(NetworkServiceRecordState
.VNFFG_TERMINATE_PHASE
)
2409 event_descr
= "Terminating VNFFGS in NS Id:%s" % self
.id
2410 self
.record_event("terminating-vnffgss", event_descr
)
2411 yield from terminate_vnffgrs()
2413 # Move the state to VNF_TERMINATE_PHASE
2414 self
.set_state(NetworkServiceRecordState
.VNF_TERMINATE_PHASE
)
2415 event_descr
= "Terminating VNFS in NS Id:%s" % self
.id
2416 self
.record_event("terminating-vnfs", event_descr
)
2417 yield from self
.terminate_vnfrs(self
.vnfrs
.values())
2419 # Move the state to VL_TERMINATE_PHASE
2420 self
.set_state(NetworkServiceRecordState
.VL_TERMINATE_PHASE
)
2421 event_descr
= "Terminating VLs in NS Id:%s" % self
.id
2422 self
.record_event("terminating-vls", event_descr
)
2423 yield from terminate_vlrs()
2425 yield from self
.nsm_plugin
.terminate_ns(self
)
2427 # Move the state to TERMINATED
2428 self
.set_state(NetworkServiceRecordState
.TERMINATED
)
2429 event_descr
= "Terminated NS Id:%s" % self
.id
2430 self
.record_event("terminated", event_descr
)
2433 """"Enable a NetworkServiceRecord."""
2437 """"Disable a NetworkServiceRecord."""
2440 def map_config_status(self
):
2441 self
._log
.debug("Config status for ns {} is {}".
2442 format(self
.name
, self
._config
_status
))
2443 if self
._config
_status
== NsrYang
.ConfigStates
.CONFIGURING
:
2444 return 'configuring'
2445 if self
._config
_status
== NsrYang
.ConfigStates
.FAILED
:
2449 def vl_phase_completed(self
):
2450 """ Are VLs created in this NS?"""
2451 return self
._vl
_phase
_completed
2453 def vnf_phase_completed(self
):
2454 """ Are VLs created in this NS?"""
2455 return self
._vnf
_phase
_completed
2457 def create_msg(self
):
2458 """ The network serice record as a message """
2459 nsr_dict
= {"ns_instance_config_ref": self
.id}
2460 nsr
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr
.from_dict(nsr_dict
)
2461 #nsr.cloud_account = self.cloud_account_name
2462 nsr
.sdn_account
= self
._sdn
_account
_name
2463 nsr
.name_ref
= self
.name
2464 nsr
.nsd_ref
= self
.nsd_id
2465 nsr
.nsd_name_ref
= self
.nsd_msg
.name
2466 nsr
.operational_events
= self
._op
_status
.msg
2467 nsr
.operational_status
= self
._op
_status
.yang_str()
2468 nsr
.config_status
= self
.map_config_status()
2469 nsr
.config_status_details
= self
._config
_status
_details
2470 nsr
.create_time
= self
._create
_time
2472 for cfg_prim
in self
.nsd_msg
.service_primitive
:
2473 cfg_prim
= NsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr_ServicePrimitive
.from_dict(
2475 nsr
.service_primitive
.append(cfg_prim
)
2477 for init_cfg
in self
.nsd_msg
.initial_config_primitive
:
2478 prim
= NsrYang
.NsrInitialConfigPrimitive
.from_dict(
2480 nsr
.initial_config_primitive
.append(prim
)
2482 if self
.vl_phase_completed():
2483 for vlr
in self
.vlrs
:
2484 nsr
.vlr
.append(vlr
.create_nsr_vlr_msg(self
.vnfrs
.values()))
2486 if self
.vnf_phase_completed():
2487 for vnfr_id
in self
.vnfrs
:
2488 nsr
.constituent_vnfr_ref
.append(self
.vnfrs
[vnfr_id
].const_vnfr_msg
)
2489 for vnffgr
in self
.vnffgrs
.values():
2490 nsr
.vnffgr
.append(vnffgr
.fetch_vnffgr())
2491 for scaling_group
in self
._scaling
_groups
.values():
2492 nsr
.scaling_group_record
.append(scaling_group
.create_record_msg())
2496 def all_vnfs_active(self
):
2497 """ Are all VNFS in this NS active? """
2498 for _
, vnfr
in self
.vnfrs
.items():
2499 if vnfr
.active
is not True:
2504 def update_state(self
):
2505 """ Re-evaluate this NS's state """
2506 curr_state
= self
._op
_status
.state
2508 if curr_state
== NetworkServiceRecordState
.TERMINATED
:
2509 self
._log
.debug("NS (%s) in terminated state, not updating state", self
.id)
2512 new_state
= NetworkServiceRecordState
.RUNNING
2513 self
._log
.info("Received update_state for nsr: %s, curr-state: %s",
2514 self
.id, curr_state
)
2516 # Check all the VNFRs are present
2517 for _
, vnfr
in self
.vnfrs
.items():
2518 if vnfr
.state
in [VnfRecordState
.ACTIVE
, VnfRecordState
.TERMINATED
]:
2520 elif vnfr
.state
== VnfRecordState
.FAILED
:
2521 if vnfr
._prev
_state
!= vnfr
.state
:
2522 event_descr
= "Instantiation of VNF %s failed" % vnfr
.id
2523 event_error_details
= vnfr
.state_failed_reason
2524 self
.record_event("vnf-failed", event_descr
, evt_details
=event_error_details
)
2525 vnfr
.set_state(VnfRecordState
.FAILED
)
2527 self
._log
.info("VNF state did not change, curr=%s, prev=%s",
2528 vnfr
.state
, vnfr
._prev
_state
)
2529 new_state
= NetworkServiceRecordState
.FAILED
2532 self
._log
.info("VNF %s in NSR %s is still not active; current state is: %s",
2533 vnfr
.id, self
.id, vnfr
.state
)
2534 new_state
= curr_state
2536 # If new state is RUNNING; check all VLs
2537 if new_state
== NetworkServiceRecordState
.RUNNING
:
2538 for vl
in self
.vlrs
:
2540 if vl
.state
in [VlRecordState
.ACTIVE
, VlRecordState
.TERMINATED
]:
2542 elif vl
.state
== VlRecordState
.FAILED
:
2543 if vl
.prev_state
!= vl
.state
:
2544 event_descr
= "Instantiation of VL %s failed" % vl
.id
2545 event_error_details
= vl
.state_failed_reason
2546 self
.record_event("vl-failed", event_descr
, evt_details
=event_error_details
)
2547 vl
.prev_state
= vl
.state
2549 self
._log
.debug("VL %s already in failed state")
2551 if vl
.state
in [VlRecordState
.INSTANTIATION_PENDING
, VlRecordState
.INIT
]:
2552 new_state
= NetworkServiceRecordState
.VL_INSTANTIATE
2555 if vl
.state
in [VlRecordState
.TERMINATE_PENDING
]:
2556 new_state
= NetworkServiceRecordState
.VL_TERMINATE
2559 # If new state is RUNNING; check VNFFGRs are also active
2560 if new_state
== NetworkServiceRecordState
.RUNNING
:
2561 for _
, vnffgr
in self
.vnffgrs
.items():
2562 self
._log
.info("Checking vnffgr state for nsr %s is: %s",
2563 self
.id, vnffgr
.state
)
2564 if vnffgr
.state
== VnffgRecordState
.ACTIVE
:
2566 elif vnffgr
.state
== VnffgRecordState
.FAILED
:
2567 event_descr
= "Instantiation of VNFFGR %s failed" % vnffgr
.id
2568 self
.record_event("vnffg-failed", event_descr
)
2569 new_state
= NetworkServiceRecordState
.FAILED
2572 self
._log
.info("VNFFGR %s in NSR %s is still not active; current state is: %s",
2573 vnffgr
.id, self
.id, vnffgr
.state
)
2574 new_state
= curr_state
2576 # Update all the scaling group instance operational status to
2577 # reflect the state of all VNFR within that instance
2578 yield from self
._update
_scale
_group
_instances
_status
()
2580 for _
, group
in self
._scaling
_groups
.items():
2581 if group
.state
== scale_group
.ScaleGroupState
.SCALING_OUT
:
2582 new_state
= NetworkServiceRecordState
.SCALING_OUT
2584 elif group
.state
== scale_group
.ScaleGroupState
.SCALING_IN
:
2585 new_state
= NetworkServiceRecordState
.SCALING_IN
2588 if new_state
!= curr_state
:
2589 self
._log
.debug("Changing state of Network service %s from %s to %s",
2590 self
.id, curr_state
, new_state
)
2591 if new_state
== NetworkServiceRecordState
.RUNNING
:
2592 yield from self
.is_active()
2593 elif new_state
== NetworkServiceRecordState
.FAILED
:
2594 # If the NS is already active and we entered scaling_in, scaling_out,
2595 # do not mark the NS as failing if scaling operation failed.
2596 if curr_state
in [NetworkServiceRecordState
.SCALING_OUT
,
2597 NetworkServiceRecordState
.SCALING_IN
] and self
._is
_active
:
2598 new_state
= NetworkServiceRecordState
.RUNNING
2599 self
.set_state(new_state
)
2601 yield from self
.instantiation_failed()
2603 self
.set_state(new_state
)
2605 yield from self
.publish()
2608 class InputParameterSubstitution(object):
2610 This class is responsible for substituting input parameters into an NSD.
2613 def __init__(self
, log
):
2614 """Create an instance of InputParameterSubstitution
2617 log - a logger for this object to use
2622 def __call__(self
, nsd
, nsr_config
):
2623 """Substitutes input parameters from the NSR config into the NSD
2625 This call modifies the provided NSD with the input parameters that are
2626 contained in the NSR config.
2629 nsd - a GI NSD object
2630 nsr_config - a GI NSR config object
2633 if nsd
is None or nsr_config
is None:
2636 # Create a lookup of the xpath elements that this descriptor allows
2638 optional_input_parameters
= set()
2639 for input_parameter
in nsd
.input_parameter_xpath
:
2640 optional_input_parameters
.add(input_parameter
.xpath
)
2642 # Apply the input parameters to the descriptor
2643 if nsr_config
.input_parameter
:
2644 for param
in nsr_config
.input_parameter
:
2645 if param
.xpath
not in optional_input_parameters
:
2646 msg
= "tried to set an invalid input parameter ({})"
2647 self
.log
.error(msg
.format(param
.xpath
))
2651 "input-parameter:{} = {}".format(
2658 xpath
.setxattr(nsd
, param
.xpath
, param
.value
)
2660 except Exception as e
:
2661 self
.log
.exception(e
)
2664 class NetworkServiceDescriptor(object):
2666 Network service descriptor class
2669 def __init__(self
, dts
, log
, loop
, nsd
, nsm
):
2681 """ Returns nsd id """
2686 """ Returns name of nsd """
2687 return self
._nsd
.name
2690 def ref_count(self
):
2691 """ Returns reference count"""
2692 return self
._ref
_count
2695 """ Returns whether nsd is in use or not """
2696 return True if self
.ref_count
> 0 else False
2699 """ Take a reference on this object """
2700 self
._ref
_count
+= 1
2703 """ Release reference on this object """
2704 if self
.ref_count
< 1:
2705 msg
= ("Unref on a NSD object - nsd id %s, ref_count = %s" %
2706 (self
.id, self
.ref_count
))
2707 self
._log
.critical(msg
)
2708 raise NetworkServiceDescriptorError(msg
)
2709 self
._ref
_count
-= 1
2713 """ Return the message associated with this NetworkServiceDescriptor"""
2717 def path_for_id(nsd_id
):
2718 """ Return path for the passed nsd_id"""
2719 return "C,/nsd:nsd-catalog/nsd:nsd[nsd:id = '{}'".format(nsd_id
)
2722 """ Return the message associated with this NetworkServiceDescriptor"""
2723 return NetworkServiceDescriptor
.path_for_id(self
.id)
2725 def update(self
, nsd
):
2726 """ Update the NSD descriptor """
2730 class NsdDtsHandler(object):
2731 """ The network service descriptor DTS handler """
2732 XPATH
= "C,/nsd:nsd-catalog/nsd:nsd"
2734 def __init__(self
, dts
, log
, loop
, nsm
):
2744 """ Return registration handle """
2749 """ Register for Nsd create/update/delete/read requests from dts """
2751 def on_apply(dts
, acg
, xact
, action
, scratch
):
2752 """Apply the configuration"""
2753 is_recovery
= xact
.xact
is None and action
== rwdts
.AppconfAction
.INSTALL
2754 self
._log
.debug("Got nsd apply cfg (xact:%s) (action:%s)",
2756 # Create/Update an NSD record
2757 for cfg
in self
._regh
.get_xact_elements(xact
):
2758 # Only interested in those NSD cfgs whose ID was received in prepare callback
2759 if cfg
.id in scratch
.get('nsds', []) or is_recovery
:
2760 self
._nsm
.update_nsd(cfg
)
2762 scratch
.pop('nsds', None)
2764 return RwTypes
.RwStatus
.SUCCESS
2767 def delete_nsd_libs(nsd_id
):
2768 """ Remove any files uploaded with NSD and stored under $RIFT_ARTIFACTS/libs/<id> """
2770 rift_artifacts_dir
= os
.environ
['RIFT_ARTIFACTS']
2771 nsd_dir
= os
.path
.join(rift_artifacts_dir
, 'launchpad/libs', nsd_id
)
2773 if os
.path
.exists (nsd_dir
):
2774 shutil
.rmtree(nsd_dir
, ignore_errors
=True)
2775 except Exception as e
:
2776 self
._log
.error("Exception in cleaning up NSD libs {}: {}".
2778 self
._log
.excpetion(e
)
2781 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
2782 """ Prepare callback from DTS for NSD config """
2784 self
._log
.info("Got nsd prepare - config received nsd id %s, msg %s",
2787 fref
= ProtobufC
.FieldReference
.alloc()
2788 fref
.goto_whole_message(msg
.to_pbcm())
2790 if fref
.is_field_deleted():
2791 # Delete an NSD record
2792 self
._log
.debug("Deleting NSD with id %s", msg
.id)
2793 if self
._nsm
.nsd_in_use(msg
.id):
2794 self
._log
.debug("Cannot delete NSD in use - %s", msg
.id)
2795 err
= "Cannot delete an NSD in use - %s" % msg
.id
2796 raise NetworkServiceDescriptorRefCountExists(err
)
2798 yield from delete_nsd_libs(msg
.id)
2799 self
._nsm
.delete_nsd(msg
.id)
2801 # Add this NSD to scratch to create/update in apply callback
2802 nsds
= scratch
.setdefault('nsds', [])
2804 # acg._scratch['nsds'].append(msg.id)
2806 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2809 "Registering for NSD config using xpath: %s",
2810 NsdDtsHandler
.XPATH
,
2813 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2814 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2815 # Need a list in scratch to store NSDs to create/update later
2816 # acg._scratch['nsds'] = list()
2817 self
._regh
= acg
.register(
2818 xpath
=NsdDtsHandler
.XPATH
,
2819 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
2820 on_prepare
=on_prepare
)
2823 class VnfdDtsHandler(object):
2824 """ DTS handler for VNFD config changes """
2825 XPATH
= "C,/vnfd:vnfd-catalog/vnfd:vnfd"
2827 def __init__(self
, dts
, log
, loop
, nsm
):
2836 """ DTS registration handle """
2841 """ Register for VNFD configuration"""
2844 def on_apply(dts
, acg
, xact
, action
, scratch
):
2845 """Apply the configuration"""
2846 self
._log
.debug("Got NSM VNFD apply (xact: %s) (action: %s)(scr: %s)",
2847 xact
, action
, scratch
)
2849 # Create/Update a VNFD record
2850 for cfg
in self
._regh
.get_xact_elements(xact
):
2851 # Only interested in those VNFD cfgs whose ID was received in prepare callback
2852 if cfg
.id in scratch
.get('vnfds', []):
2853 self
._nsm
.update_vnfd(cfg
)
2855 for cfg
in self
._regh
.elements
:
2856 if cfg
.id in scratch
.get('deleted_vnfds', []):
2857 yield from self
._nsm
.delete_vnfd(cfg
.id)
2859 scratch
.pop('vnfds', None)
2860 scratch
.pop('deleted_vnfds', None)
2863 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
2864 """ on prepare callback """
2865 self
._log
.debug("Got on prepare for VNFD (path: %s) (action: %s) (msg: %s)",
2866 ks_path
.to_xpath(RwNsmYang
.get_schema()), xact_info
.query_action
, msg
)
2868 fref
= ProtobufC
.FieldReference
.alloc()
2869 fref
.goto_whole_message(msg
.to_pbcm())
2871 # Handle deletes in prepare_callback, but adds/updates in apply_callback
2872 if fref
.is_field_deleted():
2873 self
._log
.debug("Adding msg to deleted field")
2874 deleted_vnfds
= scratch
.setdefault('deleted_vnfds', [])
2875 deleted_vnfds
.append(msg
.id)
2877 # Add this VNFD to scratch to create/update in apply callback
2878 vnfds
= scratch
.setdefault('vnfds', [])
2879 vnfds
.append(msg
.id)
2881 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
2884 "Registering for VNFD config using xpath: %s",
2885 VnfdDtsHandler
.XPATH
,
2887 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
2888 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
2889 # Need a list in scratch to store VNFDs to create/update later
2890 # acg._scratch['vnfds'] = list()
2891 # acg._scratch['deleted_vnfds'] = list()
2892 self
._regh
= acg
.register(
2893 xpath
=VnfdDtsHandler
.XPATH
,
2894 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY
,
2895 on_prepare
=on_prepare
)
2897 class NsrRpcDtsHandler(object):
2898 """ The network service instantiation RPC DTS handler """
2899 EXEC_NSR_CONF_XPATH
= "I,/nsr:start-network-service"
2900 EXEC_NSR_CONF_O_XPATH
= "O,/nsr:start-network-service"
2901 NETCONF_IP_ADDRESS
= "127.0.0.1"
2903 NETCONF_USER
= "admin"
2904 NETCONF_PW
= "admin"
2906 def __init__(self
, dts
, log
, loop
, nsm
):
2913 self
._ns
_regh
= None
2915 self
._manager
= None
2917 self
._model
= RwYang
.Model
.create_libncx()
2918 self
._model
.load_schema_ypbc(RwNsrYang
.get_schema())
2922 """ Return the NS manager instance """
2926 def wrap_netconf_config_xml(xml
):
2927 xml
= '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml
)
2931 def _connect(self
, timeout_secs
=240):
2933 start_time
= time
.time()
2934 while (time
.time() - start_time
) < timeout_secs
:
2937 self
._log
.debug("Attemping NsmTasklet netconf connection.")
2939 manager
= yield from ncclient
.asyncio_manager
.asyncio_connect(
2941 host
=NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
,
2942 port
=NsrRpcDtsHandler
.NETCONF_PORT
,
2943 username
=NsrRpcDtsHandler
.NETCONF_USER
,
2944 password
=NsrRpcDtsHandler
.NETCONF_PW
,
2946 look_for_keys
=False,
2947 hostkey_verify
=False,
2952 except ncclient
.transport
.errors
.SSHError
as e
:
2953 self
._log
.warning("Netconf connection to launchpad %s failed: %s",
2954 NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
, str(e
))
2956 yield from asyncio
.sleep(5, loop
=self
._loop
)
2958 raise NsrInstantiationFailed("Failed to connect to Launchpad within %s seconds" %
2963 """ Register for NS monitoring read from dts """
2965 def on_ns_config_prepare(xact_info
, action
, ks_path
, msg
):
2966 """ prepare callback from dts start-network-service"""
2967 assert action
== rwdts
.QueryAction
.RPC
2969 rpc_op
= NsrYang
.YangOutput_Nsr_StartNetworkService
.from_dict({
2970 "nsr_id":str(uuid
.uuid4())
2973 if not ('name' in rpc_ip
and 'nsd_ref' in rpc_ip
and 'cloud_account' in rpc_ip
):
2974 self
._log
.error("Mandatory parameters name or nsd_ref or cloud account not found in start-network-service {}".format(rpc_ip
))
2977 self
._log
.debug("start-network-service RPC input: {}".format(rpc_ip
))
2980 # Add used value to the pool
2981 self
._log
.debug("RPC output: {}".format(rpc_op
))
2982 nsd_copy
= self
.nsm
.get_nsd(rpc_ip
.nsd_ref
)
2984 if not self
._manager
:
2985 self
._manager
= yield from self
._connect
()
2987 self
._log
.debug("Configuring ns-instance-config with name %s nsd-ref: %s",
2988 rpc_ip
.name
, rpc_ip
.nsd_ref
)
2990 ns_instance_config_dict
= {"id":rpc_op
.nsr_id
, "admin_status":"ENABLED"}
2991 ns_instance_config_copy_dict
= {k
:v
for k
, v
in rpc_ip
.as_dict().items()
2992 if k
in RwNsrYang
.YangData_Nsr_NsInstanceConfig_Nsr().fields
}
2993 ns_instance_config_dict
.update(ns_instance_config_copy_dict
)
2995 ns_instance_config
= RwNsrYang
.YangData_Nsr_NsInstanceConfig_Nsr
.from_dict(ns_instance_config_dict
)
2996 ns_instance_config
.nsd
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_Nsd()
2997 ns_instance_config
.nsd
.from_dict(nsd_copy
.msg
.as_dict())
2999 xml
= ns_instance_config
.to_xml_v2(self
._model
)
3000 netconf_xml
= self
.wrap_netconf_config_xml(xml
)
3002 self
._log
.debug("Sending configure ns-instance-config xml to %s: %s",
3003 netconf_xml
, NsrRpcDtsHandler
.NETCONF_IP_ADDRESS
)
3005 response
= yield from self
._manager
.edit_config(
3009 self
._log
.debug("Received edit config response: %s", str(response
))
3011 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
,
3012 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
,
3014 except Exception as e
:
3015 self
._log
.error("Exception processing the "
3016 "start-network-service: {}".format(e
))
3017 self
._log
.exception(e
)
3018 xact_info
.respond_xpath(rwdts
.XactRspCode
.NACK
,
3019 NsrRpcDtsHandler
.EXEC_NSR_CONF_O_XPATH
)
3022 hdl_ns
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_ns_config_prepare
,)
3024 with self
._dts
.group_create() as group
:
3025 self
._ns
_regh
= group
.register(xpath
=NsrRpcDtsHandler
.EXEC_NSR_CONF_XPATH
,
3027 flags
=rwdts
.Flag
.PUBLISHER
,
3031 class NsrDtsHandler(object):
3032 """ The network service DTS handler """
3033 NSR_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr"
3034 SCALE_INSTANCE_XPATH
= "C,/nsr:ns-instance-config/nsr:nsr/nsr:scaling-group/nsr:instance"
3036 def __init__(self
, dts
, log
, loop
, nsm
):
3042 self
._nsr
_regh
= None
3043 self
._scale
_regh
= None
3047 """ Return the NS manager instance """
3052 """ Register for Nsr create/update/delete/read requests from dts """
3054 def nsr_id_from_keyspec(ks
):
3055 nsr_path_entry
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr
.schema().keyspec_to_entry(ks
)
3056 nsr_id
= nsr_path_entry
.key00
.id
3059 def group_name_from_keyspec(ks
):
3060 group_path_entry
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
.schema().keyspec_to_entry(ks
)
3061 group_name
= group_path_entry
.key00
.scaling_group_name_ref
3064 def is_instance_in_reg_elements(nsr_id
, group_name
, instance_id
):
3065 """ Return boolean indicating if scaling group instance was already commited previously.
3067 By looking at the existing elements in this registration handle (elements not part
3068 of this current xact), we can tell if the instance was configured previously without
3069 keeping any application state.
3071 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3072 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3073 elem_group_name
= group_name_from_keyspec(keyspec
)
3075 if elem_nsr_id
!= nsr_id
or group_name
!= elem_group_name
:
3078 if instance_cfg
.id == instance_id
:
3083 def get_scale_group_instance_delta(nsr_id
, group_name
, xact
):
3084 delta
= {"added": [], "deleted": []}
3085 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3086 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3087 if elem_nsr_id
!= nsr_id
:
3090 elem_group_name
= group_name_from_keyspec(keyspec
)
3091 if elem_group_name
!= group_name
:
3094 delta
["added"].append(instance_cfg
.id)
3096 for instance_cfg
, keyspec
in self
._scale
_regh
.get_xact_elements(include_keyspec
=True):
3097 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3098 if elem_nsr_id
!= nsr_id
:
3101 elem_group_name
= group_name_from_keyspec(keyspec
)
3102 if elem_group_name
!= group_name
:
3105 if instance_cfg
.id in delta
["added"]:
3106 delta
["added"].remove(instance_cfg
.id)
3108 delta
["deleted"].append(instance_cfg
.id)
3113 def update_nsr_nsd(nsr_id
, xact
, scratch
):
3116 def get_nsr_vl_delta(nsr_id
, xact
, scratch
):
3117 delta
= {"added": [], "deleted": []}
3118 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(xact
, include_keyspec
=True):
3119 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3120 if elem_nsr_id
!= nsr_id
:
3123 if 'vld' in instance_cfg
.nsd
:
3124 for vld
in instance_cfg
.nsd
.vld
:
3125 delta
["added"].append(vld
)
3127 for instance_cfg
, keyspec
in self
._nsr
_regh
.get_xact_elements(include_keyspec
=True):
3128 self
._log
.debug("NSR update: %s", instance_cfg
)
3129 elem_nsr_id
= nsr_id_from_keyspec(keyspec
)
3130 if elem_nsr_id
!= nsr_id
:
3133 if 'vld' in instance_cfg
.nsd
:
3134 for vld
in instance_cfg
.nsd
.vld
:
3135 if vld
in delta
["added"]:
3136 delta
["added"].remove(vld
)
3138 delta
["deleted"].append(vld
)
3142 vl_delta
= yield from get_nsr_vl_delta(nsr_id
, xact
, scratch
)
3143 self
._log
.debug("Got NSR:%s VL instance delta: %s", nsr_id
, vl_delta
)
3145 for vld
in vl_delta
["added"]:
3146 yield from self
._nsm
.nsr_instantiate_vl(nsr_id
, vld
)
3148 for vld
in vl_delta
["deleted"]:
3149 yield from self
._nsm
.nsr_terminate_vl(nsr_id
, vld
)
3151 def get_add_delete_update_cfgs(dts_member_reg
, xact
, key_name
, scratch
):
3152 # Unfortunately, it is currently difficult to figure out what has exactly
3153 # changed in this xact without Pbdelta support (RIFT-4916)
3154 # As a workaround, we can fetch the pre and post xact elements and
3155 # perform a comparison to figure out adds/deletes/updates
3156 xact_cfgs
= list(dts_member_reg
.get_xact_elements(xact
))
3157 curr_cfgs
= list(dts_member_reg
.elements
)
3159 xact_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in xact_cfgs
}
3160 curr_key_map
= {getattr(cfg
, key_name
): cfg
for cfg
in curr_cfgs
}
3163 added_keys
= set(xact_key_map
) - set(curr_key_map
)
3164 added_cfgs
= [xact_key_map
[key
] for key
in added_keys
]
3167 deleted_keys
= set(curr_key_map
) - set(xact_key_map
)
3168 deleted_cfgs
= [curr_key_map
[key
] for key
in deleted_keys
]
3171 updated_keys
= set(curr_key_map
) & set(xact_key_map
)
3172 updated_cfgs
= [xact_key_map
[key
] for key
in updated_keys
3173 if xact_key_map
[key
] != curr_key_map
[key
]]
3175 return added_cfgs
, deleted_cfgs
, updated_cfgs
3177 def on_apply(dts
, acg
, xact
, action
, scratch
):
3178 """Apply the configuration"""
3179 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3180 xact
, action
, scratch
)
3182 def handle_create_nsr(msg
, restart_mode
=False):
3183 # Handle create nsr requests """
3184 # Do some validations
3185 if not msg
.has_field("nsd"):
3186 err
= "NSD not provided"
3187 self
._log
.error(err
)
3188 raise NetworkServiceRecordError(err
)
3190 self
._log
.debug("Creating NetworkServiceRecord %s from nsr config %s",
3191 msg
.id, msg
.as_dict())
3192 nsr
= self
.nsm
.create_nsr(msg
, restart_mode
=restart_mode
)
3195 def handle_delete_nsr(msg
):
3197 def delete_instantiation(ns_id
):
3198 """ Delete instantiation """
3199 with self
._dts
.transaction() as xact
:
3200 yield from self
._nsm
.terminate_ns(ns_id
, xact
)
3202 # Handle delete NSR requests
3203 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
3204 # Terminate the NSR instance
3205 nsr
= self
._nsm
.get_ns_by_nsr_id(msg
.id)
3207 nsr
.set_state(NetworkServiceRecordState
.TERMINATE_RCVD
)
3208 event_descr
= "Terminate rcvd for NS Id:%s" % msg
.id
3209 nsr
.record_event("terminate-rcvd", event_descr
)
3211 self
._loop
.create_task(delete_instantiation(msg
.id))
3214 def begin_instantiation(nsr
):
3215 # Begin instantiation
3216 self
._log
.info("Beginning NS instantiation: %s", nsr
.id)
3217 yield from self
._nsm
.instantiate_ns(nsr
.id, xact
)
3219 self
._log
.debug("Got nsr apply (xact: %s) (action: %s)(scr: %s)",
3220 xact
, action
, scratch
)
3222 if action
== rwdts
.AppconfAction
.INSTALL
and xact
.id is None:
3223 for element
in self
._nsr
_regh
.elements
:
3224 nsr
= handle_create_nsr(element
, restart_mode
=True)
3225 self
._loop
.create_task(begin_instantiation(nsr
))
3228 (added_msgs
, deleted_msgs
, updated_msgs
) = get_add_delete_update_cfgs(self
._nsr
_regh
,
3232 self
._log
.debug("Added: %s, Deleted: %s, Updated: %s", added_msgs
,
3233 deleted_msgs
, updated_msgs
)
3235 for msg
in added_msgs
:
3236 if msg
.id not in self
._nsm
.nsrs
:
3237 self
._log
.info("Create NSR received in on_apply to instantiate NS:%s", msg
.id)
3238 nsr
= handle_create_nsr(msg
)
3239 self
._loop
.create_task(begin_instantiation(nsr
))
3241 for msg
in deleted_msgs
:
3242 self
._log
.info("Delete NSR received in on_apply to terminate NS:%s", msg
.id)
3244 handle_delete_nsr(msg
)
3246 self
._log
.exception("Failed to terminate NS:%s", msg
.id)
3248 for msg
in updated_msgs
:
3249 self
._log
.info("Update NSR received in on_apply: %s", msg
)
3251 self
._nsm
.nsr_update_cfg(msg
.id, msg
)
3254 self
._loop
.create_task(update_nsr_nsd(msg
.id, xact
, scratch
))
3256 for group
in msg
.scaling_group
:
3257 instance_delta
= get_scale_group_instance_delta(msg
.id, group
.scaling_group_name_ref
, xact
)
3258 self
._log
.debug("Got NSR:%s scale group instance delta: %s", msg
.id, instance_delta
)
3260 for instance_id
in instance_delta
["added"]:
3261 self
._nsm
.scale_nsr_out(msg
.id, group
.scaling_group_name_ref
, instance_id
, xact
)
3263 for instance_id
in instance_delta
["deleted"]:
3264 self
._nsm
.scale_nsr_in(msg
.id, group
.scaling_group_name_ref
, instance_id
)
3267 return RwTypes
.RwStatus
.SUCCESS
3270 def on_prepare(dts
, acg
, xact
, xact_info
, ks_path
, msg
, scratch
):
3271 """ Prepare calllback from DTS for NSR """
3273 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3274 action
= xact_info
.query_action
3276 "Got Nsr prepare callback (xact: %s) (action: %s) (info: %s), %s:%s)",
3277 xact
, action
, xact_info
, xpath
, msg
3281 def delete_instantiation(ns_id
):
3282 """ Delete instantiation """
3283 yield from self
._nsm
.terminate_ns(ns_id
, None)
3285 def handle_delete_nsr():
3286 """ Handle delete NSR requests """
3287 self
._log
.info("Delete req for NSR Id: %s received", msg
.id)
3288 # Terminate the NSR instance
3289 nsr
= self
._nsm
.get_ns_by_nsr_id(msg
.id)
3291 nsr
.set_state(NetworkServiceRecordState
.TERMINATE_RCVD
)
3292 event_descr
= "Terminate rcvd for NS Id:%s" % msg
.id
3293 nsr
.record_event("terminate-rcvd", event_descr
)
3295 self
._loop
.create_task(delete_instantiation(msg
.id))
3297 fref
= ProtobufC
.FieldReference
.alloc()
3298 fref
.goto_whole_message(msg
.to_pbcm())
3300 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
, rwdts
.QueryAction
.DELETE
]:
3301 # if this is an NSR create
3302 if action
!= rwdts
.QueryAction
.DELETE
and msg
.id not in self
._nsm
.nsrs
:
3303 # Ensure the Cloud account/datacenter has been specified
3304 if not msg
.has_field("cloud_account") and not msg
.has_field("om_datacenter"):
3305 raise NsrInstantiationFailed("Cloud account or datacenter not specified in NSR")
3307 # Check if nsd is specified
3308 if not msg
.has_field("nsd"):
3309 raise NsrInstantiationFailed("NSD not specified in NSR")
3312 nsr
= self
._nsm
.nsrs
[msg
.id]
3314 if msg
.has_field("nsd"):
3315 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3316 raise NsrVlUpdateError("Unable to update VL when NSR not in running state")
3317 if 'vld' not in msg
.nsd
or len(msg
.nsd
.vld
) == 0:
3318 raise NsrVlUpdateError("NS config NSD should have atleast 1 VLD defined")
3320 if msg
.has_field("scaling_group"):
3321 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3322 raise ScalingOperationError("Unable to perform scaling action when NS is not in running state")
3324 if len(msg
.scaling_group
) > 1:
3325 raise ScalingOperationError("Only a single scaling group can be configured at a time")
3327 for group_msg
in msg
.scaling_group
:
3328 num_new_group_instances
= len(group_msg
.instance
)
3329 if num_new_group_instances
> 1:
3330 raise ScalingOperationError("Only a single scaling instance can be modified at a time")
3332 elif num_new_group_instances
== 1:
3333 scale_group
= nsr
.scaling_groups
[group_msg
.scaling_group_name_ref
]
3334 if action
in [rwdts
.QueryAction
.CREATE
, rwdts
.QueryAction
.UPDATE
]:
3335 if len(scale_group
.instances
) == scale_group
.max_instance_count
:
3336 raise ScalingOperationError("Max instances for %s reached" % scale_group
)
3338 acg
.handle
.prepare_complete_ok(xact_info
.handle
)
3341 self
._log
.debug("Registering for NSR config using xpath: %s",
3342 NsrDtsHandler
.NSR_XPATH
)
3344 acg_hdl
= rift
.tasklets
.AppConfGroup
.Handler(on_apply
=on_apply
)
3345 with self
._dts
.appconf_group_create(handler
=acg_hdl
) as acg
:
3346 self
._nsr
_regh
= acg
.register(xpath
=NsrDtsHandler
.NSR_XPATH
,
3347 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY | rwdts
.Flag
.CACHE
,
3348 on_prepare
=on_prepare
)
3350 self
._scale
_regh
= acg
.register(
3351 xpath
=NsrDtsHandler
.SCALE_INSTANCE_XPATH
,
3352 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.DELTA_READY| rwdts
.Flag
.CACHE
,
3356 class NsrOpDataDtsHandler(object):
3357 """ The network service op data DTS handler """
3358 XPATH
= "D,/nsr:ns-instance-opdata/nsr:nsr"
3360 def __init__(self
, dts
, log
, loop
, nsm
):
3369 """ Return the registration handle"""
3374 """ Return the NS manager instance """
3379 """ Register for Nsr op data publisher registration"""
3380 self
._log
.debug("Registering Nsr op data path %s as publisher",
3381 NsrOpDataDtsHandler
.XPATH
)
3383 hdl
= rift
.tasklets
.DTS
.RegistrationHandler()
3384 handlers
= rift
.tasklets
.Group
.Handler()
3385 with self
._dts
.group_create(handler
=handlers
) as group
:
3386 self
._regh
= group
.register(xpath
=NsrOpDataDtsHandler
.XPATH
,
3388 flags
=rwdts
.Flag
.PUBLISHER | rwdts
.Flag
.NO_PREP_READ | rwdts
.Flag
.DATASTORE
)
3391 def create(self
, path
, msg
):
3393 Create an NS record in DTS with the path and message
3395 self
._log
.debug("Creating NSR %s:%s", path
, msg
)
3396 self
.regh
.create_element(path
, msg
)
3397 self
._log
.debug("Created NSR, %s:%s", path
, msg
)
3400 def update(self
, path
, msg
, flags
=rwdts
.XactFlag
.REPLACE
):
3402 Update an NS record in DTS with the path and message
3404 self
._log
.debug("Updating NSR, %s:%s regh = %s", path
, msg
, self
.regh
)
3405 self
.regh
.update_element(path
, msg
, flags
)
3406 self
._log
.debug("Updated NSR, %s:%s", path
, msg
)
3409 def delete(self
, path
):
3411 Update an NS record in DTS with the path and message
3413 self
._log
.debug("Deleting NSR path:%s", path
)
3414 self
.regh
.delete_element(path
)
3415 self
._log
.debug("Deleted NSR path:%s", path
)
3418 class VnfrDtsHandler(object):
3419 """ The virtual network service DTS handler """
3420 XPATH
= "D,/vnfr:vnfr-catalog/vnfr:vnfr"
3422 def __init__(self
, dts
, log
, loop
, nsm
):
3432 """ Return registration handle """
3437 """ Return the NS manager instance """
3442 """ Register for vnfr create/update/delete/ advises from dts """
3444 def on_commit(xact_info
):
3445 """ The transaction has been committed """
3446 self
._log
.debug("Got vnfr commit (xact_info: %s)", xact_info
)
3447 return rwdts
.MemberRspCode
.ACTION_OK
3450 def on_prepare(xact_info
, action
, ks_path
, msg
):
3451 """ prepare callback from dts """
3452 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3454 "Got vnfr on_prepare cb (xact_info: %s, action: %s): %s:%s",
3455 xact_info
, action
, ks_path
, msg
3458 schema
= VnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.schema()
3459 path_entry
= schema
.keyspec_to_entry(ks_path
)
3460 if path_entry
.key00
.id not in self
._nsm
._vnfrs
:
3461 self
._log
.error("%s request for non existent record path %s",
3463 xact_info
.respond_xpath(rwdts
.XactRspCode
.NA
, xpath
)
3467 self
._log
.debug("Deleting VNFR with id %s", path_entry
.key00
.id)
3468 if action
== rwdts
.QueryAction
.CREATE
or action
== rwdts
.QueryAction
.UPDATE
:
3469 yield from self
._nsm
.update_vnfr(msg
)
3470 elif action
== rwdts
.QueryAction
.DELETE
:
3471 self
._log
.debug("Deleting VNFR with id %s", path_entry
.key00
.id)
3472 self
._nsm
.delete_vnfr(path_entry
.key00
.id)
3474 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
, xpath
)
3476 self
._log
.debug("Registering for VNFR using xpath: %s",
3477 VnfrDtsHandler
.XPATH
,)
3479 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_commit
=on_commit
,
3480 on_prepare
=on_prepare
,)
3481 with self
._dts
.group_create() as group
:
3482 self
._regh
= group
.register(xpath
=VnfrDtsHandler
.XPATH
,
3484 flags
=(rwdts
.Flag
.SUBSCRIBER
),)
3487 class NsdRefCountDtsHandler(object):
3488 """ The NSD Ref Count DTS handler """
3489 XPATH
= "D,/nsr:ns-instance-opdata/rw-nsr:nsd-ref-count"
3491 def __init__(self
, dts
, log
, loop
, nsm
):
3501 """ Return registration handle """
3506 """ Return the NS manager instance """
3511 """ Register for NSD ref count read from dts """
3514 def on_prepare(xact_info
, action
, ks_path
, msg
):
3515 """ prepare callback from dts """
3516 xpath
= ks_path
.to_xpath(RwNsrYang
.get_schema())
3518 if action
== rwdts
.QueryAction
.READ
:
3519 schema
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_NsdRefCount
.schema()
3520 path_entry
= schema
.keyspec_to_entry(ks_path
)
3521 nsd_list
= yield from self
._nsm
.get_nsd_refcount(path_entry
.key00
.nsd_id_ref
)
3522 for xpath
, msg
in nsd_list
:
3523 xact_info
.respond_xpath(rsp_code
=rwdts
.XactRspCode
.MORE
,
3526 xact_info
.respond_xpath(rwdts
.XactRspCode
.ACK
)
3528 raise NetworkServiceRecordError("Not supported operation %s" % action
)
3530 hdl
= rift
.tasklets
.DTS
.RegistrationHandler(on_prepare
=on_prepare
,)
3531 with self
._dts
.group_create() as group
:
3532 self
._regh
= group
.register(xpath
=NsdRefCountDtsHandler
.XPATH
,
3534 flags
=rwdts
.Flag
.PUBLISHER
,)
3537 class NsManager(object):
3538 """ The Network Service Manager class"""
3539 def __init__(self
, dts
, log
, loop
,
3540 nsr_handler
, vnfr_handler
, vlr_handler
, ro_plugin_selector
,
3541 vnffgmgr
, vnfd_pub_handler
, cloud_account_handler
):
3545 self
._nsr
_handler
= nsr_handler
3546 self
._vnfr
_pub
_handler
= vnfr_handler
3547 self
._vlr
_pub
_handler
= vlr_handler
3548 self
._vnffgmgr
= vnffgmgr
3549 self
._vnfd
_pub
_handler
= vnfd_pub_handler
3550 self
._cloud
_account
_handler
= cloud_account_handler
3552 self
._ro
_plugin
_selector
= ro_plugin_selector
3553 self
._ncclient
= rift
.mano
.ncclient
.NcClient(
3565 self
.cfgmgr_obj
= conman
.ROConfigManager(log
, loop
, dts
, self
)
3567 # TODO: All these handlers should move to tasklet level.
3568 # Passing self is often an indication of bad design
3569 self
._nsd
_dts
_handler
= NsdDtsHandler(dts
, log
, loop
, self
)
3570 self
._vnfd
_dts
_handler
= VnfdDtsHandler(dts
, log
, loop
, self
)
3571 self
._dts
_handlers
= [self
._nsd
_dts
_handler
,
3572 VnfrDtsHandler(dts
, log
, loop
, self
),
3573 NsdRefCountDtsHandler(dts
, log
, loop
, self
),
3574 NsrDtsHandler(dts
, log
, loop
, self
),
3575 ScalingRpcHandler(log
, dts
, loop
, self
.scale_rpc_callback
),
3576 NsrRpcDtsHandler(dts
,log
,loop
,self
),
3577 self
._vnfd
_dts
_handler
,
3598 def nsr_handler(self
):
3599 """" NSR handler """
3600 return self
._nsr
_handler
3604 """" So Obj handler """
3609 """ NSRs in this NSM"""
3614 """ NSDs in this NSM"""
3619 """ VNFDs in this NSM"""
3624 """ VNFRs in this NSM"""
3628 def nsr_pub_handler(self
):
3629 """ NSR publication handler """
3630 return self
._nsr
_handler
3633 def vnfr_pub_handler(self
):
3634 """ VNFR publication handler """
3635 return self
._vnfr
_pub
_handler
3638 def vlr_pub_handler(self
):
3639 """ VLR publication handler """
3640 return self
._vlr
_pub
_handler
3643 def vnfd_pub_handler(self
):
3644 return self
._vnfd
_pub
_handler
3648 """ Register all static DTS handlers """
3649 for dts_handle
in self
._dts
_handlers
:
3650 yield from dts_handle
.register()
3653 def get_ns_by_nsr_id(self
, nsr_id
):
3654 """ get NSR by nsr id """
3655 if nsr_id
not in self
._nsrs
:
3656 raise NetworkServiceRecordError("NSR id %s not found" % nsr_id
)
3658 return self
._nsrs
[nsr_id
]
3660 def scale_nsr_out(self
, nsr_id
, scale_group_name
, instance_id
, config_xact
):
3661 self
.log
.debug("Scale out NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3666 nsr
= self
._nsrs
[nsr_id
]
3667 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3668 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3670 self
._loop
.create_task(nsr
.create_scale_group_instance(scale_group_name
, instance_id
, config_xact
))
3672 def scale_nsr_in(self
, nsr_id
, scale_group_name
, instance_id
):
3673 self
.log
.debug("Scale in NetworkServiceRecord (nsr_id: %s) (scaling group: %s) (instance_id: %s)",
3678 nsr
= self
._nsrs
[nsr_id
]
3679 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3680 raise ScalingOperationError("Cannot perform scaling operation if NSR is not in running state")
3682 self
._loop
.create_task(nsr
.delete_scale_group_instance(scale_group_name
, instance_id
))
3684 def scale_rpc_callback(self
, xact
, msg
, action
):
3685 """Callback handler for RPC calls
3687 xact : Transaction Handler
3689 action : Scaling Action
3691 ScalingGroupInstance
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup_Instance
3692 ScalingGroup
= NsrYang
.YangData_Nsr_NsInstanceConfig_Nsr_ScalingGroup
3694 xpath
= ('C,/nsr:ns-instance-config/nsr:nsr[nsr:id="{}"]').format(
3696 instance
= ScalingGroupInstance
.from_dict({"id": msg
.instance_id
})
3699 def get_nsr_scaling_group():
3700 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
3702 for result
in results
:
3703 res
= yield from result
3704 nsr_config
= res
.result
3706 for scaling_group
in nsr_config
.scaling_group
:
3707 if scaling_group
.scaling_group_name_ref
== msg
.scaling_group_name_ref
:
3710 scaling_group
= nsr_config
.scaling_group
.add()
3711 scaling_group
.scaling_group_name_ref
= msg
.scaling_group_name_ref
3713 return (nsr_config
, scaling_group
)
3716 def update_config(nsr_config
):
3717 xml
= self
._ncclient
.convert_to_xml(RwNsrYang
, nsr_config
)
3718 xml
= '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0">{}</config>'.format(xml
)
3719 yield from self
._ncclient
.connect()
3720 yield from self
._ncclient
.manager
.edit_config(target
="running", config
=xml
, default_operation
="replace")
3724 nsr_config
, scaling_group
= yield from get_nsr_scaling_group()
3725 scaling_group
.instance
.append(instance
)
3726 yield from update_config(nsr_config
)
3730 nsr_config
, scaling_group
= yield from get_nsr_scaling_group()
3731 scaling_group
.instance
.remove(instance
)
3732 yield from update_config(nsr_config
)
3734 if action
== ScalingRpcHandler
.ACTION
.SCALE_OUT
:
3735 self
._loop
.create_task(scale_out())
3737 self
._loop
.create_task(scale_in())
3739 # Opdata based calls, disabled for now!
3740 # if action == ScalingRpcHandler.ACTION.SCALE_OUT:
3741 # self.scale_nsr_out(
3743 # msg.scaling_group_name_ref,
3747 # self.scale_nsr_in(
3749 # msg.scaling_group_name_ref,
3752 def nsr_update_cfg(self
, nsr_id
, msg
):
3753 nsr
= self
._nsrs
[nsr_id
]
3754 nsr
.nsr_cfg_msg
= msg
3756 def nsr_instantiate_vl(self
, nsr_id
, vld
):
3757 self
.log
.debug("NSR {} create VL {}".format(nsr_id
, vld
))
3758 nsr
= self
._nsrs
[nsr_id
]
3759 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3760 raise NsrVlUpdateError("Cannot perform VL instantiate if NSR is not in running state")
3762 # Not calling in a separate task as this is called from a separate task
3763 yield from nsr
.create_vl_instance(vld
)
3765 def nsr_terminate_vl(self
, nsr_id
, vld
):
3766 self
.log
.debug("NSR {} delete VL {}".format(nsr_id
, vld
.id))
3767 nsr
= self
._nsrs
[nsr_id
]
3768 if nsr
.state
!= NetworkServiceRecordState
.RUNNING
:
3769 raise NsrVlUpdateError("Cannot perform VL terminate if NSR is not in running state")
3771 # Not calling in a separate task as this is called from a separate task
3772 yield from nsr
.delete_vl_instance(vld
)
3774 def create_nsr(self
, nsr_msg
, restart_mode
=False):
3775 """ Create an NSR instance """
3776 if nsr_msg
.id in self
._nsrs
:
3777 msg
= "NSR id %s already exists" % nsr_msg
.id
3778 self
._log
.error(msg
)
3779 raise NetworkServiceRecordError(msg
)
3781 self
._log
.info("Create NetworkServiceRecord nsr id %s from nsd_id %s",
3785 nsm_plugin
= self
._ro
_plugin
_selector
.ro_plugin
3786 sdn_account_name
= self
._cloud
_account
_handler
.get_cloud_account_sdn_name(nsr_msg
.cloud_account
)
3788 nsr
= NetworkServiceRecord(self
._dts
,
3795 restart_mode
=restart_mode
3797 self
._nsrs
[nsr_msg
.id] = nsr
3798 nsm_plugin
.create_nsr(nsr_msg
, nsr_msg
.nsd
)
3802 def delete_nsr(self
, nsr_id
):
3804 Delete NSR with the passed nsr id
3806 del self
._nsrs
[nsr_id
]
3809 def instantiate_ns(self
, nsr_id
, config_xact
):
3810 """ Instantiate an NS instance """
3811 self
._log
.debug("Instantiating Network service id %s", nsr_id
)
3812 if nsr_id
not in self
._nsrs
:
3813 err
= "NSR id %s not found " % nsr_id
3814 self
._log
.error(err
)
3815 raise NetworkServiceRecordError(err
)
3817 nsr
= self
._nsrs
[nsr_id
]
3818 yield from nsr
.nsm_plugin
.instantiate_ns(nsr
, config_xact
)
3821 def update_vnfr(self
, vnfr
):
3822 """Create/Update an VNFR """
3824 vnfr_state
= self
._vnfrs
[vnfr
.id].state
3825 self
._log
.debug("Updating VNFR with state %s: vnfr %s", vnfr_state
, vnfr
)
3827 yield from self
._vnfrs
[vnfr
.id].update_state(vnfr
)
3828 nsr
= self
.find_nsr_for_vnfr(vnfr
.id)
3829 yield from nsr
.update_state()
3831 def find_nsr_for_vnfr(self
, vnfr_id
):
3832 """ Find the NSR which )has the passed vnfr id"""
3833 for nsr
in list(self
.nsrs
.values()):
3834 for vnfr
in list(nsr
.vnfrs
.values()):
3835 if vnfr
.id == vnfr_id
:
3839 def delete_vnfr(self
, vnfr_id
):
3840 """ Delete VNFR with the passed id"""
3841 del self
._vnfrs
[vnfr_id
]
3843 def get_nsd_ref(self
, nsd_id
):
3844 """ Get network service descriptor for the passed nsd_id
3846 nsd
= self
.get_nsd(nsd_id
)
3851 def get_nsr_config(self
, nsd_id
):
3852 xpath
= "C,/nsr:ns-instance-config"
3853 results
= yield from self
._dts
.query_read(xpath
, rwdts
.XactFlag
.MERGE
)
3855 for result
in results
:
3856 entry
= yield from result
3857 ns_instance_config
= entry
.result
3859 for nsr
in ns_instance_config
.nsr
:
3860 if nsr
.nsd
.id == nsd_id
:
3866 def nsd_unref_by_nsr_id(self
, nsr_id
):
3867 """ Unref the network service descriptor based on NSR id """
3868 self
._log
.debug("NSR Unref called for Nsr Id:%s", nsr_id
)
3869 if nsr_id
in self
._nsrs
:
3870 nsr
= self
._nsrs
[nsr_id
]
3873 nsd
= self
.get_nsd(nsr
.nsd_id
)
3874 self
._log
.debug("Releasing ref on NSD %s held by NSR %s - Curr %d",
3875 nsd
.id, nsr
.id, nsd
.ref_count
)
3877 except NetworkServiceDescriptorError
:
3878 # We store a copy of NSD in NSR and the NSD in nsd-catalog
3883 self
._log
.error("Cannot find NSR with id %s", nsr_id
)
3884 raise NetworkServiceDescriptorUnrefError("No NSR with id" % nsr_id
)
3887 def nsd_unref(self
, nsd_id
):
3888 """ Unref the network service descriptor associated with the id """
3889 nsd
= self
.get_nsd(nsd_id
)
3892 def get_nsd(self
, nsd_id
):
3893 """ Get network service descriptor for the passed nsd_id"""
3894 if nsd_id
not in self
._nsds
:
3895 self
._log
.error("Cannot find NSD id:%s", nsd_id
)
3896 raise NetworkServiceDescriptorError("Cannot find NSD id:%s", nsd_id
)
3898 return self
._nsds
[nsd_id
]
3900 def create_nsd(self
, nsd_msg
):
3901 """ Create a network service descriptor """
3902 self
._log
.debug("Create network service descriptor - %s", nsd_msg
)
3903 if nsd_msg
.id in self
._nsds
:
3904 self
._log
.error("Cannot create NSD %s -NSD ID already exists", nsd_msg
)
3905 raise NetworkServiceDescriptorError("NSD already exists-%s", nsd_msg
.id)
3907 nsd
= NetworkServiceDescriptor(
3914 self
._nsds
[nsd_msg
.id] = nsd
3918 def update_nsd(self
, nsd
):
3919 """ update the Network service descriptor """
3920 self
._log
.debug("Update network service descriptor - %s", nsd
)
3921 if nsd
.id not in self
._nsds
:
3922 self
._log
.debug("No NSD found - creating NSD id = %s", nsd
.id)
3923 self
.create_nsd(nsd
)
3925 self
._log
.debug("Updating NSD id = %s, nsd = %s", nsd
.id, nsd
)
3926 self
._nsds
[nsd
.id].update(nsd
)
3928 def delete_nsd(self
, nsd_id
):
3929 """ Delete the Network service descriptor with the passed id """
3930 self
._log
.debug("Deleting the network service descriptor - %s", nsd_id
)
3931 if nsd_id
not in self
._nsds
:
3932 self
._log
.debug("Delete NSD failed - cannot find nsd-id %s", nsd_id
)
3933 raise NetworkServiceDescriptorNotFound("Cannot find %s", nsd_id
)
3935 if nsd_id
not in self
._nsds
:
3936 self
._log
.debug("Cannot delete NSD id %s reference exists %s",
3938 self
._nsds
[nsd_id
].ref_count
)
3939 raise NetworkServiceDescriptorRefCountExists(
3940 "Cannot delete :%s, ref_count:%s",
3942 self
._nsds
[nsd_id
].ref_count
)
3944 del self
._nsds
[nsd_id
]
3946 def get_vnfd_config(self
, xact
):
3947 vnfd_dts_reg
= self
._vnfd
_dts
_handler
.regh
3948 for cfg
in vnfd_dts_reg
.get_xact_elements(xact
):
3949 if cfg
.id not in self
._vnfds
:
3950 self
.create_vnfd(cfg
)
3952 def get_vnfd(self
, vnfd_id
, xact
):
3953 """ Get virtual network function descriptor for the passed vnfd_id"""
3954 if vnfd_id
not in self
._vnfds
:
3955 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
3956 self
.get_vnfd_config(xact
)
3958 if vnfd_id
not in self
._vnfds
:
3959 self
._log
.error("Cannot find VNFD id:%s", vnfd_id
)
3960 raise VnfDescriptorError("Cannot find VNFD id:%s", vnfd_id
)
3962 return self
._vnfds
[vnfd_id
]
3964 def create_vnfd(self
, vnfd
):
3965 """ Create a virtual network function descriptor """
3966 self
._log
.debug("Create virtual network function descriptor - %s", vnfd
)
3967 if vnfd
.id in self
._vnfds
:
3968 self
._log
.error("Cannot create VNFD %s -VNFD ID already exists", vnfd
)
3969 raise VnfDescriptorError("VNFD already exists-%s", vnfd
.id)
3971 self
._vnfds
[vnfd
.id] = vnfd
3972 return self
._vnfds
[vnfd
.id]
3974 def update_vnfd(self
, vnfd
):
3975 """ Update the virtual network function descriptor """
3976 self
._log
.debug("Update virtual network function descriptor- %s", vnfd
)
3978 # Hack to remove duplicates from leaf-lists - to be fixed by RIFT-6511
3979 for ivld
in vnfd
.internal_vld
:
3980 ivld
.internal_connection_point_ref
= list(set(ivld
.internal_connection_point_ref
))
3982 if vnfd
.id not in self
._vnfds
:
3983 self
._log
.debug("No VNFD found - creating VNFD id = %s", vnfd
.id)
3984 self
.create_vnfd(vnfd
)
3986 self
._log
.debug("Updating VNFD id = %s, vnfd = %s", vnfd
.id, vnfd
)
3987 self
._vnfds
[vnfd
.id] = vnfd
3990 def delete_vnfd(self
, vnfd_id
):
3991 """ Delete the virtual network function descriptor with the passed id """
3992 self
._log
.debug("Deleting the virtual network function descriptor - %s", vnfd_id
)
3993 if vnfd_id
not in self
._vnfds
:
3994 self
._log
.debug("Delete VNFD failed - cannot find vnfd-id %s", vnfd_id
)
3995 raise VnfDescriptorError("Cannot find %s", vnfd_id
)
3997 del self
._vnfds
[vnfd_id
]
3999 def nsd_in_use(self
, nsd_id
):
4000 """ Is the NSD with the passed id in use """
4001 self
._log
.debug("Is this NSD in use - msg:%s", nsd_id
)
4002 if nsd_id
in self
._nsds
:
4003 return self
._nsds
[nsd_id
].in_use()
4007 def publish_nsr(self
, xact
, path
, msg
):
4008 """ Publish a NSR """
4009 self
._log
.debug("Publish NSR with path %s, msg %s",
4011 yield from self
.nsr_handler
.update(xact
, path
, msg
)
4014 def unpublish_nsr(self
, xact
, path
):
4015 """ Un Publish an NSR """
4016 self
._log
.debug("Publishing delete NSR with path %s", path
)
4017 yield from self
.nsr_handler
.delete(path
, xact
)
4019 def vnfr_is_ready(self
, vnfr_id
):
4020 """ VNFR with the id is ready """
4021 self
._log
.debug("VNFR id %s ready", vnfr_id
)
4022 if vnfr_id
not in self
._vnfds
:
4023 err
= "Did not find VNFR ID with id %s" % vnfr_id
4024 self
._log
.critical("err")
4025 raise VirtualNetworkFunctionRecordError(err
)
4026 self
._vnfrs
[vnfr_id
].is_ready()
4029 def get_nsd_refcount(self
, nsd_id
):
4030 """ Get the nsd_list from this NSM"""
4032 def nsd_refcount_xpath(nsd_id
):
4033 """ xpath for ref count entry """
4034 return (NsdRefCountDtsHandler
.XPATH
+
4035 "[rw-nsr:nsd-id-ref = '{}']").format(nsd_id
)
4038 if nsd_id
is None or nsd_id
== "":
4039 for nsd
in self
._nsds
.values():
4040 nsd_msg
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4041 nsd_msg
.nsd_id_ref
= nsd
.id
4042 nsd_msg
.instance_ref_count
= nsd
.ref_count
4043 nsd_list
.append((nsd_refcount_xpath(nsd
.id), nsd_msg
))
4044 elif nsd_id
in self
._nsds
:
4045 nsd_msg
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_NsdRefCount()
4046 nsd_msg
.nsd_id_ref
= self
._nsds
[nsd_id
].id
4047 nsd_msg
.instance_ref_count
= self
._nsds
[nsd_id
].ref_count
4048 nsd_list
.append((nsd_refcount_xpath(nsd_id
), nsd_msg
))
4053 def terminate_ns(self
, nsr_id
, xact
):
4055 Terminate network service for the given NSR Id
4058 # Terminate the instances/networks assocaited with this nw service
4059 self
._log
.debug("Terminating the network service %s", nsr_id
)
4060 yield from self
._nsrs
[nsr_id
].terminate()
4063 yield from self
.nsd_unref_by_nsr_id(nsr_id
)
4065 # Unpublish the NSR record
4066 self
._log
.debug("Unpublishing the network service %s", nsr_id
)
4067 yield from self
._nsrs
[nsr_id
].unpublish(xact
)
4069 # Finaly delete the NS instance from this NS Manager
4070 self
._log
.debug("Deletng the network service %s", nsr_id
)
4071 self
.delete_nsr(nsr_id
)
4074 class NsmRecordsPublisherProxy(object):
4075 """ This class provides a publisher interface that allows plugin objects
4076 to publish NSR/VNFR/VLR"""
4078 def __init__(self
, dts
, log
, loop
, nsr_pub_hdlr
, vnfr_pub_hdlr
, vlr_pub_hdlr
):
4082 self
._nsr
_pub
_hdlr
= nsr_pub_hdlr
4083 self
._vlr
_pub
_hdlr
= vlr_pub_hdlr
4084 self
._vnfr
_pub
_hdlr
= vnfr_pub_hdlr
4087 def publish_nsr(self
, xact
, nsr
):
4088 """ Publish an NSR """
4089 path
= NetworkServiceRecord
.xpath_from_nsr(nsr
)
4090 return (yield from self
._nsr
_pub
_hdlr
.update(xact
, path
, nsr
))
4093 def unpublish_nsr(self
, xact
, nsr
):
4094 """ Unpublish an NSR """
4095 path
= NetworkServiceRecord
.xpath_from_nsr(nsr
)
4096 return (yield from self
._nsr
_pub
_hdlr
.delete(xact
, path
))
4099 def publish_vnfr(self
, xact
, vnfr
):
4100 """ Publish an VNFR """
4101 path
= VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
)
4102 return (yield from self
._vnfr
_pub
_hdlr
.update(xact
, path
, vnfr
))
4105 def unpublish_vnfr(self
, xact
, vnfr
):
4106 """ Unpublish a VNFR """
4107 path
= VirtualNetworkFunctionRecord
.vnfr_xpath(vnfr
)
4108 return (yield from self
._vnfr
_pub
_hdlr
.delete(xact
, path
))
4111 def publish_vlr(self
, xact
, vlr
):
4112 """ Publish a VLR """
4113 path
= VirtualLinkRecord
.vlr_xpath(vlr
)
4114 return (yield from self
._vlr
_pub
_hdlr
.update(xact
, path
, vlr
))
4117 def unpublish_vlr(self
, xact
, vlr
):
4118 """ Unpublish a VLR """
4119 path
= VirtualLinkRecord
.vlr_xpath(vlr
)
4120 return (yield from self
._vlr
_pub
_hdlr
.delete(xact
, path
))
4123 class ScalingRpcHandler(mano_dts
.DtsHandler
):
4124 """ The Network service Monitor DTS handler """
4125 SCALE_IN_INPUT_XPATH
= "I,/nsr:exec-scale-in"
4126 SCALE_IN_OUTPUT_XPATH
= "O,/nsr:exec-scale-in"
4128 SCALE_OUT_INPUT_XPATH
= "I,/nsr:exec-scale-out"
4129 SCALE_OUT_OUTPUT_XPATH
= "O,/nsr:exec-scale-out"
4131 ACTION
= Enum('ACTION', 'SCALE_IN SCALE_OUT')
4133 def __init__(self
, log
, dts
, loop
, callback
=None):
4134 super().__init
__(log
, dts
, loop
)
4135 self
.callback
= callback
4136 self
.last_instance_id
= defaultdict(int)
4142 def on_scale_in_prepare(xact_info
, action
, ks_path
, msg
):
4143 assert action
== rwdts
.QueryAction
.RPC
4147 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_IN
)
4149 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleIn
.from_dict({
4150 "instance_id": msg
.instance_id
})
4152 xact_info
.respond_xpath(
4153 rwdts
.XactRspCode
.ACK
,
4154 self
.__class
__.SCALE_IN_OUTPUT_XPATH
,
4157 except Exception as e
:
4158 self
.log
.exception(e
)
4159 xact_info
.respond_xpath(
4160 rwdts
.XactRspCode
.NACK
,
4161 self
.__class
__.SCALE_IN_OUTPUT_XPATH
)
4164 def on_scale_out_prepare(xact_info
, action
, ks_path
, msg
):
4165 assert action
== rwdts
.QueryAction
.RPC
4168 scaling_group
= msg
.scaling_group_name_ref
4169 if not msg
.instance_id
:
4170 last_instance_id
= self
.last_instance_id
[scale_group
]
4171 msg
.instance_id
= last_instance_id
+ 1
4172 self
.last_instance_id
[scale_group
] += 1
4175 self
.callback(xact_info
.xact
, msg
, self
.ACTION
.SCALE_OUT
)
4177 rpc_op
= NsrYang
.YangOutput_Nsr_ExecScaleOut
.from_dict({
4178 "instance_id": msg
.instance_id
})
4180 xact_info
.respond_xpath(
4181 rwdts
.XactRspCode
.ACK
,
4182 self
.__class
__.SCALE_OUT_OUTPUT_XPATH
,
4185 except Exception as e
:
4186 self
.log
.exception(e
)
4187 xact_info
.respond_xpath(
4188 rwdts
.XactRspCode
.NACK
,
4189 self
.__class
__.SCALE_OUT_OUTPUT_XPATH
)
4191 scale_in_hdl
= rift
.tasklets
.DTS
.RegistrationHandler(
4192 on_prepare
=on_scale_in_prepare
)
4193 scale_out_hdl
= rift
.tasklets
.DTS
.RegistrationHandler(
4194 on_prepare
=on_scale_out_prepare
)
4196 with self
.dts
.group_create() as group
:
4198 xpath
=self
.__class
__.SCALE_IN_INPUT_XPATH
,
4199 handler
=scale_in_hdl
,
4200 flags
=rwdts
.Flag
.PUBLISHER
)
4202 xpath
=self
.__class
__.SCALE_OUT_INPUT_XPATH
,
4203 handler
=scale_out_hdl
,
4204 flags
=rwdts
.Flag
.PUBLISHER
)
4207 class NsmTasklet(rift
.tasklets
.Tasklet
):
4209 The network service manager tasklet
4211 def __init__(self
, *args
, **kwargs
):
4212 super(NsmTasklet
, self
).__init
__(*args
, **kwargs
)
4213 self
.rwlog
.set_category("rw-mano-log")
4214 self
.rwlog
.set_subcategory("nsm")
4219 self
._ro
_plugin
_selector
= None
4220 self
._vnffgmgr
= None
4222 self
._nsr
_handler
= None
4223 self
._vnfr
_pub
_handler
= None
4224 self
._vlr
_pub
_handler
= None
4225 self
._vnfd
_pub
_handler
= None
4226 self
._scale
_cfg
_handler
= None
4228 self
._records
_publisher
_proxy
= None
4231 """ The task start callback """
4232 super(NsmTasklet
, self
).start()
4233 self
.log
.info("Starting NsmTasklet")
4235 self
.log
.debug("Registering with dts")
4236 self
._dts
= rift
.tasklets
.DTS(self
.tasklet_info
,
4237 RwNsmYang
.get_schema(),
4239 self
.on_dts_state_change
)
4241 self
.log
.debug("Created DTS Api GI Object: %s", self
._dts
)
4247 print("Caught Exception in NSM stop:", sys
.exc_info()[0])
4250 def on_instance_started(self
):
4251 """ Task instance started callback """
4252 self
.log
.debug("Got instance started callback")
4256 """ Task init callback """
4257 self
.log
.debug("Got instance started callback")
4259 self
.log
.debug("creating config account handler")
4261 self
._nsr
_pub
_handler
= publisher
.NsrOpDataDtsHandler(self
._dts
, self
.log
, self
.loop
)
4262 yield from self
._nsr
_pub
_handler
.register()
4264 self
._vnfr
_pub
_handler
= publisher
.VnfrPublisherDtsHandler(self
._dts
, self
.log
, self
.loop
)
4265 yield from self
._vnfr
_pub
_handler
.register()
4267 self
._vlr
_pub
_handler
= publisher
.VlrPublisherDtsHandler(self
._dts
, self
.log
, self
.loop
)
4268 yield from self
._vlr
_pub
_handler
.register()
4270 manifest
= self
.tasklet_info
.get_pb_manifest()
4271 use_ssl
= manifest
.bootstrap_phase
.rwsecurity
.use_ssl
4272 ssl_cert
= manifest
.bootstrap_phase
.rwsecurity
.cert
4273 ssl_key
= manifest
.bootstrap_phase
.rwsecurity
.key
4275 self
._vnfd
_pub
_handler
= publisher
.VnfdPublisher(use_ssl
, ssl_cert
, ssl_key
, self
.loop
)
4277 self
._records
_publisher
_proxy
= NsmRecordsPublisherProxy(
4281 self
._nsr
_pub
_handler
,
4282 self
._vnfr
_pub
_handler
,
4283 self
._vlr
_pub
_handler
,
4286 # Register the NSM to receive the nsm plugin
4287 # when cloud account is configured
4288 self
._ro
_plugin
_selector
= cloud
.ROAccountPluginSelector(
4292 self
._records
_publisher
_proxy
,
4294 yield from self
._ro
_plugin
_selector
.register()
4296 self
._cloud
_account
_handler
= cloud
.CloudAccountConfigSubscriber(
4301 yield from self
._cloud
_account
_handler
.register()
4303 self
._vnffgmgr
= rwvnffgmgr
.VnffgMgr(self
._dts
, self
.log
, self
.log_hdl
, self
.loop
)
4304 yield from self
._vnffgmgr
.register()
4306 self
._nsm
= NsManager(
4310 self
._nsr
_pub
_handler
,
4311 self
._vnfr
_pub
_handler
,
4312 self
._vlr
_pub
_handler
,
4313 self
._ro
_plugin
_selector
,
4315 self
._vnfd
_pub
_handler
,
4316 self
._cloud
_account
_handler
4319 yield from self
._nsm
.register()
4323 """ Task run callback """
4327 def on_dts_state_change(self
, state
):
4328 """Take action according to current dts state to transition
4329 application into the corresponding application state
4332 state - current dts state
4335 rwdts
.State
.INIT
: rwdts
.State
.REGN_COMPLETE
,
4336 rwdts
.State
.CONFIG
: rwdts
.State
.RUN
,
4340 rwdts
.State
.INIT
: self
.init
,
4341 rwdts
.State
.RUN
: self
.run
,
4344 # Transition application to next state
4345 handler
= handlers
.get(state
, None)
4346 if handler
is not None:
4347 yield from handler()
4349 # Transition dts to next state
4350 next_state
= switch
.get(state
, None)
4351 if next_state
is not None:
4352 self
.log
.debug("Changing state to %s", next_state
)
4353 self
._dts
.handle
.set_state(next_state
)